8. Adding Reward Customization

The complete code for this part of the tutorial can be found here

# file structure
- cutting_2d
    - main.py  # modified
    - env
        - core_env.py  # modified
        - inventory.py
        - maze_state.py
        - maze_action.py
        - renderer.py
        - maze_env.py  # modified
        - events.py
        - kpi_calculator.py
    - space_interfaces
        - dict_action_conversion.py
        - dict_observation_conversion.py
    - reward
        - default_reward.py  # new

8.1. Reward

In this part of the tutorial we introduce how to reuse the event system for reward shaping and customization via the RewardAggregatorInterface.

The DefaultRewardAggregator does the following:

  • Requests the required event interfaces via get_interfaces (here CuttingEvents and InventoryEvents).

  • Collects rewards and penalties according to relevant events.

  • Aggregates the individual event rewards and penalties to a single scalar reward signal.

Note that this reward aggregator can have any form as long as it provides a scalar reward function that can be used for training. This gives a lot of flexibility in shaping rewards without the need to change the actual implementation of the environment (more on this topic).

reward/default_reward.py
from abc import abstractmethod
from typing import List

from maze.core.env.reward import RewardAggregatorInterface

from ..env.events import CuttingEvents, InventoryEvents


class CuttingRewardAggregator(RewardAggregatorInterface):
    """Interface for cutting reward aggregators."""

    @abstractmethod
    def collect_rewards(self) -> List[float]:
        """Assign rewards and penalties according to respective events.
        :return: List of individual event rewards.
        """


class DefaultRewardAggregator(CuttingRewardAggregator):
    """Default reward scheme for the 2D cutting env.

    :param invalid_action_penalty: Negative reward assigned for an invalid cutting specification.
    :param raw_piece_usage_penalty: Negative reward assigned for starting a new raw inventory piece.
    """

    def __init__(self, invalid_action_penalty: float, raw_piece_usage_penalty: float):
        super().__init__()
        self.invalid_action_penalty = invalid_action_penalty
        self.raw_piece_usage_penalty = raw_piece_usage_penalty

    def get_interfaces(self):
        """Specification of the event interfaces this subscriber wants to receive events from.
        Every subscriber must implement this configuration method.
        :return: A list of interface classes"""
        return [CuttingEvents, InventoryEvents]

    def collect_rewards(self) -> List[float]:
        """Assign rewards and penalties according to respective events.
        :return: List of individual event rewards.
        """

        rewards: List[float] = []

        # penalty for starting a new raw inventory piece
        for _ in self.query_events(InventoryEvents.piece_replenished):
            rewards.append(self.raw_piece_usage_penalty)

        # penalty for selecting an invalid piece for cutting
        for _ in self.query_events(CuttingEvents.invalid_piece_selected):
            rewards.append(self.invalid_action_penalty)

        # penalty for specifying invalid cutting parameters
        for _ in self.query_events(CuttingEvents.invalid_cut):
            rewards.append(self.invalid_action_penalty)

        return rewards

    @classmethod
    def to_scalar_reward(cls, reward: List[float]) -> float:
        """Aggregate sub-rewards to scalar reward.

        This method is useful for example in a multi-agent setting
        where we could sum over multiple actors to assign a joint reward.

        :param: reward: The aggregated reward (e.g. per-agent reward for multi-agent RL settings).
        :return: The scalar reward returned by the environment.
        """
        return sum(reward)

8.2. Updating the Core- and MazeEnv

We also have to make a few modifications in the CoreEnv:

  • Initialize the reward aggregator in the constructor.

  • Instead of accumulating reward in the if-else branches of the step function we summarize it only once at the end. The conversion to a scalar is performed in the step function of the MazeEnv.

env/core_env.py
...
from ..reward.default_reward import CuttingRewardAggregator


class Cutting2DCoreEnvironment(CoreEnv):
    """Environment for cutting 2D pieces based on the customer demand. Works as follows:
    ...
    :param reward_aggregator: Either an instantiated aggregator or a configuration dictionary.
    """

    def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int), static_demand: (int, int),
                 reward_aggregator: CuttingRewardAggregator):
        super().__init__()

        ...

        # init reward and register it with pubsub
        self.reward_aggregator = reward_aggregator
        self.pubsub.register_subscriber(self.reward_aggregator)

    def step(self, maze_action: Cutting2DMazeAction) -> Tuple[Cutting2DMazeState, np.array, bool, Dict[Any, Any]]:
        """Summary of the step (simplified, not necessarily respecting the actual order in the code):
        1. Check if the selected piece to cut is valid (i.e. in inventory, large enough etc.)
        2. Attempt the cutting
        3. Replenish a fresh piece if needed and return an appropriate reward

        :param maze_action: Cutting maze_action to take.
        :return: state, reward, done, info
        """

        info = {}
        replenishment_needed = False

        # check if valid piece id was selected
        if maze_action.piece_id >= self.inventory.size():
            self.cutting_events.invalid_piece_selected()
        # perform cutting
        else:
            piece_to_cut = self.inventory.pieces[maze_action.piece_id]

            # attempt the cut
            if self.inventory.cut(maze_action, self.current_demand):
                self.cutting_events.valid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                              raw_piece_size=self.raw_piece_size)
                replenishment_needed = piece_to_cut == self.raw_piece_size
            else:
                # assign a negative reward for invalid cutting attempts
                self.cutting_events.invalid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                                raw_piece_size=self.raw_piece_size)

        # check if replenishment is required
        if replenishment_needed:
            self.inventory.replenish_piece()
            # assign negative reward if a piece has to be replenished

        # step execution finished, write step statistics
        self.inventory.log_step_statistics()

        # aggregate reward from events
        reward = self.reward_aggregator.collect_rewards()

        # compile env state
        maze_state = self.get_maze_state()

        return maze_state, reward, False, info

Finally, we update the maze_env_factory function for instantiating the trainable MazeEnv and we are all set up for training with event based, customized rewards.

env/maze_env.py
...


def maze_env_factory(max_pieces_in_inventory: int, raw_piece_size: (int, int),
                     static_demand: (int, int)) -> Cutting2DEnvironment:
    """Convenience factory function that compiles a trainable maze environment.
    (for argument details see: Cutting2DCoreEnvironment)
    """

    # init reward aggregator
    reward_aggregator = DefaultRewardAggregator(invalid_action_penalty=-2, raw_piece_usage_penalty=-1)

    # init core environment
    core_env = Cutting2DCoreEnvironment(max_pieces_in_inventory=max_pieces_in_inventory,
                                        raw_piece_size=raw_piece_size,
                                        static_demand=static_demand,
                                        reward_aggregator=reward_aggregator)

    # init maze environment including observation and action interfaces
    action_conversion = ActionConversion(max_pieces_in_inventory=max_pieces_in_inventory)
    observation_conversion = ObservationConversion(raw_piece_size=raw_piece_size,
                                                   max_pieces_in_inventory=max_pieces_in_inventory)
    return Cutting2DEnvironment(core_env, action_conversion, observation_conversion)

8.3. Where to Go Next

As the reward is implemented via a reward aggregator that is methodologically identical to the initial version there is no need to retain the model for now. However, we highly recommend to proceed with the more advanced tutorial on Structured Environments and Action Masking.