Pytorch

CIFAR-10 CNN으로 학습하기 [Pytorch]

JEO96 2022. 5. 12. 23:41
반응형

 

 

1. 서론

이번 글에서 이미지를 torch.nn.conv2d를 사용하여 CNN 모델을 구현하여 학습하는 코드를 작성해보려 한다. 

2. CIFAR-10

우선 CIFAR-10 dataset은 32 * 32 픽셀의 컬러 이미지로 50000개는 학습 데이터 10000개는 테스트 데이터로 구성되었으며 10개의 클래스로 labeling 되어있다.

3. 코드

3.1 import package

import ssl
import torch
import torch.nn as nn
from torchvision import transforms, datasets

ssl 모듈은 CIFAR10이 urllib 오류로 다운이 안될 때 해결하기 위해 추가한 모듈이다.

3.2 CIFAR-10 다운 및 데이터셋 구성

BATCH_SIZE = 32
train_dataset = datasets.CIFAR10(root="./data/",
                                 train=True,
                                 download=True,
                                 transform=transforms.ToTensor())

test_dataset = datasets.CIFAR10(root="./data/",
                                train=False,
                                download=True,
                                transform=transforms.ToTensor())

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=False)

print(train_loader.dataset)
# Dataset CIFAR10
#    Number of datapoints: 50000
#    Root location: ./data/
#    Split: Train
#    StandardTransform
#    Transform: ToTensor()

CIFAR10은 본 프로젝트의 data폴더 안에 다운로드하고 학습 데이터는 5만 장 테스트 데이터는 1만 장으로 구성되며 데이터 형태는 Tensor로 하였다. 그리고 학습 데이터는 순서를 섞어주었고 학습 데이터와 평가 데이터 모두 batch size를 32로 주었다.

for (X_train, Y_train) in train_loader:
    print(f"X_train: {X_train.size()} type: {X_train.type()}")
    print(f"Y_train: {Y_train.size()} type: {Y_train.type()}")
    break
# X_train: torch.Size([32, 3, 32, 32]) type: torch.FloatTensor
# Y_train: torch.Size([32]) type: torch.LongTensor

train_loader에 for 문을 사용하면 X_train은 [batch size, channel, width, height]으로 구성되어있다. 여기서 channel은 R, G, B로 3개의 channel로 이루어져 있다.

3.3 CNN 모델

이번에 구현한 CNN모델은 아래와 같이 구현하였다. X_train을 CNN에 넣으면 첫 번째로 self.conv1에 들어간다. 위에서 확인했듯이 학습 데이터의 channel은 3이므로 nn.Conv2d의 in_channels는 3으로 맞춰주어야 하며 out_channels는 다음 nn.Conv2d의 in_channels로 맞춰주면 된다. kernel_size는 3으로 주었는데 3 * 3 filter를 사용하겠다는 것이고 padding은 zero padding의 개수를 나타낸다. kernel_size가 3인 경우 padding은 1을 주로 사용하며 kernel_size가 5인 경우 padding는 2를 주로 사용한다. stride는 필터를 이동시키는 칸 수를 나타낸다.

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=3,
            out_channels=8,
            kernel_size=3,
            padding=1)
        self.conv2 = nn.Conv2d(
            in_channels=8,
            out_channels=16,
            kernel_size=3,
            padding=1)
        self.pool = nn.MaxPool2d(
            kernel_size=2,
            stride=2
        )
        self.fc1 = nn.Linear(8 * 8 * 16, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool(x)

        x = x.view(-1, 8 * 8 * 16)
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        x = torch.relu(x)
        x = self.fc3(x)
        x = torch.log_softmax(x, dim=1)
        return x

여기서 view는 tensor의 형 변환을 해준다. (-1, 8 * 8 * 16)을 사용하였는데 channels가 3에서 16으로 바뀌었으며 image size는 MaxPooling을 2번 사용하여 32 * 32 -> 16 * 16 -> 8 * 8로 바뀌어 x.view(-1, 8 * 8 * 16)을 사용하여 [32, 1024]로 바뀌었다.

3.3 모델 생성, 장비, optimizer, criterion 설정

DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

print(f"Using PyTorch version: {torch.__version__}, Device: {DEVICE}")

model = CNN().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

3.3 학습 함수

def train(model, train_loader, optimizer, log_interval):
    model.train()
    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.to(DEVICE)
        label = label.to(DEVICE)
        optimizer.zero_grad()
        output = model(image)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        if batch_idx % log_interval == 0:
            print(
                f"train Epoch: {Epoch} [{batch_idx * len(image)}/{len(train_loader.dataset)}({100. * batch_idx / len(train_loader):.0f}%)]\tTrain Loss: {loss.item()}")

3.4 평가 함수

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(DEVICE)
            label = label.to(DEVICE)
            output = model(image)
            test_loss += criterion(output, label).item()
            prediction = output.max(1, keepdim=True)[1]
            correct += prediction.eq(label.view_as(prediction)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

3.5 학습

EPOCHS = 10
for Epoch in range(1, EPOCHS + 1):
    train(model, train_loader, optimizer, log_interval=200)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print(f"\n[EPOCH: {Epoch}]\tTest Loss: {test_loss:.4f}\tTest Accuracy: {test_accuracy} % \n")

4. 전체 코드

import ssl
import torch
import torch.nn as nn
from torchvision import transforms, datasets

# ssl._create_default_https_context = ssl._create_unverified_context


BATCH_SIZE = 32
train_dataset = datasets.CIFAR10(root="./data/",
                                 train=True,
                                 download=True,
                                 transform=transforms.ToTensor())

test_dataset = datasets.CIFAR10(root="./data/",
                                train=False,
                                download=True,
                                transform=transforms.ToTensor())

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=False)

print(train_loader.dataset)

for (X_train, Y_train) in train_loader:
    print(f"X_train: {X_train.size()} type: {X_train.type()}")
    print(f"Y_train: {Y_train.size()} type: {Y_train.type()}")
    break


class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=3,
            out_channels=8,
            kernel_size=3,
            padding=1)
        self.conv2 = nn.Conv2d(
            in_channels=8,
            out_channels=16,
            kernel_size=3,
            padding=1)
        self.pool = nn.MaxPool2d(
            kernel_size=2,
            stride=2
        )
        self.fc1 = nn.Linear(8 * 8 * 16, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool(x)

        x = x.view(-1, 8 * 8 * 16)
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        x = torch.relu(x)
        x = self.fc3(x)
        x = torch.log_softmax(x, dim=1)
        return x


DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

print(f"Using PyTorch version: {torch.__version__}, Device: {DEVICE}")

model = CNN().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()


def train(model, train_loader, optimizer, log_interval):
    model.train()
    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.to(DEVICE)
        label = label.to(DEVICE)
        optimizer.zero_grad()
        output = model(image)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        if batch_idx % log_interval == 0:
            print(
                f"train Epoch: {Epoch} [{batch_idx * len(image)}/{len(train_loader.dataset)}({100. * batch_idx / len(train_loader):.0f}%)]\tTrain Loss: {loss.item()}")


def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(DEVICE)
            label = label.to(DEVICE)
            output = model(image)
            test_loss += criterion(output, label).item()
            prediction = output.max(1, keepdim=True)[1]
            correct += prediction.eq(label.view_as(prediction)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

EPOCHS = 10
for Epoch in range(1, EPOCHS + 1):
    train(model, train_loader, optimizer, log_interval=200)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print(f"\n[EPOCH: {Epoch}]\tTest Loss: {test_loss:.4f}\tTest Accuracy: {test_accuracy} % \n")

 

반응형