まだタイトルない

アウトプット用です

【未完】PyTorch LightningとWeights & Biasesを使った画像タスクパイプラインを作成し始めた

こんにちは

2021年の夏の連休も2021年に引き続きほぼ予定がなかったので、何らかのコンペに充てる気満々だったのですが、いろんな要因で時間を捧げるコンペはありませんでした。

コンペはしてなかったのですが、PyTorch LightningとWeights & Biases を使った画像タスクのパイプライン作成をしていました。いままではそういった物は持ってなくて最初のサブまでとても時間がかかっていたので、パイプラインをいじって学習して推論して初サブまでが早くなればいいと思っています。

今回は作ったものを載せつつ自分用のメモを書いていこうと思います。間違いや改善点ありましたらご指摘ください。

Config

class CFG:
    debug = True
    exp_name = "ex005_local"
    seed = 29
    # model
    model_name = 'efficientnet_b0'
    img_size = 224
    in_chans = 1
    target_col = 'target' # 目標値のある列名
    target_size = 1
    # optimizer
    optimizer_name = 'RAdam'
    lr = 1e-3
    weight_decay = 1e-5
    amsgrad = False
    # scheduler
    epochs = 10
    scheduler = 'CosineAnnealingLR'
    T_max = 10
    min_lr = 1e-5
    # criterion
    criterion_name = 'CrossEntropyLoss'
    
    # training
    train = True
    inference = False
    n_fold = 5
    trn_fold = [0]
    precision = 16 #[16, 32, 64]
    grad_acc = 1
    # DataLoader
    loader = {
        "train": {
            "batch_size": 128,
            "num_workers": 0,
            "shuffle": True,
            "pin_memory": True,
            "drop_last": True
        },
        "valid": {
            "batch_size": 128,
            "num_workers": 0,
            "shuffle": False,
            "pin_memory": True,
            "drop_last": False
        }
    }
  • wandbにconfigも記録するためにはdict型にする必要があるみたいなのでこの設定の持ち方は検討が必要な気がしています
  • そもそもこの設定の持ち方、dataclassでもなさそうだしなんていう呼び方をするのでしょう・・・?インスタンス作らなくてもCFG.model_nameで値取れますよね

Directory

# 環境によって処理を変えるためのもの
import sys
IN_COLAB = 'google.colab' in sys.modules
IN_KAGGLE = 'kaggle_web_client' in sys.modules
LOCAL = not (IN_KAGGLE or IN_COLAB)

if IN_KAGGLE:
    INPUT_DIR = Path('../input/BirdClef2021')
    OUTPUT_DIR = './'
elif IN_COLAB:
    INPUT_DIR = Path('/content/input/')
    OUTPUT_DIR = f'/content/drive/MyDrive/kaggle/BirdClef2021/{CFG.exp_name}/'
if LOCAL:
    INPUT_DIR = Path("F:/Kaggle/BirdClef2021/data/input/")
    OUTPUT_DIR = f'F:/Kaggle/BirdClef2021/data/output/{CFG.exp_name}/'

TRAIN_DIR = INPUT_DIR / "train"
TEST_DIR = INPUT_DIR / "test"

df_train = pd.read_csv(INPUT_DIR / "train_labels.csv")
df_test = pd.read_csv(INPUT_DIR / "sample_submission.csv")

if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

def get_filepath(name, folder=TRAIN_DIR):
    path = os.path.join(folder, name[0], f'{name}.npy')
    return path

df_train['image_path'] = df_train['id'].apply(lambda x: get_filepath(x, TRAIN_DIR))
df_test['image_path'] = df_test['id'].apply(lambda x: get_filepath(x, TEST_DIR))
  • 環境に応じた入力データの場所と出力先の設定をします
    • kaggle,colab,localの判断できるの結構便利だと思ってる
  • データのpath用の列も予め用意しておいて問題ないと思います

transform

