データサイエンスと品質コード

通常、機械学習モデルはjupyterノートブックに組み込まれています。コードは、非常に長い式のシートではなく、記述された関数の「ひざの上で」呼び出されるように見えます。 このようなコードをサポートすることはほとんど不可能であるため、各プロジェクトはほとんどゼロから書き直されます。 そして、本番環境でこのコードを実装することを考えることさえ怖いです。







したがって、本日、データセットおよびデータサイエンスモデルを操作するためのPythonライブラリのプレビューを厳格な裁判所に提出します。 これにより、Pythonコードは次のようになります。







my_dataset. load('/some/path'). normalize(). resize(shape=(256, 256, 256)). random_rotate(angle=(-30, 30)). random_crop(shape=(64, 64, 64)) for i in range(MAX_ITER): batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True) #  ,     
      
      





この記事では、コードをシンプルで理解しやすく、便利にするための主要なクラスとメソッドについて学びます。









図書館はまだ最終仕上げを行っており、まだ一般公開されていません。

この記事は完全なドキュメントではなく、ライブラリの簡単な説明とその使用例にすぎません。

あなたのコメントは、ライブラリを完成させ、必要な機能を含めるのに役立ちます。







データセット



データの量は非常に大きくなる可能性があり、データの処理が開始されるまでに、たとえば徐々にデータが到着する場合など、すべてのデータがまったくない場合があります。 したがって、 Dataset



クラスはそれ自体にデータを格納しません。 インデックス-データの要素のリスト(識別子または単にシリアル番号の場合もあります)、およびデータを操作するためのメソッドが定義されているBatch



クラスが含まれます。







 dataset = Dataset(index = some_index, batch_class=DataFrameBatch)
      
      





Dataset



の主な目的は、バッチの形成です。







 batch = dataset.next_batch(BATCH_SIZE, shuffle=True) # batch -   DataFrameBatch, #  BATCH_SIZE  
      
      





または、ジェネレータを呼び出すことができます:







 for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): # batch -   DataFrameBatch
      
      





バッチは、厳密に順序付けられた状態または無秩序に収集され、無限に繰り返されるか、データに応じて1サイクルだけ実行されます。 状況に応じて、ステップごとに異なるサイズのバッチを作成することもできます。







反復に加えて、別の便利な操作がDataset



で利用できますcv_split



データセットをトレイン、テスト、および検証に分割します。 そして、これは特に便利ですが、それぞれが再びデータセットです。







 dataset.cv_split([0.7, 0.2, 0.1]) #    70 / 20 / 10 #     for i in range(MAX_ITER): batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True) #  ,     
      
      





索引



データセットの要素のアドレス指定は、インデックスを使用して行われます。 識別子のセット(クライアント、トランザクション、CTスナップショット)または単なるシリアル番号(たとえば、 numpy.arange(N)



)にすることができます。 データセットは(ほぼ)任意に大きく、RAMに収まらない場合があります。 しかし、これは必須ではありません。 結局、データ処理はバッチで実行されます。







インデックスの作成は非常に簡単です。







 ds_index = DatasetIndex(sequence_of_item_ids)
      
      





シーケンスは、リスト、 numpy



配列、 pandas.Series



またはその他のpandas.Series



データ型にすることができます。







ソースデータが別のファイルに保存されている場合、これらのファイルのリストからすぐにインデックスを作成すると便利です。







 ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True)
      
      





ここで、インデックス要素は、指定されたディレクトリからのファイル名(拡張子なし)です。







データセットの要素(3次元CT画像など)が別のディレクトリに保存されることがあります。







 ds_index = FilesIndex(path='/ct_images_??/*', dirs=True)
      
      





これにより、 /ct_images_01



/ct_images_02



/ct_images_02



などからすべてのサブディレクトリの共通インデックスが構築されます。 ファイルインデックスは、その要素のフルパスを記憶しています。 したがって、後でload



またはsave



メソッドで、パスindex.get_fullpath(index_item)



簡単に取得できます。







ほとんどの場合、インデックスを操作する必要はありませんが、必要な作業はすべて内部で行われ、すでにバッチ全体でのみ作業しています。







クラスバッチ



データのすべてのストレージロジックと処理メソッドは、 Batch



クラスで定義されます。 例としてCT画像を操作するためのクラスを作成しましょう。 CTImagesBatch



の子孫となる基本クラスBatch



には、このバッチの要素のリストとNone



初期化されたdata



属性を格納する属性index



が既にあります。 そして、これで十分であるため、コンストラクターを再定義しません。







したがって、すぐにload



action



