我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gym.Wrapper()。
def get_wrapper_by_name(env, classname): """Given an a gym environment possibly wrapped multiple times, returns a wrapper of class named classname or raises ValueError if no such wrapper was applied Parameters ---------- env: gym.Env of gym.Wrapper gym environment classname: str name of the wrapper Returns ------- wrapper: gym.Wrapper wrapper named classname """ currentenv = env while True: if classname == currentenv.class_name(): return currentenv elif isinstance(currentenv, gym.Wrapper): currentenv = currentenv.env else: raise ValueError("Couldn't find wrapper named %s" % classname)
def env_wrapper_by_name(env, classname): """Given an a gym environment possibly wrapped multiple times, returns a wrapper of class named classname or raises ValueError if no such wrapper was applied Parameters ---------- env: gym.Env of gym.Wrapper gym environment classname: str name of the wrapper Returns ------- wrapper: gym.Wrapper wrapper named classname """ currentenv = env while True: if classname == currentenv.class_name(): return currentenv elif isinstance(currentenv, gym.Wrapper): currentenv = currentenv.env else: raise ValueError("Couldn't find wrapper named %s" % classname)
def SetResolution(target_resolution): class SetResolutionWrapper(gym.Wrapper): """ Doom wrapper to change screen resolution """ def __init__(self, env): super(SetResolutionWrapper, self).__init__(env) if target_resolution not in resolutions: raise gym.error.Error('Error - The specified resolution "{}" is not supported by Vizdoom.'.format(target_resolution)) parts = target_resolution.lower().split('x') width = int(parts[0]) height = int(parts[1]) screen_res = __import__('doom_py') screen_res = getattr(screen_res, 'ScreenResolution') screen_res = getattr(screen_res, 'RES_{}X{}'.format(width, height)) self.screen_width, self.screen_height, self.unwrapped.screen_resolution = width, height, screen_res self.unwrapped.observation_space = gym.spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3)) self.observation_space = self.unwrapped.observation_space return SetResolutionWrapper
def __init__(self, env, k): """Stack k last frames. Returns lazy array, which is much more memory efficient. See Also -------- baselines.common.atari_wrappers.LazyFrames """ gym.Wrapper.__init__(self, env) self.k = k self.frames = deque([], maxlen=k) shp = env.observation_space.shape self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))
def __init__(self, env): gym.Wrapper.__init__(self, env) self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1))
def get_wrapper_by_name(env, classname): currentenv = env while True: if classname in currentenv.__class__.__name__: return currentenv elif isinstance(env, gym.Wrapper): currentenv = currentenv.env else: raise ValueError("Couldn't find wrapper named %s"%classname)
def __init__(self, env, noop_max=30): """Sample initial states by taking random number of no-ops on reset. No-op is assumed to be action 0. """ gym.Wrapper.__init__(self, env) self.noop_max = noop_max self.override_num_noops = None self.noop_action = 0 assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env): """Take action on reset for environments that are fixed until firing.""" gym.Wrapper.__init__(self, env) assert env.unwrapped.get_action_meanings()[1] == 'FIRE' assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env): """Make end-of-life == end-of-episode, but only reset on true game over. Done by DeepMind for the DQN and co. since it helps value estimation. """ gym.Wrapper.__init__(self, env) self.lives = 0 self.was_real_done = True
def __init__(self, env, skip=4): """Return only every `skip`-th frame""" gym.Wrapper.__init__(self, env) # most recent raw observations (for max pooling across time steps) self._obs_buffer = np.zeros((2,) + env.observation_space.shape, dtype='uint8') self._skip = skip
def __init__(self, env, skip=4): """Return only every `skip`-th frame""" gym.Wrapper.__init__(self, env) # most recent raw observations (for max pooling across time steps) self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype='uint8') self._skip = skip
def list_wrappers(env: Union[Env, gym.Wrapper]): while isinstance(env, gym.Wrapper): yield env env = env.env
def set_meta(self, meta): self.meta = meta w = get_wrapper(self.env, Wrapper) if w: w.set_meta(meta)
def __init__(self, env): super(Wrapper, self).__init__(env) if not env.metadata.get('runtime.vectorized'): if self.autovectorize: # Circular dependency :( from universe import wrappers env = wrappers.Vectorize(env) else: raise error.Error('This wrapper can only wrap vectorized envs (i.e. where env.metadata["runtime.vectorized"] = True), not {}. Set "self.autovectorize = True" to automatically add a Vectorize wrapper.'.format(env)) self.env = env
def SkipWrapper(repeat_count): class SkipWrapper(gym.Wrapper): """ Generic common frame skipping wrapper Will perform action for `x` additional steps """ def __init__(self, env): super(SkipWrapper, self).__init__(env) self.repeat_count = repeat_count self.stepcount = 0 def _step(self, action): done = False total_reward = 0 current_step = 0 while current_step < (self.repeat_count + 1) and not done: self.stepcount += 1 obs, reward, done, info = self.env.step(action) total_reward += reward current_step += 1 if 'skip.stepcount' in info: raise gym.error.Error('Key "skip.stepcount" already in info. Make sure you are not stacking ' \ 'the SkipWrapper wrappers.') info['skip.stepcount'] = self.stepcount return obs, total_reward, done, info def _reset(self): self.stepcount = 0 return self.env.reset() return SkipWrapper
def HistoryWrapper(steps): class HistoryWrapper(gym.Wrapper): """ Track history of observations for given amount of steps Initial steps are zero-filled """ def __init__(self, env): super(HistoryWrapper, self).__init__(env) self.steps = steps self.history = self._make_history() def _make_history(self): return [np.zeros(shape=self.env.observation_space.shape) for _ in range(steps)] def _step(self, action): obs, reward, done, info = self.env.step(action) self.history.pop(0) self.history.append(obs) return np.array(self.history), reward, done, info def _reset(self): self.history = self._make_history() self.history.pop(0) self.history.append(self.env.reset()) return np.array(self.history) return HistoryWrapper
def HistoryWrapper(steps): class _HistoryWrapper(gym.Wrapper): """ Track history of observations for given amount of steps Initial steps are zero-filled """ def __init__(self, env): super(_HistoryWrapper, self).__init__(env) self.steps = steps self.history = self._make_history() self.observation_space = self._make_observation_space(steps, env.observation_space) @staticmethod def _make_observation_space(steps, orig_obs): assert isinstance(orig_obs, gym.spaces.Box) low = np.repeat(np.expand_dims(orig_obs.low, 0), steps, axis=0) high = np.repeat(np.expand_dims(orig_obs.high, 0), steps, axis=0) return gym.spaces.Box(low, high) def _make_history(self, last_item = None): size = self.steps if last_item is None else self.steps-1 res = collections.deque([np.zeros(shape=self.env.observation_space.shape)] * size) if last_item is not None: res.append(last_item) return res def _step(self, action): obs, reward, done, info = self.env.step(action) self.history.popleft() self.history.append(obs) return self.history, reward, done, info def _reset(self): self.history = self._make_history(last_item=self.env.reset()) return self.history return _HistoryWrapper
def make_env(env_name, monitor_dir=None, wrappers=()): """ Make gym environment with optional monitor :param env_name: name of the environment to create :param monitor_dir: optional directory to save monitor results :param wrappers: list of optional Wrapper object instances :return: environment object """ env = gym.make(env_name) for wrapper in wrappers: env = wrapper(env) if monitor_dir: env = gym.wrappers.Monitor(env, monitor_dir) return env
def RepeatActionWrapper(env, repeat): """ This is just a thin wrapper around `gym.wrappes.SkipWrapper` to get a consistent interface. :param gym.env env: Environment to wrap :param int repeat: Number of times that an action will be repeated. :return gym.Wrapper: A wrapper that repeats an action for `repeat` steps. """ from gym.wrappers import SkipWrapper return SkipWrapper(repeat)(env)
def __init__(self, env, noop_max=30): gym.Wrapper.__init__(self, env) self.noop_max = noop_max self.override_num_noops = None assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env): gym.Wrapper.__init__(self, env) assert env.unwrapped.get_action_meanings()[1] == 'FIRE' assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env): gym.Wrapper.__init__(self, env) self.lives = 0 self.was_real_done = True
def __init__(self, env, skip=4): gym.Wrapper.__init__(self, env) # Most recent raw observations (for max pooling across time steps) self._obs_buffer = deque(maxlen=2) self._skip = skip
def __init__(self, env, k): gym.Wrapper.__init__(self, env) self.k = k self.frames = deque([], maxlen=k) shp = env.observation_space.shape assert shp[2] == 1 # can only stack 1-channel frames self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], k))
def ToDiscrete(): class ToDiscreteWrapper(gym.Wrapper): """ Wrapper to convert MultiDiscrete action space to Discrete Only supports one config, which maps to the most logical discrete space possible """ def __init__(self, env): super(ToDiscreteWrapper, self).__init__(env) mapping = { 0: [0, 0, 0, 0, 0, 0], # NOOP 1: [1, 0, 0, 0, 0, 0], # Up 2: [0, 0, 1, 0, 0, 0], # Down 3: [0, 1, 0, 0, 0, 0], # Left 4: [0, 1, 0, 0, 1, 0], # Left + A 5: [0, 1, 0, 0, 0, 1], # Left + B 6: [0, 1, 0, 0, 1, 1], # Left + A + B 7: [0, 0, 0, 1, 0, 0], # Right 8: [0, 0, 0, 1, 1, 0], # Right + A 9: [0, 0, 0, 1, 0, 1], # Right + B 10: [0, 0, 0, 1, 1, 1], # Right + A + B 11: [0, 0, 0, 0, 1, 0], # A 12: [0, 0, 0, 0, 0, 1], # B 13: [0, 0, 0, 0, 1, 1], # A + B } self.action_space = gym.spaces.multi_discrete.DiscreteToMultiDiscrete(self.action_space, mapping) def _step(self, action): return self.env._step(self.action_space(action)) return ToDiscreteWrapper
def ToBox(): class ToBoxWrapper(gym.Wrapper): """ Wrapper to convert MultiDiscrete action space to Box Only supports one config, which allows all keys to be pressed """ def __init__(self, env): super(ToBoxWrapper, self).__init__(env) self.action_space = gym.spaces.multi_discrete.BoxToMultiDiscrete(self.action_space) def _step(self, action): return self.env._step(self.action_space(action)) return ToBoxWrapper
def SetPlayingMode(target_mode): """ target mode can be 'algo' or 'human' """ class SetPlayingModeWrapper(gym.Wrapper): """ Doom wrapper to change playing mode 'human' or 'algo' """ def __init__(self, env): super(SetPlayingModeWrapper, self).__init__(env) if target_mode not in ['algo', 'human']: raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode)) self.unwrapped.mode = target_mode return SetPlayingModeWrapper
def SetPlayingMode(target_mode): """ target mode can be 'algo' or 'human' """ class SetPlayingModeWrapper(gym.Wrapper): """ Doom wrapper to change playing mode 'human' or 'algo' """ def __init__(self, env): super(SetPlayingModeWrapper, self).__init__(env) if target_mode not in ['algo', 'human']: raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode)) self.unwrapped._mode = target_mode return SetPlayingModeWrapper
def __init__(self, env, noop_max=30): """Sample initial states by taking random number of no-ops on reset. No-op is assumed to be action 0. """ gym.Wrapper.__init__(self, env) self.noop_max = noop_max self.override_num_noops = None if isinstance(env.action_space, gym.spaces.MultiBinary): self.noop_action = np.zeros(self.env.action_space.n, dtype=np.int64) else: # used for atari environments self.noop_action = 0 assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env, k): """Stack k last frames. Returns lazy array, which is much more memory efficient. See Also -------- LazyFrames """ gym.Wrapper.__init__(self, env) self.k = k self.frames = deque([], maxlen=k) shp = env.observation_space.shape self.observation_space = spaces.Box( low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))