1 step の流れ
agent は観測
s_tを見て行動a_tを選び、
環境は次の観測と報酬r_tを返す。
sample rollout batch の違いenv.step()env を進める
-> policy で推論する
-> learner が更新する
-> 次の env を進める
policy lag直感
データを集めたときの policy と、
learner が今更新している policy がずれること。
何をした論文か
Sample Factory は、
CPU の環境実行・GPU 推論・GPU 学習を同時並行で回す ために、
RL システムを部品分割した実行系である。 [1]
| 問題 | 論文の方針 |
|---|---|
| 待ち時間 | workload を rollout / policy / learner に分割 |
| 通信量 | tensor は shared memory、queue は index だけ |
| policy lag | 重み即時反映 + 古い軌跡の制御 |
| off-policy 化 | PPO clipping と V-trace で補正 |
論文: pp.2-5, Sec. 3.1-3.4
sample_factory/ は入口・学習本体・周辺機能に分かれるsample_factory/
├── train.py / enjoy.py / eval.py
│ training, inference, evaluation entrypoints
├── cfg/
│ arguments.py, cfg.py
│ CLI options and default configuration
├── algo/
│ reinforcement learning execution core
├── model/
│ actor_critic.py, encoder.py, decoder.py
├── envs/
│ create_env.py and environment wrappers
├── launcher/
│ multi-run and slurm launch helpers
└── utils/
logging, timing, GPU, wandb helpers
algo/ に実行系の中核が集まっているsample_factory/algo/
├── runners/
│ ├── runner_parallel.py async orchestration
│ ├── runner_serial.py sync/single-process path
│ └── runner.py shared runner base
├── sampling/
│ ├── sampler.py worker creation and wiring
│ ├── rollout_worker.py environment stepping
│ ├── inference_worker.py policy forward on GPU
│ └── batched/non_batched env-runner implementations
├── learning/
│ ├── batcher.py rollout -> training batch
│ ├── learner_worker.py learner process shell
│ └── learner.py PPO / V-trace update
├── utils/
│ ├── shared_buffers.py shared tensors
│ └── model_sharing.py weight synchronization
└── evaluators/
└── default_evaluator.py evaluation support
sampling/ は env 実行と推論を分業しているsample_factory/algo/sampling/
├── sampler.py
│ creates rollout workers and inference workers
│ connects queues and event-loop signals
├── rollout_worker.py
│ owns env runners and trajectory collection
│ sends policy requests and emits complete rollouts
├── inference_worker.py
│ batches requests from many workers
│ runs policy forward and writes shared outputs
├── batched_sampling.py
│ vectorized env runner for batched environments
├── non_batched_sampling.py
│ per-actor runner for flexible environment layouts
└── sampling_utils.py / stats.py
helper functions and runtime statistics
learning/ と algo/utils/ が更新処理を支えるsample_factory/algo/learning/
├── batcher.py
│ receives complete rollouts
│ copies them into training batches
├── learner_worker.py
│ learner process / event-loop wrapper
├── learner.py
│ PPO loss, V-trace, optimizer step
└── rnn_utils.py
recurrent sequence packing helpers
sample_factory/algo/utils/
├── shared_buffers.py
│ allocates trajectory and policy-output tensors
├── model_sharing.py
│ publishes latest weights to inference workers
├── tensor_dict.py
│ structured tensor container
└── rl_utils.py / optimizers.py
common RL math and optimizer helpers
[cfg/arguments.py + cfg/cfg.py]
|
v
[train.py]
|
v
[algo/runners/runner_parallel.py]
/ \
v v
[algo/sampling/] [algo/learning/]
sampler.py batcher.py
rollout_worker.py learner_worker.py
inference_worker.py learner.py
batched/non_batched
\ /
\ /
v v
[algo/utils/shared_buffers.py]
[algo/utils/model_sharing.py]
[model/] and [envs/] are called from sampling/learning
定義
PPO の clipped update を土台にしつつ、
サンプリングと learner 更新を非同期に重ねる方式。
論文: p.1, Introduction 後半; pp.4-5, Sec. 3.4
| 要素 | 役割 |
|---|---|
| async sampling | CPU の env 実行と learner 更新を止めない |
| PPO clipping | policy update を急に動かしすぎない |
| V-trace / lag control | 古い policy 由来の sample のズレを抑える |
この後の shared memory、policy_version、V-trace はこの表の具体化として読む。
| 論文の語 | 現行実装 | 役割 |
|---|---|---|
| rollout worker | algo/sampling/rollout_worker.py |
env を進める |
| policy worker | algo/sampling/inference_worker.py |
GPU forward |
| learner | algo/learning/learner.py |
SGD と重み更新 |
| sampler / runner | sampler.py, runner_parallel.py |
接続と起動管理 |
注: 論文の policy worker は、現行 master では InferenceWorker という名前になっている。
対応コードの参照: [2]
論文: p.3, Figure 1 / Sec. 3.1
RolloutWorker
-> policy request を queue へ送る
InferenceWorker
-> request を batch 化して GPU forward
-> action / logprob / value を shared buffer に書く
RolloutWorker
-> env.step() して rollout を完成させる
Batcher / Learner
-> 学習バッチ化し、重みと policy_version を更新する
論文: p.3, Figure 1 caption; Sec. 3.1 の rollout/policy/learner 説明
worker_num_splits がそのまま対応する対応箇所:
RolloutWorker.num_splitsRolloutWorker.env_runnersBatchedVectorEnvRunner / NonBatchedVectorEnvRunner論文: p.4, Figure 2(b) と Sec. 3.2 冒頭
num_splits = cfg.worker_num_splits
for split_idx in range(num_splits):
env_runner = make_env_runner(split_idx)
env_runners.append(env_runner)
def advance_rollouts(split_idx, policy_id):
complete_rollouts, _ = env_runners[split_idx].advance_rollouts(policy_id, timing)
maybe_send_policy_request(env_runners[split_idx])対応箇所:
shared_buffers.py::BufferMgrrollout_worker.py::_enqueue_policy_request()inference_worker.py::_batch_*()論文: p.4, Sec. 3.3 前半
現行実装の対策:
ensure_weights_updated() で推論側へ即時反映policy_version を軌跡ごとに保存max_policy_lag を超えたデータは learner 側で無効化論文: pp.4-5, Sec. 3.4 前半
self.param_client.ensure_weights_updated()
policy_outputs["policy_version"] = fill(current_policy_version)
self.policy_versions_tensor[self.policy_id] = self.train_step
lag_ok = curr_policy_version - buff["policy_version"] < cfg.max_policy_lag
valids = valids & lag_okwith_vtrace=True で V-trace を有効化できる対応箇所:
learner.pycfg.py::with_vtrace, vtrace_rho, vtrace_c論文: p.5, Sec. 3.4 後半の V-trace / PPO clipping 段落