メソッドの作成に進みます。







 class CTImagesBatch(Batch): @action def load(self, src, fmt): if fmt == 'dicom': self.data = self._load_dicom(src) elif fmt == 'blosc': self.data = self._load_blosc(src) elif fmt == 'npz': self.data = self._load_npz(src) else: raise ValueError("Incorrect format") return self
      
      





まず、メソッドの前に@action



デコレータを@action



必要があります(少し後で理由がわかります)。







次に、 Batch



オブジェクトを返す必要があります。 同じクラスの新しいオブジェクト(この場合はCTImagesBatch)、または別のクラスのオブジェクト(ただし、確かにBatch



子孫)か、単純にself



返すことができます。







このアプローチにより、データに対する一連のアクションを記述することができます。 さらに、処理中、データは内容だけでなく、形式と構造も異なる場合があります。







プライベートメソッド_load_dicom



_load_blosc



および_load_npz



時間を無駄にしません。 特定の形式のファイルからデータをロードし、3次元のnumpy



配列[バッチサイズ、画像の幅、画像の高さ]を返すことができます。 主なことは、ここで各バッチのデータの配置方法を決定したことであり、この配列を引き続き使用します。







次に、非常に複雑な画像処理を実行するvery_complicated_processing



メソッドを記述します。 バッチ内の画像は互いに独立しているため、それらを並行して処理すると便利です。







 class CTImagesBatch(Batch): ... @action @inbatch_parallel(target='threads') def very_complicated_processing(self, item, *args, **kwargs): #   ... return processed_image_as_array
      
      





つまり、メソッドは単一のスナップショットを処理しているように記述され、このスナップショットのインデックスは最初のパラメーターで渡されます。







並列処理のマジックを機能させるには、並列化技術(プロセス、スレッドなど)が設定されているデコレータ、および並列化の前後に呼び出される前処理および後処理関数でメソッドをラップする必要があります。







ところで、 async



メソッドとして集中的な入出力を伴う操作を記述し、 target='async'



を使用して並列化すると、データのロードとアンロードが大幅に高速化されます。







これがすべてプログラミングの利便性を高めることは明らかですが、ここで並列処理が必要かどうかの「 思考 」を完全に排除するわけではありません。







すべてのaction



メソッドが記述されたら、バッチを操作できます。







 for i in range(MAX_ITER): batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True) processed_batch = batch.load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)) .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) #  ,   processed_batch   
      
      





見た目は良さそうですが、どういうわけか、バッチ処理の反復とデータ処理が混在しているのは間違っています。 はい。そして、 next_batch



以外に何も存在しないように、モデルのトレーニングサイクルを可能な限り減らしたいとnext_batch



ます。







一般に、一連のaction



メソッドをデータセットレベルに移動する必要があります。







パイプライン



そしてそれを行うことができます。 結局のところ、これらすべてのaction



デコレータがフェンスで囲まれたのは無駄ではありません。 メソッドをデータセットレベルに転送するというtransferringな魔法を隠しています。 だから書くだけ:







 ct_images_pipeline = ct_images_dataset.pipeline(). .load('/some/path/', 'dicom') .very_complicated_processing(some_arg=some_value) .resize(shape=(256, 256, 256)). .random_rotate(angle=(-30, 30)) .random_crop(shape=(64, 64, 64)) # ... for i in range(MAX_ITER): batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True) #  ,      
      
      





新しいDataset



下位クラスを作成して、これらのメソッドをすべて記述する必要はありません。 これらは対応するBatch



クラスにあり、 @action



デコレータでマークされています。つまり、 Dataset



クラスにあるかのように安全に呼び出すことができます。







もう1つのトリックは、このアプローチでは、すべてのaction



メソッドが「遅延」(遅延)になり、遅延して実行されることです。 つまり、 next_batch



呼び出されたときにこのバッチが形成された瞬間に、各バッチに対して読み込み、処理、サイズ変更などのアクションが実行されます。







また、各バッチの処理には時間がかかることがあるため、事前にバッチを作成しておくと便利です。 これは、モデルトレーニングがGPUで実行される場合に特に重要です。これは、新しいバッチを見越して単純なGPUがその高性能のすべての利点を簡単に「食べる」ことができるためです。







 batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3)
      
      





prefetch



パラメーターは、3つのバッチを並行して読み取る必要があることを示します。 さらに、並列化テクノロジ(プロセス、スレッド)を指定できます。







データセットの結合



実際の機械学習タスクでは、単一のデータセットを扱う必要はほとんどありません。 ほとんどの場合、少なくとも2つのデータセットXとYがあります。たとえば、家のパラメーターのデータとその値のデータです。 コンピュータービジョンタスクでは、画像自体に加えて、クラスラベル、セグメント化マスク、境界ボックスがまだあります。







