通常、機械学習モデルはjupyterノートブックに組み込まれています。コードは、非常に長い式のシートではなく、記述された関数の「ひざの上で」呼び出されるように見えます。 このようなコードをサポートすることはほとんど不可能であるため、各プロジェクトはほとんどゼロから書き直されます。 そして、本番環境でこのコードを実装することを考えることさえ怖いです。
したがって、本日、データセットおよびデータサイエンスモデルを操作するためのPythonライブラリのプレビューを厳格な裁判所に提出します。 これにより、Pythonコードは次のようになります。
my_dataset. load('/some/path'). normalize(). resize(shape=(256, 256, 256)). random_rotate(angle=(-30, 30)). random_crop(shape=(64, 64, 64)) for i in range(MAX_ITER): batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True) # ,
この記事では、コードをシンプルで理解しやすく、便利にするための主要なクラスとメソッドについて学びます。
図書館はまだ最終仕上げを行っており、まだ一般公開されていません。
この記事は完全なドキュメントではなく、ライブラリの簡単な説明とその使用例にすぎません。
あなたのコメントは、ライブラリを完成させ、必要な機能を含めるのに役立ちます。
データセット
データの量は非常に大きくなる可能性があり、データの処理が開始されるまでに、たとえば徐々にデータが到着する場合など、すべてのデータがまったくない場合があります。 したがって、 Dataset
クラスはそれ自体にデータを格納しません。 インデックス-データの要素のリスト(識別子または単にシリアル番号の場合もあります)、およびデータを操作するためのメソッドが定義されているBatch
クラスが含まれます。
dataset = Dataset(index = some_index, batch_class=DataFrameBatch)
Dataset
の主な目的は、バッチの形成です。
batch = dataset.next_batch(BATCH_SIZE, shuffle=True) # batch - DataFrameBatch, # BATCH_SIZE
または、ジェネレータを呼び出すことができます:
for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): # batch - DataFrameBatch
バッチは、厳密に順序付けられた状態または無秩序に収集され、無限に繰り返されるか、データに応じて1サイクルだけ実行されます。 状況に応じて、ステップごとに異なるサイズのバッチを作成することもできます。
反復に加えて、別の便利な操作がDataset
で利用できますcv_split
データセットをトレイン、テスト、および検証に分割します。 そして、これは特に便利ですが、それぞれが再びデータセットです。
dataset.cv_split([0.7, 0.2, 0.1]) # 70 / 20 / 10 # for i in range(MAX_ITER): batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True) # ,
索引
データセットの要素のアドレス指定は、インデックスを使用して行われます。 識別子のセット(クライアント、トランザクション、CTスナップショット)または単なるシリアル番号(たとえば、 numpy.arange(N)
)にすることができます。 データセットは(ほぼ)任意に大きく、RAMに収まらない場合があります。 しかし、これは必須ではありません。 結局、データ処理はバッチで実行されます。
インデックスの作成は非常に簡単です。
ds_index = DatasetIndex(sequence_of_item_ids)
シーケンスは、リスト、 numpy
配列、 pandas.Series
またはその他のpandas.Series
データ型にすることができます。
ソースデータが別のファイルに保存されている場合、これらのファイルのリストからすぐにインデックスを作成すると便利です。
ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True)
ここで、インデックス要素は、指定されたディレクトリからのファイル名(拡張子なし)です。
データセットの要素(3次元CT画像など)が別のディレクトリに保存されることがあります。
ds_index = FilesIndex(path='/ct_images_??/*', dirs=True)
これにより、 /ct_images_01
、 /ct_images_02
、 /ct_images_02
などからすべてのサブディレクトリの共通インデックスが構築されます。 ファイルインデックスは、その要素のフルパスを記憶しています。 したがって、後でload
またはsave
メソッドで、パスindex.get_fullpath(index_item)
簡単に取得できます。
ほとんどの場合、インデックスを操作する必要はありませんが、必要な作業はすべて内部で行われ、すでにバッチ全体でのみ作業しています。
クラスバッチ
データのすべてのストレージロジックと処理メソッドは、 Batch
クラスで定義されます。 例としてCT画像を操作するためのクラスを作成しましょう。 CTImagesBatch
の子孫となる基本クラスBatch
には、このバッチの要素のリストとNone
初期化されたdata
属性を格納する属性index
が既にあります。 そして、これで十分であるため、コンストラクターを再定義しません。
したがって、すぐにload
action
メソッドの作成に進みます。
class CTImagesBatch(Batch): @action def load(self, src, fmt): if fmt == 'dicom': self.data = self._load_dicom(src) elif fmt == 'blosc': self.data = self._load_blosc(src) elif fmt == 'npz': self.data = self._load_npz(src) else: raise ValueError("Incorrect format") return self
まず、メソッドの前に@action
デコレータを@action
必要があります(少し後で理由がわかります)。
次に、 Batch
オブジェクトを返す必要があります。 同じクラスの新しいオブジェクト(この場合はCTImagesBatch)、または別のクラスのオブジェクト(ただし、確かにBatch
子孫)か、単純にself
返すことができます。
このアプローチにより、データに対する一連のアクションを記述することができます。 さらに、処理中、データは内容だけでなく、形式と構造も異なる場合があります。
プライベートメソッド_load_dicom
、 _load_blosc
および_load_npz
時間を無駄にしません。 特定の形式のファイルからデータをロードし、3次元のnumpy
配列[バッチサイズ、画像の幅、画像の高さ]を返すことができます。 主なことは、ここで各バッチのデータの配置方法を決定したことであり、この配列を引き続き使用します。
次に、非常に複雑な画像処理を実行するvery_complicated_processing
メソッドを記述します。 バッチ内の画像は互いに独立しているため、それらを並行して処理すると便利です。
class CTImagesBatch(Batch): ... @action @inbatch_parallel(target='threads') def very_complicated_processing(self, item, *args, **kwargs): # ... return processed_image_as_array
つまり、メソッドは単一のスナップショットを処理しているように記述され、このスナップショットのインデックスは最初のパラメーターで渡されます。
並列処理のマジックを機能させるには、並列化技術(プロセス、スレッドなど)が設定されているデコレータ、および並列化の前後に呼び出される前処理および後処理関数でメソッドをラップする必要があります。
ところで、 async
メソッドとして集中的な入出力を伴う操作を記述し、 target='async'
を使用して並列化すると、データのロードとアンロードが大幅に高速化されます。
これがすべてプログラミングの利便性を高めることは明らかですが、ここで並列処理が必要かどうかの「 思考 」を完全に排除するわけではありません。
すべてのaction
メソッドが記述されたら、バッチを操作できます。
for i in range(MAX_ITER): batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True) processed_batch = batch.load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)) .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # , processed_batch
見た目は良さそうですが、どういうわけか、バッチ処理の反復とデータ処理が混在しているのは間違っています。 はい。そして、 next_batch
以外に何も存在しないように、モデルのトレーニングサイクルを可能な限り減らしたいとnext_batch
ます。
一般に、一連のaction
メソッドをデータセットレベルに移動する必要があります。
パイプライン
そしてそれを行うことができます。 結局のところ、これらすべてのaction
デコレータがフェンスで囲まれたのは無駄ではありません。 メソッドをデータセットレベルに転送するというtransferringな魔法を隠しています。 だから書くだけ:
ct_images_pipeline = ct_images_dataset.pipeline(). .load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)). .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # ... for i in range(MAX_ITER): batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True) # ,
新しいDataset
下位クラスを作成して、これらのメソッドをすべて記述する必要はありません。 これらは対応するBatch
クラスにあり、 @action
デコレータでマークされています。つまり、 Dataset
クラスにあるかのように安全に呼び出すことができます。
もう1つのトリックは、このアプローチでは、すべてのaction
メソッドが「遅延」(遅延)になり、遅延して実行されることです。 つまり、 next_batch
呼び出されたときにこのバッチが形成された瞬間に、各バッチに対して読み込み、処理、サイズ変更などのアクションが実行されます。
また、各バッチの処理には時間がかかることがあるため、事前にバッチを作成しておくと便利です。 これは、モデルトレーニングがGPUで実行される場合に特に重要です。これは、新しいバッチを見越して単純なGPUがその高性能のすべての利点を簡単に「食べる」ことができるためです。
batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3)
prefetch
パラメーターは、3つのバッチを並行して読み取る必要があることを示します。 さらに、並列化テクノロジ(プロセス、スレッド)を指定できます。
データセットの結合
実際の機械学習タスクでは、単一のデータセットを扱う必要はほとんどありません。 ほとんどの場合、少なくとも2つのデータセットXとYがあります。たとえば、家のパラメーターのデータとその値のデータです。 コンピュータービジョンタスクでは、画像自体に加えて、クラスラベル、セグメント化マスク、境界ボックスがまだあります。
一般に、複数のデータセットから並列バッチを形成できると便利です。 このために、 join
操作を実行join
か、 JointDataset
作成できます。
共同データセット
バッチの並列反復のみが必要な場合は、単一のデータセットを作成する方が便利です。
joint_dataset = JointDataset((ds_X, ds_Y))
ds_X
とds_Y
が同じインデックスに基づいていない場合、インデックスの長さが同じで、 ds_Y
が同じであることが重要です。つまり、 ds_Y[i]
の値はds_X[i]
値に対応します。 この場合、データセットの作成は少し異なります。
joint_dataset = JointDataset((ds_X, ds_Y), align='order')
そして、すべてが完全に標準的な方法で行われます:
for i in range(MAX_ITER): batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True)
next_batch
が返すのは1つのバッチではなく、各データセットからのバッチを含むタプルだけです。
当然、 JointDataset
はパイプラインで構成することもできます。
pl_images = ct_images_ds.pipeline() .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256,256,256)) .segment_lungs() pl_labels = labels_ds.pipeline() .load('/other/path', 'csv') .apply(lambda x: (x['diagnosis'] == 'C').astype('int')) full_ds = JointDataset((pl_images, pl_labels), align='same') for i in range(MAX_ITER): images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True) # ,
また、パイプラインはデータセットのコンポーネントであるため、画像とラベルの読み込みと処理はnext_batch
呼び出されたときにのみ開始されます。 つまり、すべての計算が実行され、必要な場合にのみバッチが形成されます。
結合操作
ただし、データセットで操作を実行し、別のデータセットからデータを適用する必要がある場合は、他の状況もあります。
これは、CT画像を使用した例で最もよく実証されます。 がんの成長の座標とサイズを読み込み、それらから3次元マスクを形成します。
pl_masks = nodules_ds.pipeline() .load('/other/path', 'csv') .calculate_3d_masks()
CT画像を読み込んでマスクを適用し、癌性領域のみを選択します。
pl_images = ct_images_ds.pipeline(). .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256, 256, 256)) .join(pl_masks) .apply_masks(op='mult')
join
は、データセットを指定します。 このため、次のaction
メソッド(この例ではapply_masks
)は、このデータセットのバッチを最初の引数として使用します。 バッチだけでなく、必要なバッチだけです。 たとえば、 ct_images_ds
の現在のバッチに画像ct_images_ds
、および14が含まれている場合、マスク付きの添付のバットは画像ct_images_ds
、および14にも適用されます。
当然、最初にjoin
せずに明示的に渡すこともできるため、この引数を考慮してapply_masks
メソッドを記述する必要があります。 さらに、 action
メソッドでは、バッチの要素のインデックスと識別子について考えることができなくなります-マスクの配列を画像の配列に適用するだけです。
繰り返しますが、 pl_images.next_batch
を呼び出すまで、画像もマスクもダウンロードと計算は開始されませんpl_images.next_batch
すべてをまとめる
それでは、完全なワークフローデータサイエンスプロジェクトがどのようになるかを見てみましょう。
- インデックスとデータセットを作成する
ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True) ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch)
前処理を行い、処理した画像を保存します
ct_images_dataset.pipeline() .load(None, 'dicom') # dicom .hu_normalize() .resize(shape=(256, 256, 256)) .segment_lungs() .save('/preprocessed/images', 'blosc') .run(BATCH_SIZE, shuffle=False, one_pass=True)
モデルのデータの準備と拡張について説明します
ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*') ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) # ct_images_pipeline = ct_preprocessed_dataset.pipeline() .load(None, 'blosc') .split_to_patches(shape=(64, 64, 64)) # ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) ct_masks_pipeline = ct_masks_ds.pipeline(). .load('/preprocessed/masks', 'blosc') .split_to_patches(shape=(64, 64, 64)) # full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline))
トレーニングバッチを作成し、モデルをトレーニングします
full_ds.cv_split([0.8, 0.2]) for i in range(MAX_ITER): images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True) # ,
- モデルの品質を確認します
for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): #
クリアで高品質なコードの開発、複雑なデータの前処理による以前に作成されたモデルの再利用、さらには実稼働対応システムの開発まで、はるかに迅速に行える便利なライブラリがあります。
そして今、問題は次のとおりです。他にライブラリに追加する価値があるものは何か データやモデルを扱う際に何が本当に不足していますか?