From 47480ff35b4da1672a513efd84cfd367169eabc2 Mon Sep 17 00:00:00 2001 From: muupan Date: Fri, 10 May 2019 03:07:55 +0900 Subject: [PATCH 01/28] tmp --- chainerrl/agents/dqn.py | 144 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 9d35092ef..3977ef7ae 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -81,6 +81,150 @@ def compute_weighted_value_loss(y, t, weights, return loss +# class Actor(object): +# +# def __init__(self, batch_greedy_action_func, explorer=None): +# self.batch_greedy_action_func = batch_greedy_action_func +# +# def act(self, obs): +# return self.batch_greedy_action_func([obs])[0] +# +# def act_and_train(self, obs): +# greedy_action = self.batch_greedy_action_func([obs])[0] +# if self.explorer is not None: +# action = self.explorer.select_action( +# self.t, lambda: greedy_action, action_value=action_value) +# else: +# action = self.explorer.select_action( +# self.t, lambda: greedy_action, action_value=action_value) +# self.t += 1 + + +class StateQFunctionActor(object): + + def __init__( + self, + model, + explorer, + phi=lambda x: x, + logger=getLogger(__name__), + batch_states=batch_states, + ): + self.model = model + self.explorer = explorer + self.phi = phi + self.logger = logger + self.batch_states = batch_states + + self.t = 0 + self.last_state = None + self.last_state = None + self.batch_last_action = None + self.batch_last_action = None + + @property + def xp(self): + return self.model.xp + + def compute_action_value(self, batch_obs): + with chainer.using_config('train', False), chainer.no_backprop_mode(): + return self.model(self.batch_states(batch_obs, self.xp, self.phi)) + + def act(self, obs): + action_value = self.compute_action_value([obs]) + action = cuda.to_cpu(action_value.greedy_actions.array)[0] + return action + + def act_and_train(self, obs, reward): + + action_value = self.compute_action_value([obs]) + greedy_action = cuda.to_cpu(action_value.greedy_actions.array)[0] + + action = self.explorer.select_action( + self.t, lambda: greedy_action, action_value=action_value) + self.t += 1 + + if self.last_state is not None: + assert self.last_action is not None + # Add a transition to the replay buffer + self.send_to_learner( + state=self.last_state, + action=self.last_action, + reward=reward, + next_state=obs, + next_action=action, + is_state_terminal=False) + + self.last_state = obs + self.last_action = action + + return self.last_action + + def stop_episode_and_train(self, state, reward, done=False): + + assert self.last_state is not None + assert self.last_action is not None + + # Add a transition to the replay buffer + self.replay_buffer.append( + state=self.last_state, + action=self.last_action, + reward=reward, + next_state=state, + next_action=self.last_action, + is_state_terminal=done) + + self.last_state = None + self.last_action = None + self.replay_buffer.stop_current_episode() + + def stop_episode(self): + pass + + def batch_act(self, batch_obs): + batch_av = self.compute_action_value(batch_obs) + batch_argmax = cuda.to_cpu(batch_av.greedy_actions.array) + return batch_argmax + + def batch_act_and_train(self, batch_obs): + batch_av = self.compute_action_value(batch_obs) + batch_argmax = cuda.to_cpu(batch_av.greedy_actions.array) + batch_action = [ + self.explorer.select_action( + self.t, lambda: batch_argmax[i], + action_value=batch_av[i:i + 1], + ) + for i in range(len(batch_obs))] + self.batch_last_obs = list(batch_obs) + self.batch_last_action = list(batch_action) + return batch_action + + def batch_observe_and_train(self, batch_obs, batch_reward, + batch_done, batch_reset): + for i in range(len(batch_obs)): + self.t += 1 + # Update the target network + if self.t % self.target_update_interval == 0: + self.sync_target_network() + if self.batch_last_obs[i] is not None: + assert self.batch_last_action[i] is not None + # Add a transition to the replay buffer + self.replay_buffer.append( + state=self.batch_last_obs[i], + action=self.batch_last_action[i], + reward=batch_reward[i], + next_state=batch_obs[i], + next_action=None, + is_state_terminal=batch_done[i], + ) + if batch_reset[i] or batch_done[i]: + self.batch_last_obs[i] = None + + def batch_observe(self, batch_obs, batch_reward, + batch_done, batch_reset): + pass + + class DQN(agent.AttributeSavingMixin, agent.BatchAgent): """Deep Q-Network algorithm. From bca85c98807ccbbb2757b487a5c6b6d057f32a35 Mon Sep 17 00:00:00 2001 From: muupan Date: Fri, 10 May 2019 21:55:02 +0900 Subject: [PATCH 02/28] Implement actor-learner parallelism --- chainerrl/agents/__init__.py | 1 + chainerrl/agents/dqn.py | 233 +++++++----------- chainerrl/agents/state_q_function_actor.py | 125 ++++++++++ examples/grasping/train_dqn_batch_grasping.py | 19 +- 4 files changed, 228 insertions(+), 150 deletions(-) create mode 100644 chainerrl/agents/state_q_function_actor.py diff --git a/chainerrl/agents/__init__.py b/chainerrl/agents/__init__.py index 471dd7564..e0a32204b 100644 --- a/chainerrl/agents/__init__.py +++ b/chainerrl/agents/__init__.py @@ -19,3 +19,4 @@ from chainerrl.agents.residual_dqn import ResidualDQN # NOQA from chainerrl.agents.sarsa import SARSA # NOQA from chainerrl.agents.trpo import TRPO # NOQA +from chainerrl.agents.state_q_function_actor import StateQFunctionActor # NOQA diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 8e6b58678..923821ba6 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -8,14 +8,19 @@ import copy from logging import getLogger +import multiprocessing as mp +import threading +import time import chainer from chainer import cuda import chainer.functions as F +import chainerrl from chainerrl import agent from chainerrl.misc.batch_states import batch_states from chainerrl.misc.copy_param import synchronize_parameters +from chainerrl.misc.copy_param import copy_param from chainerrl.recurrent import Recurrent from chainerrl.recurrent import state_reset from chainerrl.replay_buffer import batch_experiences @@ -81,150 +86,6 @@ def compute_weighted_value_loss(y, t, weights, return loss -# class Actor(object): -# -# def __init__(self, batch_greedy_action_func, explorer=None): -# self.batch_greedy_action_func = batch_greedy_action_func -# -# def act(self, obs): -# return self.batch_greedy_action_func([obs])[0] -# -# def act_and_train(self, obs): -# greedy_action = self.batch_greedy_action_func([obs])[0] -# if self.explorer is not None: -# action = self.explorer.select_action( -# self.t, lambda: greedy_action, action_value=action_value) -# else: -# action = self.explorer.select_action( -# self.t, lambda: greedy_action, action_value=action_value) -# self.t += 1 - - -class StateQFunctionActor(object): - - def __init__( - self, - model, - explorer, - phi=lambda x: x, - logger=getLogger(__name__), - batch_states=batch_states, - ): - self.model = model - self.explorer = explorer - self.phi = phi - self.logger = logger - self.batch_states = batch_states - - self.t = 0 - self.last_state = None - self.last_state = None - self.batch_last_action = None - self.batch_last_action = None - - @property - def xp(self): - return self.model.xp - - def compute_action_value(self, batch_obs): - with chainer.using_config('train', False), chainer.no_backprop_mode(): - return self.model(self.batch_states(batch_obs, self.xp, self.phi)) - - def act(self, obs): - action_value = self.compute_action_value([obs]) - action = cuda.to_cpu(action_value.greedy_actions.array)[0] - return action - - def act_and_train(self, obs, reward): - - action_value = self.compute_action_value([obs]) - greedy_action = cuda.to_cpu(action_value.greedy_actions.array)[0] - - action = self.explorer.select_action( - self.t, lambda: greedy_action, action_value=action_value) - self.t += 1 - - if self.last_state is not None: - assert self.last_action is not None - # Add a transition to the replay buffer - self.send_to_learner( - state=self.last_state, - action=self.last_action, - reward=reward, - next_state=obs, - next_action=action, - is_state_terminal=False) - - self.last_state = obs - self.last_action = action - - return self.last_action - - def stop_episode_and_train(self, state, reward, done=False): - - assert self.last_state is not None - assert self.last_action is not None - - # Add a transition to the replay buffer - self.replay_buffer.append( - state=self.last_state, - action=self.last_action, - reward=reward, - next_state=state, - next_action=self.last_action, - is_state_terminal=done) - - self.last_state = None - self.last_action = None - self.replay_buffer.stop_current_episode() - - def stop_episode(self): - pass - - def batch_act(self, batch_obs): - batch_av = self.compute_action_value(batch_obs) - batch_argmax = cuda.to_cpu(batch_av.greedy_actions.array) - return batch_argmax - - def batch_act_and_train(self, batch_obs): - batch_av = self.compute_action_value(batch_obs) - batch_argmax = cuda.to_cpu(batch_av.greedy_actions.array) - batch_action = [ - self.explorer.select_action( - self.t, lambda: batch_argmax[i], - action_value=batch_av[i:i + 1], - ) - for i in range(len(batch_obs))] - self.batch_last_obs = list(batch_obs) - self.batch_last_action = list(batch_action) - return batch_action - - def batch_observe_and_train(self, batch_obs, batch_reward, - batch_done, batch_reset): - for i in range(len(batch_obs)): - self.t += 1 - # Update the target network - if self.t % self.target_update_interval == 0: - self.sync_target_network() - if self.batch_last_obs[i] is not None: - assert self.batch_last_action[i] is not None - # Add a transition to the replay buffer - self.replay_buffer.append( - state=self.batch_last_obs[i], - action=self.batch_last_action[i], - reward=batch_reward[i], - next_state=batch_obs[i], - next_action=None, - is_state_terminal=batch_done[i], - ) - if batch_reset[i] or batch_done[i]: - self.batch_last_obs[i] = None - - def batch_observe(self, batch_obs, batch_reward, - batch_done, batch_reset): - pass - - class DQN(agent.AttributeSavingMixin, agent.BatchAgent): """Deep Q-Network algorithm. @@ -652,6 +513,90 @@ def stop_episode(self): self.model.reset_state() self.replay_buffer.stop_current_episode() + def _poll_pipe(self, pipe, shared_model): + while pipe.poll(): + i, cmd, data = pipe.recv() + self.logger.debug( + 'Learner thread received a message from actoor %s: %s %s', + i, cmd, data) + if cmd == 'get_statistics': + pipe.send(self.get_statistics()) + elif cmd == 'load': + self.load(data) + pipe.send(None) + elif cmd == 'save': + self.save(data) + pipe.send(None) + else: + raise RuntimeError( + 'Unknown command from actor: {}'.format(cmd)) + + def _poll_queue(self, queue): + while not queue.empty(): + i, cmd, data = queue.get() + if cmd == 'transition': + self.replay_buffer.append(**data, env_id=i) + self.t += 1 + elif cmd == 'stop_episode': + assert data is None + self.replay_buffer.stop_current_episode(env_id=i) + else: + raise RuntimeError( + 'Unknown command from actor: {}'.format(cmd)) + + def _learner_loop(self, shared_model, queues, pipes, stop_event): + # To stop this loop, call stop_event.set() + while not stop_event.wait(0): + # Poll actors for messages + for pipe in pipes: + self._poll_pipe(pipe, shared_model) + # Poll actors for transitions + for queue in queues: + self._poll_queue(queue) + # Update model if possible + if self.replay_updater.update_if_necessary(self.t): + copy_param(src=self.model, tgt=shared_model) + else: + time.sleep(1e-6) + + def start_actor_learner_training(self, n_actors): + # Make a copy on shared memory and share among actors and a learner + shared_model = copy.deepcopy(self.model).to_cpu() + shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( + shared_model) + chainerrl.misc.async_.set_shared_params(shared_model, shared_arrays) + + # Queues are used for actors to send transitions to a learner + queues = [mp.Queue() for _ in range(n_actors)] + + # Pipes are used for infrequent communication + learner_pipes, actor_pipes = list(zip(*[ + mp.Pipe() for _ in range(n_actors)])) + + def make_actor(i): + return chainerrl.agents.StateQFunctionActor( + queue=queues[i], + pipe=actor_pipes[i], + model=shared_model, + explorer=self.explorer, + phi=self.phi, + batch_states=self.batch_states, + logger=self.logger, + ) + + stop_event = threading.Event() + + learner = threading.Thread( + target=self._learner_loop, + kwargs=dict( + shared_model=shared_model, + queues=queues, + pipes=learner_pipes, + stop_event=stop_event, + ) + ) + return make_actor, learner, stop_event + def get_statistics(self): return [ ('average_q', self.average_q), diff --git a/chainerrl/agents/state_q_function_actor.py b/chainerrl/agents/state_q_function_actor.py new file mode 100644 index 000000000..d3d8fe5b5 --- /dev/null +++ b/chainerrl/agents/state_q_function_actor.py @@ -0,0 +1,125 @@ +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +from __future__ import absolute_import +from builtins import * # NOQA +from future import standard_library +standard_library.install_aliases() # NOQA + +from logging import getLogger + +import chainer +from chainer import cuda + +from chainerrl import agent +from chainerrl.misc.batch_states import batch_states + + +class StateQFunctionActor(agent.AsyncAgent): + """Actor that acts according to the Q-function.""" + + process_idx = None + shared_attributes = [] + + def __init__( + self, + queue, + pipe, + model, + explorer, + phi=lambda x: x, + logger=getLogger(__name__), + batch_states=batch_states, + ): + self.queue = queue + self.pipe = pipe + self.model = model + self.explorer = explorer + self.phi = phi + self.logger = logger + self.batch_states = batch_states + + self.t = 0 + self.last_state = None + self.last_state = None + self.batch_last_action = None + self.batch_last_action = None + + @property + def xp(self): + return self.model.xp + + def compute_action_value(self, batch_obs): + with chainer.using_config('train', False), chainer.no_backprop_mode(): + return self.model(self.batch_states(batch_obs, self.xp, self.phi)) + + def act(self, obs): + action_value = self.compute_action_value([obs]) + action = cuda.to_cpu(action_value.greedy_actions.array)[0] + return action + + def _send_to_learner(self, transition, stop_episode=False): + self.queue.put((self.process_idx, 'transition', transition)) + if stop_episode: + self.queue.put((self.process_idx, 'stop_episode', None)) + + def act_and_train(self, obs, reward): + + action_value = self.compute_action_value([obs]) + greedy_action = cuda.to_cpu(action_value.greedy_actions.array)[0] + + action = self.explorer.select_action( + self.t, lambda: greedy_action, action_value=action_value) + self.t += 1 + + if self.last_state is not None: + assert self.last_action is not None + # Add a transition to the replay buffer + self._send_to_learner(dict( + state=self.last_state, + action=self.last_action, + reward=reward, + next_state=obs, + next_action=action, + is_state_terminal=False), + stop_episode=False, + ) + + self.last_state = obs + self.last_action = action + + return self.last_action + + def stop_episode_and_train(self, state, reward, done=False): + + assert self.last_state is not None + assert self.last_action is not None + + # Add a transition to the replay buffer + self._send_to_learner(dict( + state=self.last_state, + action=self.last_action, + reward=reward, + next_state=state, + next_action=self.last_action, + is_state_terminal=done), + stop_episode=True, + ) + + self.last_state = None + self.last_action = None + + def stop_episode(self): + pass + + def save(self, dirname): + self.pipe.send((self.process_idx, 'save', dirname)) + self.pipe.recv() + + def load(self, dirname): + self.pipe.send((self.process_idx, 'load', dirname)) + self.pipe.recv() + + def get_statistics(self): + self.pipe.send((self.process_idx, 'get_statistics', None)) + return self.pipe.recv() diff --git a/examples/grasping/train_dqn_batch_grasping.py b/examples/grasping/train_dqn_batch_grasping.py index 1aca8072c..33d19577e 100644 --- a/examples/grasping/train_dqn_batch_grasping.py +++ b/examples/grasping/train_dqn_batch_grasping.py @@ -301,19 +301,26 @@ def phi(x): args.eval_n_runs, eval_stats['mean'], eval_stats['median'], eval_stats['stdev'])) else: - experiments.train_agent_batch_with_evaluation( - agent=agent, - env=make_batch_env(test=False), - eval_env=eval_env, + + make_actor, learner, stop_event = agent.start_actor_learner_training( + args.num_envs) + + learner.start() + + experiments.train_agent_async( + processes=args.num_envs, + make_agent=make_actor, + make_env=make_env, steps=args.steps, eval_n_steps=None, eval_n_episodes=args.eval_n_runs, eval_interval=args.eval_interval, outdir=args.outdir, - save_best_so_far_agent=False, - log_interval=1000, ) + stop_event.set() + learner.join() + if __name__ == '__main__': main() From e7c816b3f56c79d90dce3cb844de24291b90afb1 Mon Sep 17 00:00:00 2001 From: muupan Date: Sat, 11 May 2019 03:15:14 +0900 Subject: [PATCH 03/28] Fix a bug of not shaaring models --- chainerrl/agents/dqn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 923821ba6..aea2b81e2 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -564,7 +564,6 @@ def start_actor_learner_training(self, n_actors): shared_model = copy.deepcopy(self.model).to_cpu() shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( shared_model) - chainerrl.misc.async_.set_shared_params(shared_model, shared_arrays) # Queues are used for actors to send transitions to a learner queues = [mp.Queue() for _ in range(n_actors)] @@ -574,6 +573,8 @@ def start_actor_learner_training(self, n_actors): mp.Pipe() for _ in range(n_actors)])) def make_actor(i): + chainerrl.misc.async_.set_shared_params( + shared_model, shared_arrays) return chainerrl.agents.StateQFunctionActor( queue=queues[i], pipe=actor_pipes[i], From 091237ffc0b9f13b929ad29f0912c3f687e51cd3 Mon Sep 17 00:00:00 2001 From: muupan Date: Sat, 11 May 2019 10:37:16 +0900 Subject: [PATCH 04/28] Rename --- chainerrl/agents/dqn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index aea2b81e2..6a60b6319 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -559,7 +559,7 @@ def _learner_loop(self, shared_model, queues, pipes, stop_event): else: time.sleep(1e-6) - def start_actor_learner_training(self, n_actors): + def setup_actor_learner_training(self, n_actors): # Make a copy on shared memory and share among actors and a learner shared_model = copy.deepcopy(self.model).to_cpu() shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( From e88ad5a2531448cb8f3ae45f580f4a499ff9fc62 Mon Sep 17 00:00:00 2001 From: muupan Date: Sat, 11 May 2019 10:37:33 +0900 Subject: [PATCH 05/28] Add tests --- tests/agents_tests/basetest_training.py | 86 +++++++++++++++++++++++++ tests/agents_tests/test_dqn.py | 16 +++-- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/tests/agents_tests/basetest_training.py b/tests/agents_tests/basetest_training.py index 572e96f84..5aeea7c8b 100644 --- a/tests/agents_tests/basetest_training.py +++ b/tests/agents_tests/basetest_training.py @@ -15,6 +15,8 @@ import chainerrl from chainerrl.experiments.evaluator import batch_run_evaluation_episodes +from chainerrl.experiments.evaluator import run_evaluation_episodes +from chainerrl.experiments import train_agent_async from chainerrl.experiments import train_agent_batch_with_evaluation from chainerrl.experiments import train_agent_with_evaluation from chainerrl.misc import random_seed @@ -185,3 +187,87 @@ def test_batch_training_cpu_fast(self): self._test_batch_training(-1, steps=10, require_success=False) self._test_batch_training( -1, steps=0, load_model=True, require_success=False) + + +class _TestActorLearnerTrainingMixin(object): + """Mixin for testing actor-learner training. + + Inherit this after _TestTraining to enable test cases for batch training. + """ + + def _test_actor_learner_training(self, gpu, steps=100000, load_model=False, + require_success=True): + + logging.basicConfig(level=logging.DEBUG) + + test_env, successful_return = self.make_env_and_successful_return( + test=True) + agent = self.make_agent(test_env, gpu) + + if load_model: + print('Load agent from', self.agent_dirname) + agent.load(self.agent_dirname) + agent.replay_buffer.load(self.rbuf_filename) + + def make_env(process_idx, test): + env, _ = self.make_env_and_successful_return(test=test) + return env + + make_actor, learner, stop_event = agent.setup_actor_learner_training( + n_actors=2) + + learner.start() + + # Train + train_agent_async( + processes=2, + steps=steps, + outdir=self.tmpdir, + eval_interval=200, + eval_n_steps=None, + eval_n_episodes=5, + successful_score=successful_return, + make_env=make_env, + make_agent=make_actor, + ) + + stop_event.set() + learner.join() + + # Test + n_test_runs = 5 + eval_returns = run_evaluation_episodes( + test_env, + agent, + n_steps=None, + n_episodes=n_test_runs, + ) + n_succeeded = np.sum(np.asarray(eval_returns) >= successful_return) + if require_success: + self.assertEqual(n_succeeded, n_test_runs) + + # Save + agent.save(self.agent_dirname) + agent.replay_buffer.save(self.rbuf_filename) + + @testing.attr.slow + @testing.attr.gpu + def test_actor_learner_training_gpu(self): + self._test_actor_learner_training(0, steps=100000) + self._test_actor_learner_training(0, steps=0, load_model=True) + + @testing.attr.slow + def test_actor_learner_training_cpu(self): + self._test_actor_learner_training(-1, steps=100000) + self._test_actor_learner_training(-1, steps=0, load_model=True) + + @testing.attr.gpu + def test_actor_learner_training_gpu_fast(self): + self._test_actor_learner_training(0, steps=10, require_success=False) + self._test_actor_learner_training( + 0, steps=0, load_model=True, require_success=False) + + def test_actor_learner_training_cpu_fast(self): + self._test_actor_learner_training(-1, steps=10, require_success=False) + self._test_actor_learner_training( + -1, steps=0, load_model=True, require_success=False) diff --git a/tests/agents_tests/test_dqn.py b/tests/agents_tests/test_dqn.py index ef13018c9..e88bd2cbb 100644 --- a/tests/agents_tests/test_dqn.py +++ b/tests/agents_tests/test_dqn.py @@ -16,11 +16,14 @@ from chainerrl.agents.dqn import compute_weighted_value_loss from chainerrl.agents.dqn import DQN +from basetest_training import _TestActorLearnerTrainingMixin from basetest_training import _TestBatchTrainingMixin class TestDQNOnDiscreteABC( - _TestBatchTrainingMixin, base._TestDQNOnDiscreteABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnDiscreteABC): def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): return DQN(q_func, opt, rbuf, gpu=gpu, gamma=0.9, explorer=explorer, @@ -38,7 +41,9 @@ def test_replay_capacity_checked(self): class TestDQNOnDiscreteABCBoltzmann( - _TestBatchTrainingMixin, base._TestDQNOnDiscreteABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnDiscreteABC): def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): explorer = chainerrl.explorers.Boltzmann() @@ -47,14 +52,17 @@ def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): class TestDQNOnContinuousABC( - _TestBatchTrainingMixin, base._TestDQNOnContinuousABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnContinuousABC): def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): return DQN(q_func, opt, rbuf, gpu=gpu, gamma=0.9, explorer=explorer, replay_start_size=100, target_update_interval=100) -# Batch training with recurrent models is currently not supported +# Batch or actor-learner training with recurrent models is currently not +# supported class TestDQNOnDiscretePOABC(base._TestDQNOnDiscretePOABC): def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): From d9ab2618cd1d34039eb21a5b6cac50c4c5639e68 Mon Sep 17 00:00:00 2001 From: muupan Date: Sat, 11 May 2019 15:27:32 +0900 Subject: [PATCH 06/28] Make tests pass --- chainerrl/agents/dqn.py | 14 +++-- chainerrl/replay_buffer.py | 15 ++++-- tests/agents_tests/basetest_training.py | 72 ++++++++++--------------- 3 files changed, 50 insertions(+), 51 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 6a60b6319..5908eed74 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -170,6 +170,8 @@ def __init__(self, q_function, optimizer, replay_buffer, gamma, replay_start_size=replay_start_size, update_interval=update_interval, ) + assert target_update_interval % update_interval == 0,\ + "target_update_interval should be a multiple of update_interval" self.t = 0 self.last_state = None @@ -555,9 +557,15 @@ def _learner_loop(self, shared_model, queues, pipes, stop_event): self._poll_queue(queue) # Update model if possible if self.replay_updater.update_if_necessary(self.t): - copy_param(src=self.model, tgt=shared_model) - else: - time.sleep(1e-6) + copy_param(source_link=self.model, target_link=shared_model) + # To keep the ratio of target updates to model updates, + # here we calculate back the effective current timestep from + # update_interval and number of updates so far. + effective_timestep = ( + self.optimizer.t * self.replay_updater.update_interval) + if effective_timestep % self.target_update_interval == 0: + self.sync_target_network() + time.sleep(1e-6) def setup_actor_learner_training(self, n_actors): # Make a copy on shared memory and share among actors and a learner diff --git a/chainerrl/replay_buffer.py b/chainerrl/replay_buffer.py index 83777f5da..7b7582fbe 100644 --- a/chainerrl/replay_buffer.py +++ b/chainerrl/replay_buffer.py @@ -524,15 +524,23 @@ def __init__(self, replay_buffer, update_func, batchsize, episodic_update, self.update_interval = update_interval def update_if_necessary(self, iteration): + """Update the model if the condition is met. + + Args: + iteration (int): Timestep. + + Returns: + bool: True iff the condition was updated this time. + """ if len(self.replay_buffer) < self.replay_start_size: - return + return False if (self.episodic_update and self.replay_buffer.n_episodes < self.batchsize): - return + return False if iteration % self.update_interval != 0: - return + return False for _ in range(self.n_times_update): if self.episodic_update: @@ -542,3 +550,4 @@ def update_if_necessary(self, iteration): else: transitions = self.replay_buffer.sample(self.batchsize) self.update_func(transitions) + return True diff --git a/tests/agents_tests/basetest_training.py b/tests/agents_tests/basetest_training.py index 5aeea7c8b..8e9cc110c 100644 --- a/tests/agents_tests/basetest_training.py +++ b/tests/agents_tests/basetest_training.py @@ -15,7 +15,6 @@ import chainerrl from chainerrl.experiments.evaluator import batch_run_evaluation_episodes -from chainerrl.experiments.evaluator import run_evaluation_episodes from chainerrl.experiments import train_agent_async from chainerrl.experiments import train_agent_batch_with_evaluation from chainerrl.experiments import train_agent_with_evaluation @@ -195,7 +194,7 @@ class _TestActorLearnerTrainingMixin(object): Inherit this after _TestTraining to enable test cases for batch training. """ - def _test_actor_learner_training(self, gpu, steps=100000, load_model=False, + def _test_actor_learner_training(self, gpu, steps=100000, require_success=True): logging.basicConfig(level=logging.DEBUG) @@ -204,70 +203,53 @@ def _test_actor_learner_training(self, gpu, steps=100000, load_model=False, test=True) agent = self.make_agent(test_env, gpu) - if load_model: - print('Load agent from', self.agent_dirname) - agent.load(self.agent_dirname) - agent.replay_buffer.load(self.rbuf_filename) - def make_env(process_idx, test): env, _ = self.make_env_and_successful_return(test=test) return env - make_actor, learner, stop_event = agent.setup_actor_learner_training( - n_actors=2) - - learner.start() - # Train - train_agent_async( - processes=2, - steps=steps, - outdir=self.tmpdir, - eval_interval=200, - eval_n_steps=None, - eval_n_episodes=5, - successful_score=successful_return, - make_env=make_env, - make_agent=make_actor, - ) - - stop_event.set() - learner.join() + if steps > 0: + make_actor, learner, stop_event =\ + agent.setup_actor_learner_training(n_actors=2) + + learner.start() + + train_agent_async( + processes=2, + steps=steps, + outdir=self.tmpdir, + eval_interval=200, + eval_n_steps=None, + eval_n_episodes=5, + successful_score=successful_return, + make_env=make_env, + make_agent=make_actor, + ) + + stop_event.set() + learner.join() # Test - n_test_runs = 5 - eval_returns = run_evaluation_episodes( - test_env, - agent, - n_steps=None, - n_episodes=n_test_runs, - ) - n_succeeded = np.sum(np.asarray(eval_returns) >= successful_return) - if require_success: - self.assertEqual(n_succeeded, n_test_runs) - # Save - agent.save(self.agent_dirname) - agent.replay_buffer.save(self.rbuf_filename) + # Because in actor-learner traininig the model can be updated between + # evaluation and saving, it is difficult too guarantee the learned + # model succeeds. Thus we only check if the training was successful. + + if require_success: + assert os.path.exists(os.path.join(self.tmpdir, 'successful')) @testing.attr.slow @testing.attr.gpu def test_actor_learner_training_gpu(self): self._test_actor_learner_training(0, steps=100000) - self._test_actor_learner_training(0, steps=0, load_model=True) @testing.attr.slow def test_actor_learner_training_cpu(self): self._test_actor_learner_training(-1, steps=100000) - self._test_actor_learner_training(-1, steps=0, load_model=True) @testing.attr.gpu def test_actor_learner_training_gpu_fast(self): self._test_actor_learner_training(0, steps=10, require_success=False) - self._test_actor_learner_training( - 0, steps=0, load_model=True, require_success=False) def test_actor_learner_training_cpu_fast(self): self._test_actor_learner_training(-1, steps=10, require_success=False) - self._test_actor_learner_training( - -1, steps=0, load_model=True, require_success=False) From d3c64e59a031dddcacbc5ebf771b8db70ac75399 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 02:54:21 +0900 Subject: [PATCH 07/28] Record statistics in loss computation --- chainerrl/agents/dqn.py | 50 +++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 35039d41a..b7bb92ea9 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -7,6 +7,7 @@ standard_library.install_aliases() # NOQA import copy +import collections from logging import getLogger import multiprocessing as mp import threading @@ -15,6 +16,7 @@ import chainer from chainer import cuda import chainer.functions as F +import numpy as np import chainerrl from chainerrl import agent @@ -26,6 +28,11 @@ from chainerrl.replay_buffer import ReplayUpdater +def _mean_or_nan(xs): + """Return its mean a non-empty sequence, numpy.nan for a empty one.""" + return np.mean(xs) if xs else np.nan + + def compute_value_loss(y, t, clip_delta=True, batch_accumulator='mean'): """Compute a loss for value prediction problem. @@ -128,10 +135,6 @@ class DQN(agent.AttributeSavingMixin, agent.BatchAgent): target_update_method (str): 'hard' or 'soft'. soft_update_tau (float): Tau of soft target update. n_times_update (int): Number of repetition of update - average_q_decay (float): Decay rate of average Q, only used for - recording statistics - average_loss_decay (float): Decay rate of average loss, only used for - recording statistics batch_accumulator (str): 'mean' or 'sum' episodic_update_len (int or None): Subsequences of this length are used for update if set int and episodic_update=True @@ -152,8 +155,7 @@ def __init__(self, q_function, optimizer, replay_buffer, gamma, phi=lambda x: x, target_update_method='hard', soft_update_tau=1e-2, - n_times_update=1, average_q_decay=0.999, - average_loss_decay=0.99, + n_times_update=1, batch_accumulator='mean', episodic_update_len=None, logger=getLogger(__name__), @@ -207,10 +209,10 @@ def __init__(self, q_function, optimizer, replay_buffer, gamma, self.sync_target_network() # For backward compatibility self.target_q_function = self.target_model - self.average_q = 0 - self.average_q_decay = average_q_decay - self.average_loss = 0 - self.average_loss_decay = average_loss_decay + + # Statistics + self.q_record = collections.deque(maxlen=1000) + self.loss_record = collections.deque(maxlen=100) # Recurrent states of the model self.train_recurrent_states = None @@ -277,9 +279,7 @@ def update(self, experiences, errors_out=None): if has_weight: self.replay_buffer.update_errors(errors_out) - # Update stats - self.average_loss *= self.average_loss_decay - self.average_loss += (1 - self.average_loss_decay) * float(loss.array) + self.loss_record.append(float(loss.array)) self.model.cleargrads() loss.backward() @@ -295,10 +295,7 @@ def update_from_episodes(self, episodes, errors_out=None): batch_states=self.batch_states, ) loss = self._compute_loss(exp_batch, errors_out=None) - # Update stats - self.average_loss *= self.average_loss_decay - self.average_loss += (1 - self.average_loss_decay) * float(loss.array) - self.optimizer.update(lambda: loss) + self.loss_record.append(float(loss.array)) def _compute_target_values(self, exp_batch): batch_next_state = exp_batch['next_state'] @@ -354,6 +351,8 @@ def _compute_loss(self, exp_batch, errors_out=None): """ y, t = self._compute_y_and_t(exp_batch) + self.q_record.extend(cuda.to_cpu(y.array).ravel()) + if errors_out is not None: del errors_out[:] delta = F.absolute(y - t) @@ -379,10 +378,6 @@ def act(self, obs): q = float(action_value.max.array) action = cuda.to_cpu(action_value.greedy_actions.array)[0] - # Update stats - self.average_q *= self.average_q_decay - self.average_q += (1 - self.average_q_decay) * q - self.logger.debug('t:%s q:%s action_value:%s', self.t, q, action_value) return action @@ -426,10 +421,6 @@ def act_and_train(self, obs, reward): action = self.explorer.select_action( self.t, lambda: greedy_action, action_value=action_value) - # Update stats - self.average_q *= self.average_q_decay - self.average_q += (1 - self.average_q_decay) * q - self.t += 1 self.last_state = obs self.last_action = action @@ -462,7 +453,6 @@ def batch_act_and_train(self, batch_obs): with chainer.using_config('train', False), chainer.no_backprop_mode(): batch_av = self._evaluate_model_and_update_train_recurrent_states( batch_obs) - batch_maxq = batch_av.max.array batch_argmax = cuda.to_cpu(batch_av.greedy_actions.array) batch_action = [ self.explorer.select_action( @@ -473,10 +463,6 @@ def batch_act_and_train(self, batch_obs): self.batch_last_obs = list(batch_obs) self.batch_last_action = list(batch_action) - # Update stats - self.average_q *= self.average_q_decay - self.average_q += (1 - self.average_q_decay) * float(batch_maxq.mean()) - return batch_action def batch_act(self, batch_obs): @@ -673,7 +659,7 @@ def stop_episode(self): def get_statistics(self): return [ - ('average_q', self.average_q), - ('average_loss', self.average_loss), + ('average_q', _mean_or_nan(self.q_record)), + ('average_loss', _mean_or_nan(self.loss_record)), ('n_updates', self.optimizer.t), ] From 027a208990b091cd76242bb8df326caa7d33909d Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 02:56:15 +0900 Subject: [PATCH 08/28] Remove process_idx from messages --- chainerrl/agents/dqn.py | 22 +++++++++++----------- chainerrl/agents/state_q_function_actor.py | 10 +++++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index b7bb92ea9..c79730198 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -562,12 +562,12 @@ def stop_episode_and_train(self, state, reward, done=False): self.train_recurrent_states = None self.replay_buffer.stop_current_episode() - def _poll_pipe(self, pipe, shared_model): + def _poll_pipe(self, actor_idx, pipe, shared_model): while pipe.poll(): - i, cmd, data = pipe.recv() + cmd, data = pipe.recv() self.logger.debug( 'Learner thread received a message from actoor %s: %s %s', - i, cmd, data) + actor_idx, cmd, data) if cmd == 'get_statistics': pipe.send(self.get_statistics()) elif cmd == 'load': @@ -580,15 +580,15 @@ def _poll_pipe(self, pipe, shared_model): raise RuntimeError( 'Unknown command from actor: {}'.format(cmd)) - def _poll_queue(self, queue): + def _poll_queue(self, actor_idx, queue): while not queue.empty(): - i, cmd, data = queue.get() + cmd, data = queue.get() if cmd == 'transition': - self.replay_buffer.append(**data, env_id=i) + self.replay_buffer.append(**data, env_id=actor_idx) self.t += 1 elif cmd == 'stop_episode': assert data is None - self.replay_buffer.stop_current_episode(env_id=i) + self.replay_buffer.stop_current_episode(env_id=actor_idx) else: raise RuntimeError( 'Unknown command from actor: {}'.format(cmd)) @@ -597,11 +597,11 @@ def _learner_loop(self, shared_model, queues, pipes, stop_event): # To stop this loop, call stop_event.set() while not stop_event.wait(0): # Poll actors for messages - for pipe in pipes: - self._poll_pipe(pipe, shared_model) + for i, pipe in enumerate(pipes): + self._poll_pipe(i, pipe, shared_model) # Poll actors for transitions - for queue in queues: - self._poll_queue(queue) + for i, queue in enumerate(queues): + self._poll_queue(i, queue) # Update model if possible if self.replay_updater.update_if_necessary(self.t): copy_param(source_link=self.model, target_link=shared_model) diff --git a/chainerrl/agents/state_q_function_actor.py b/chainerrl/agents/state_q_function_actor.py index d3d8fe5b5..c3c8fd1fe 100644 --- a/chainerrl/agents/state_q_function_actor.py +++ b/chainerrl/agents/state_q_function_actor.py @@ -59,9 +59,9 @@ def act(self, obs): return action def _send_to_learner(self, transition, stop_episode=False): - self.queue.put((self.process_idx, 'transition', transition)) + self.queue.put(('transition', transition)) if stop_episode: - self.queue.put((self.process_idx, 'stop_episode', None)) + self.queue.put(('stop_episode', None)) def act_and_train(self, obs, reward): @@ -113,13 +113,13 @@ def stop_episode(self): pass def save(self, dirname): - self.pipe.send((self.process_idx, 'save', dirname)) + self.pipe.send(('save', dirname)) self.pipe.recv() def load(self, dirname): - self.pipe.send((self.process_idx, 'load', dirname)) + self.pipe.send(('load', dirname)) self.pipe.recv() def get_statistics(self): - self.pipe.send((self.process_idx, 'get_statistics', None)) + self.pipe.send(('get_statistics', None)) return self.pipe.recv() From 5dcf397caae6d07699496459bd3b1bd28c436276 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 03:47:39 +0900 Subject: [PATCH 09/28] Support recurrent in actor-learner training --- chainerrl/agents/dqn.py | 1 + chainerrl/agents/state_q_function_actor.py | 100 +++++++++++++++------ tests/agents_tests/test_dqn.py | 5 +- 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index c79730198..5ba88a9d0 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -638,6 +638,7 @@ def make_actor(i): phi=self.phi, batch_states=self.batch_states, logger=self.logger, + recurrent=self.recurrent, ) stop_event = threading.Event() diff --git a/chainerrl/agents/state_q_function_actor.py b/chainerrl/agents/state_q_function_actor.py index c3c8fd1fe..5e15c6288 100644 --- a/chainerrl/agents/state_q_function_actor.py +++ b/chainerrl/agents/state_q_function_actor.py @@ -28,6 +28,7 @@ def __init__( model, explorer, phi=lambda x: x, + recurrent=False, logger=getLogger(__name__), batch_states=batch_states, ): @@ -36,26 +37,47 @@ def __init__( self.model = model self.explorer = explorer self.phi = phi + self.recurrent = recurrent self.logger = logger self.batch_states = batch_states self.t = 0 self.last_state = None - self.last_state = None - self.batch_last_action = None - self.batch_last_action = None + self.last_action = None + + # Recurrent states of the model + self.train_recurrent_states = None + self.train_prev_recurrent_states = None + self.test_recurrent_states = None @property def xp(self): return self.model.xp - def compute_action_value(self, batch_obs): - with chainer.using_config('train', False), chainer.no_backprop_mode(): - return self.model(self.batch_states(batch_obs, self.xp, self.phi)) + def _evaluate_model_and_update_train_recurrent_states(self, batch_obs): + batch_xs = self.batch_states(batch_obs, self.xp, self.phi) + if self.recurrent: + self.train_prev_recurrent_states = self.train_recurrent_states + batch_av, self.train_recurrent_states = self.model( + batch_xs, self.train_recurrent_states) + else: + batch_av = self.model(batch_xs) + return batch_av + + def _evaluate_model_and_update_test_recurrent_states(self, batch_obs): + batch_xs = self.batch_states(batch_obs, self.xp, self.phi) + if self.recurrent: + batch_av, self.test_recurrent_states = self.model( + batch_xs, self.test_recurrent_states) + else: + batch_av = self.model(batch_xs) + return batch_av def act(self, obs): - action_value = self.compute_action_value([obs]) - action = cuda.to_cpu(action_value.greedy_actions.array)[0] + with chainer.using_config('train', False), chainer.no_backprop_mode(): + action_value =\ + self._evaluate_model_and_update_test_recurrent_states([obs]) + action = cuda.to_cpu(action_value.greedy_actions.array)[0] return action def _send_to_learner(self, transition, stop_episode=False): @@ -65,8 +87,10 @@ def _send_to_learner(self, transition, stop_episode=False): def act_and_train(self, obs, reward): - action_value = self.compute_action_value([obs]) - greedy_action = cuda.to_cpu(action_value.greedy_actions.array)[0] + with chainer.using_config('train', False), chainer.no_backprop_mode(): + action_value =\ + self._evaluate_model_and_update_train_recurrent_states([obs]) + greedy_action = cuda.to_cpu(action_value.greedy_actions.array)[0] action = self.explorer.select_action( self.t, lambda: greedy_action, action_value=action_value) @@ -75,15 +99,23 @@ def act_and_train(self, obs, reward): if self.last_state is not None: assert self.last_action is not None # Add a transition to the replay buffer - self._send_to_learner(dict( - state=self.last_state, - action=self.last_action, - reward=reward, - next_state=obs, - next_action=action, - is_state_terminal=False), - stop_episode=False, - ) + transition = { + 'state': self.last_state, + 'action': self.last_action, + 'reward': reward, + 'next_state': obs, + 'is_state_terminal': False, + } + if self.recurrent: + transition['recurrent_state'] =\ + self.model.get_recurrent_state_at( + self.train_prev_recurrent_states, + 0, unwrap_variable=True) + self.train_prev_recurrent_states = None + transition['next_recurrent_state'] =\ + self.model.get_recurrent_state_at( + self.train_recurrent_states, 0, unwrap_variable=True) + self._send_to_learner(transition) self.last_state = obs self.last_action = action @@ -96,21 +128,31 @@ def stop_episode_and_train(self, state, reward, done=False): assert self.last_action is not None # Add a transition to the replay buffer - self._send_to_learner(dict( - state=self.last_state, - action=self.last_action, - reward=reward, - next_state=state, - next_action=self.last_action, - is_state_terminal=done), - stop_episode=True, - ) + transition = { + 'state': self.last_state, + 'action': self.last_action, + 'reward': reward, + 'next_state': state, + 'is_state_terminal': done, + } + if self.recurrent: + transition['recurrent_state'] =\ + self.model.get_recurrent_state_at( + self.train_prev_recurrent_states, 0, unwrap_variable=True) + self.train_prev_recurrent_states = None + transition['next_recurrent_state'] =\ + self.model.get_recurrent_state_at( + self.train_recurrent_states, 0, unwrap_variable=True) + self._send_to_learner(transition, stop_episode=True) self.last_state = None self.last_action = None + if self.recurrent: + self.train_recurrent_states = None def stop_episode(self): - pass + if self.recurrent: + self.test_recurrent_states = None def save(self, dirname): self.pipe.send(('save', dirname)) diff --git a/tests/agents_tests/test_dqn.py b/tests/agents_tests/test_dqn.py index 3f562642a..d4049f5f1 100644 --- a/tests/agents_tests/test_dqn.py +++ b/tests/agents_tests/test_dqn.py @@ -61,9 +61,10 @@ def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): replay_start_size=100, target_update_interval=100) -# Actor-learner training with recurrent models is currently not supported class TestDQNOnDiscretePOABC( - _TestBatchTrainingMixin, base._TestDQNOnDiscretePOABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnDiscretePOABC): def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): return DQN(q_func, opt, rbuf, gpu=gpu, gamma=0.9, explorer=explorer, From ca35585323d38980542dc33ba139a59413f7af0d Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 12:06:26 +0900 Subject: [PATCH 10/28] Restore unintentionally deleted update --- chainerrl/agents/dqn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 5ba88a9d0..ab6860486 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -295,6 +295,7 @@ def update_from_episodes(self, episodes, errors_out=None): batch_states=self.batch_states, ) loss = self._compute_loss(exp_batch, errors_out=None) + self.optimizer.update(lambda: loss) self.loss_record.append(float(loss.array)) def _compute_target_values(self, exp_batch): From a635b07fbcdc8d5578ed5d826240cfc8ed0aa2fe Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 12:07:17 +0900 Subject: [PATCH 11/28] Restore unintentionally deleted stop_current_episode --- chainerrl/agents/dqn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index ab6860486..36961007f 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -504,6 +504,7 @@ def batch_observe_and_train(self, batch_obs, batch_reward, if batch_reset[i] or batch_done[i]: self.batch_last_obs[i] = None self.batch_last_action[i] = None + self.replay_buffer.stop_current_episode(env_id=i) self.replay_updater.update_if_necessary(self.t) if self.recurrent: From 04101b34a967e6e5d1b54e120f84f1d9843ac3b5 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 12:44:19 +0900 Subject: [PATCH 12/28] Support actor-learner training in IQN --- chainerrl/agents/__init__.py | 1 + chainerrl/agents/iqn.py | 48 ++++++++++++++++++++++++++++++++++ tests/agents_tests/test_iqn.py | 9 +++++-- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/chainerrl/agents/__init__.py b/chainerrl/agents/__init__.py index e0a32204b..2bdce3dd3 100644 --- a/chainerrl/agents/__init__.py +++ b/chainerrl/agents/__init__.py @@ -9,6 +9,7 @@ from chainerrl.agents.double_pal import DoublePAL # NOQA from chainerrl.agents.dpp import DPP # NOQA from chainerrl.agents.dqn import DQN # NOQA +from chainerrl.agents.implicit_quantile_state_q_function_actor import ImplicitQuantileStateQFunctionActor # NOQA from chainerrl.agents.iqn import IQN # NOQA from chainerrl.agents.nsq import NSQ # NOQA from chainerrl.agents.pal import PAL # NOQA diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index 7b1f3d168..c4d9fc1fe 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -6,12 +6,17 @@ from future import standard_library standard_library.install_aliases() # NOQA +import copy +import multiprocessing as mp +import threading + import chainer from chainer import cuda import chainer.functions as F import chainer.links as L import numpy as np +import chainerrl from chainerrl.action_value import QuantileDiscreteActionValue from chainerrl.agents import dqn from chainerrl.links import StatelessRecurrentChainList @@ -378,3 +383,46 @@ def _evaluate_model_and_update_test_recurrent_states(self, batch_obs): 0, 1, size=(len(batch_obs), self.quantile_thresholds_K)).astype('f') return tau2av(taus_tilde) + + def setup_actor_learner_training(self, n_actors): + # Override DQN.setup_actor_learner_training to use + # `ImplicitQuantileStateQFunctionActor`, not `StateQFunctionActor`. + + # Make a copy on shared memory and share among actors and a learner + shared_model = copy.deepcopy(self.model).to_cpu() + shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( + shared_model) + + # Queues are used for actors to send transitions to a learner + queues = [mp.Queue() for _ in range(n_actors)] + + # Pipes are used for infrequent communication + learner_pipes, actor_pipes = list(zip(*[ + mp.Pipe() for _ in range(n_actors)])) + + def make_actor(i): + chainerrl.misc.async_.set_shared_params( + shared_model, shared_arrays) + return chainerrl.agents.ImplicitQuantileStateQFunctionActor( + queue=queues[i], + pipe=actor_pipes[i], + model=shared_model, + explorer=self.explorer, + phi=self.phi, + batch_states=self.batch_states, + logger=self.logger, + recurrent=self.recurrent, + ) + + stop_event = threading.Event() + + learner = threading.Thread( + target=self._learner_loop, + kwargs=dict( + shared_model=shared_model, + queues=queues, + pipes=learner_pipes, + stop_event=stop_event, + ) + ) + return make_actor, learner, stop_event diff --git a/tests/agents_tests/test_iqn.py b/tests/agents_tests/test_iqn.py index 866c6f047..bf4b73959 100644 --- a/tests/agents_tests/test_iqn.py +++ b/tests/agents_tests/test_iqn.py @@ -16,6 +16,7 @@ from chainer import testing import basetest_dqn_like as base +from basetest_training import _TestActorLearnerTrainingMixin from basetest_training import _TestBatchTrainingMixin import chainerrl from chainerrl.agents import iqn @@ -26,7 +27,9 @@ 'quantile_thresholds_N_prime': [1, 7], })) class TestIQNOnDiscreteABC( - _TestBatchTrainingMixin, base._TestDQNOnDiscreteABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnDiscreteABC): def make_q_func(self, env): obs_size = env.observation_space.low.size @@ -53,7 +56,9 @@ def make_dqn_agent(self, env, q_func, opt, explorer, rbuf, gpu): class TestIQNOnDiscretePOABC( - _TestBatchTrainingMixin, base._TestDQNOnDiscretePOABC): + _TestActorLearnerTrainingMixin, + _TestBatchTrainingMixin, + base._TestDQNOnDiscretePOABC): def make_q_func(self, env): obs_size = env.observation_space.low.size From c1567cb28ac071fe96bdc031b36fffe19f5a4926 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 12:46:13 +0900 Subject: [PATCH 13/28] Add a missing file --- ...mplicit_quantile_state_q_function_actor.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 chainerrl/agents/implicit_quantile_state_q_function_actor.py diff --git a/chainerrl/agents/implicit_quantile_state_q_function_actor.py b/chainerrl/agents/implicit_quantile_state_q_function_actor.py new file mode 100644 index 000000000..ac26d62e6 --- /dev/null +++ b/chainerrl/agents/implicit_quantile_state_q_function_actor.py @@ -0,0 +1,53 @@ +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +from __future__ import absolute_import +from builtins import * # NOQA +from future import standard_library +standard_library.install_aliases() # NOQA + +from chainerrl.agents import state_q_function_actor + + +class ImplicitQuantileStateQFunctionActor( + state_q_function_actor.StateQFunctionActor): + """Actor that acts according to the implicit quantile Q-function. + + This actor specialization is required because the interface of an implicit + quantile Q-function is different from that of a usual Q-function. + """ + + def __init__(self, *args, **kwargs): + # K=32 were used in the IQN paper's experiments + # (personal communication) + self.quantile_thresholds_K = kwargs.pop('quantile_thresholds_K', 32) + super().__init__(*args, **kwargs) + + @property + def xp(self): + return self.model.xp + + def _evaluate_model_and_update_train_recurrent_states(self, batch_obs): + batch_xs = self.batch_states(batch_obs, self.xp, self.phi) + if self.recurrent: + self.train_prev_recurrent_states = self.train_recurrent_states + tau2av, self.train_recurrent_states = self.model( + batch_xs, self.train_recurrent_states) + else: + tau2av = self.model(batch_xs) + taus_tilde = self.xp.random.uniform( + 0, 1, + size=(len(batch_obs), self.quantile_thresholds_K)).astype('f') + return tau2av(taus_tilde) + + def _evaluate_model_and_update_test_recurrent_states(self, batch_obs): + batch_xs = self.batch_states(batch_obs, self.xp, self.phi) + if self.recurrent: + tau2av, self.test_recurrent_states = self.model( + batch_xs, self.test_recurrent_states) + else: + tau2av = self.model(batch_xs) + taus_tilde = self.xp.random.uniform( + 0, 1, + size=(len(batch_obs), self.quantile_thresholds_K)).astype('f') + return tau2av(taus_tilde) From 2296f5c5269be7f7440abe8998803a0be962ec48 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 13:04:46 +0900 Subject: [PATCH 14/28] Record q for IQN --- chainerrl/agents/iqn.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index c4d9fc1fe..9bcc5ee72 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -345,6 +345,8 @@ def _compute_loss(self, exp_batch, errors_out=None): with chainer.no_backprop_mode(): t = self._compute_target_values(exp_batch) + self.q_record.extend(cuda.to_cpu(y.array.mean(axis=1)).ravel()) + eltwise_loss = compute_eltwise_huber_quantile_loss(y, t, taus) if errors_out is not None: From 1ef78603a9bf174940b70e1145b5c17b7ce0a07d Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 13:04:57 +0900 Subject: [PATCH 15/28] Forward quantile_thresholds_K --- chainerrl/agents/iqn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index 9bcc5ee72..5dff3b1d2 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -414,6 +414,7 @@ def make_actor(i): batch_states=self.batch_states, logger=self.logger, recurrent=self.recurrent, + quantile_thresholds_K=self.quantile_thresholds_K, ) stop_event = threading.Event() From 4f5a9f13c06d3aec022fb90359e13c2aa7fddaea Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 15:05:21 +0900 Subject: [PATCH 16/28] Support copy_param between different devices --- chainerrl/misc/copy_param.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/chainerrl/misc/copy_param.py b/chainerrl/misc/copy_param.py index 97469c950..f15451f94 100644 --- a/chainerrl/misc/copy_param.py +++ b/chainerrl/misc/copy_param.py @@ -12,6 +12,7 @@ def copy_param(target_link, source_link): """Copy parameters of a link to another link.""" target_params = dict(target_link.namedparams()) + target_device = target_link.device for param_name, param in source_link.namedparams(): if target_params[param_name].array is None: raise TypeError( @@ -19,15 +20,15 @@ def copy_param(target_link, source_link): 'not initialized.\nPlease try to forward dummy input ' 'beforehand to determine parameter shape of the model.'.format( param_name)) - target_params[param_name].array[...] = param.array + target_params[param_name].array[...] = target_device.send(param.array) # Copy Batch Normalization's statistics target_links = dict(target_link.namedlinks()) for link_name, link in source_link.namedlinks(): if isinstance(link, L.BatchNormalization): target_bn = target_links[link_name] - target_bn.avg_mean[...] = link.avg_mean - target_bn.avg_var[...] = link.avg_var + target_bn.avg_mean[...] = target_device.send(link.avg_mean) + target_bn.avg_var[...] = target_device.send(link.avg_var) def soft_copy_param(target_link, source_link, tau): From 4d384607154c38f54363cee9760dcaca1c548d17 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 15:06:00 +0900 Subject: [PATCH 17/28] Set OMP_NUM_THREADS=1 --- examples/grasping/train_dqn_batch_grasping.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/grasping/train_dqn_batch_grasping.py b/examples/grasping/train_dqn_batch_grasping.py index 33d19577e..36a49eb9d 100644 --- a/examples/grasping/train_dqn_batch_grasping.py +++ b/examples/grasping/train_dqn_batch_grasping.py @@ -9,6 +9,9 @@ import functools import os +# Prevent numpy from using multiple threads +os.environ['OMP_NUM_THREADS'] = '1' # NOQA + import chainer from chainer import functions as F from chainer import links as L From f97903ff684764f094ea417f425e18dd6f026ff3 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 15:06:12 +0900 Subject: [PATCH 18/28] Rename function --- examples/grasping/train_dqn_batch_grasping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/grasping/train_dqn_batch_grasping.py b/examples/grasping/train_dqn_batch_grasping.py index 36a49eb9d..479edff4b 100644 --- a/examples/grasping/train_dqn_batch_grasping.py +++ b/examples/grasping/train_dqn_batch_grasping.py @@ -305,7 +305,7 @@ def phi(x): eval_stats['stdev'])) else: - make_actor, learner, stop_event = agent.start_actor_learner_training( + make_actor, learner, stop_event = agent.setup_actor_learner_training( args.num_envs) learner.start() From 3e779351799c24bf00c725e8c77583a167702938 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 16:20:08 +0900 Subject: [PATCH 19/28] Use poller thread to parallelize learning and polling --- chainerrl/agents/dqn.py | 86 ++++++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 36961007f..fdfa02ffb 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -199,6 +199,11 @@ def __init__(self, q_function, optimizer, replay_buffer, gamma, replay_start_size=replay_start_size, update_interval=update_interval, ) + self.minibatch_size = minibatch_size + self.episodic_update_len = episodic_update_len + self.replay_start_size = replay_start_size + self.update_interval = update_interval + assert target_update_interval % update_interval == 0,\ "target_update_interval should be a multiple of update_interval" @@ -564,7 +569,15 @@ def stop_episode_and_train(self, state, reward, done=False): self.train_recurrent_states = None self.replay_buffer.stop_current_episode() - def _poll_pipe(self, actor_idx, pipe, shared_model): + def _can_start_replay(self): + if len(self.replay_buffer) < self.replay_start_size: + return False + if (self.recurrent + and self.replay_buffer.n_episodes < self.minibatch_size): + return False + return True + + def _poll_pipe(self, actor_idx, pipe): while pipe.poll(): cmd, data = pipe.recv() self.logger.debug( @@ -582,39 +595,72 @@ def _poll_pipe(self, actor_idx, pipe, shared_model): raise RuntimeError( 'Unknown command from actor: {}'.format(cmd)) - def _poll_queue(self, actor_idx, queue): + def _poll_queue(self, actor_idx, queue, replay_buffer_lock): while not queue.empty(): cmd, data = queue.get() if cmd == 'transition': - self.replay_buffer.append(**data, env_id=actor_idx) + with replay_buffer_lock: + self.replay_buffer.append(**data, env_id=actor_idx) self.t += 1 elif cmd == 'stop_episode': assert data is None - self.replay_buffer.stop_current_episode(env_id=actor_idx) + with replay_buffer_lock: + self.replay_buffer.stop_current_episode(env_id=actor_idx) else: raise RuntimeError( 'Unknown command from actor: {}'.format(cmd)) - def _learner_loop(self, shared_model, queues, pipes, stop_event): + def _learner_loop(self, queues, pipes, shared_model, replay_buffer_lock, + stop_event): + + poller = threading.Thread( + target=self._poller_loop, + kwargs=dict( + queues=queues, + pipes=pipes, + replay_buffer_lock=replay_buffer_lock, + stop_event=stop_event, + ) + ) + poller.start() + # To stop this loop, call stop_event.set() while not stop_event.wait(0): + time.sleep(1e-6) + # Update model if possible + if not self._can_start_replay(): + continue + if self.recurrent: + with replay_buffer_lock: + episodes = self.replay_buffer.sample_episodes( + self.minibatch_size, self.episodic_update_len) + self.update_from_episodes(episodes) + else: + with replay_buffer_lock: + transitions = self.replay_buffer.sample( + self.minibatch_size) + self.update(transitions) + copy_param(source_link=self.model, + target_link=shared_model) + # To keep the ratio of target updates to model updates, + # here we calculate back the effective current timestep + # from update_interval and number of updates so far. + effective_timestep = self.optimizer.t * self.update_interval + if effective_timestep % self.target_update_interval == 0: + self.sync_target_network() + + poller.join() + + def _poller_loop(self, queues, pipes, replay_buffer_lock, stop_event): + # To stop this loop, call stop_event.set() + while not stop_event.wait(0): + time.sleep(1e-6) # Poll actors for messages for i, pipe in enumerate(pipes): - self._poll_pipe(i, pipe, shared_model) + self._poll_pipe(i, pipe) # Poll actors for transitions for i, queue in enumerate(queues): - self._poll_queue(i, queue) - # Update model if possible - if self.replay_updater.update_if_necessary(self.t): - copy_param(source_link=self.model, target_link=shared_model) - # To keep the ratio of target updates to model updates, - # here we calculate back the effective current timestep from - # update_interval and number of updates so far. - effective_timestep = ( - self.optimizer.t * self.replay_updater.update_interval) - if effective_timestep % self.target_update_interval == 0: - self.sync_target_network() - time.sleep(1e-6) + self._poll_queue(i, queue, replay_buffer_lock) def setup_actor_learner_training(self, n_actors): # Make a copy on shared memory and share among actors and a learner @@ -644,13 +690,15 @@ def make_actor(i): ) stop_event = threading.Event() + replay_buffer_lock = threading.Lock() learner = threading.Thread( target=self._learner_loop, kwargs=dict( - shared_model=shared_model, queues=queues, pipes=learner_pipes, + shared_model=shared_model, + replay_buffer_lock=replay_buffer_lock, stop_event=stop_event, ) ) From 5a3cf6d8fece2de3b2530c7ba5310fb3ff300b2c Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 17:11:50 +0900 Subject: [PATCH 20/28] Use replay_buffer_lock in IQN as well --- chainerrl/agents/iqn.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index 5dff3b1d2..bfa89ab7e 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -418,6 +418,7 @@ def make_actor(i): ) stop_event = threading.Event() + replay_buffer_lock = threading.Lock() learner = threading.Thread( target=self._learner_loop, @@ -425,6 +426,7 @@ def make_actor(i): shared_model=shared_model, queues=queues, pipes=learner_pipes, + replay_buffer_lock=replay_buffer_lock, stop_event=stop_event, ) ) From 339b701ab8e794b8c019daa24fa6ef2664bd52a4 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 17:59:11 +0900 Subject: [PATCH 21/28] Support mixing cpu and gpu recurrent states --- chainerrl/links/stateless_recurrent.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/chainerrl/links/stateless_recurrent.py b/chainerrl/links/stateless_recurrent.py index 93a2ce06b..c3e7433f2 100644 --- a/chainerrl/links/stateless_recurrent.py +++ b/chainerrl/links/stateless_recurrent.py @@ -312,12 +312,21 @@ def get_recurrent_state_at(link, recurrent_state, indices, unwrap_variable): raise ValueError('{} is not a recurrent link'.format(link)) +def _to_device_variable_or_ndarray(device, x): + if isinstance(x, chainer.Variable): + x.to_device(device) + return x + else: + return chainer.dataset.to_device(device, x) + + def concatenate_recurrent_states(link, split_recurrent_states): if isinstance(link, L.NStepLSTM): # shape: (n_layers, batch_size, out_size) n_layers = link.n_layers out_size = link.out_size xp = link.xp + device = link.device hs = [] cs = [] for i, srs in enumerate(split_recurrent_states): @@ -326,6 +335,8 @@ def concatenate_recurrent_states(link, split_recurrent_states): c = xp.zeros((n_layers, 1, out_size), dtype=np.float32) else: h, c = srs + h = _to_device_variable_or_ndarray(device, h) + c = _to_device_variable_or_ndarray(device, c) if h.ndim == 2: assert h.shape == (n_layers, out_size) assert c.shape == (n_layers, out_size) @@ -341,12 +352,14 @@ def concatenate_recurrent_states(link, split_recurrent_states): n_layers = link.n_layers out_size = link.out_size xp = link.xp + device = link.device hs = [] for i, srs in enumerate(split_recurrent_states): if srs is None: h = xp.zeros((n_layers, 1, out_size), dtype=np.float32) else: h = srs + h = _to_device_variable_or_ndarray(device, h) if h.ndim == 2: assert h.shape == (n_layers, out_size) # add batch axis From c895d7c70ea8b971bab88c51d7f6106997054156 Mon Sep 17 00:00:00 2001 From: muupan Date: Sun, 12 May 2019 18:00:46 +0900 Subject: [PATCH 22/28] Use pipes only --- chainerrl/agents/dqn.py | 23 +++------------------- chainerrl/agents/iqn.py | 5 ----- chainerrl/agents/state_q_function_actor.py | 6 ++---- 3 files changed, 5 insertions(+), 29 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 36961007f..b0423c19a 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -567,10 +567,8 @@ def stop_episode_and_train(self, state, reward, done=False): def _poll_pipe(self, actor_idx, pipe, shared_model): while pipe.poll(): cmd, data = pipe.recv() - self.logger.debug( - 'Learner thread received a message from actoor %s: %s %s', - actor_idx, cmd, data) if cmd == 'get_statistics': + assert data is None pipe.send(self.get_statistics()) elif cmd == 'load': self.load(data) @@ -578,14 +576,7 @@ def _poll_pipe(self, actor_idx, pipe, shared_model): elif cmd == 'save': self.save(data) pipe.send(None) - else: - raise RuntimeError( - 'Unknown command from actor: {}'.format(cmd)) - - def _poll_queue(self, actor_idx, queue): - while not queue.empty(): - cmd, data = queue.get() - if cmd == 'transition': + elif cmd == 'transition': self.replay_buffer.append(**data, env_id=actor_idx) self.t += 1 elif cmd == 'stop_episode': @@ -595,15 +586,12 @@ def _poll_queue(self, actor_idx, queue): raise RuntimeError( 'Unknown command from actor: {}'.format(cmd)) - def _learner_loop(self, shared_model, queues, pipes, stop_event): + def _learner_loop(self, shared_model, pipes, stop_event): # To stop this loop, call stop_event.set() while not stop_event.wait(0): # Poll actors for messages for i, pipe in enumerate(pipes): self._poll_pipe(i, pipe, shared_model) - # Poll actors for transitions - for i, queue in enumerate(queues): - self._poll_queue(i, queue) # Update model if possible if self.replay_updater.update_if_necessary(self.t): copy_param(source_link=self.model, target_link=shared_model) @@ -622,9 +610,6 @@ def setup_actor_learner_training(self, n_actors): shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( shared_model) - # Queues are used for actors to send transitions to a learner - queues = [mp.Queue() for _ in range(n_actors)] - # Pipes are used for infrequent communication learner_pipes, actor_pipes = list(zip(*[ mp.Pipe() for _ in range(n_actors)])) @@ -633,7 +618,6 @@ def make_actor(i): chainerrl.misc.async_.set_shared_params( shared_model, shared_arrays) return chainerrl.agents.StateQFunctionActor( - queue=queues[i], pipe=actor_pipes[i], model=shared_model, explorer=self.explorer, @@ -649,7 +633,6 @@ def make_actor(i): target=self._learner_loop, kwargs=dict( shared_model=shared_model, - queues=queues, pipes=learner_pipes, stop_event=stop_event, ) diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index 5dff3b1d2..b91fa9e3b 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -395,9 +395,6 @@ def setup_actor_learner_training(self, n_actors): shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( shared_model) - # Queues are used for actors to send transitions to a learner - queues = [mp.Queue() for _ in range(n_actors)] - # Pipes are used for infrequent communication learner_pipes, actor_pipes = list(zip(*[ mp.Pipe() for _ in range(n_actors)])) @@ -406,7 +403,6 @@ def make_actor(i): chainerrl.misc.async_.set_shared_params( shared_model, shared_arrays) return chainerrl.agents.ImplicitQuantileStateQFunctionActor( - queue=queues[i], pipe=actor_pipes[i], model=shared_model, explorer=self.explorer, @@ -423,7 +419,6 @@ def make_actor(i): target=self._learner_loop, kwargs=dict( shared_model=shared_model, - queues=queues, pipes=learner_pipes, stop_event=stop_event, ) diff --git a/chainerrl/agents/state_q_function_actor.py b/chainerrl/agents/state_q_function_actor.py index 5e15c6288..c3be1a606 100644 --- a/chainerrl/agents/state_q_function_actor.py +++ b/chainerrl/agents/state_q_function_actor.py @@ -23,7 +23,6 @@ class StateQFunctionActor(agent.AsyncAgent): def __init__( self, - queue, pipe, model, explorer, @@ -32,7 +31,6 @@ def __init__( logger=getLogger(__name__), batch_states=batch_states, ): - self.queue = queue self.pipe = pipe self.model = model self.explorer = explorer @@ -81,9 +79,9 @@ def act(self, obs): return action def _send_to_learner(self, transition, stop_episode=False): - self.queue.put(('transition', transition)) + self.pipe.send(('transition', transition)) if stop_episode: - self.queue.put(('stop_episode', None)) + self.pipe.send(('stop_episode', None)) def act_and_train(self, obs, reward): From 00a5b4a99d2523dceb921a27b13a16d668528baa Mon Sep 17 00:00:00 2001 From: muupan Date: Tue, 21 May 2019 20:34:21 +0900 Subject: [PATCH 23/28] Add StoppableThread --- chainerrl/misc/__init__.py | 1 + chainerrl/misc/stoppable_thread.py | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 chainerrl/misc/stoppable_thread.py diff --git a/chainerrl/misc/__init__.py b/chainerrl/misc/__init__.py index 1219bee5e..da0c6964a 100644 --- a/chainerrl/misc/__init__.py +++ b/chainerrl/misc/__init__.py @@ -6,3 +6,4 @@ from chainerrl.misc import env_modifiers # NOQA from chainerrl.misc.is_return_code_zero import is_return_code_zero # NOQA from chainerrl.misc.random_seed import set_random_seed # NOQA +from chainerrl.misc.stoppable_thread import StoppableThread # NOQA diff --git a/chainerrl/misc/stoppable_thread.py b/chainerrl/misc/stoppable_thread.py new file mode 100644 index 000000000..f501044d2 --- /dev/null +++ b/chainerrl/misc/stoppable_thread.py @@ -0,0 +1,20 @@ +import threading + + +class StoppableThread(threading.Thread): + """Thread with an event object to stop itself. + + Args: + stop_event (threading.Event): Event that stops the thread if it is set. + *args, **kwargs: Forwarded to `threading.Thread`. + """ + + def __init__(self, stop_event, *args, **kwargs): + super(StoppableThread, self).__init__(*args, **kwargs) + self.stop_event = stop_event + + def stop(self): + self.stop_event.set() + + def is_stopped(self): + self.stop_event.is_set() From b6c6b53d86ed4bbfed4a59215ae2fda251665a9e Mon Sep 17 00:00:00 2001 From: muupan Date: Tue, 21 May 2019 21:46:53 +0900 Subject: [PATCH 24/28] Modify actor-learner interface to accept n_updates --- chainerrl/agents/dqn.py | 100 ++++++++++++--------- chainerrl/agents/iqn.py | 26 ++++-- chainerrl/experiments/train_agent_async.py | 66 ++++++++------ chainerrl/misc/async_.py | 9 +- tests/agents_tests/basetest_training.py | 10 ++- 5 files changed, 124 insertions(+), 87 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index aa0853066..523eb1087 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -10,7 +10,6 @@ import copy from logging import getLogger import multiprocessing as mp -import threading import time import chainer @@ -578,47 +577,48 @@ def _can_start_replay(self): return True def _poll_pipe(self, actor_idx, pipe, replay_buffer_lock): - while pipe.poll(): - cmd, data = pipe.recv() - if cmd == 'get_statistics': - assert data is None - pipe.send(self.get_statistics()) - elif cmd == 'load': - self.load(data) - pipe.send(None) - elif cmd == 'save': - self.save(data) - pipe.send(None) - elif cmd == 'transition': - with replay_buffer_lock: - self.replay_buffer.append(**data, env_id=actor_idx) - elif cmd == 'stop_episode': - assert data is None - with replay_buffer_lock: - self.replay_buffer.stop_current_episode(env_id=actor_idx) - else: - raise RuntimeError( - 'Unknown command from actor: {}'.format(cmd)) + if pipe.closed: + return + try: + while pipe.poll(): + cmd, data = pipe.recv() + if cmd == 'get_statistics': + assert data is None + pipe.send(self.get_statistics()) + elif cmd == 'load': + self.load(data) + pipe.send(None) + elif cmd == 'save': + self.save(data) + pipe.send(None) + elif cmd == 'transition': + with replay_buffer_lock: + self.replay_buffer.append(**data, env_id=actor_idx) + elif cmd == 'stop_episode': + assert data is None + with replay_buffer_lock: + self.replay_buffer.stop_current_episode( + env_id=actor_idx) + else: + raise RuntimeError( + 'Unknown command from actor: {}'.format(cmd)) + except EOFError: + pipe.close() def _learner_loop(self, pipes, shared_model, replay_buffer_lock, - stop_event): - - poller = threading.Thread( - target=self._poller_loop, - kwargs=dict( - pipes=pipes, - replay_buffer_lock=replay_buffer_lock, - stop_event=stop_event, - ) - ) - poller.start() + stop_event, n_updates=None): # To stop this loop, call stop_event.set() - while not stop_event.wait(0): + while not stop_event.is_set(): time.sleep(1e-6) # Update model if possible if not self._can_start_replay(): continue + if n_updates is not None: + assert self.optimizer.t <= n_updates + if self.optimizer.t == n_updates: + stop_event.set() + break if self.recurrent: with replay_buffer_lock: episodes = self.replay_buffer.sample_episodes( @@ -638,17 +638,15 @@ def _learner_loop(self, pipes, shared_model, replay_buffer_lock, if effective_timestep % self.target_update_interval == 0: self.sync_target_network() - poller.join() - def _poller_loop(self, pipes, replay_buffer_lock, stop_event): # To stop this loop, call stop_event.set() - while not stop_event.wait(0): + while not stop_event.is_set(): time.sleep(1e-6) # Poll actors for messages for i, pipe in enumerate(pipes): self._poll_pipe(i, pipe, replay_buffer_lock) - def setup_actor_learner_training(self, n_actors): + def setup_actor_learner_training(self, n_actors, n_updates=None): # Make a copy on shared memory and share among actors and a learner shared_model = copy.deepcopy(self.model).to_cpu() shared_arrays = chainerrl.misc.async_.extract_params_as_shared_arrays( @@ -671,19 +669,33 @@ def make_actor(i): recurrent=self.recurrent, ) - stop_event = threading.Event() - replay_buffer_lock = threading.Lock() + replay_buffer_lock = mp.Lock() - learner = threading.Thread( + poller_stop_event = mp.Event() + poller = chainerrl.misc.StoppableThread( + target=self._poller_loop, + kwargs=dict( + pipes=learner_pipes, + replay_buffer_lock=replay_buffer_lock, + stop_event=poller_stop_event, + ), + stop_event=poller_stop_event, + ) + + learner_stop_event = mp.Event() + learner = chainerrl.misc.StoppableThread( target=self._learner_loop, kwargs=dict( shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, - stop_event=stop_event, - ) + stop_event=learner_stop_event, + n_updates=n_updates, + ), + stop_event=learner_stop_event, ) - return make_actor, learner, stop_event + + return make_actor, learner, poller def stop_episode(self): if self.recurrent: diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index d0c5850af..ff5fabdee 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -386,7 +386,7 @@ def _evaluate_model_and_update_test_recurrent_states(self, batch_obs): size=(len(batch_obs), self.quantile_thresholds_K)).astype('f') return tau2av(taus_tilde) - def setup_actor_learner_training(self, n_actors): + def setup_actor_learner_training(self, n_actors, n_updates=None): # Override DQN.setup_actor_learner_training to use # `ImplicitQuantileStateQFunctionActor`, not `StateQFunctionActor`. @@ -413,16 +413,30 @@ def make_actor(i): quantile_thresholds_K=self.quantile_thresholds_K, ) - stop_event = threading.Event() replay_buffer_lock = threading.Lock() - learner = threading.Thread( + poller_stop_event = mp.Event() + poller = chainerrl.misc.StoppableThread( + target=self._poller_loop, + kwargs=dict( + pipes=learner_pipes, + replay_buffer_lock=replay_buffer_lock, + stop_event=poller_stop_event, + ), + stop_event=poller_stop_event, + ) + + learner_stop_event = mp.Event() + learner = chainerrl.misc.StoppableThread( target=self._learner_loop, kwargs=dict( shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, - stop_event=stop_event, - ) + stop_event=learner_stop_event, + n_updates=n_updates, + ), + stop_event=learner_stop_event, ) - return make_actor, learner, stop_event + + return make_actor, learner, poller diff --git a/chainerrl/experiments/train_agent_async.py b/chainerrl/experiments/train_agent_async.py index bc698b1ac..6ed34e7ae 100644 --- a/chainerrl/experiments/train_agent_async.py +++ b/chainerrl/experiments/train_agent_async.py @@ -10,13 +10,15 @@ import multiprocessing as mp import os +import numpy as np + from chainerrl.experiments.evaluator import AsyncEvaluator from chainerrl.misc import async_ from chainerrl.misc import random_seed def train_loop(process_idx, env, agent, steps, outdir, counter, - episodes_counter, training_done, + episodes_counter, stop_event, max_episode_len=None, evaluator=None, eval_env=None, successful_score=None, logger=None, global_step_hooks=[]): @@ -58,7 +60,7 @@ def train_loop(process_idx, env, agent, steps, outdir, counter, reset = (episode_len == max_episode_len or info.get('needs_reset', False)) - if done or reset or global_t >= steps or training_done.value: + if done or reset or global_t >= steps or stop_event.is_set(): agent.stop_episode_and_train(obs, r, done) if process_idx == 0: @@ -75,10 +77,8 @@ def train_loop(process_idx, env, agent, steps, outdir, counter, if (eval_score is not None and successful_score is not None and eval_score >= successful_score): - with training_done.get_lock(): - if not training_done.value: - training_done.value = True - successful = True + stop_event.set() + successful = True # Break immediately in order to avoid an additional # call of agent.act_and_train break @@ -87,7 +87,7 @@ def train_loop(process_idx, env, agent, steps, outdir, counter, episodes_counter.value += 1 global_episodes = episodes_counter.value - if global_t >= steps or training_done.value: + if global_t >= steps or stop_event.is_set(): break # Start a new episode @@ -129,21 +129,26 @@ def set_shared_objects(agent, shared_objects): setattr(agent, attr, new_value) -def train_agent_async(outdir, processes, make_env, - profile=False, - steps=8 * 10 ** 7, - eval_interval=10 ** 6, - eval_n_steps=None, - eval_n_episodes=10, - max_episode_len=None, - step_offset=0, - successful_score=None, - agent=None, - make_agent=None, - global_step_hooks=[], - save_best_so_far_agent=True, - logger=None, - ): +def train_agent_async( + outdir, + processes, + make_env, + profile=False, + steps=8 * 10 ** 7, + eval_interval=10 ** 6, + eval_n_steps=None, + eval_n_episodes=10, + max_episode_len=None, + step_offset=0, + successful_score=None, + agent=None, + make_agent=None, + global_step_hooks=[], + save_best_so_far_agent=True, + logger=None, + random_seeds=None, + stop_event=None, +): """Train agent asynchronously using multiprocessing. Either `agent` or `make_agent` must be specified. @@ -171,6 +176,10 @@ def train_agent_async(outdir, processes, make_env, if the score (= mean return of evaluation episodes) exceeds the best-so-far score, the current agent is saved. logger (logging.Logger): Logger used in this function. + random_seeds (array-like of ints or None): Random seeds for processes. + If set to None, [0, 1, ..., processes-1] are used. + stop_event (multiprocessing.Event or None): Event to stop training. + If set to None, a new Event object is created and used internally. Returns: Trained agent. @@ -183,7 +192,9 @@ def train_agent_async(outdir, processes, make_env, counter = mp.Value('l', 0) episodes_counter = mp.Value('l', 0) - training_done = mp.Value('b', False) # bool + + if stop_event is None: + stop_event = mp.Event() if agent is None: assert make_agent is not None @@ -205,8 +216,11 @@ def train_agent_async(outdir, processes, make_env, logger=logger, ) + if random_seeds is None: + random_seeds = np.arange(processes) + def run_func(process_idx): - random_seed.set_random_seed(process_idx) + random_seed.set_random_seed(random_seeds[process_idx]) env = make_env(process_idx, test=False) if evaluator is None: @@ -232,7 +246,7 @@ def f(): max_episode_len=max_episode_len, evaluator=evaluator, successful_score=successful_score, - training_done=training_done, + stop_event=stop_event, eval_env=eval_env, global_step_hooks=global_step_hooks, logger=logger) @@ -250,4 +264,6 @@ def f(): async_.run_async(processes, run_func) + stop_event.set() + return agent diff --git a/chainerrl/misc/async_.py b/chainerrl/misc/async_.py index 317d20877..d246a744a 100644 --- a/chainerrl/misc/async_.py +++ b/chainerrl/misc/async_.py @@ -12,8 +12,6 @@ import chainer import numpy as np -from chainerrl.misc import random_seed - class AbnormalExitWarning(Warning): """Warning category for abnormal subprocess exit.""" @@ -125,13 +123,8 @@ def run_async(n_process, run_func): processes = [] - def set_seed_and_run(process_idx, run_func): - random_seed.set_random_seed(np.random.randint(0, 2 ** 32)) - run_func(process_idx) - for process_idx in range(n_process): - processes.append(mp.Process(target=set_seed_and_run, args=( - process_idx, run_func))) + processes.append(mp.Process(target=run_func, args=(process_idx,))) for p in processes: p.start() diff --git a/tests/agents_tests/basetest_training.py b/tests/agents_tests/basetest_training.py index 8e9cc110c..e5e9bed60 100644 --- a/tests/agents_tests/basetest_training.py +++ b/tests/agents_tests/basetest_training.py @@ -209,11 +209,11 @@ def make_env(process_idx, test): # Train if steps > 0: - make_actor, learner, stop_event =\ + make_actor, learner, poller =\ agent.setup_actor_learner_training(n_actors=2) + poller.start() learner.start() - train_agent_async( processes=2, steps=steps, @@ -224,10 +224,12 @@ def make_env(process_idx, test): successful_score=successful_return, make_env=make_env, make_agent=make_actor, + stop_event=learner.stop_event, ) - - stop_event.set() + learner.stop() learner.join() + poller.stop() + poller.join() # Test From e68611d93b92e68330f3dd619b8b281a018b7d14 Mon Sep 17 00:00:00 2001 From: muupan Date: Tue, 21 May 2019 21:47:56 +0900 Subject: [PATCH 25/28] Update example as well --- examples/grasping/train_dqn_batch_grasping.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/grasping/train_dqn_batch_grasping.py b/examples/grasping/train_dqn_batch_grasping.py index 479edff4b..67d1d97b5 100644 --- a/examples/grasping/train_dqn_batch_grasping.py +++ b/examples/grasping/train_dqn_batch_grasping.py @@ -305,11 +305,11 @@ def phi(x): eval_stats['stdev'])) else: - make_actor, learner, stop_event = agent.setup_actor_learner_training( + make_actor, learner, poller = agent.setup_actor_learner_training( args.num_envs) + poller.start() learner.start() - experiments.train_agent_async( processes=args.num_envs, make_agent=make_actor, @@ -319,10 +319,12 @@ def phi(x): eval_n_episodes=args.eval_n_runs, eval_interval=args.eval_interval, outdir=args.outdir, + stop_event=learner.stop_event, ) - - stop_event.set() + learner.stop() learner.join() + poller.stop() + poller.join() if __name__ == '__main__': From 7d48c0423a1c43c554c25ddbcb04caceec76daad Mon Sep 17 00:00:00 2001 From: muupan Date: Tue, 21 May 2019 22:41:07 +0900 Subject: [PATCH 26/28] Call device.use in case gpu>=1 --- chainerrl/agents/dqn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 523eb1087..154273b85 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -608,6 +608,7 @@ def _poll_pipe(self, actor_idx, pipe, replay_buffer_lock): def _learner_loop(self, pipes, shared_model, replay_buffer_lock, stop_event, n_updates=None): + self.model.device.use() # To stop this loop, call stop_event.set() while not stop_event.is_set(): time.sleep(1e-6) From ef21c290cd4ce2e182cec0995d25df88ddd6672d Mon Sep 17 00:00:00 2001 From: muupan Date: Tue, 21 May 2019 23:11:38 +0900 Subject: [PATCH 27/28] Synchronize model in poller, not learner --- chainerrl/agents/dqn.py | 15 +++++++++------ chainerrl/agents/iqn.py | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 154273b85..b12eccd36 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -605,9 +605,10 @@ def _poll_pipe(self, actor_idx, pipe, replay_buffer_lock): except EOFError: pipe.close() - def _learner_loop(self, pipes, shared_model, replay_buffer_lock, - stop_event, n_updates=None): + def _learner_loop(self, pipes, replay_buffer_lock, stop_event, + n_updates=None): + # Device.use should be called in a new thread self.model.device.use() # To stop this loop, call stop_event.set() while not stop_event.is_set(): @@ -630,8 +631,6 @@ def _learner_loop(self, pipes, shared_model, replay_buffer_lock, transitions = self.replay_buffer.sample( self.minibatch_size) self.update(transitions) - copy_param(source_link=self.model, - target_link=shared_model) # To keep the ratio of target updates to model updates, # here we calculate back the effective current timestep # from update_interval and number of updates so far. @@ -639,13 +638,17 @@ def _learner_loop(self, pipes, shared_model, replay_buffer_lock, if effective_timestep % self.target_update_interval == 0: self.sync_target_network() - def _poller_loop(self, pipes, replay_buffer_lock, stop_event): + def _poller_loop(self, shared_model, pipes, replay_buffer_lock, + stop_event): # To stop this loop, call stop_event.set() while not stop_event.is_set(): time.sleep(1e-6) # Poll actors for messages for i, pipe in enumerate(pipes): self._poll_pipe(i, pipe, replay_buffer_lock) + # Synchronize shared model + copy_param(source_link=self.model, + target_link=shared_model) def setup_actor_learner_training(self, n_actors, n_updates=None): # Make a copy on shared memory and share among actors and a learner @@ -676,6 +679,7 @@ def make_actor(i): poller = chainerrl.misc.StoppableThread( target=self._poller_loop, kwargs=dict( + shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, stop_event=poller_stop_event, @@ -687,7 +691,6 @@ def make_actor(i): learner = chainerrl.misc.StoppableThread( target=self._learner_loop, kwargs=dict( - shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, stop_event=learner_stop_event, diff --git a/chainerrl/agents/iqn.py b/chainerrl/agents/iqn.py index ff5fabdee..24c28b194 100644 --- a/chainerrl/agents/iqn.py +++ b/chainerrl/agents/iqn.py @@ -419,6 +419,7 @@ def make_actor(i): poller = chainerrl.misc.StoppableThread( target=self._poller_loop, kwargs=dict( + shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, stop_event=poller_stop_event, @@ -430,7 +431,6 @@ def make_actor(i): learner = chainerrl.misc.StoppableThread( target=self._learner_loop, kwargs=dict( - shared_model=shared_model, pipes=learner_pipes, replay_buffer_lock=replay_buffer_lock, stop_event=learner_stop_event, From aa4a534062402ce6ec05fe91efaaf93a6ad86679 Mon Sep 17 00:00:00 2001 From: muupan Date: Thu, 5 Mar 2020 17:44:03 +0900 Subject: [PATCH 28/28] Fix remaining conflict --- chainerrl/agents/dqn.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/chainerrl/agents/dqn.py b/chainerrl/agents/dqn.py index 7f4871685..4e055c9a7 100644 --- a/chainerrl/agents/dqn.py +++ b/chainerrl/agents/dqn.py @@ -502,11 +502,7 @@ def batch_observe_and_train(self, batch_obs, batch_reward, self.model.get_recurrent_state_at( self.train_recurrent_states, i, unwrap_variable=True) -<<<<<<< HEAD - self.replay_buffer.append(**transition, env_id=i) -======= self.replay_buffer.append(env_id=i, **transition) ->>>>>>> master if batch_reset[i] or batch_done[i]: self.batch_last_obs[i] = None self.batch_last_action[i] = None