一般に、複数のデータセットから並列バッチを形成できると便利です。 このために、 join



操作を実行join



か、 JointDataset



作成できます。







共同データセット



バッチの並列反復のみが必要な場合は、単一のデータセットを作成する方が便利です。







 joint_dataset = JointDataset((ds_X, ds_Y))
      
      





ds_X



ds_Y



が同じインデックスに基づいていない場合、インデックスの長さが同じで、 ds_Y



が同じであることが重要です。つまり、 ds_Y[i]



の値はds_X[i]



値に対応します。 この場合、データセットの作成は少し異なります。







 joint_dataset = JointDataset((ds_X, ds_Y), align='order')
      
      





そして、すべてが完全に標準的な方法で行われます:







 for i in range(MAX_ITER): batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True)
      
      





next_batch



が返すのは1つのバッチではなく、各データセットからのバッチを含むタプルだけです。







当然、 JointDataset



はパイプラインで構成することもできます。







 pl_images = ct_images_ds.pipeline() .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256,256,256)) .segment_lungs() pl_labels = labels_ds.pipeline() .load('/other/path', 'csv') .apply(lambda x: (x['diagnosis'] == 'C').astype('int')) full_ds = JointDataset((pl_images, pl_labels), align='same') for i in range(MAX_ITER): images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True) #    ,       
      
      





また、パイプラインはデータセットのコンポーネントであるため、画像とラベルの読み込みと処理はnext_batch



呼び出されたときにのみ開始されます。 つまり、すべての計算が実行され、必要な場合にのみバッチが形成されます。







結合操作



ただし、データセットで操作を実行し、別のデータセットからデータを適用する必要がある場合は、他の状況もあります。







これは、CT画像を使用した例で最もよく実証されます。 がんの成長の座標とサイズを読み込み、それらから3次元マスクを形成します。







 pl_masks = nodules_ds.pipeline() .load('/other/path', 'csv') .calculate_3d_masks()
      
      





CT画像を読み込んでマスクを適用し、癌性領域のみを選択します。







 pl_images = ct_images_ds.pipeline(). .load('/some/path', 'dicom') .hu_normalize() .resize(shape=(256, 256, 256)) .join(pl_masks) .apply_masks(op='mult')
      
      





join



は、データセットを指定します。 このため、次のaction



メソッド(この例ではapply_masks



)は、このデータセットのバッチを最初の引数として使用します。 バッチだけでなく、必要なバッチだけです。 たとえば、 ct_images_ds



の現在のバッチに画像ct_images_ds



、および14が含まれている場合、マスク付きの添付のバットは画像ct_images_ds



、および14にも適用されます。







当然、最初にjoin



せずに明示的に渡すこともできるため、この引数を考慮してapply_masks



メソッドを記述する必要があります。 さらに、 action



メソッドでは、バッチの要素のインデックスと識別子について考えることができなくなります-マスクの配列を画像の配列に適用するだけです。







繰り返しますが、 pl_images.next_batch



を呼び出すまで、画像もマスクもダウンロードと計算は開始されませんpl_images.next_batch









すべてをまとめる



それでは、完全なワークフローデータサイエンスプロジェクトがどのようになるかを見てみましょう。







  1. インデックスとデータセットを作成する

     ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True) ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch)
          
          



  2. 前処理を行い、処理した画像を保存します







     ct_images_dataset.pipeline() .load(None, 'dicom') #     dicom     .hu_normalize() .resize(shape=(256, 256, 256)) .segment_lungs() .save('/preprocessed/images', 'blosc') .run(BATCH_SIZE, shuffle=False, one_pass=True)
          
          





  3. モデルのデータの準備と拡張について説明します







     ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*') ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) # ct_images_pipeline = ct_preprocessed_dataset.pipeline() .load(None, 'blosc') .split_to_patches(shape=(64, 64, 64)) # ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch) ct_masks_pipeline = ct_masks_ds.pipeline(). .load('/preprocessed/masks', 'blosc') .split_to_patches(shape=(64, 64, 64)) # full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline))
          
          





  4. トレーニングバッチを作成し、モデルをトレーニングします







     full_ds.cv_split([0.8, 0.2]) for i in range(MAX_ITER): images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True) #  ,      
          
          





  5. モデルの品質を確認します

     for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True): #    
          
          





クリアで高品質なコードの開発、複雑なデータの前処理による以前に作成されたモデルの再利用、さらには実稼働対応システムの開発まで、はるかに迅速に行える便利なライブラリがあります。







そして今、問題は次のとおりです。他にライブラリに追加する価値があるものは何か データやモデルを扱う際に何が本当に不足していますか?








All Articles