こんにちは
2021年の夏の連休も2021年に引き続きほぼ予定がなかったので、何らかのコンペに充てる気満々だったのですが、いろんな要因で時間を捧げるコンペはありませんでした。
コンペはしてなかったのですが、PyTorch LightningとWeights & Biases を使った画像タスクのパイプライン作成をしていました。いままではそういった物は持ってなくて最初のサブまでとても時間がかかっていたので、パイプラインをいじって学習して推論して初サブまでが早くなればいいと思っています。
今回は作ったものを載せつつ自分用のメモを書いていこうと思います。間違いや改善点ありましたらご指摘ください。
Config
class CFG: debug = True exp_name = "ex005_local" seed = 29 # model model_name = 'efficientnet_b0' img_size = 224 in_chans = 1 target_col = 'target' # 目標値のある列名 target_size = 1 # optimizer optimizer_name = 'RAdam' lr = 1e-3 weight_decay = 1e-5 amsgrad = False # scheduler epochs = 10 scheduler = 'CosineAnnealingLR' T_max = 10 min_lr = 1e-5 # criterion criterion_name = 'CrossEntropyLoss' # training train = True inference = False n_fold = 5 trn_fold = [0] precision = 16 #[16, 32, 64] grad_acc = 1 # DataLoader loader = { "train": { "batch_size": 128, "num_workers": 0, "shuffle": True, "pin_memory": True, "drop_last": True }, "valid": { "batch_size": 128, "num_workers": 0, "shuffle": False, "pin_memory": True, "drop_last": False } }
- wandbにconfigも記録するためにはdict型にする必要があるみたいなのでこの設定の持ち方は検討が必要な気がしています
- そもそもこの設定の持ち方、dataclassでもなさそうだしなんていう呼び方をするのでしょう・・・?インスタンス作らなくても
CFG.model_name
で値取れますよね
Directory
# 環境によって処理を変えるためのもの import sys IN_COLAB = 'google.colab' in sys.modules IN_KAGGLE = 'kaggle_web_client' in sys.modules LOCAL = not (IN_KAGGLE or IN_COLAB) if IN_KAGGLE: INPUT_DIR = Path('../input/BirdClef2021') OUTPUT_DIR = './' elif IN_COLAB: INPUT_DIR = Path('/content/input/') OUTPUT_DIR = f'/content/drive/MyDrive/kaggle/BirdClef2021/{CFG.exp_name}/' if LOCAL: INPUT_DIR = Path("F:/Kaggle/BirdClef2021/data/input/") OUTPUT_DIR = f'F:/Kaggle/BirdClef2021/data/output/{CFG.exp_name}/' TRAIN_DIR = INPUT_DIR / "train" TEST_DIR = INPUT_DIR / "test" df_train = pd.read_csv(INPUT_DIR / "train_labels.csv") df_test = pd.read_csv(INPUT_DIR / "sample_submission.csv") if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) def get_filepath(name, folder=TRAIN_DIR): path = os.path.join(folder, name[0], f'{name}.npy') return path df_train['image_path'] = df_train['id'].apply(lambda x: get_filepath(x, TRAIN_DIR)) df_test['image_path'] = df_test['id'].apply(lambda x: get_filepath(x, TEST_DIR))
- 環境に応じた入力データの場所と出力先の設定をします
- kaggle,colab,localの判断できるの結構便利だと思ってる
- データのpath用の列も予め用意しておいて問題ないと思います
transform
def get_transforms(phase: str): if phase == 'train': return Compose([ Resize(CFG.img_size, CFG.img_size), Transpose(p=0.5), HorizontalFlip(p=0.5), VerticalFlip(p=0.5), ShiftScaleRotate(p=0.5), Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], ), ToTensorV2(p=1.0), ]) elif phase == 'valid': return Compose([ Resize(CFG.img_size, CFG.img_size), Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], ), ToTensorV2(p=1.0), ])
- normalizeしてなかったらtrainがuint8のままモデルに突っ込まれてエラー出てたのでここも要改善
dataset
class TrainDataset(Dataset): def __init__(self, df, transform=None): self.df = df self.image_paths = df['image_path'].values self.targets = df[CFG.target_col].values self.transform = transform def __len__(self): return len(self.df) def __getitem__(self, idx): file_path = self.image_paths[idx] # タスクに合わせたロード方法 #image = np.load(file_path).astype(np.float32)# (6, 273, 256) #image = np.vstack(image).transpose((1, 0))# (1638, 256) -> (256, 1638) image = cv2.imread(file_path) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if self.transform: image = self.transform(image=image)["image"] else: image = image[np.newaxis,:,:] image = torch.from_numpy(image).float() #label = torch.tensor(self.targets[idx]).float() label = torch.tensor(self.targets[idx]).long() return image, label
LightningDataModule
PyTorch Lightning要素
class DataModule(pl.LightningDataModule): def __init__(self, train_data, valid_data, test_data): super().__init__() self.train_data = train_data self.valid_data = valid_data self.test_data = test_data # 必ず呼び出される関数 def setup(self, stage=None): self.train_dataset = TrainDataset(self.train_data, transform=get_transforms(phase='train')) self.valid_dataset = TrainDataset(self.valid_data, transform=get_transforms(phase='valid')) self.test_dataset = TrainDataset(self.test_data, transform=get_transforms(phase='valid')) # Trainer.fit() 時に呼び出される def train_dataloader(self): return DataLoader(self.train_dataset, batch_size=CFG.loader["train"]["batch_size"], shuffle=True, num_workers=CFG.loader["train"]["num_workers"], pin_memory=True, drop_last=True) # Trainer.fit() 時に呼び出される def val_dataloader(self): return DataLoader(self.valid_dataset, batch_size=CFG.loader["valid"]["batch_size"], shuffle=False, num_workers=CFG.loader["valid"]["num_workers"], pin_memory=True, drop_last=False) def test_dataloader(self): return DataLoader(self.test_dataset, batch_size=CFG.loader["valid"]["batch_size"], shuffle=False, num_workers=CFG.loader["valid"]["num_workers"], pin_memory=True, drop_last=False)
LightningModule
これもPyTorch Lightning要素
class Trainer(pl.LightningModule): def __init__(self, cfg): super().__init__() self.cfg = cfg self.model = get_model(cfg) self.criterion = get_criterion() def forward(self, x): output = self.model(x) return output def training_step(self, batch, batch_idx): x, y = batch # mixup とかしたい場合はここに差し込む output = self.forward(x) labels = y loss = self.criterion(output, labels) try: train_score = accuracy_score(labels.detach().cpu(), output.argmax(1).detach().cpu()) self.log("train_score", train_score, prog_bar=True, logger=True) self.log('train_loss', loss) except: pass return {"loss": loss, "predictions": output, "labels": labels} def training_epoch_end(self, outputs): preds = [] labels = [] for output in outputs: preds += output['predictions'] labels += output['labels'] labels = torch.stack(labels) preds = torch.stack(preds) train_score = accuracy_score(labels.detach().cpu(), preds.argmax(1).detach().cpu()) self.log("mean_train_score", train_score, prog_bar=True, logger=True) self.log("lr", self.optimizer.param_groups[0]['lr'], prog_bar=True, logger=True) def validation_step(self, batch, batch_idx): x, y = batch output = self.forward(x) labels = y#.unsqueeze(1) loss = self.criterion(output, labels) self.log('val_loss', loss, on_step= True, prog_bar=True, logger=True) return {"predictions": output, "labels": labels} def validation_epoch_end(self, outputs): preds = [] labels = [] for output in outputs: preds += output['predictions'] labels += output['labels'] labels = torch.stack(labels) preds = torch.stack(preds) val_score = accuracy_score(labels.detach().cpu(), preds.argmax(1).detach().cpu()) self.log("val_score", val_score, prog_bar=True, logger=True) def test_step(self, batch, batch_idx): x = batch output = self(x).sigmoid() return output def configure_optimizers(self): ### optimizer #optimizer = get_optimizer(self.parameters(), self.cfg) self.optimizer = get_optimizer(self, self.cfg) self.scheduler = get_scheduler(self.optimizer) return {'optimizer': self.optimizer, 'lr_scheduler': self.scheduler}
ログを記録したい値は、self.log
に指定すると自動で記録される。epochは特に指定してないですが記録されちゃいます。
- train_scoreとtrain_lossがstepでlogつけてるのですが明らかに頻度が少ないのでtryからだして様子を見るなど残課題が残っています。
main部分
def train() -> None: for fold in range(CFG.n_fold): if not fold in CFG.trn_fold: continue print(f"{'='*38} Fold: {fold} {'='*38}") # Logger #====================================================== lr_monitor = LearningRateMonitor(logging_interval='step') # 学習済重みを保存するために必要 loss_checkpoint = ModelCheckpoint( dirpath=OUTPUT_DIR, filename=f"best_loss_fold{fold}", monitor="val_loss", save_last=True, save_top_k=1, save_weights_only=True, mode="min", ) auc_checkpoint = ModelCheckpoint( dirpath=OUTPUT_DIR, filename=f"best_auc_fold{fold}", monitor="val_score", save_top_k=1, save_weights_only=True, mode="max", ) wandb_logger = WandbLogger( project='atma11', group= f'{CFG.exp_name}', name = f'Fold{fold}', save_dir=OUTPUT_DIR ) data_module = DataModule( df_train[df_train['fold']!=fold], df_train[df_train['fold']==fold], df_train[df_train['fold']==fold], ) data_module.setup() #early_stopping_callback = EarlyStopping(monitor='val_auc',mode="max", patience=2) trainer = pl.Trainer( logger=wandb_logger, callbacks=[loss_checkpoint, auc_checkpoint, lr_monitor], default_root_dir=OUTPUT_DIR, gpus=1, progress_bar_refresh_rate=1, accumulate_grad_batches=CFG.grad_acc, max_epochs=CFG.epochs, precision=CFG.precision, benchmark=False, deterministic=True, ) model = Trainer(CFG) trainer.fit(model, data_module) wandb.finish()
- 複数fold継続して学習できるようにしてます
- ModelCheckpoint
- monitorに指定した値(self.logのname?)で更新されたら保存、最後は保存等指定できる
- 複数実験で保存するファイル名がかぶっても上書きされないようです
- 私の環境ではTrainer側の
default_root_dir
でなくModelCheckpoint側のdirpath
にディレクトリを指定しないとその直下にweightは保存してもらえませんでした。
- WandbLogger
project
はコンペ名で良さそう- group名が同じものに関してはまとめられる。グラフの線は平均値、分散が影として描画されます。
- グラフ同士で比較したい場合はgroupを分ける必要がある。
その他
Split
df_train["fold"] = -1 Fold = StratifiedKFold(n_splits=CFG.n_fold, shuffle=True, random_state=CFG.seed) for n, (train_index, val_index) in enumerate(Fold.split(df_train, df_train[CFG.target_col])): df_train.loc[val_index, 'fold'] = int(n) df_train['fold'] = df_train['fold'].astype(int) print(df_train.groupby(['fold', CFG.target_col]).size())
Get関数
# ==================================================== # model # ==================================================== class CustomModel(nn.Module): def __init__(self, cfg, pretrained=False): super().__init__() self.cfg = cfg self.model = timm.create_model(model_name=self.cfg.model_name, pretrained=pretrained, in_chans=self.cfg.in_chans, num_classes=self.cfg.target_size) def forward(self, x): output = self.model(x) return output def get_model(cfg): model = CustomModel(cfg, pretrained=cfg.pretrained) # plだと要らない? # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # model.to(device) # model.eval() return model # ==================================================== # criterion # ==================================================== def get_criterion(): if CFG.criterion_name == 'BCEWithLogitsLoss': # plだとto(device)いらない criterion = nn.BCEWithLogitsLoss(reduction="mean") if CFG.criterion_name == 'CrossEntropyLoss': criterion = nn.CrossEntropyLoss() else: raise NotImplementedError return criterion # ==================================================== # optimizer # ==================================================== def get_optimizer(model: nn.Module, config: dict): """ input: model:model config:optimizer_nameやlrが入ったものを渡す output:optimizer """ optimizer_name = config.optimizer_name if 'Adam' == optimizer_name: return Adam(model.parameters(), lr=config.lr, weight_decay=config.weight_decay, amsgrad=config.amsgrad) elif 'RAdam' == optimizer_name: return optim.RAdam(model.parameters(), lr=config.lr, weight_decay=config.weight_decay) else: raise NotImplementedError # ==================================================== # scheduler # ==================================================== def get_scheduler(optimizer): if CFG.scheduler=='ReduceLROnPlateau': """ factor : 学習率の減衰率 patience : 何ステップ向上しなければ減衰するかの値 eps : nanとかInf回避用の微小数 """ scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=CFG.factor, patience=CFG.patience, verbose=True, eps=CFG.eps) elif CFG.scheduler=='CosineAnnealingLR': """ T_max : 1 半周期のステップサイズ eta_min : 最小学習率(極小値) """ scheduler = CosineAnnealingLR(optimizer, T_max=CFG.T_max, eta_min=CFG.min_lr, last_epoch=-1) elif CFG.scheduler=='CosineAnnealingWarmRestarts': """ T_0 : 初期の繰りかえし回数 T_mult : サイクルのスケール倍率 """ scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=CFG.T_0, T_mult=1, eta_min=CFG.min_lr, last_epoch=-1) else: raise NotImplementedError return scheduler
timm.create_model
でnum_classes
指定しない例が多い気がしますがどういうことを考慮してるのか教えてもらいたいです。
参考にしたサイト
pytorch-lightning.readthedocs.io
⚡️Pytorch Lightning + Grid Mask + Ranger Opt + W&B | Kaggle
あとがき
とりあえず連休中PyTorch Lightningと戯れたことをアウトプットしてみましたが結構残課題があったり未確認な点も多かったりと中途半端なものになってしまいました。きれいなものを作ることが目的ではないですが、今後はコンペの実践を通してブラッシュアップして行きたいと思います。
今までのコンペはいわゆるピュアpythonで取り組んでましたが、ループのコード、ログの設定、モデル保存の条件式等学習の本質的なコードと同量くらいの機能的なコードがありましたがその部分がライブラリの昨日に統一されてきれいにもなり、フォーマットを気にすることもなくなり良かったと思ってます。今後は今回載せたパイプラインの改善だけでなく他のタスク向けのものも作ったりしながらコーディング技術と機械学習への理解を深めていきたいです。
これにて私の連休は終了です!