MONAIを用いた胸部セグメンテーションチュートリアル
医療画像解析フレームワークであるMONAIを用いた胸部セグメンテーションのチュートリアルをデータ作成から行います。また、評価はWandBで行います。
Created on October 26|Last edited on December 29
Comment
はじめに全体の流れMONAIとは何か概要MONAIの主な目的MONAIでできることMONAIの開発方法MONAIについてのまとめMONAIについての参照データ準備環境のセットアップデータセットのダウンロードセグメンテーション画像の確認データセット作成学習:MONAIを用いたセグメンテーションデータの分割前処理モデルの設定学習ループの実行推論:MONAIを用いたセグメンテーション予測結果の確認おわりに
はじめに
- 東北大学と協力し放射線分野でのLLM開発
- 医療ドメインでのLLMデータセット作成支援
- 医療従事者向けの医療AI研修
- 医療分野での音声認識データセット販売
本記事では、医学分野で注目される深層学習フレームワークのMONAIを紹介します。
放射線医療現場でのAIを用いたプロジェクトを進めるにあたり、さまざまな実験を試す必要があります。そこで、この記事では放射線治療現場でのAIを用いた自動輪郭抽出の実験管理をWeight&Biases(wandB)のプラットフォームを使って管理する方法について共有します。
このチュートリアル記事は通常のMONAIの技術記事では扱わない、DICOMデータから実際にどのようにMONAIで扱える形式にするのかについても解説します。
Open in Colabからコード全体を取得できます。
この記事を読むことで得られる知識
- MONAIとは何か
- 放射線治療現場でのAIを用いた自動輪郭抽出の現状を理解
- DICOMデータからのMONAIの利用方法
- Weight&Biasesを用いた実験管理
全体の流れ
この技術記事では、以下の4つの構成で進めます。
- MONAIとは何か
- データ準備
- 学習
- 推論
GitHub: GitHub
MONAIとは何か
概要
MONAI(Medical Open Network for AI)は、医療画像解析のためのオープンソースの深層学習(ディープラーニング)フレームワークです。
このフレームワークは、医療画像解析のタスクを効率的に実行できるように設計されており、特にPythonプログラミング言語とPyTorchディープラーニングライブラリをベースにしています。
MONAIの主な目的
主な目的な以下の通りです。
- 医療画像解析に特化したディープラーニングの機能を提供すること。
- コミュニティによる開発と改善を促進すること。
- 研究者や開発者が簡単にアクセスできる、再利用可能なコンポーネントを提供すること。
MONAIでできること
医療画像解析に関連するさまざまな以下のような機能を提供しています
- 医療画像の前処理
- データ拡張
- データ変換
- ネットワークアーキテクチャ
- 損失関数
- 評価指標
- 後処理(ポストプロセッシング)
など
これにより、医療画像解析のタスクを効果的に実行し、より良い結果を得ることができます。
MONAIの開発方法
MONAIはコミュニティ主導の開発が行われており、研究者や開発者が共同でフレームワークの改善や新機能の追加に取り組んでいます。
これにより、最新の研究成果や技術が迅速に実装され、広く利用可能になることが期待されています。
最新の研究成果が学会で発表されるより前に実装されていたりなど、開発が活発です。
MONAIについてのまとめ
MONAIは医療画像解析の分野での深層学習アプリケーション開発を支援するための、オープンソースで柔軟なフレームワークです。
このフレームワークを使用することで、開発者や研究者は医療画像解析の問題に対して効果的な解決策を構築できます。
MONAIについての参照
MONAIについて更に調べたい方は以下の資料を参照ください。MONAIドキュメントのAPIガイドなどよくまとまっておりわかりやすいです。
データ準備
このパートでは学習するデータを作成します。 AAPM2017で実施された胸部放射線のセグメンテーションチャレンジ「Auto-Segmentation for Thoracic Radiation Treatment Planning: A Grand Challenge」のデータを利用します。
今回はMONAIという医療系画像処理でよく使われるフレームワークを利用します。 MONAIで利用する場合は、少し工夫をする必要があります。 具体的には、領域予測したい領域を1、背景を0にしてDICOM形式(.dcm)から機械学習でよく利用されるNumpy(.npy)で保存します。
GitHub: GitHub
環境のセットアップ
データ作成で必要なライブラリをインストールします。
!pip install monai[pydicom]==1.0.0!pip install rt_utils -q!pip install nibabel -q!pip install pydicom -q!pip install wandb -q
データセットのダウンロード
以下のコマンドを実行することでデータをダウンロードできます。
!gdown --id 1TXH4PnMcU-23irtZZShZ80ClpJPedZL_ -O ./LCTSC_DICOM.zip!unzip LCTSC_DICOM.zip
セグメンテーション画像の確認
ダウンロードした画像を確認しましょう。DICOM画像のシリーズとRT(放射線療法)構造データから特定の領域(ここでは"肺(Lung_R)")のマスクを生成し、そのマスクの一部を表示します。
RT構造データは、患者の体内の特定の領域(例えば、腫瘍や器官)を識別するための情報を含んでおり、これを利用して3Dマスクを生成し、そのマスクを用いて画像上での領域の位置を確認することができます。
from rt_utils import RTStructBuilder # RT Structure(放射線治療計画)ファイルの操作に必要なライブラリimport matplotlib.pyplot as pltpatient_dir_path = "./LCTSC_DICOM/LCTSC-Train-S1-007/12-13-2003-NA-RTRCCTTHORAX8FHigh Adult-15875/0.000000-CTP1B3RespCT 3.0 B30f 50 Ex-86710"segment_path = "./LCTSC_DICOM/LCTSC-Train-S1-007/12-13-2003-NA-RTRCCTTHORAX8FHigh Adult-15875/1.000000-.simplified-17709/1-1.dcm"# 既存のRT構造データを読み込みますrtstruct = RTStructBuilder.create_from(dicom_series_path=patient_dir_path,rt_struct_path=segment_path)print(rtstruct.get_roi_names())# RT構造データから3Dマスクを取得しますmask_3d = rtstruct.get_roi_mask_by_name("Lung_R")# "Lung_R"という領域の一部を可視化first_mask_slice = mask_3d[:, :, 50]plt.imshow(first_mask_slice)plt.show()
データセット作成
内臓のセグメンテーションしたい領域を1、背景を0にしてDICOM形式(.dcm)から機械学習でよく利用されるNumpy(.npy)で保存します。
マスクデータの作成
各患者で学習用に右肺(Lung_R)の0と1の値で構成されたマスクデータを作成します。
3ステップでマスクデータを保存します。
- 再帰的に各患者ごとCTが入った0始まりのディレクトリと、セグメンテーションアノテーションされた1始まりのディレクトリ内のDICOMを取得
- 取得したパスをAIモデルで学習するためにCSVに保存
- 各患者ごとのマスクデータをDICOM形式からnumpy形式に変換し保存
それではまず、DICOMを取得していきましょう。
# Step1import osfrom glob import glob# DICOM画像が保存されているディレクトリのルートパスを設定patient_dcm_root_dir = "."# 患者CTデータのディレクトリを再帰的に取得search_dcm_dir = f"{patient_dcm_root_dir}/*/*/*/0*"patient_dcm_dirs = list(glob(search_dcm_dir))patient_dcm_dirs = sorted(patient_dcm_dirs)patient_dcm_dirs = [os.path.relpath(path, patient_dcm_root_dir) for path in patient_dcm_dirs]# 患者セグメンテーションアノテーションされたDICOMを再帰的に取得search_mask_dir = f"{patient_dcm_root_dir}/*/*/*/1*"patient_mask_dirs = list(glob(search_mask_dir))patient_mask_dirs = sorted(patient_mask_dirs)patient_mask_dcm_paths = [p + "/1-1.dcm" for p in patient_mask_dirs] # 各マスク画像のファイルパス(1-1.dcm)を取得patient_mask_dcm_paths = [os.path.relpath(path, patient_dcm_root_dir) for path in patient_mask_dcm_paths]print(len(patient_dcm_dirs), len(patient_mask_dcm_paths))
続いて、AIモデルで学習するためにCSVに保存していきます。
# Step2import pandas as pddf = pd.DataFrame()# 患者のDICOMディレクトリとマスクDICOMのパスを格納します。df["patient_dcm_dir"] = patient_dcm_dirsdf["patient_mask_dcm_path"] = patient_mask_dcm_paths# numpy形式で保存するマスクのパスdf["patient_mask_save_np_path"] = df['patient_dcm_dir'].apply(lambda x: x.split('/')[1]) + ".npy"# csvファイルの保存df.to_csv("train.csv", index=False)df.head()
最後に、各患者ごとのマスクデータをDICOM形式からnumpy形式に変換し保存します。
# Step3import numpy as np# データフレームの各行について、それぞれの患者のDICOMディレクトリとマスクDICOMのパス、保存するマスクのnumpy形式のパスを取得します。for index, row in df.iterrows():patient_dcm_dir = os.path.join(patient_dcm_root_dir,row['patient_dcm_dir'])patient_mask_dcm_path = os.path.join(patient_dcm_root_dir,row['patient_mask_dcm_path'])patient_mask_save_np_path = row['patient_mask_save_np_path']# DICOMファイルとRT構造ファイルからRT構造を作成します。rtstruct = RTStructBuilder.create_from(dicom_series_path=patient_dcm_dir,rt_struct_path=patient_mask_dcm_path)# "Lung_R"という名前の右肺の領域に対応する3Dマスクを取得します。mask_3d = rtstruct.get_roi_mask_by_name("Lung_R")print(mask_3d.shape)# 3Dマスクをnumpy形式で保存np.save(patient_mask_save_np_path, mask_3d)
学習:MONAIを用いたセグメンテーション
はじめに、実験に再現性を持たせるために乱数固定します。
import torchimport numpy as npimport randomdef seed_everything(seed_value):random.seed(seed_value) # Pythonの乱数を固定np.random.seed(seed_value) # Numpyの乱数を固定torch.manual_seed(seed_value) # PyTorchの乱数を固定if torch.cuda.is_available():torch.cuda.manual_seed(seed_value) # GPUを使う場合、PyTorchの乱数を固定torch.cuda.manual_seed_all(seed_value) # 全てのGPUを使う場合、PyTorchの乱数を固定torch.backends.cudnn.deterministic = True # cuDNNの決定論的モードをONtorch.backends.cudnn.benchmark = False # ベンチマークモードをOFFseed_everything(42)
学習に必要なライブラリをインポートします。
import osimport numpy as npimport pandas as pdimport matplotlib.pyplot as plt# 医療画像処理とAI関連ライブラリimport pydicom # DICOM形式の医療画像を読み書きするためのライブラリfrom rt_utils import RTStructBuilder # RT Structure(放射線治療計画)ファイルの操作に必要なライブラリimport torchfrom monai.transforms import ResizeD, Compose, LoadImaged, AddChanneld, ScaleIntensityd, ToTensordfrom monai.networks.nets import UNetfrom monai.networks.layers import Normfrom monai.metrics import DiceMetricfrom monai.losses import DiceLossfrom monai.data import DataLoader, Datasetfrom monai.utils import set_determinismfrom monai.inferers import sliding_window_inferenceimport wandb
データの分割
学習データ、評価データ、テストデータに分割します。それぞれ、12名(75.0%)、2名(12.5%)、2名(12.5%)で分割しています。
test_df = df[:2]val_df = df[2:4]train_df = df[4:]train_df.shape, val_df.shape, test_df.shape
前処理
MONAIでデータを扱えるように前処理を行います。
# 訓練、検証、テストのデータリストを初期化train_data = []val_data = []test_data = []patient_dcm_root_dir = './'# 訓練データfor index, row in train_df.iterrows():train_dict = {}# 画像とラベル(マスク)のフルパスをディクショナリに保存train_dict["image"] = os.path.join(patient_dcm_root_dir,row['patient_dcm_dir'])train_dict["label"] = os.path.join(patient_dcm_root_dir,row['patient_mask_save_np_path'])# ディクショナリを訓練データリストに追加train_data.append(train_dict)# 検証データfor index, row in val_df.iterrows():val_dict = {}# 画像とラベル(マスク)のフルパスをディクショナリに保存val_dict["image"] = os.path.join(patient_dcm_root_dir,row['patient_dcm_dir'])val_dict["label"] = os.path.join(patient_dcm_root_dir,row['patient_mask_save_np_path'])# ディクショナリを検証データリストに追加val_data.append(val_dict)# テストデータfor index, row in test_df.iterrows():test_dict = {}# 画像とラベル(マスク)のフルパスをディクショナリに保存test_dict["image"] = os.path.join(patient_dcm_root_dir,row['patient_dcm_dir'])test_dict["label"] = os.path.join(patient_dcm_root_dir,row['patient_mask_save_np_path'])# ディクショナリをテストデータリストに追加test_data.append(test_dict)
学習時に行う画像の読み込みや変換の処理を設定します。
# 訓練データのトランスフォームを定義# 画像とラベルのロード、次元の追加、リサイズ、強度のスケーリング、そしてテンソルへの変換を行いますtrain_transforms = Compose([LoadImaged(keys=["image", "label"]),AddChanneld(keys=["image", "label"]),ResizeD(keys=["image", "label"], spatial_size=(96, 96, 96)),ScaleIntensityd(keys="image"),ToTensord(keys=["image", "label"])])# 検証データのトランスフォームを定義val_transforms = Compose([LoadImaged(keys=["image", "label"]),AddChanneld(keys=["image", "label"]),ResizeD(keys=["image", "label"], spatial_size=(96, 96, 96)),ScaleIntensityd(keys="image"),ToTensord(keys=["image", "label"])])
学習時にデータを効率的に取り出せるようにデータローダを作成します。
# 訓練データセットとデータローダーを作成train_ds = Dataset(data=train_data, transform=train_transforms)train_loader = DataLoader(train_ds, batch_size=2, shuffle=True)# 検証データセットとデータローダーを作成val_ds = Dataset(data=val_data, transform=val_transforms)val_loader = DataLoader(val_ds, batch_size=2)# テストデータセットとデータローダーを作成test_ds = Dataset(data=test_data, transform=val_transforms)test_loader = DataLoader(test_ds, batch_size=1)
モデルの設定
学習用のモデルと学習に利用する変数を設定します。モデルはU-Netを3次元に拡張した3D U-Netを利用します。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')config = {"num_workers": 4,# train settings"train_batch_size": 2,"val_batch_size": 2,"learning_rate": 1e-3,"max_epochs": 100,"val_interval": 1, # 検証の間隔"device": device,"model_type": "unet","model_params": dict(dimensions=3,in_channels=1,out_channels=1,channels=(16, 32, 64, 128, 256),strides=(2, 2, 2, 2),num_res_units=2,norm=Norm.BATCH,),"loss_type": "DiceLoss","optimizer_type": "Adam","metric": 'DiceMetric'}# モデルを定義model = UNet(**config['model_params']).to(device)# 損失関数DiceLossloss_function = DiceLoss(sigmoid=True)# オプティマイザoptimizer = torch.optim.Adam(model.parameters(), config['learning_rate'])# 最大エポック数max_epochs = config['max_epochs']# 検証の間隔val_interval = config['val_interval']# 最高の評価指標の値とその時のエポック数を保存するための変数best_metric = -1best_metric_epoch = -1# エポックごとの損失値と評価指標の値を保存するためのリストepoch_loss_values = list()metric_values = list()# 評価関数dice_metric = DiceMetric(include_background=True, reduction='mean')
学習状態を記録す��wandBの設定を行います。実行時にAPI keyを入力するように言われるので、入力しましょう。
wandB起動時に学習時の設定を保存しています。これにより、実験時のパラメータがわかり、再現実験をしやすくなります。
wandb.init(project="Monai test", config=config)
学習ループの実行
モデルの設定が終わりましたので、学習していきましょう。
今回はエポックごとに評価データを用いて評価値を算出し、学習データに過学習しないようにしています。また、各エポックごとにモデルの評価値をwandBで記録します。
# 最大エポック数だけループを回すfor epoch in range(max_epochs):print('-' * 10)print(f"epoch {epoch + 1}/{max_epochs}")model.train()epoch_loss = 0step = 0# 訓練データローダーからバッチを取得for batch_data in train_loader:step += 1 # ステップ数をインクリメントinputs, labels = batch_data['image'].to(device), batch_data['label'].to(device)optimizer.zero_grad()outputs = model(inputs)loss = loss_function(outputs, labels)loss.backward()optimizer.step()epoch_loss += loss.item()print(f"{step}/{len(train_ds) // train_loader.batch_size}, train_loss: {loss.item():.4f}")epoch_loss /= stepepoch_loss_values.append(epoch_loss)print(f"epoch {epoch + 1} average loss: {epoch_loss:.4f}")wandb.log({'train/loss': epoch_loss}, step=epoch+1)# 検証間隔ごとに検証を行うif (epoch + 1) % val_interval == 0:model.eval()with torch.no_grad():metric_sum = 0.metric_count = 0for val_data in val_loader:val_inputs, val_labels = val_data['image'].to(device), val_data['label'].to(device)roi_size = (160, 160, 160)sw_batch_size = 4# 検証結果を取得val_outputs = sliding_window_inference(val_inputs, roi_size, sw_batch_size, model)# Diceメトリックを計算dice_metric(y_pred=val_outputs, y=val_labels)metric = dice_metric.aggregate().item()metric_values.append(metric, step=epoch+1)wandb.log({'val/metric': metric})# メトリックが改善していればモデルを保存if metric > best_metric:best_metric = metricbest_metric_epoch = epoch + 1torch.save(model.state_dict(), 'best_metric_model.pth')print('saved new best metric model')print(f"current epoch: {epoch + 1} current mean dice: {metric:.4f}"f" best mean dice: {best_metric:.4f} at epoch: {best_metric_epoch}")print(f"train completed, best_metric: {best_metric:.4f} at epoch: {best_metric_epoch}")# ベストスコアをwandbに記録wandb.log({"best_metric": best_metric, "best_metric_epoch": best_metric_epoch})
wandBでlossと評価値を確認すると確認するとepoch毎にlossは下がっていっていますが、評価値は40epochを境に下がっていっています。過学習していることがwandBを見ることでよくわかります。wandBでは学習しながらグラフを見ることができるため、学習がうまくできているかなどをリアルタイムで確認できます。
Run set
1
Run set
1
推論:MONAIを用いたセグメンテーション
テストデータに対して推論を行い、評価を確認します。測した結果はwandbのTableに保存します。
正解画像と予測画像を横並びに表示し、実際にどの程度セグメンテーションできているかを視覚的に確認できるようにしています。
CTは3Dデータなので、3Dでスライドしながら結果を確認していきます。
# 学習済みの重みをロードmodel_path = "best_metric_model.pth"model.load_state_dict(torch.load(model_path))model.eval() # モデルを評価モードに設定columns = ['ground_truth', 'prediction', 'metric']test_table = wandb.Table(columns=columns)# テストデータに対して推論を行うwith torch.no_grad(): # 勾配の計算を無効化for i, test_data in enumerate(test_loader):test_inputs, test_labels = test_data['image'].to(device), test_data['label'].to(device)roi_size = (160, 160, 160)sw_batch_size = 4# モデルを介して入力を伝播し、推論結果を取得test_outputs = sliding_window_inference(test_inputs, roi_size, sw_batch_size, model)# sigmoid関数を適用し0~1の値に変換test_outputs_binary = torch.sigmoid(test_outputs)# 0.5を閾値として、それ以下の値を0、それ以上の値を1に変換test_outputs_binary = (test_outputs_binary >= 0.5).float()test_outputs_np = test_outputs_binary.cpu().numpy()test_labels_np = test_labels.cpu().numpy()# 可視化のために次元を変換 (1, 96, 96, 96) => (96, 96, 96)test_outputs_np = np.reshape(test_outputs_np, (96, 96, 96))test_labels_np = np.reshape(test_labels_np, (96, 96, 96))# NumPy配列をファイルに保存np.save(f'test_output_{i}.npy', test_outputs_np)# Diceメトリックを計算metric = dice_metric(y_pred=test_outputs, y=test_labels)metric = metric.cpu().item()test_table.add_data([wandb.Image(img) for img in test_labels_np], [wandb.Image(img) for img in test_outputs_np], metric)test_metric = dice_metric.aggregate().item() # Diceメトリックの平均を計算print(f"Dice metric on test data: {test_metric:.4f}")wandb.log({'test_metric': test_metric})wandb.log({"test_predictions" : test_table})wandb.finish()
予測結果の確認
wandBで予測結果を確認した結果が下の図になります。
左側の列が正解画像、真ん中が予測画像、右側の列が評価値となっています。実際に画像を見てみると、あまりうまく予測できていないことがわかります。
wandBを利用することで3D画像でも簡単に予測結果の確認ができました。
おわりに
この記事では放射線治療現場でのAIを用いた自動輪郭抽出の実験管理をWeight&Biases(wandB)のプラットフォームを使って管理する方法について共有しました。
wandBを利用することで、学習の管理から結果の管理まで簡単にすることができます。とても便利なので、ぜひ利用してみてみください。
Add a comment
Tags: Articles, Community Posts
Iterate on AI agents and models faster. Try Weights & Biases today.