def get_transforms(phase: str):
    if phase == 'train':
        return Compose([
            Resize(CFG.img_size, CFG.img_size),
            Transpose(p=0.5),
            HorizontalFlip(p=0.5),
            VerticalFlip(p=0.5),
            ShiftScaleRotate(p=0.5),
            Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(p=1.0),
        ])
    elif phase == 'valid':
        return Compose([
            Resize(CFG.img_size, CFG.img_size),
            Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(p=1.0),
        ])
  • normalizeしてなかったらtrainがuint8のままモデルに突っ込まれてエラー出てたのでここも要改善

dataset

class TrainDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.image_paths = df['image_path'].values
        self.targets = df[CFG.target_col].values
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        file_path = self.image_paths[idx]
        # タスクに合わせたロード方法
        #image = np.load(file_path).astype(np.float32)# (6, 273, 256)
        #image = np.vstack(image).transpose((1, 0))# (1638, 256) -> (256, 1638)
        image = cv2.imread(file_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transform:
            image = self.transform(image=image)["image"]
        else:
            image = image[np.newaxis,:,:]
            image = torch.from_numpy(image).float()
        #label = torch.tensor(self.targets[idx]).float()
        label = torch.tensor(self.targets[idx]).long()
        return image, label

LightningDataModule

PyTorch Lightning要素

class DataModule(pl.LightningDataModule):
    
    def __init__(self, train_data, valid_data, test_data):
        super().__init__()
        self.train_data = train_data
        self.valid_data = valid_data
        self.test_data = test_data
        
    # 必ず呼び出される関数
    def setup(self, stage=None):
        self.train_dataset = TrainDataset(self.train_data, transform=get_transforms(phase='train'))
        self.valid_dataset = TrainDataset(self.valid_data, transform=get_transforms(phase='valid'))
        self.test_dataset = TrainDataset(self.test_data, transform=get_transforms(phase='valid'))
        
    # Trainer.fit() 時に呼び出される
    def train_dataloader(self):
        return DataLoader(self.train_dataset,
                          batch_size=CFG.loader["train"]["batch_size"],
                          shuffle=True,
                          num_workers=CFG.loader["train"]["num_workers"],
                          pin_memory=True,
                          drop_last=True)

    # Trainer.fit() 時に呼び出される
    def val_dataloader(self):
        return DataLoader(self.valid_dataset,
                          batch_size=CFG.loader["valid"]["batch_size"],
                          shuffle=False,
                          num_workers=CFG.loader["valid"]["num_workers"],
                          pin_memory=True,
                          drop_last=False)

    def test_dataloader(self):
        return DataLoader(self.test_dataset,
                          batch_size=CFG.loader["valid"]["batch_size"],
                          shuffle=False,
                          num_workers=CFG.loader["valid"]["num_workers"],
                          pin_memory=True,
                          drop_last=False)

LightningModule

これもPyTorch Lightning要素

class Trainer(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.model = get_model(cfg)
        self.criterion = get_criterion()
    
    def forward(self, x):
        output = self.model(x)
        return output
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        # mixup とかしたい場合はここに差し込む
        output = self.forward(x)
        labels = y
        loss = self.criterion(output, labels)
        try:
            train_score = accuracy_score(labels.detach().cpu(), output.argmax(1).detach().cpu())
            self.log("train_score", train_score, prog_bar=True, logger=True)
            self.log('train_loss', loss)
        except:
            pass
        
        return {"loss": loss, "predictions": output, "labels": labels}
    
    def training_epoch_end(self, outputs):
        preds = []
        labels = []
        
        for output in outputs:
            preds += output['predictions']
            labels += output['labels']

        labels = torch.stack(labels)
        preds = torch.stack(preds)

        train_score = accuracy_score(labels.detach().cpu(), preds.argmax(1).detach().cpu())
        self.log("mean_train_score", train_score, prog_bar=True, logger=True)
        self.log("lr", self.optimizer.param_groups[0]['lr'], prog_bar=True, logger=True)
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        output = self.forward(x)
        labels = y#.unsqueeze(1)
        loss = self.criterion(output, labels)
        self.log('val_loss', loss, on_step= True, prog_bar=True, logger=True)
        return {"predictions": output, "labels": labels}
    
    def validation_epoch_end(self, outputs):
        preds = []
        labels = []
        
        for output in outputs:
            preds += output['predictions']
            labels += output['labels']

        labels = torch.stack(labels)
        preds = torch.stack(preds)

        val_score = accuracy_score(labels.detach().cpu(), preds.argmax(1).detach().cpu())
        self.log("val_score", val_score, prog_bar=True, logger=True)
        
    def test_step(self, batch, batch_idx):
        x = batch        
        output = self(x).sigmoid()
        return output
    
    def configure_optimizers(self):
        ### optimizer
        #optimizer = get_optimizer(self.parameters(), self.cfg)
        self.optimizer = get_optimizer(self, self.cfg)
        self.scheduler = get_scheduler(self.optimizer)
        return {'optimizer': self.optimizer, 'lr_scheduler': self.scheduler}

ログを記録したい値は、self.log に指定すると自動で記録される。epochは特に指定してないですが記録されちゃいます。

f:id:teyoblog:20210817122457p:plain

  • train_scoreとtrain_lossがstepでlogつけてるのですが明らかに頻度が少ないのでtryからだして様子を見るなど残課題が残っています。

main部分

def train() -> None:
    for fold in range(CFG.n_fold):
        if not fold in CFG.trn_fold:
            continue
        print(f"{'='*38} Fold: {fold} {'='*38}")
        # Logger
        #======================================================
        lr_monitor = LearningRateMonitor(logging_interval='step')
        # 学習済重みを保存するために必要
        loss_checkpoint = ModelCheckpoint(
            dirpath=OUTPUT_DIR,
            filename=f"best_loss_fold{fold}",
            monitor="val_loss",
            save_last=True,
            save_top_k=1,
            save_weights_only=True,
            mode="min",
        )
        auc_checkpoint = ModelCheckpoint(
            dirpath=OUTPUT_DIR,
            filename=f"best_auc_fold{fold}",
            monitor="val_score",
            save_top_k=1,
            save_weights_only=True,
            mode="max",
        )
        
        wandb_logger = WandbLogger(
            project='atma11',
            group= f'{CFG.exp_name}',
            name = f'Fold{fold}',
            save_dir=OUTPUT_DIR
        )
        
        data_module = DataModule(
          df_train[df_train['fold']!=fold],
          df_train[df_train['fold']==fold], 
          df_train[df_train['fold']==fold], 
        )
        data_module.setup()
        #early_stopping_callback = EarlyStopping(monitor='val_auc',mode="max", patience=2)
        
        trainer = pl.Trainer(
            logger=wandb_logger,
            callbacks=[loss_checkpoint, auc_checkpoint, lr_monitor],
            default_root_dir=OUTPUT_DIR,
            gpus=1,
            progress_bar_refresh_rate=1,
            accumulate_grad_batches=CFG.grad_acc,
            max_epochs=CFG.epochs,
            precision=CFG.precision,
            benchmark=False,
            deterministic=True,
        )
        model = Trainer(CFG)
        trainer.fit(model, data_module)
        wandb.finish()
  • 複数fold継続して学習できるようにしてます
  • ModelCheckpoint
    • monitorに指定した値(self.logのname?)で更新されたら保存、最後は保存等指定できる
    • 複数実験で保存するファイル名がかぶっても上書きされないようです
    • 私の環境ではTrainer側のdefault_root_dirでなくModelCheckpoint側のdirpathディレクトリを指定しないとその直下にweightは保存してもらえませんでした。
  • WandbLogger
    • projectはコンペ名で良さそう
    • group名が同じものに関してはまとめられる。グラフの線は平均値、分散が影として描画されます。
    • グラフ同士で比較したい場合はgroupを分ける必要がある。 f:id:teyoblog:20210817122533p:plain

その他

Split

df_train["fold"] = -1
Fold = StratifiedKFold(n_splits=CFG.n_fold, shuffle=True, random_state=CFG.seed)
for n, (train_index, val_index) in enumerate(Fold.split(df_train, df_train[CFG.target_col])):
    df_train.loc[val_index, 'fold'] = int(n)
df_train['fold'] = df_train['fold'].astype(int)
print(df_train.groupby(['fold', CFG.target_col]).size())

Get関数

# ====================================================
# model
# ====================================================
class CustomModel(nn.Module):
    def __init__(self, cfg, pretrained=False):
        super().__init__()
        self.cfg = cfg
        self.model = timm.create_model(model_name=self.cfg.model_name,
                                       pretrained=pretrained,
                                       in_chans=self.cfg.in_chans,
                                       num_classes=self.cfg.target_size)
        
    def forward(self, x):
        output = self.model(x)
        return output
    
def get_model(cfg):
    model = CustomModel(cfg, pretrained=cfg.pretrained)
    # plだと要らない?
    # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # model.to(device)
    # model.eval()
    return model

# ====================================================
# criterion
# ====================================================
def get_criterion():
    if CFG.criterion_name == 'BCEWithLogitsLoss':
        # plだとto(device)いらない
        criterion = nn.BCEWithLogitsLoss(reduction="mean")
    if CFG.criterion_name == 'CrossEntropyLoss':
        criterion = nn.CrossEntropyLoss()
    else:
        raise NotImplementedError
    return criterion
# ====================================================
# optimizer
# ====================================================
def get_optimizer(model: nn.Module, config: dict):
    """
    input:
    model:model
    config:optimizer_nameやlrが入ったものを渡す
    
    output:optimizer
    """
    optimizer_name = config.optimizer_name
    if 'Adam' == optimizer_name:
        return Adam(model.parameters(),
                    lr=config.lr,
                    weight_decay=config.weight_decay,
                    amsgrad=config.amsgrad)
    elif 'RAdam' == optimizer_name:
        return optim.RAdam(model.parameters(),
                           lr=config.lr,
                           weight_decay=config.weight_decay)
    else:
        raise NotImplementedError

# ====================================================
# scheduler
# ====================================================
def get_scheduler(optimizer):
    if CFG.scheduler=='ReduceLROnPlateau':
        """
        factor : 学習率の減衰率
        patience : 何ステップ向上しなければ減衰するかの値
        eps : nanとかInf回避用の微小数
        """
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=CFG.factor, patience=CFG.patience, verbose=True, eps=CFG.eps)
    elif CFG.scheduler=='CosineAnnealingLR':
        """
        T_max : 1 半周期のステップサイズ
        eta_min : 最小学習率(極小値)
        """
        scheduler = CosineAnnealingLR(optimizer, T_max=CFG.T_max, eta_min=CFG.min_lr, last_epoch=-1)
    elif CFG.scheduler=='CosineAnnealingWarmRestarts':
        """
        T_0 : 初期の繰りかえし回数
        T_mult : サイクルのスケール倍率
        """
        scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=CFG.T_0, T_mult=1, eta_min=CFG.min_lr, last_epoch=-1)
    else:
        raise NotImplementedError
    return scheduler
  • timm.create_modelnum_classes指定しない例が多い気がしますがどういうことを考慮してるのか教えてもらいたいです。

参考にしたサイト

pytorch-lightning.readthedocs.io

qiita.com

⚡️Pytorch Lightning + Grid Mask + Ranger Opt + W&B | Kaggle

あとがき

とりあえず連休中PyTorch Lightningと戯れたことをアウトプットしてみましたが結構残課題があったり未確認な点も多かったりと中途半端なものになってしまいました。きれいなものを作ることが目的ではないですが、今後はコンペの実践を通してブラッシュアップして行きたいと思います。

今までのコンペはいわゆるピュアpythonで取り組んでましたが、ループのコード、ログの設定、モデル保存の条件式等学習の本質的なコードと同量くらいの機能的なコードがありましたがその部分がライブラリの昨日に統一されてきれいにもなり、フォーマットを気にすることもなくなり良かったと思ってます。今後は今回載せたパイプラインの改善だけでなく他のタスク向けのものも作ったりしながらコーディング技術と機械学習への理解を深めていきたいです。

これにて私の連休は終了です!