diff --git a/pyproject.toml b/pyproject.toml index 36947af0..16a2324a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ torch = ">=1.9.0" torchtyping = ">=0.1.4" # dev dependencies. -black = { version = "24.2", optional = true } +black = { version = "24.3", optional = true } flake8 = { version = "*", optional = true } gitmopy = { version = "*", optional = true } myst-parser = { version = "*", optional = true } diff --git a/src/gfn/containers/replay_buffer.py b/src/gfn/containers/replay_buffer.py index eebf24db..c1679d9a 100644 --- a/src/gfn/containers/replay_buffer.py +++ b/src/gfn/containers/replay_buffer.py @@ -3,6 +3,8 @@ import os from typing import TYPE_CHECKING, Literal +import torch + from gfn.containers.trajectories import Trajectories from gfn.containers.transitions import Transitions @@ -48,6 +50,7 @@ def __init__( elif objects_type == "states": self.training_objects = env.states_from_batch_shape((0,)) self.terminating_states = env.states_from_batch_shape((0,)) + self.terminating_states.log_rewards = torch.zeros((0,), device=env.device) self.objects_type = "states" else: raise ValueError(f"Unknown objects_type: {objects_type}") diff --git a/src/gfn/containers/trajectories.py b/src/gfn/containers/trajectories.py index 02da678a..35196ec3 100644 --- a/src/gfn/containers/trajectories.py +++ b/src/gfn/containers/trajectories.py @@ -14,6 +14,7 @@ from gfn.containers.base import Container from gfn.containers.transitions import Transitions +from gfn.utils.common import has_log_probs def is_tensor(t) -> bool: @@ -54,7 +55,7 @@ def __init__( is_backward: bool = False, log_rewards: TT["n_trajectories", torch.float] | None = None, log_probs: TT["max_length", "n_trajectories", torch.float] | None = None, - estimator_outputs: torch.Tensor | None = None, + estimator_outputs: TT["batch_shape", "output_dim", torch.float] | None = None, ) -> None: """ Args: @@ -325,7 +326,12 @@ def to_transitions(self) -> Transitions: ], dim=0, ) - log_probs = self.log_probs[~self.actions.is_dummy] + + # Only return logprobs if they exist. + log_probs = ( + self.log_probs[~self.actions.is_dummy] if has_log_probs(self) else None + ) + return Transitions( env=self.env, states=states, diff --git a/src/gfn/containers/transitions.py b/src/gfn/containers/transitions.py index 4b15f05e..a3c920af 100644 --- a/src/gfn/containers/transitions.py +++ b/src/gfn/containers/transitions.py @@ -11,6 +11,7 @@ from gfn.states import States from gfn.containers.base import Container +from gfn.utils.common import has_log_probs class Transitions(Container): @@ -186,7 +187,10 @@ def __getitem__(self, index: int | Sequence[int]) -> Transitions: log_rewards = ( self._log_rewards[index] if self._log_rewards is not None else None ) - log_probs = self.log_probs[index] + + # Only return logprobs if they exist. + log_probs = self.log_probs[index] if has_log_probs(self) else None + return Transitions( env=self.env, states=states, diff --git a/src/gfn/env.py b/src/gfn/env.py index 510d3820..c1569235 100644 --- a/src/gfn/env.py +++ b/src/gfn/env.py @@ -393,7 +393,6 @@ class DiscreteEnvStates(DiscreteStates): def make_actions_class(self) -> type[Actions]: env = self - n_actions = self.n_actions class DiscreteEnvActions(Actions): action_shape = env.action_shape diff --git a/src/gfn/gflownet/base.py b/src/gfn/gflownet/base.py index ece89bc3..032639a2 100644 --- a/src/gfn/gflownet/base.py +++ b/src/gfn/gflownet/base.py @@ -13,6 +13,7 @@ from gfn.modules import GFNModule from gfn.samplers import Sampler from gfn.states import States +from gfn.utils.common import has_log_probs TrainingSampleType = TypeVar( "TrainingSampleType", bound=Union[Container, tuple[States, ...]] @@ -29,14 +30,20 @@ class GFlowNet(ABC, nn.Module, Generic[TrainingSampleType]): @abstractmethod def sample_trajectories( - self, env: Env, n_samples: int, sample_off_policy: bool + self, + env: Env, + n_samples: int, + save_logprobs: bool = True, + save_estimator_outputs: bool = False, ) -> Trajectories: """Sample a specific number of complete trajectories. Args: env: the environment to sample trajectories from. n_samples: number of trajectories to be sampled. - sample_off_policy: whether to sample trajectories on / off policy. + save_logprobs: whether to save the logprobs of the actions - useful for on-policy learning. + save_estimator_outputs: whether to save the estimator outputs - useful for off-policy learning + with tempered policy Returns: Trajectories: sampled trajectories object. """ @@ -50,7 +57,9 @@ def sample_terminating_states(self, env: Env, n_samples: int) -> States: Returns: States: sampled terminating states object. """ - trajectories = self.sample_trajectories(env, n_samples, sample_off_policy=False) + trajectories = self.sample_trajectories( + env, n_samples, save_estimator_outputs=False, save_logprobs=False + ) return trajectories.last_states def logz_named_parameters(self): @@ -76,21 +85,26 @@ class PFBasedGFlowNet(GFlowNet[TrainingSampleType]): pb: GFNModule """ - def __init__(self, pf: GFNModule, pb: GFNModule, off_policy: bool): + def __init__(self, pf: GFNModule, pb: GFNModule): super().__init__() self.pf = pf self.pb = pb - self.off_policy = off_policy def sample_trajectories( - self, env: Env, n_samples: int, sample_off_policy: bool, **policy_kwargs + self, + env: Env, + n_samples: int, + save_logprobs: bool = True, + save_estimator_outputs: bool = False, + **policy_kwargs, ) -> Trajectories: """Samples trajectories, optionally with specified policy kwargs.""" sampler = Sampler(estimator=self.pf) trajectories = sampler.sample_trajectories( env, n_trajectories=n_samples, - off_policy=sample_off_policy, + save_estimator_outputs=save_estimator_outputs, + save_logprobs=save_logprobs, **policy_kwargs, ) @@ -108,6 +122,7 @@ def get_pfs_and_pbs( self, trajectories: Trajectories, fill_value: float = 0.0, + recalculate_all_logprobs: bool = False, ) -> Tuple[ TT["max_length", "n_trajectories", torch.float], TT["max_length", "n_trajectories", torch.float], @@ -117,17 +132,16 @@ def get_pfs_and_pbs( More specifically it evaluates $\log P_F (s' \mid s)$ and $\log P_B(s \mid s')$ for each transition in each trajectory in the batch. - Useful when the policy used to sample the trajectories is different from - the one used to evaluate the loss. Otherwise we can use the logprobs directly - from the trajectories. - - Note - for off policy exploration, the trajectories submitted to this method - will be sampled off policy. + Unless recalculate_all_logprobs=True, in which case we re-evaluate the logprobs of the trajectories with + the current self.pf. The following applies: + - If trajectories have log_probs attribute, use them - this is usually for on-policy learning + - Else, if trajectories have estimator_outputs attribute, transform them + into log_probs - this is usually for off-policy learning with a tempered policy + - Else, if trajectories have none of them, re-evaluate the log_probs + using the current self.pf - this is usually for off-policy learning with replay buffer Args: trajectories: Trajectories to evaluate. - estimator_outputs: Optional stored estimator outputs from previous forward - sampling (encountered, for example, when sampling off policy). fill_value: Value to use for invalid states (i.e. $s_f$ that is added to shorter trajectories). @@ -151,16 +165,18 @@ def get_pfs_and_pbs( if valid_states.batch_shape != tuple(valid_actions.batch_shape): raise AssertionError("Something wrong happening with log_pf evaluations") - if self.off_policy: - # We re-use the values calculated in .sample_trajectories(). - if trajectories.estimator_outputs is not None: + if has_log_probs(trajectories) and not recalculate_all_logprobs: + log_pf_trajectories = trajectories.log_probs + else: + if ( + trajectories.estimator_outputs is not None + and not recalculate_all_logprobs + ): estimator_outputs = trajectories.estimator_outputs[ ~trajectories.actions.is_dummy ] else: - raise Exception( - "GFlowNet is off policy, but no estimator_outputs found in Trajectories!" - ) + estimator_outputs = self.pf(valid_states) # Calculates the log PF of the actions sampled off policy. valid_log_pf_actions = self.pf.to_probability_distribution( @@ -175,9 +191,6 @@ def get_pfs_and_pbs( ) log_pf_trajectories[~trajectories.actions.is_dummy] = valid_log_pf_actions - else: - log_pf_trajectories = trajectories.log_probs - non_initial_valid_states = valid_states[~valid_states.is_initial_state] non_exit_valid_actions = valid_actions[~valid_actions.is_exit] @@ -201,13 +214,19 @@ def get_pfs_and_pbs( return log_pf_trajectories, log_pb_trajectories - def get_trajectories_scores(self, trajectories: Trajectories) -> Tuple[ + def get_trajectories_scores( + self, + trajectories: Trajectories, + recalculate_all_logprobs: bool = False, + ) -> Tuple[ TT["n_trajectories", torch.float], TT["n_trajectories", torch.float], TT["n_trajectories", torch.float], ]: """Given a batch of trajectories, calculate forward & backward policy scores.""" - log_pf_trajectories, log_pb_trajectories = self.get_pfs_and_pbs(trajectories) + log_pf_trajectories, log_pb_trajectories = self.get_pfs_and_pbs( + trajectories, recalculate_all_logprobs=recalculate_all_logprobs + ) assert log_pf_trajectories is not None total_log_pf_trajectories = log_pf_trajectories.sum(dim=0) diff --git a/src/gfn/gflownet/detailed_balance.py b/src/gfn/gflownet/detailed_balance.py index 2c9cc723..3d97b1ad 100644 --- a/src/gfn/gflownet/detailed_balance.py +++ b/src/gfn/gflownet/detailed_balance.py @@ -8,6 +8,7 @@ from gfn.env import Env from gfn.gflownet.base import PFBasedGFlowNet from gfn.modules import GFNModule, ScalarEstimator +from gfn.utils.common import has_log_probs class DBGFlowNet(PFBasedGFlowNet[Transitions]): @@ -23,7 +24,6 @@ class DBGFlowNet(PFBasedGFlowNet[Transitions]): Attributes: logF: a ScalarEstimator instance. - off_policy: If true, we need to reevaluate the log probs. forward_looking: whether to implement the forward looking GFN loss. log_reward_clip_min: If finite, clips log rewards to this value. """ @@ -33,16 +33,17 @@ def __init__( pf: GFNModule, pb: GFNModule, logF: ScalarEstimator, - off_policy: bool, forward_looking: bool = False, log_reward_clip_min: float = -float("inf"), ): - super().__init__(pf, pb, off_policy=off_policy) + super().__init__(pf, pb) self.logF = logF self.forward_looking = forward_looking self.log_reward_clip_min = log_reward_clip_min - def get_scores(self, env: Env, transitions: Transitions) -> Tuple[ + def get_scores( + self, env: Env, transitions: Transitions, recalculate_all_logprobs: bool = False + ) -> Tuple[ TT["n_transitions", float], TT["n_transitions", float], TT["n_transitions", float], @@ -52,6 +53,12 @@ def get_scores(self, env: Env, transitions: Transitions) -> Tuple[ Args: transitions: a batch of transitions. + Unless recalculate_all_logprobs=True, in which case we re-evaluate the logprobs of the transitions with + the current self.pf. The following applies: + - If transitions have log_probs attribute, use them - this is usually for on-policy learning + - Else, re-evaluate the log_probs using the current self.pf - this is usually for + off-policy learning with replay buffer + Raises: ValueError: when supplied with backward transitions. AssertionError: when log rewards of transitions are None. @@ -66,19 +73,20 @@ def get_scores(self, env: Env, transitions: Transitions) -> Tuple[ if states.batch_shape != tuple(actions.batch_shape): raise ValueError("Something wrong happening with log_pf evaluations") - if not self.off_policy: + + if has_log_probs(transitions) and not recalculate_all_logprobs: valid_log_pf_actions = transitions.log_probs else: - # Evaluate the log PF of the actions sampled off policy. - # I suppose the Transitions container should then have some + # Evaluate the log PF of the actions + module_output = self.pf( + states + ) # TODO: Inefficient duplication in case of tempered policy + # The Transitions container should then have some # estimator_outputs attribute as well, to avoid duplication here ? # See (#156). - module_output = self.pf(states) # TODO: Inefficient duplication. valid_log_pf_actions = self.pf.to_probability_distribution( states, module_output - ).log_prob( - actions.tensor - ) # Actions sampled off policy. + ).log_prob(actions.tensor) valid_log_F_s = self.logF(states).squeeze(-1) if self.forward_looking: @@ -147,9 +155,17 @@ class ModifiedDBGFlowNet(PFBasedGFlowNet[Transitions]): https://arxiv.org/abs/2202.13903 for more details. """ - def get_scores(self, transitions: Transitions) -> TT["n_trajectories", torch.float]: + def get_scores( + self, transitions: Transitions, recalculate_all_logprobs: bool = False + ) -> TT["n_trajectories", torch.float]: """DAG-GFN-style detailed balance, when all states are connected to the sink. + Unless recalculate_all_logprobs=True, in which case we re-evaluate the logprobs of the transitions with + the current self.pf. The following applies: + - If transitions have log_probs attribute, use them - this is usually for on-policy learning + - Else, re-evaluate the log_probs using the current self.pf - this is usually for + off-policy learning with replay buffer + Raises: ValueError: when backward transitions are supplied (not supported). ValueError: when the computed scores contain `inf`. @@ -164,7 +180,8 @@ def get_scores(self, transitions: Transitions) -> TT["n_trajectories", torch.flo all_log_rewards = transitions.all_log_rewards[mask] module_output = self.pf(states) pf_dist = self.pf.to_probability_distribution(states, module_output) - if not self.off_policy: + + if has_log_probs(transitions) and not recalculate_all_logprobs: valid_log_pf_actions = transitions[mask].log_probs else: # Evaluate the log PF of the actions sampled off policy. diff --git a/src/gfn/gflownet/flow_matching.py b/src/gfn/gflownet/flow_matching.py index 061761d4..5764cb8e 100644 --- a/src/gfn/gflownet/flow_matching.py +++ b/src/gfn/gflownet/flow_matching.py @@ -36,7 +36,8 @@ def __init__(self, logF: DiscretePolicyEstimator, alpha: float = 1.0): def sample_trajectories( self, env: Env, - off_policy: bool, + save_logprobs: bool, + save_estimator_outputs: bool = False, n_samples: int = 1000, **policy_kwargs: Optional[dict], ) -> Trajectories: @@ -49,7 +50,8 @@ def sample_trajectories( trajectories = sampler.sample_trajectories( env, n_trajectories=n_samples, - off_policy=off_policy, + save_estimator_outputs=save_estimator_outputs, + save_logprobs=save_logprobs, **policy_kwargs, ) return trajectories diff --git a/src/gfn/gflownet/sub_trajectory_balance.py b/src/gfn/gflownet/sub_trajectory_balance.py index f07835c3..6e8b1324 100644 --- a/src/gfn/gflownet/sub_trajectory_balance.py +++ b/src/gfn/gflownet/sub_trajectory_balance.py @@ -56,7 +56,6 @@ def __init__( pf: GFNModule, pb: GFNModule, logF: ScalarEstimator, - off_policy: bool, weighting: Literal[ "DB", "ModifiedDB", @@ -70,7 +69,7 @@ def __init__( log_reward_clip_min: float = -float("inf"), forward_looking: bool = False, ): - super().__init__(pf, pb, off_policy=off_policy) + super().__init__(pf, pb) self.logF = logF self.weighting = weighting self.lamda = lamda diff --git a/src/gfn/gflownet/trajectory_balance.py b/src/gfn/gflownet/trajectory_balance.py index dde1b667..1f8799d9 100644 --- a/src/gfn/gflownet/trajectory_balance.py +++ b/src/gfn/gflownet/trajectory_balance.py @@ -23,7 +23,6 @@ class TBGFlowNet(TrajectoryBasedGFlowNet): the DAG, or a singleton thereof, if self.logit_PB is a fixed DiscretePBEstimator. Attributes: - off_policy: Whether the GFlowNet samples trajectories on or off policy. logZ: a LogZEstimator instance. log_reward_clip_min: If finite, clips log rewards to this value. """ @@ -32,18 +31,22 @@ def __init__( self, pf: GFNModule, pb: GFNModule, - off_policy: bool, init_logZ: float = 0.0, log_reward_clip_min: float = -float("inf"), ): - super().__init__(pf, pb, off_policy=off_policy) + super().__init__(pf, pb) self.logZ = nn.Parameter( torch.tensor(init_logZ) ) # TODO: Optionally, this should be a nn.Module to support conditional GFNs. self.log_reward_clip_min = log_reward_clip_min - def loss(self, env: Env, trajectories: Trajectories) -> TT[0, float]: + def loss( + self, + env: Env, + trajectories: Trajectories, + recalculate_all_logprobs: bool = False, + ) -> TT[0, float]: """Trajectory balance loss. The trajectory balance loss is described in 2.3 of @@ -53,7 +56,9 @@ def loss(self, env: Env, trajectories: Trajectories) -> TT[0, float]: ValueError: if the loss is NaN. """ del env # unused - _, _, scores = self.get_trajectories_scores(trajectories) + _, _, scores = self.get_trajectories_scores( + trajectories, recalculate_all_logprobs=recalculate_all_logprobs + ) loss = (scores + self.logZ).pow(2).mean() if torch.isnan(loss): raise ValueError("loss is nan") @@ -65,7 +70,6 @@ class LogPartitionVarianceGFlowNet(TrajectoryBasedGFlowNet): """Dataclass which holds the logZ estimate for the Log Partition Variance loss. Attributes: - off_policy: Whether the GFlowNet samples trajectories on or off policy. log_reward_clip_min: If finite, clips log rewards to this value. Raises: @@ -76,16 +80,16 @@ def __init__( self, pf: GFNModule, pb: GFNModule, - off_policy: bool, log_reward_clip_min: float = -float("inf"), ): - super().__init__(pf, pb, off_policy=off_policy) + super().__init__(pf, pb) self.log_reward_clip_min = log_reward_clip_min def loss( self, env: Env, trajectories: Trajectories, + recalculate_all_logprobs: bool = False, ) -> TT[0, float]: """Log Partition Variance loss. @@ -93,7 +97,9 @@ def loss( [ROBUST SCHEDULING WITH GFLOWNETS](https://arxiv.org/abs/2302.05446)) """ del env # unused - _, _, scores = self.get_trajectories_scores(trajectories) + _, _, scores = self.get_trajectories_scores( + trajectories, recalculate_all_logprobs=recalculate_all_logprobs + ) loss = (scores - scores.mean()).pow(2).mean() if torch.isnan(loss): raise ValueError("loss is NaN.") diff --git a/src/gfn/samplers.py b/src/gfn/samplers.py index a2f810b6..473c303a 100644 --- a/src/gfn/samplers.py +++ b/src/gfn/samplers.py @@ -32,9 +32,13 @@ def sample_actions( env: Env, states: States, save_estimator_outputs: bool = False, - calculate_logprobs: bool = True, + save_logprobs: bool = True, **policy_kwargs: Optional[dict], - ) -> Tuple[Actions, TT["batch_shape", torch.float]]: + ) -> Tuple[ + Actions, + TT["batch_shape", torch.float] | None, + TT["batch_shape", torch.float] | None, + ]: """Samples actions from the given states. Args: @@ -42,7 +46,7 @@ def sample_actions( env: The environment to sample actions from. states: A batch of states. save_estimator_outputs: If True, the estimator outputs will be returned. - calculate_logprobs: If True, calculates the log probabilities of sampled + save_logprobs: If True, calculates and saves the log probabilities of sampled actions. policy_kwargs: keyword arguments to be passed to the `to_probability_distribution` method of the estimator. For example, for @@ -72,7 +76,7 @@ def sample_actions( with torch.no_grad(): actions = dist.sample() - if calculate_logprobs: + if save_logprobs: log_probs = dist.log_prob(actions) if torch.any(torch.isinf(log_probs)): raise RuntimeError("Log probabilities are inf. This should not happen.") @@ -89,29 +93,30 @@ def sample_actions( def sample_trajectories( self, env: Env, - off_policy: bool, states: Optional[States] = None, n_trajectories: Optional[int] = None, - debug_mode: bool = False, + save_estimator_outputs: bool = False, + save_logprobs: bool = True, **policy_kwargs, ) -> Trajectories: """Sample trajectories sequentially. Args: env: The environment to sample trajectories from. - off_policy: If True, samples actions such that we skip log probability - calculation, and we save the estimator outputs for later use. states: If given, trajectories would start from such states. Otherwise, trajectories are sampled from $s_o$ and n_trajectories must be provided. n_trajectories: If given, a batch of n_trajectories will be sampled all starting from the environment's s_0. + save_estimator_outputs: If True, the estimator outputs will be returned. This + is useful for off-policy training with tempered policy. + save_logprobs: If True, calculates and saves the log probabilities of sampled + actions. This is useful for on-policy training. policy_kwargs: keyword arguments to be passed to the `to_probability_distribution` method of the estimator. For example, for DiscretePolicyEstimators, the kwargs can contain the `temperature` parameter, `epsilon`, and `sf_bias`. In the continuous case these kwargs will be user defined. This can be used to, for example, sample off-policy. - debug_mode: if True, everything gets calculated. Returns: A Trajectories object representing the batch of sampled trajectories. @@ -119,8 +124,6 @@ def sample_trajectories( AssertionError: When both states and n_trajectories are specified. AssertionError: When states are not linear. """ - save_estimator_outputs = off_policy or debug_mode - skip_logprob_calculaion = off_policy and not debug_mode if states is None: assert ( @@ -167,7 +170,7 @@ def sample_trajectories( env, states[~dones], save_estimator_outputs=True if save_estimator_outputs else False, - calculate_logprobs=False if skip_logprob_calculaion else True, + save_logprobs=save_logprobs, **policy_kwargs, ) if estimator_outputs is not None: @@ -183,7 +186,7 @@ def sample_trajectories( all_estimator_outputs.append(estimator_outputs_padded) actions[~dones] = valid_actions - if not skip_logprob_calculaion: + if save_logprobs: # When off_policy, actions_log_probs are None. log_probs[~dones] = actions_log_probs trajectories_actions += [actions] @@ -222,7 +225,9 @@ def sample_trajectories( trajectories_states = stack_states(trajectories_states) trajectories_actions = env.Actions.stack(trajectories_actions) - trajectories_logprobs = torch.stack(trajectories_logprobs, dim=0) + trajectories_logprobs = ( + torch.stack(trajectories_logprobs, dim=0) if save_logprobs else None + ) # TODO: use torch.nested.nested_tensor(dtype, device, requires_grad). if save_estimator_outputs: diff --git a/src/gfn/states.py b/src/gfn/states.py index cb48b130..f4fa1a20 100644 --- a/src/gfn/states.py +++ b/src/gfn/states.py @@ -1,6 +1,6 @@ from __future__ import annotations # This allows to use the class name in type hints -from abc import ABC, abstractmethod +from abc import ABC from copy import deepcopy from math import prod from typing import Callable, ClassVar, List, Optional, Sequence, cast @@ -128,9 +128,12 @@ def device(self) -> torch.device: def __getitem__(self, index: int | Sequence[int] | Sequence[bool]) -> States: """Access particular states of the batch.""" - return self.__class__( + out = self.__class__( self.tensor[index] ) # TODO: Inefficient - this might make a copy of the tensor! + if self._log_rewards is not None: + out.log_rewards = self._log_rewards[index] + return out def __setitem__( self, index: int | Sequence[int] | Sequence[bool], states: States @@ -168,6 +171,11 @@ def extend(self, other: States) -> None: # This corresponds to adding a state to a trajectory self.batch_shape = (self.batch_shape[0] + other_batch_shape[0],) self.tensor = torch.cat((self.tensor, other.tensor), dim=0) + if self._log_rewards is not None: + assert other._log_rewards is not None + self._log_rewards = torch.cat( + (self._log_rewards, other._log_rewards), dim=0 + ) elif len(other_batch_shape) == len(self.batch_shape) == 2: # This corresponds to adding a trajectory to a batch of trajectories @@ -258,6 +266,10 @@ def log_rewards(self) -> TT["batch_shape", torch.float]: def log_rewards(self, log_rewards: TT["batch_shape", torch.float]) -> None: self._log_rewards = log_rewards + def sample(self, n_samples: int) -> States: + """Samples a subset of the States object.""" + return self[torch.randperm(len(self))[:n_samples]] + class DiscreteStates(States, ABC): """Base class for states of discrete environments. @@ -340,7 +352,11 @@ def __getitem__( self._check_both_forward_backward_masks_exist() forward_masks = self.forward_masks[index] backward_masks = self.backward_masks[index] - return self.__class__(states, forward_masks, backward_masks) + out = self.__class__(states, forward_masks, backward_masks) + if self.log_rewards is not None: + log_probs = self._log_rewards[index] + out.log_rewards = log_probs + return out def __setitem__( self, index: int | Sequence[int] | Sequence[bool], states: DiscreteStates diff --git a/src/gfn/utils/common.py b/src/gfn/utils/common.py index cc5b97a7..6094a179 100644 --- a/src/gfn/utils/common.py +++ b/src/gfn/utils/common.py @@ -15,3 +15,11 @@ def set_seed(seed: int, performance_mode: bool = False) -> None: if not performance_mode: torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False + + +def has_log_probs(obj): + """Returns True if the submitted object has the log_probs attribute populated.""" + if not hasattr(obj, "log_probs"): + return False + + return obj.log_probs is not None and obj.log_probs.nelement() > 0 diff --git a/testing/test_gflownet.py b/testing/test_gflownet.py index 35642020..718840bc 100644 --- a/testing/test_gflownet.py +++ b/testing/test_gflownet.py @@ -27,7 +27,7 @@ def test_trajectory_based_gflownet_generic(): ) pb_estimator = BoxPBEstimator(env=env, module=pb_module, n_components=1) - gflownet = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=False) + gflownet = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) mock_trajectories = Trajectories(env) result = gflownet.to_training_samples(mock_trajectories) @@ -46,7 +46,9 @@ def test_flow_matching_gflownet_generic(): module = BoxPFNeuralNet( hidden_dim=32, n_hidden_layers=2, n_components=1, n_components_s0=1 ) - estimator = DiscretePolicyEstimator(env, module, True) + estimator = DiscretePolicyEstimator( + module, n_actions=2, preprocessor=env.preprocessor + ) gflownet = FMGFlowNet(estimator) mock_trajectories = Trajectories(env) states_tuple = gflownet.to_training_samples(mock_trajectories) @@ -79,7 +81,7 @@ def test_pytorch_inheritance(): ) pb_estimator = BoxPBEstimator(env=env, module=pb_module, n_components=1) - tbgflownet = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=False) + tbgflownet = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) assert hasattr( tbgflownet.parameters(), "__iter__" ), "Expected gflownet to have iterable parameters() method inherited from nn.Module" @@ -87,7 +89,9 @@ def test_pytorch_inheritance(): tbgflownet.state_dict(), "__dict__" ), "Expected gflownet to have indexable state_dict() method inherited from nn.Module" - estimator = DiscretePolicyEstimator(env, pf_module, True) + estimator = DiscretePolicyEstimator( + pf_module, n_actions=2, preprocessor=env.preprocessor + ) fmgflownet = FMGFlowNet(estimator) assert hasattr( fmgflownet.parameters(), "__iter__" diff --git a/testing/test_parametrizations_and_losses.py b/testing/test_parametrizations_and_losses.py index f2e725bf..95b69bc6 100644 --- a/testing/test_parametrizations_and_losses.py +++ b/testing/test_parametrizations_and_losses.py @@ -57,7 +57,7 @@ def test_FM(env_name: int, ndim: int, module_name: str): ) gflownet = FMGFlowNet(log_F_edge) # forward looking by default. - trajectories = gflownet.sample_trajectories(env, off_policy=False, n_samples=10) + trajectories = gflownet.sample_trajectories(env, save_logprobs=True, n_samples=10) states_tuple = trajectories.to_non_initial_intermediary_and_terminating_states() loss = gflownet.loss(env, states_tuple) assert loss >= 0 @@ -71,11 +71,13 @@ def test_get_pfs_and_pbs(env_name: str, preprocessor_name: str): trajectories, _, pf_estimator, pb_estimator = trajectory_sampling_with_return( env_name, preprocessor_name, delta=0.1, n_components=1, n_components_s0=1 ) - gflownet_on = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=False) - gflownet_off = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=True) + gflownet_on = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) + gflownet_off = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) log_pfs_on, log_pbs_on = gflownet_on.get_pfs_and_pbs(trajectories) - log_pfs_off, log_pbs_off = gflownet_off.get_pfs_and_pbs(trajectories) + log_pfs_off, log_pbs_off = gflownet_off.get_pfs_and_pbs( + trajectories, recalculate_all_logprobs=True + ) @pytest.mark.parametrize("preprocessor_name", ["Identity", "KHot"]) @@ -86,10 +88,12 @@ def test_get_scores(env_name: str, preprocessor_name: str): trajectories, _, pf_estimator, pb_estimator = trajectory_sampling_with_return( env_name, preprocessor_name, delta=0.1, n_components=1, n_components_s0=1 ) - gflownet_on = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=False) - gflownet_off = TBGFlowNet(pf=pf_estimator, pb=pb_estimator, off_policy=True) + gflownet_on = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) + gflownet_off = TBGFlowNet(pf=pf_estimator, pb=pb_estimator) scores_on = gflownet_on.get_trajectories_scores(trajectories) - scores_off = gflownet_off.get_trajectories_scores(trajectories) + scores_off = gflownet_off.get_trajectories_scores( + trajectories, recalculate_all_logprobs=True + ) assert all( [ torch.all(torch.abs(scores_on[i] - scores_off[i]) < 1e-4) @@ -189,28 +193,24 @@ def PFBasedGFlowNet_with_return( forward_looking=forward_looking, pf=pf, pb=pb, - off_policy=False, ) elif gflownet_name == "ModifiedDB": - gflownet = ModifiedDBGFlowNet(pf=pf, pb=pb, off_policy=False) + gflownet = ModifiedDBGFlowNet(pf=pf, pb=pb) elif gflownet_name == "TB": - gflownet = TBGFlowNet(pf=pf, pb=pb, off_policy=False) + gflownet = TBGFlowNet(pf=pf, pb=pb) elif gflownet_name == "ZVar": - gflownet = LogPartitionVarianceGFlowNet(pf=pf, pb=pb, off_policy=False) + gflownet = LogPartitionVarianceGFlowNet(pf=pf, pb=pb) elif gflownet_name == "SubTB": gflownet = SubTBGFlowNet( logF=logF, weighting=sub_tb_weighting, pf=pf, pb=pb, - off_policy=False, ) else: raise ValueError(f"Unknown gflownet {gflownet_name}") - trajectories = gflownet.sample_trajectories( - env, sample_off_policy=False, n_samples=10 - ) + trajectories = gflownet.sample_trajectories(env, save_logprobs=True, n_samples=10) training_objects = gflownet.to_training_samples(trajectories) _ = gflownet.loss(env, training_objects) @@ -307,13 +307,11 @@ def test_subTB_vs_TB( zero_logF=True, ) - trajectories = gflownet.sample_trajectories( - env, sample_off_policy=False, n_samples=10 - ) + trajectories = gflownet.sample_trajectories(env, save_logprobs=True, n_samples=10) subtb_loss = gflownet.loss(env, trajectories) if weighting == "TB": - tb_loss = TBGFlowNet(pf=pf, pb=pb, off_policy=False).loss( + tb_loss = TBGFlowNet(pf=pf, pb=pb).loss( env, trajectories ) # LogZ is default 0.0. assert (tb_loss - subtb_loss).abs() < 1e-4 diff --git a/testing/test_samplers_and_trajectories.py b/testing/test_samplers_and_trajectories.py index 71bdbc04..aa1b61b5 100644 --- a/testing/test_samplers_and_trajectories.py +++ b/testing/test_samplers_and_trajectories.py @@ -1,4 +1,4 @@ -from typing import Literal +from typing import Literal, Tuple import pytest @@ -11,18 +11,18 @@ BoxPFEstimator, BoxPFNeuralNet, ) -from gfn.modules import DiscretePolicyEstimator +from gfn.modules import DiscretePolicyEstimator, GFNModule from gfn.samplers import Sampler from gfn.utils import NeuralNet def trajectory_sampling_with_return( env_name: str, - preprocessor_name: str, + preprocessor_name: Literal["KHot", "OneHot", "Identity", "Enum"], delta: float, n_components_s0: int, n_components: int, -) -> Trajectories: +) -> Tuple[Trajectories, Trajectories, GFNModule, GFNModule]: if env_name == "HyperGrid": env = HyperGrid(ndim=2, height=8, preprocessor_name=preprocessor_name) elif env_name == "DiscreteEBM": @@ -33,10 +33,6 @@ def trajectory_sampling_with_return( if preprocessor_name != "Identity": pytest.skip("Useless tests") env = Box(delta=delta) - else: - raise ValueError("Unknown environment name") - - if env_name == "Box": pf_module = BoxPFNeuralNet( hidden_dim=32, n_hidden_layers=2, @@ -59,6 +55,10 @@ def trajectory_sampling_with_return( env=env, module=pb_module, n_components=n_components ) else: + raise ValueError("Unknown environment name") + + if env_name != "Box": + assert not isinstance(env, Box) pf_module = NeuralNet( input_dim=env.preprocessor.output_dim, output_dim=env.n_actions ) @@ -81,14 +81,17 @@ def trajectory_sampling_with_return( sampler = Sampler(estimator=pf_estimator) # Test mode collects log_probs and estimator_ouputs, not encountered in the wild. trajectories = sampler.sample_trajectories( - env, off_policy=False, n_trajectories=5, debug_mode=True + env, + save_logprobs=True, + n_trajectories=5, + save_estimator_outputs=True, ) # trajectories = sampler.sample_trajectories(env, n_trajectories=10) # TODO - why is this duplicated? states = env.reset(batch_shape=5, random=True) bw_sampler = Sampler(estimator=pb_estimator) bw_trajectories = bw_sampler.sample_trajectories( - env, off_policy=False, states=states + env, save_logprobs=True, states=states ) return trajectories, bw_trajectories, pf_estimator, pb_estimator @@ -101,11 +104,11 @@ def trajectory_sampling_with_return( @pytest.mark.parametrize("n_components", [1, 2, 5]) def test_trajectory_sampling( env_name: str, - preprocessor_name: str, + preprocessor_name: Literal["KHot", "OneHot", "Identity", "Enum"], delta: float, n_components_s0: int, n_components: int, -) -> Trajectories: +): if env_name == "HyperGrid": if delta != 0.1 or n_components_s0 != 1 or n_components != 1: pytest.skip("Useless tests") diff --git a/tutorials/examples/train_box.py b/tutorials/examples/train_box.py index 632d5b78..8bf7ec5b 100644 --- a/tutorials/examples/train_box.py +++ b/tutorials/examples/train_box.py @@ -158,14 +158,12 @@ def main(args): # noqa: C901 pf=pf_estimator, pb=pb_estimator, logF=logF_estimator, - off_policy=False, ) else: gflownet = SubTBGFlowNet( pf=pf_estimator, pb=pb_estimator, logF=logF_estimator, - off_policy=False, weighting=args.subTB_weighting, lamda=args.subTB_lambda, ) @@ -173,13 +171,11 @@ def main(args): # noqa: C901 gflownet = TBGFlowNet( pf=pf_estimator, pb=pb_estimator, - off_policy=False, ) elif args.loss == "ZVar": gflownet = LogPartitionVarianceGFlowNet( pf=pf_estimator, pb=pb_estimator, - off_policy=False, ) assert gflownet is not None, f"No gflownet for loss {args.loss}" @@ -235,7 +231,7 @@ def main(args): # noqa: C901 print(f"current optimizer LR: {optimizer.param_groups[0]['lr']}") trajectories = gflownet.sample_trajectories( - env, sample_off_policy=False, n_samples=args.batch_size + env, save_logprobs=True, n_samples=args.batch_size ) training_samples = gflownet.to_training_samples(trajectories) diff --git a/tutorials/examples/train_discreteebm.py b/tutorials/examples/train_discreteebm.py index 562bb2b4..45537686 100644 --- a/tutorials/examples/train_discreteebm.py +++ b/tutorials/examples/train_discreteebm.py @@ -71,7 +71,7 @@ def main(args): # noqa: C901 validation_info = {"l1_dist": float("inf")} for iteration in trange(n_iterations): trajectories = gflownet.sample_trajectories( - env, off_policy=False, n_samples=args.batch_size + env, save_logprobs=True, n_samples=args.batch_size ) training_samples = gflownet.to_training_samples(trajectories) diff --git a/tutorials/examples/train_hypergrid.py b/tutorials/examples/train_hypergrid.py index a051e850..2041c7ca 100644 --- a/tutorials/examples/train_hypergrid.py +++ b/tutorials/examples/train_hypergrid.py @@ -38,7 +38,6 @@ def main(args): # noqa: C901 seed = args.seed if args.seed != 0 else DEFAULT_SEED set_seed(seed) - off_policy_sampling = False if args.replay_buffer_size == 0 else True device_str = "cuda" if torch.cuda.is_available() and not args.no_cuda else "cpu" use_wandb = len(args.wandb_project) > 0 @@ -123,7 +122,6 @@ def main(args): # noqa: C901 gflownet = ModifiedDBGFlowNet( pf_estimator, pb_estimator, - off_policy_sampling, ) if args.loss in ("DB", "SubTB"): @@ -154,14 +152,12 @@ def main(args): # noqa: C901 pf=pf_estimator, pb=pb_estimator, logF=logF_estimator, - off_policy=off_policy_sampling, ) else: gflownet = SubTBGFlowNet( pf=pf_estimator, pb=pb_estimator, logF=logF_estimator, - off_policy=off_policy_sampling, weighting=args.subTB_weighting, lamda=args.subTB_lambda, ) @@ -169,13 +165,11 @@ def main(args): # noqa: C901 gflownet = TBGFlowNet( pf=pf_estimator, pb=pb_estimator, - off_policy=off_policy_sampling, ) elif args.loss == "ZVar": gflownet = LogPartitionVarianceGFlowNet( pf=pf_estimator, pb=pb_estimator, - off_policy=off_policy_sampling, ) assert gflownet is not None, f"No gflownet for loss {args.loss}" @@ -225,7 +219,10 @@ def main(args): # noqa: C901 validation_info = {"l1_dist": float("inf")} for iteration in trange(n_iterations): trajectories = gflownet.sample_trajectories( - env, n_samples=args.batch_size, sample_off_policy=off_policy_sampling + env, + n_samples=args.batch_size, + save_logprobs=args.replay_buffer_size == 0, + save_estimator_outputs=False, ) training_samples = gflownet.to_training_samples(trajectories) if replay_buffer is not None: diff --git a/tutorials/examples/train_hypergrid_simple.py b/tutorials/examples/train_hypergrid_simple.py index d21ef349..98c3ecae 100644 --- a/tutorials/examples/train_hypergrid_simple.py +++ b/tutorials/examples/train_hypergrid_simple.py @@ -35,7 +35,7 @@ pb_estimator = DiscretePolicyEstimator( module_PB, env.n_actions, is_backward=True, preprocessor=env.preprocessor ) -gflownet = TBGFlowNet(init_logZ=0.0, pf=pf_estimator, pb=pb_estimator, off_policy=True) +gflownet = TBGFlowNet(init_logZ=0.0, pf=pf_estimator, pb=pb_estimator) # Feed pf to the sampler. sampler = Sampler(estimator=pf_estimator) @@ -56,7 +56,8 @@ trajectories = sampler.sample_trajectories( env, n_trajectories=batch_size, - off_policy=True, + save_logprobs=False, + save_estimator_outputs=True, epsilon=exploration_rate, ) optimizer.zero_grad() diff --git a/tutorials/examples/train_line.py b/tutorials/examples/train_line.py index ccbbf1cf..6ce7fde6 100644 --- a/tutorials/examples/train_line.py +++ b/tutorials/examples/train_line.py @@ -228,7 +228,8 @@ def train( trajectories = gflownet.sample_trajectories( env, n_samples=batch_size, - sample_off_policy=True, + save_estimator_outputs=True, + save_logprobs=False, scale_factor=scale_schedule[iteration], # Off policy kwargs. ) training_samples = gflownet.to_training_samples(trajectories) @@ -291,7 +292,7 @@ def train( policy_std_max=policy_std_max, ) pb = StepEstimator(environment, pb_module, backward=True) - gflownet = TBGFlowNet(pf=pf, pb=pb, off_policy=True, init_logZ=0.0) + gflownet = TBGFlowNet(pf=pf, pb=pb, init_logZ=0.0) gflownet = train( gflownet, diff --git a/tutorials/notebooks/intro_gfn_continuous_line.ipynb b/tutorials/notebooks/intro_gfn_continuous_line.ipynb index 232abda1..52b35c8b 100644 --- a/tutorials/notebooks/intro_gfn_continuous_line.ipynb +++ b/tutorials/notebooks/intro_gfn_continuous_line.ipynb @@ -83,22 +83,15 @@ }, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "/tmp/ipykernel_145438/1097605799.py:20: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).\n", - " Normal(torch.tensor(m), torch.tensor(s)) for m, s in zip(mus, self.sigmas)\n" + "ename": "TypeError", + "evalue": "Can't instantiate abstract class Line with abstract methods backward_step, step", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[2], line 154\u001b[0m\n\u001b[1;32m 151\u001b[0m plt\u001b[38;5;241m.\u001b[39mshow()\n\u001b[1;32m 153\u001b[0m \u001b[38;5;66;03m# Set up our simple environment.\u001b[39;00m\n\u001b[0;32m--> 154\u001b[0m env \u001b[38;5;241m=\u001b[39m \u001b[43mLine\u001b[49m\u001b[43m(\u001b[49m\u001b[43mmus\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[38;5;241;43m-\u001b[39;49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mvariances\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[38;5;241;43m0.2\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m0.2\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mn_sd\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m4.5\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43minit_value\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m0\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mn_steps_per_trajectory\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m5\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 155\u001b[0m render(env)\n", + "\u001b[0;31mTypeError\u001b[0m: Can't instantiate abstract class Line with abstract methods backward_step, step" ] - }, - { - "data": { - "image/png": "", - "text/plain": [ - "
" - ] - }, - "metadata": {}, - "output_type": "display_data" } ], "source": [ @@ -547,6 +540,7 @@ " env,\n", " n_samples=batch_size,\n", " scale_factor=scale_schedule[iteration],\n", + " save_estimator_outputs=True,\n", " )\n", " training_samples = gflownet.to_training_samples(trajectories)\n", "\n", @@ -612,7 +606,6 @@ "gflownet = TBGFlowNet(\n", " pf=pf_estimator,\n", " pb=pb_estimator,\n", - " off_policy=True,\n", " init_logZ=0.0,\n", ")\n", "\n", @@ -766,7 +759,6 @@ "gflownet = TBGFlowNet(\n", " pf=pf_estimator,\n", " pb=pb_estimator,\n", - " off_policy=True, # No replay buffer.\n", " init_logZ=0.0,\n", ")\n", "\n", @@ -857,7 +849,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/tutorials/notebooks/intro_gfn_smiley.ipynb b/tutorials/notebooks/intro_gfn_smiley.ipynb index 7552ac9a..e8f95d41 100644 --- a/tutorials/notebooks/intro_gfn_smiley.ipynb +++ b/tutorials/notebooks/intro_gfn_smiley.ipynb @@ -1987,7 +1987,6 @@ "gflownet = TBGFlowNet(\n", " pf=pf_estimator,\n", " pb=pb_estimator,\n", - " off_policy=False, # No replay buffer.\n", ")\n", "\n", "# Policy parameters recieve one LR, and LogZ gets a dedicated, typically higher LR.\n",