Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] AbsorbingStateTransform #2290

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions torchrl/envs/transforms/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8557,3 +8557,157 @@ def _inv_call(self, tensordict):
if self.sampling == self.SamplingStrategy.RANDOM:
action = action + self.jitters * torch.rand_like(self.jitters)
return tensordict.set(self.in_keys_inv[0], action)


class AbsorbingStateTransform(ObservationTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for this class
It should be registered in the __init__.py and put in the doc.

"""Adds an absorbing state to the observation space.

A transform that introduces an absorbing state to the environment. This absorbing state is typically used
in reinforcement learning to handle terminal states by creating an additional state that signifies
the end of an episode but allows for additional steps in the transition to better handle
learning algorithms.

Args:
max_episode_length (int): Maximum length of an episode.
in_keys (Sequence[NestedKey], optional): Keys to use for input observation. Defaults to ``"observation"``.
out_keys (Sequence[NestedKey], optional): Keys to use for output observation. Defaults to ``in_keys``.
done_key (Optional[NestedKey]): Key indicating if the episode is done. Defaults to ``"done"``.
terminate_key (Optional[NestedKey]): Key indicating if the episode is terminated. Defaults to ``"terminated"``.

Examples:
>>> from torchrl.envs import GymEnv
>>> t = AbsorbingStateTransform(max_episode_length=1000)
>>> base_env = GymEnv("HalfCheetah-v4")
>>> env = TransformedEnv(base_env, t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not very informative about the functionality ;)

"""

def __init__(
self,
max_episode_length: int,
in_keys: Sequence[NestedKey] | None = None,
out_keys: Sequence[NestedKey] | None = None,
done_key: Optional[NestedKey] = "done",
terminate_key: Optional[NestedKey] = "terminated",
):
if in_keys is None:
in_keys = "observation" # default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

["observation"] no?

if out_keys is None:
out_keys = copy(in_keys)
super().__init__(in_keys=in_keys, out_keys=out_keys)
self.max_episode_length = max_episode_length
self.done_key = done_key
self.terminate_key = terminate_key
self._done = None
self._curr_timestep = 0

def forward(self, tensordict: TensorDictBase) -> TensorDictBase:
raise RuntimeError(FORWARD_NOT_IMPLEMENTED.format(type(self)))

def _apply_transform(self, observation: torch.Tensor) -> torch.Tensor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a version of this that works for all batch sizes. This one will only work with uni of bidimensional batch sizes.

# Check if the observation is batched or not
if observation.dim() == 1:
# Single observation
if self._done:
# Return absorbing state which is [0, ..., 0, 1]
return torch.eye(observation.size(0) + 1)[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the observation is more than 1d?

return torch.cat((observation, torch.zeros(1)), dim=-1)

elif observation.dim() == 2:
# Batched observations
batch_size = observation.size(0)
if self._done:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need an in-place value? How does that work if one sub-env is done and the other not?
Maybe we could read the done state and change it on the fly, without using local attribute

# Create absorbing states for the batched observations
absorbing_state = torch.eye(observation.size(1) + 1)[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather wasteful, we creating a big tensor and indexing it, plus this is a view on a storage hence the original storage isn't cleared when you index.

Besides it lacks dtype and device.

You can create an incomplete eye with m and n, see the doc here

return absorbing_state.expand(batch_size, -1)
zeros = torch.zeros(batch_size, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing device and dtype

You could use observation.new_zeros

return torch.cat((observation, zeros), dim=-1)

else:
raise ValueError(
"Unsupported observation dimension: {}".format(observation.dim())
)

def _reset(
self, tensordict: TensorDictBase, tensordict_reset: TensorDictBase
) -> TensorDictBase:
self._curr_timestep = 0
self._done = False
with _set_missing_tolerance(self, True):
return self._call(tensordict_reset)

def _call(self, tensordict: TensorDictBase) -> TensorDictBase:
parent = self.parent
if parent is None:
raise RuntimeError(
f"{type(self)}.parent cannot be None: make sure this transform is executed within an environment."
)
if self._done:
for in_key, out_key in zip(self.in_keys, self.out_keys):
value = tensordict.get(in_key, default=None)
if value is not None:
observation = self._apply_transform(value)
tensordict.set(
out_key,
observation,
)
elif not self.missing_tolerance:
raise KeyError(
f"{self}: '{in_key}' not found in tensordict {tensordict}"
)
tensordict.set(
self.done_key, torch.ones_like(tensordict.get(self.done_key)).bool()
)
tensordict.set(
self.terminate_key,
torch.ones_like(tensordict.get(self.terminate_key)).bool(),
)
return tensordict
done = tensordict.get(self.done_key)
self._done = done.any()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that if any sub-env is done all are done?

# set dones to be true
for in_key, out_key in zip(self.in_keys, self.out_keys):
value = tensordict.get(in_key, default=None)
if value is not None:
observation = self._apply_transform(value)
tensordict.set(
out_key,
observation,
)
elif not self.missing_tolerance:
raise KeyError(
f"{self}: '{in_key}' not found in tensordict {tensordict}"
)

tensordict.set(
self.done_key, torch.zeros_like(tensordict.get(self.done_key)).bool()
)
tensordict.set(
self.terminate_key,
torch.zeros_like(tensordict.get(self.terminate_key)).bool(),
)
return tensordict

@property
def is_done(self) -> bool:
return self._done

@_apply_to_composite
def transform_observation_spec(self, observation_spec: TensorSpec) -> TensorSpec:
space = observation_spec.space

if isinstance(space, ContinuousBox):
space.low = self._apply_transform(space.low)
space.high = self._apply_transform(space.high)
observation_spec.shape = space.low.shape
else:
observation_spec.shape = self._apply_transform(
torch.zeros(observation_spec.shape)
).shape
return observation_spec

def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"max_episode_length={self.max_episode_length}, "
f"keys={self.in_keys})"
)
Loading