Dagster と W&B を使用して、MLOps パイプラインをオーケストレーションし、ML アセットを管理します。W&B とのインテグレーションにより、Dagster 内で次のことを簡単に行えます。
W&B Dagster インテグレーションでは、W&B 専用の Dagster リソースと IO Manager を利用できます。
wandb_resource: W&B API への認証と通信に使用する Dagster リソース。
wandb_artifacts_io_manager: W&B Artifacts を利用するための Dagster IO Manager。
このガイドでは、Dagster で W&B を使用するための前提条件を満たす方法、ops と assets で W&B Artifacts を作成して使用する方法、W&B Launch の使い方、および推奨されるベストプラクティスを紹介します。
W&B 内で Dagster を使用するには、次のリソースが必要です。
- W&B APIキー。
- W&B entity (ユーザーまたはチーム) : entity とは、W&B Runs と Artifacts の送信先となるユーザー名またはチーム名です。run をログする前に、W&B App UI でアカウントまたはチーム entity を必ず作成してください。entity を指定しない場合、run はデフォルトの entity (通常はユーザー名) に送信されます。デフォルトの entity は、Settings の Project Defaults で変更してください。
- W&B project: W&B Runs が保存される project の名です。
W&B entity は、W&B App で対象のユーザーまたはチームのプロフィールページを確認して検索できます。既存の W&B project を使用することも、新しく作成することもできます。新しい project は、W&B App のホームページ、またはユーザー/チームのプロフィールページで作成できます。project が存在しない場合は、最初に使用したときに自動的に作成されます。
- W&B にログインします。注: W&B Server を使用している場合は、インスタンスのホスト名を管理者に確認してください。
- User Settings でAPIキーを作成します。本番環境では、そのキーの所有者としてサービスアカウントを使用することを推奨します。
- そのAPIキー用の環境変数を設定します:
export WANDB_API_KEY=YOUR_KEY。
以下の例では、Dagster のコード内でAPIキーを指定する場所を示します。wandb_config のネストされた辞書内に、entity と project 名を必ず指定してください。別の W&B Project を使用する場合は、ops や assets ごとに異なる wandb_config の値を渡せます。渡せるキーの詳細については、以下の Configuration セクションを参照してください。
例: @job の設定# これを config.yaml に追加します
# または、Dagit の Launchpad や JobDefinition.execute_in_process で設定することもできます
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # ご自身の W&B entity に置き換えてください
project: my_project # ご自身の W&B project に置き換えてください
@job(
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
"io_manager": wandb_artifacts_io_manager,
}
)
def simple_job_example():
my_op()
例: assets を使用する @repository の設定from dagster_wandb import wandb_artifacts_io_manager, wandb_resource
from dagster import (
load_assets_from_package_module,
make_values_resource,
repository,
with_resources,
)
from . import assets
@repository
def my_repository():
return [
*with_resources(
load_assets_from_package_module(assets),
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
"wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
{"cache_duration_in_minutes": 60} # ファイルは 1 時間だけキャッシュします
),
},
resource_config_by_key={
"wandb_config": {
"config": {
"entity": "my_entity", # ご自身の W&B entity に置き換えてください
"project": "my_project", # ご自身の W&B project に置き換えてください
}
}
},
),
]
この例では、@job の例とは異なり、IO Manager のキャッシュ期間も設定しています。
以下の設定オプションは、このインテグレーションで提供される W&B 固有の Dagster リソース および IO Manager の設定として使用されます。
wandb_resource: W&B API との通信に使用する Dagster の リソース です。指定された APIキーを使用して自動的に認証されます。プロパティ:
api_key: (str, required): W&B API と通信するために必要な W&B APIキーです。
host: (str, optional): 使用する API ホストサーバーです。W&B Server を使用している場合にのみ必要です。デフォルトでは、Public Cloud ホスト https://api.wandb.ai が使用されます。
wandb_artifacts_io_manager: W&B Artifacts を利用するための Dagster の IO Manager です。プロパティ:
base_dir: (int, optional) ローカルストレージおよびキャッシュに使用するベースディレクトリーです。W&B Artifacts と W&B Run のログは、このディレクトリーに書き込まれ、ここから読み取られます。デフォルトでは DAGSTER_HOME ディレクトリーが使用されます。
cache_duration_in_minutes: (int, optional) W&B Artifacts と W&B Run のログをローカルストレージに保持する時間を定義します。その時間のあいだ開かれていないファイルとディレクトリーのみがキャッシュから削除されます。キャッシュの削除は、IO Manager の実行終了時に行われます。キャッシュを完全に無効にしたい場合は、0 に設定できます。同じマシン上で実行されるジョブ間で artifact が再利用される場合、キャッシュにより速度が向上します。デフォルトは 30 日です。
run_id: (str, optional): 再開に使用する、この run の一意の ID です。この ID は project 内で一意である必要があり、run を削除すると同じ ID を再利用することはできません。短い説明的な名前には name フィールドを使用し、runs 間で比較するためにハイパーパラメーターを保存するには config を使用します。ID には次の特殊文字を含めることはできません: /\\#?%:.. Dagster 内で実験管理を行う場合は、IO Manager が run を再開できるように Run ID を設定する必要があります。デフォルトでは Dagster Run ID (例: 7e4df022-1bf2-44b5-a383-bb852df4077e) に設定されます。
run_name: (str, optional) UI でこの run を識別しやすくするための短い表示名です。デフォルトでは、dagster-run-[8 first characters of the Dagster Run ID] 形式の文字列になります。たとえば dagster-run-7e4df022 です。
run_tags: (list[str], optional): UI でこの run の tags の一覧に表示される文字列のリストです。tags は、runs をまとめて整理したり、baseline や production のような一時的なラベルを付けたりするのに役立ちます。UI では tags の追加や削除を簡単に行えるほか、特定の tag を持つ runs のみにフィルターすることもできます。インテグレーションで使用されるすべての W&B Run には、dagster_wandb tag が付きます。
W&B Artifact とのインテグレーションでは、Dagster の IO Manager を利用します。
IO Managers は、ユーザーが提供するオブジェクトで、asset または op の出力を保存し、それを下流の asset または op への入力として読み込む役割を担います。たとえば、IO Manager はファイルシステム上のファイルにオブジェクトを保存したり、そこから読み込んだりできます。
このインテグレーションでは、W&B Artifacts 用の IO Manager を提供しています。これにより、任意の Dagster @op または @asset で W&B Artifacts をネイティブに作成し、利用できます。以下は、Python の list を含む、データセット タイプの W&B Artifact を生成する @asset のシンプルな例です。
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3] # これは Artifact に保存されます
@op、@asset、@multi_asset にメタデータ設定のアノテーションを追加することで、Artifacts に書き込めます。同様に、Dagster の外部で作成された W&B Artifacts も利用できます。
先に進む前に、W&B Artifacts の使い方を十分に理解しておくことをお勧めします。Artifacts ガイドを参照してください。
Python 関数からオブジェクトを返すことで、W&B Artifact に書き込めます。W&B では、次のオブジェクトがサポートされます。
- Python オブジェクト (int、dict、list…)
- W&B オブジェクト (Table、Image、Graph…)
- W&B Artifact オブジェクト
以下の例では、Dagster の asset (@asset) を使って W&B Artifacts に書き込む方法を示します。
Python オブジェクト
W&B オブジェクト
W&B Artifact
pickle モジュールでシリアライズできるものはすべて pickle 化され、インテグレーションによって作成された Artifact に追加されます。Dagster 内でその Artifact を読み取る際には、内容は unpickle 化されます (詳細は Read artifacts を参照してください) 。@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
W&B は複数の Pickle ベースのシリアライズモジュール (pickle、dill、cloudpickle、joblib) をサポートしています。ONNX や PMML などの、より高度なシリアライズ方式を使用することもできます。詳細は Serialization セクションを参照してください。 Table や Image などの W&B オブジェクトは、インテグレーションによって作成された Artifact に追加されます。この例では、Artifact に Table を追加します。import wandb
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset_in_table():
return wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
複雑なユースケースでは、独自の Artifact オブジェクトを作成する必要がある場合があります。その場合でも、インテグレーションの両側でメタデータを拡張するなどの便利な追加機能を利用できます。import wandb
MY_ASSET = "my_asset"
@asset(
name=MY_ASSET,
io_manager_key="wandb_artifacts_manager",
)
def create_artifact():
artifact = wandb.Artifact(MY_ASSET, "dataset")
table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
artifact.add(table, "my_table")
return artifact
wandb_artifact_configuration という設定辞書は、@op、@asset、@multi_asset に設定できます。この辞書は、デコレータの引数で metadata として渡す必要があります。この設定は、W&B Artifacts に対する IO Manager の読み取りと書き込みを制御するために必須です。
@op の場合は、Out の metadata 引数を通じて出力 metadata に指定します。
@asset の場合は、asset の metadata 引数に指定します。
@multi_asset の場合は、各出力の metadata に AssetOut の metadata 引数を通じて指定します。
以下のコード例は、@op、@asset、@multi_asset の各計算で辞書を設定する方法を示しています。
@op の例
@asset の例
@multi_asset の例
@op の例:@op(
out=Out(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
"type": "dataset",
}
}
)
)
def create_dataset():
return [1, 2, 3]
@asset の例:@asset(
name="my_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
@asset にはすでに名前があるため、設定で name を渡す必要はありません。インテグレーションによって、Artifact 名は asset 名に設定されます。@multi_asset の例:@multi_asset(
name="create_datasets",
outs={
"first_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "training_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
"second_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "validation_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="my_multi_asset_group",
)
def create_datasets():
first_table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
second_table = wandb.Table(columns=["d", "e"], data=[[4, 5]])
return first_table, second_table
サポートされるプロパティ:
name: (str) この artifact の人が読みやすい名前です。UI でこの artifact を識別したり、use_artifact 呼び出しで参照したりするために使用します。名前には、文字、数字、アンダースコア、ハイフン、ドットを使用できます。名前は project 全体で一意である必要があります。@op では必須です。
type: (str) artifact のタイプです。artifact を整理し、区別するために使用します。一般的なタイプには dataset や model がありますが、文字、数字、アンダースコア、ハイフン、ドットを含む任意の文字列を使用できます。出力がまだ Artifact でない場合は必須です。
description: (str) artifact の説明を記述する自由形式のテキストです。説明は UI で Markdown としてレンダリングされるため、表やリンクなどを配置するのに適しています。
aliases: (list[str]) Artifact に適用する 1 つ以上のエイリアスを含む配列です。インテグレーションは、設定の有無にかかわらず、そのリストに “latest” タグも追加します。これは、モデルやデータセットのバージョン管理に効果的な方法です。
add_dirs: (list[dict[str, Any]]) :Artifact に含める各ローカルディレクトリの設定を含む配列です。
add_files: (list[dict[str, Any]]) :Artifact に含める各ローカルファイルの設定を含む配列です。
add_references: (list[dict[str, Any]]) :Artifact に含める各外部参照の設定を含む配列です。
serialization_module: (dict) 使用するシリアライズモジュールの設定です。詳細については、Serialization セクションを参照してください。
name: (str) シリアライズモジュールの名前です。指定可能な値は pickle、dill、cloudpickle、joblib です。このモジュールはローカル環境で利用可能である必要があります。
parameters: (dict[str, Any]) シリアライズ関数に渡す任意の引数です。そのモジュールの dump メソッドと同じパラメーターを指定できます。たとえば、{"compress": 3, "protocol": 4} です。
高度な例:
@asset(
name="my_advanced_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
"description": "My *Markdown* description",
"aliases": ["my_first_alias", "my_second_alias"],
"add_dirs": [
{
"name": "My directory",
"local_path": "path/to/directory",
}
],
"add_files": [
{
"name": "validation_dataset",
"local_path": "path/to/data.json",
},
{
"is_tmp": True,
"local_path": "path/to/temp",
},
],
"add_references": [
{
"uri": "https://picsum.photos/200/300",
"name": "External HTTP reference to an image",
},
{
"uri": "s3://my-bucket/datasets/mnist",
"name": "External S3 reference",
},
],
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
return [1, 2, 3]
この asset は、インテグレーションの両側で有用なメタデータ付きでマテリアライズされます。
- W&B 側: ソース インテグレーションの名前とバージョン、使用された Python のバージョン、pickle プロトコルのバージョンなど。
- Dagster 側:
- Dagster Run ID
- W&B Run: ID、名、パス、URL
- W&B Artifact: ID、名、タイプ、バージョン、サイズ、URL
- W&B Entity
- W&B Project
次の画像は、Dagster の asset に追加された W&B のメタデータを示しています。この情報は、インテグレーションによって Dagster に伝播されます。
次の画像は、指定した設定が W&B Artifact 上で有用なメタデータによって拡充されたことを示しています。この情報は、再現性と保守性の向上に役立ちます。インテグレーションがなければ利用できません。
mypy などの静的型チェッカーを使用する場合は、次のように設定タイプ定義オブジェクトを import してください。from dagster_wandb import WandbArtifactConfiguration
このインテグレーションは、Dagster のパーティションをネイティブでサポートしています。
以下は、DailyPartitionsDefinition を使用したパーティションの例です。
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
name="my_daily_partitioned_asset",
compute_kind="wandb",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
)
def create_my_daily_partitioned_asset(context):
partition_key = context.asset_partition_key_for_output()
context.log.info(f"Creating partitioned asset for {partition_key}")
return random.randint(0, 100)
このコードは、各パーティションごとに 1 つの W&B Artifact を生成します。Artifact パネル (UI) では、末尾にパーティションキーが追加された asset 名の下に各 artifact が表示されます。たとえば、my_daily_partitioned_asset.2023-01-01、my_daily_partitioned_asset.2023-01-02、my_daily_partitioned_asset.2023-01-03 です。複数の次元にまたがってパーティション化された asset では、各次元がドット区切り形式で表示されます。たとえば、my_asset.car.blue です。
このインテグレーションでは、1 回の run で複数のパーティションをマテリアライズできません。asset をマテリアライズするには、複数の run を実行する必要があります。これは、asset のマテリアライズ時に Dagit から実行できます。
W&B Artifacts の読み取りは、書き込みとほぼ同じです。wandb_artifact_configuration という設定ディクショナリを @op または @asset に設定できます。唯一の違いは、出力ではなく入力に対して設定する必要があることです。
@op の場合、これは In の metadata 引数を通じて入力メタデータに指定します。Artifact の名を
明示的に渡す必要があります。
@asset の場合、これは Asset In metadata 引数を通じて入力メタデータに指定します。親アセット の名がそれと一致するはずなので、Artifact 名は渡さないでください。
インテグレーションの外部で作成された Artifact に依存させたい場合は、SourceAsset を使用する必要があります。これは常にそのアセット の最新バージョンを読み取ります。
以下の例は、さまざまな op から Artifact を読み取る方法を示しています。
@op から artifact を読み取る@op(
ins={
"artifact": In(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
}
}
)
},
io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
context.log.info(artifact)
別の @asset によって作成された artifact を読み取る@asset(
name="my_asset",
ins={
"artifact": AssetIn(
# 入力引数の名前を変更したくない場合は、'key' を削除できます
key="parent_dagster_asset_name",
input_manager_key="wandb_artifacts_manager",
)
},
)
def read_artifact(context, artifact):
context.log.info(artifact)
Dagster の外部で作成された Artifact を読み取る:my_artifact = SourceAsset(
key=AssetKey("my_artifact"), # W&B Artifact の名前
description="Artifact created outside Dagster",
io_manager_key="wandb_artifacts_manager",
)
@asset
def read_artifact(context, my_artifact):
context.log.info(my_artifact)
以下の設定は、IO Manager がデコレートされた関数に入力として収集・提供する内容を指定するために使用します。以下の読み取りパターンをサポートしています。
- Artifact 内に含まれる名前付きオブジェクトを取得するには、get を使用します:
@asset(
ins={
"table": AssetIn(
key="my_artifact_with_table",
metadata={
"wandb_artifact_configuration": {
"get": "my_table",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_table(context, table):
context.log.info(table.get_column("a"))
- Artifact に含まれるダウンロード済みファイルのローカルパスを取得するには、get_path を使用してください:
@asset(
ins={
"path": AssetIn(
key="my_artifact_with_file",
metadata={
"wandb_artifact_configuration": {
"get_path": "name_of_file",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_path(context, path):
context.log.info(path)
- 内容がローカルにダウンロードされた Artifact オブジェクト全体を取得するには:
@asset(
ins={
"artifact": AssetIn(
key="my_artifact",
input_manager_key="wandb_artifacts_manager",
)
},
)
def get_artifact(context, artifact):
context.log.info(artifact.name)
サポートされているプロパティ
get: (str) artifact の相対名で指定された場所にある W&B オブジェクトを取得します。
get_path: (str) artifact の相対名で指定された場所にあるファイルのパスを取得します。
デフォルトでは、このインテグレーションは標準の pickle モジュールを使用しますが、一部のオブジェクトはこれに対応していません。たとえば、yield を含む関数を pickle 化しようとすると、エラーが発生します。
dill、cloudpickle、joblib など、他の Pickle ベースのシリアル化モジュールもサポートしています。また、シリアル化済みの文字列を返すか、Artifact を直接作成することで、ONNX や PMML などの、より高度なシリアル化方式を使用することもできます。適切な選択はユースケースによって異なるため、このトピックに関する関連文献を参照してください。
Pickle は安全ではないことが知られています。セキュリティが重要な場合は、W&B オブジェクトのみを使用してください。データに署名し、ハッシュキーはお使いのシステムに保存することを推奨します。より複雑なユースケースについては、お気軽にお問い合わせください。喜んでお手伝いします。
使用するシリアル化は、wandb_artifact_configuration 内の serialization_module 辞書で設定できます。Dagster を実行しているマシンで、そのモジュールが利用可能であることを確認してください。
その Artifact を読み取る際、どのシリアル化モジュールを使用するかは、インテグレーションが自動的に判断します。
現在サポートされているモジュールは、pickle、dill、cloudpickle、joblib です。
以下は、joblib でシリアライズした「モデル」を作成し、それを推論に使用する簡略化した例です。
@asset(
name="my_joblib_serialized_model",
compute_kind="Python",
metadata={
"wandb_artifact_configuration": {
"type": "model",
"serialization_module": {
"name": "joblib"
},
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
# これは実際のMLモデルではありませんが、pickleモジュールでは実現できません
return lambda x, y: x + y
@asset(
name="inference_result_from_joblib_serialized_model",
compute_kind="Python",
ins={
"my_joblib_serialized_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
)
},
metadata={
"wandb_artifact_configuration": {
"type": "results",
}
},
io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
context: OpExecutionContext, my_joblib_serialized_model
):
inference_result = my_joblib_serialized_model(1, 2)
context.log.info(inference_result) # 出力: 3
return inference_result
ONNX や PMML などの交換用ファイル形式を使用するのは一般的です。インテグレーションはこれらの形式をサポートしていますが、Pickle ベースのシリアル化に比べると、少し手間がかかります。
これらの形式を使用する方法は 2 つあります。
- モデルを選択した形式に変換し、その形式の文字列表現を通常の Python オブジェクトであるかのように返します。インテグレーションはその文字列を pickle 化します。後でその文字列を使ってモデルを再構築できます。
- シリアライズしたモデルを含む新しいローカルファイルを作成し、
add_file 設定を使用して、そのファイルを含むカスタム Artifact を作成します。
以下は、Scikit-learn モデルを ONNX 形式でシリアライズする例です。
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from dagster import AssetIn, AssetOut, asset, multi_asset
@multi_asset(
compute_kind="Python",
outs={
"my_onnx_model": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "model",
}
},
io_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "test_set",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def create_onnx_model():
# Inspired from https://onnx.ai/sklearn-onnx/
# Train a model.
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y)
clr = RandomForestClassifier()
clr.fit(X_train, y_train)
# Convert into ONNX format
initial_type = [("float_input", FloatTensorType([None, 4]))]
onx = convert_sklearn(clr, initial_types=initial_type)
# Write artifacts (model + test_set)
return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}
@asset(
name="experiment_results",
compute_kind="Python",
ins={
"my_onnx_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
# https://onnx.ai/sklearn-onnx/ を参照
# ONNX Runtime で予測を計算する
sess = rt.InferenceSession(my_onnx_model)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onx = sess.run(
[label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
)[0]
context.log.info(pred_onx)
return pred_onx
パーティションの使用
このインテグレーションは、Dagster partitions をネイティブにサポートしています。
アセット の 1 つ、複数、またはすべてのパーティションを選択して読み取ることができます。
すべてのパーティションは辞書として提供され、キーと値はそれぞれパーティションキーと Artifact の内容を表します。
すべてのパーティションを読み取る
特定のパーティションを読み取る
上流の @asset のすべてのパーティションを読み取り、それらは辞書として渡されます。この辞書では、キーと値がそれぞれパーティションキーと Artifact の内容に対応します。@asset(
compute_kind="wandb",
ins={"my_daily_partitioned_asset": AssetIn()},
output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
AssetIn の partition_mapping 設定を使用すると、特定のパーティションを選択できます。この例では、TimeWindowPartitionMapping を使用しています。@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
compute_kind="wandb",
ins={
"my_daily_partitioned_asset": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
)
},
output_required=False,
)
def read_specific_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
設定オブジェクト metadata は、W&B が project 内の異なる Artifact パーティションとどのようにやり取りするかを定義します。
オブジェクト metadata には wandb_artifact_configuration というキーがあり、その中にネストされた partitions オブジェクトが含まれています。
partitions オブジェクトは、各パーティションの名をその設定に対応付けます。各パーティションの設定では、そこからデータを取得する方法を指定できます。これらの設定には、各パーティションの要件に応じて、get、version、alias などのキーを含めることができます。
設定キー
get:
get キーは、データの取得元となる W&B Object (Table、Image など) の名を指定します。
version:
version キーは、Artifact の特定のバージョンを取得したい場合に使用します。
alias:
alias キーを使用すると、Artifact をそのエイリアスで取得できます。
ワイルドカード設定
ワイルドカード "*" は、設定されていないすべてのパーティションを表します。これにより、partitions オブジェクトで明示的に指定されていないパーティションに対するデフォルト設定を指定できます。
例えば、
"*": {
"get": "default_table_name",
},
この設定では、明示的に設定されていないすべてのパーティションについて、default_table_name という名前の表からデータを取得します。
特定のパーティションの設定
各パーティションのキーを使って個別の設定を指定することで、特定のパーティションではワイルドカード設定を上書きできます。
たとえば、
"yellow": {
"get": "custom_table_name",
},
この設定では、yellow という名前のパーティションのデータは、ワイルドカード設定を上書きして、custom_table_name という名前の表から取得されます。
バージョン管理とエイリアス
バージョン管理やエイリアス指定のために、設定で version キーおよび alias キーを個別に指定できます。
バージョンについては、
"orange": {
"version": "v0",
},
この設定では、orange Artifact パーティションの v0 バージョンからデータを取得します。
エイリアスについては、
"blue": {
"alias": "special_alias",
},
この設定では、エイリアス special_alias が付いた Artifact パーティションの表 default_table_name からデータを取得します (設定内では blue として参照されます) 。
高度な使い方
インテグレーションの高度な使い方については、以下のコード全体の例を参照してください。
現在活発に開発中のベータ版プロダクトです
Launch にご興味がある場合は、W&B Launch のカスタマーパイロットプログラムへの参加について担当のアカウントチームにお問い合わせください。
ベータプログラムの対象となるには、パイロットのお客様は AWS EKS または SageMaker を使用する必要があります。今後は追加のプラットフォームもサポートする予定です。
続行する前に、W&B Launch の使い方を十分に理解しておくことをお勧めします。Launch ガイドをご覧ください。
Dagster インテグレーションは、次のことに役立ちます。
- Dagster インスタンス内で 1 つまたは複数の Launch agent を実行する。
- Dagster インスタンス内でローカルの Launch ジョブを実行する。
- オンプレミスまたはクラウドでリモートの Launch ジョブを実行する。
このインテグレーションでは、インポート可能な @op である run_launch_agent を提供します。これにより Launch エージェントを起動し、手動で停止するまで長時間実行プロセスとして動作させます。
エージェントは、Launch キュー をポーリングし、ジョブを順に実行するプロセスです (または、実行のために外部サービスへディスパッチします) 。
詳しくは、Launch ページを参照してください。
また、Launchpad ではすべてのプロパティについて役立つ説明を確認できます。
簡単な例
# これを config.yaml に追加してください
# または、Dagit の Launchpad や JobDefinition.execute_in_process で設定することもできます
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # W&B の entity に置き換えてください
project: my_project # W&B の project に置き換えてください
ops:
run_launch_agent:
config:
max_jobs: -1
queues:
- my_dagster_queue
from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_agent_example():
run_launch_agent()
このインテグレーションは、インポート可能な @op である run_launch_job を提供します。これを使用して Launch ジョブを実行できます。
Launch ジョブを実行するには、キューに割り当てる必要があります。キューは新しく作成することも、デフォルトのものを使用することもできます。そのキューをリッスンしているアクティブなエージェントが存在することを確認してください。Dagster インスタンス内でエージェントを実行することもできますが、Kubernetes 上でデプロイ可能なエージェントの使用も検討してください。
Launch ページを参照してください。
Launchpad では、すべてのプロパティについて役立つ説明も確認できます。
簡単な例
# これを config.yaml に追加してください
# または、Dagit の Launchpad や JobDefinition.execute_in_process で設定することもできます
# 参考: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # W&B の entity に置き換えてください
project: my_project # W&B の project に置き換えてください
ops:
my_launched_job:
config:
entry_point:
- python
- train.py
queue: my_dagster_queue
uri: https://github.com/wandb/example-dagster-integration-with-launch
from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_job_example():
run_launch_job.alias("my_launched_job")() # alias でジョブ名を変更します
-
IO Manager を使用して Artifacts を読み書きしてください。
Artifact.download() や Run.log_artifact() を直接使用することは避けてください。これらのメソッドはインテグレーションによって処理されます。代わりに、Artifact に保存したいデータを返し、残りの処理はインテグレーションに任せてください。この方法により、Artifact のリネージをより適切に取得できます。
-
複雑なユース ケースでのみ、Artifact オブジェクトを自分で構築してください。
Python オブジェクトと W&B オブジェクトは、ops/assets から返すようにしてください。Artifact のバンドルはインテグレーションが処理します。
複雑なユース ケースでは、Dagster ジョブ内で直接 Artifact を構築できます。ソース インテグレーション名とバージョン、使用した Python バージョン、pickle プロトコルのバージョンなどのメタデータをより充実させるため、Artifact オブジェクトをインテグレーションに渡すことをお勧めします。
-
ファイル、ディレクトリ、外部参照は、メタデータを通じて Artifacts に追加してください。
インテグレーションの
wandb_artifact_configuration オブジェクトを使用して、任意のファイル、ディレクトリ、または外部参照 (Amazon S3、GCS、HTTP…) を追加してください。詳細は、Artifact configuration section の高度な例を参照してください。
-
Artifact が生成される場合は、@op ではなく @asset を使用してください。
Artifacts はアセットです。Dagster がそのアセットを管理する場合は、アセット を使用することをお勧めします。これにより、Dagit Asset Catalog での可観測性が向上します。
-
Dagster の外部で作成された Artifact を利用するには、SourceAsset を使用してください。
これにより、外部で作成された Artifacts を読み取る際にもインテグレーションを活用できます。そうでない場合は、インテグレーションによって作成された Artifacts しか使用できません。
-
大規模モデルのトレーニングを専用コンピュートでオーケストレーションするには、W&B Launch を使用してください。
小規模なモデルであれば Dagster クラスター内でトレーニングでき、GPU ノードを備えた Kubernetes クラスターで Dagster を実行することもできます。大規模モデルのトレーニングには W&B Launch を使用することをお勧めします。これにより、インスタンスの過負荷を防ぎ、より適切なコンピュートを利用できます。
-
Dagster 内で実験管理を行う場合は、W&B Run ID を Dagster Run ID の値に設定してください。
Run resumable にすることと、W&B Run ID を Dagster Run ID または任意の文字列に設定することの両方をお勧めします。この推奨に従うことで、Dagster 内でモデルをトレーニングする際、W&B メトリクスと W&B Artifacts が同じ W&B Run に保存されます。
W&B Run ID を Dagster Run ID に設定してください。
wandb.init(
id=context.run_id,
resume="allow",
...
)
または、任意の W&B Run ID を指定し、それを IO Manager の設定に渡すこともできます。
wandb.init(
id="my_resumable_run_id",
resume="allow",
...
)
@job(
resource_defs={
"io_manager": wandb_artifacts_io_manager.configured(
{"wandb_run_id": "my_resumable_run_id"}
),
}
)
-
大きな W&B Artifacts では、
get または get_path を使って必要なデータだけを取得してください。
デフォルトでは、このインテグレーションは Artifact 全体をダウンロードします。非常に大きな artifact を使用している場合は、必要な特定のファイルやオブジェクトだけを取得するとよいでしょう。これにより、速度とリソース使用率が向上します。
-
Python オブジェクトでは、ユースケースに合わせて pickling モジュールを使い分けてください。
デフォルトでは、W&B インテグレーションは標準の pickle モジュールを使用します。ただし、一部のオブジェクトはこれに対応していません。たとえば、
yield を含む関数は pickle 化しようとするとエラーになります。W&B は、その他の Pickle ベースのシリアル化モジュール (dill、cloudpickle、joblib) もサポートしています。
シリアライズ済みの文字列を返すか、Artifact を直接作成することで、ONNX や PMML のような、より高度なシリアル化方式を使用することもできます。どの方法が適切かはユースケースによって異なるため、このテーマに関する公開文献を参照してください。