import cv2
import multiprocessing
import multiprocessing.connection
import gym
import numpy as npThis is a wrapper for OpenAI gym game environment. We do a few things here:
Observation is tensor of size (84, 84, 4). It is four frames (images of the game screen) stacked on last axis. i.e, each channel is a frame.
Frames 00 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15
Actions a1 a1 a1 a1 a2 a2 a2 a2 a3 a3 a3 a3 a4 a4 a4 a4
Max -- -- MM MM -- -- MM MM -- -- MM MM -- -- MM MM
Stacked -- -- Stack -- -- Stack -- -- Stack -- -- Stack
class Game(gym.Wrapper): def __init__(self, env: gym.Env, is_stack: bool): gym.Wrapper.__init__(self, env)
self.is_stack = is_stack
if self.is_stack:
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=(84, 84, 4),
dtype=np.uint8)
else:
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=(84, 84, 1),
dtype=np.uint8)buffer to take the maximum of last 2 frames for each action
self.obs_2_max = np.zeros((2, 84, 84, 1), np.uint8)tensor for a stack of 4 frames
self.obs_4 = np.zeros((84, 84, 4))keep track of the episode rewards
self.rewards = []and number of lives left
self.lives = 0Executes action for 4 time steps and
returns a tuple of (observation, reward, done, episode_info).
observation: stacked 4 frames (this frame and frames for last 3 actions) as described abovereward: total reward while the action was executeddone: whether the episode finished (a life lost)episode_info: episode information if completed def step(self, action): reward = 0.
done = Nonerun for 4 steps
for i in range(4):execute the action in the OpenAI Gym environment
obs, r, done, info = self.env.step(action)add last two frames to buffer
if i >= 2:
self.obs_2_max[i % 2] = self._process_obs(obs)
reward += rget number of lives left
lives = self.env.unwrapped.ale.lives()reset if a life is lost
if lives < self.lives:
done = True
self.lives = livesstop if episode finished
if done:
breakmaintain rewards for each step
self.rewards.append(reward)
if done:if finished, set episode information if episode is over, and reset
episode_info = {"reward": sum(self.rewards),
"length": len(self.rewards)}
self.reset()
else:
episode_info = Noneget the max of last two frames
obs = self.obs_2_max.max(axis=0)push it to the stack of 4 frames
self.obs_4 = np.roll(self.obs_4, shift=-1, axis=-1)
self.obs_4[..., -1:] = obs
if self.is_stack:
return self.obs_4, reward, done, episode_info
else:
return self.obs_4[..., 3:], reward, done, episode_info def reset(self):reset OpenAI Gym environment
obs = self.env.reset()
for _ in range(30):
obs, _, _, _ = self.env.step(0)Fire and make a move
obs, _, _, _ = self.env.step(1)
obs, _, _, _ = self.env.step(2)reset caches
obs = self._process_obs(obs)
self.obs_4[..., 0:] = obs
self.obs_4[..., 1:] = obs
self.obs_4[..., 2:] = obs
self.obs_4[..., 3:] = obs
self.rewards = []
self.lives = self.env.unwrapped.ale.lives()
if self.is_stack:
return self.obs_4
else:
return self.obs_4[..., 3:] @staticmethod
def _process_obs(obs): obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
return obs[:, :, None] # Shape (84, 84, 1) @staticmethod
def _process_obs_2(obs): img = np.reshape(obs, [210, 160, 3]).astype(np.float32)
img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_LINEAR)
x_t = resized_screen[18:102, :]
x_t = np.reshape(x_t, [84, 84, 1])
return x_t.astype(np.uint8)def make_game(seed: int, is_stack: bool):create gym environment
env = gym.make('BreakoutNoFrameskip-v4')
env.seed(seed)
game = Game(env, is_stack)
return gamedef worker_process(remote: multiprocessing.connection.Connection, seed: int):create game
game = make_game(seed, is_stack=True)wait for instructions from the connection and execute them
while True:
cmd, data = remote.recv()
if cmd == "step":
remote.send(game.step(data))
elif cmd == "reset":
remote.send(game.reset())
elif cmd == "close":
remote.close()
break
else:
raise NotImplementedErrorclass Worker(object): child: multiprocessing.connection.Connection
process: multiprocessing.Process def __init__(self, seed):
self.child, parent = multiprocessing.Pipe()
self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
self.process.start()