Skip to content

Commit

Permalink
Add seed manager
Browse files Browse the repository at this point in the history
  • Loading branch information
chraibi committed Oct 24, 2024
1 parent be8e8b8 commit e51c7ef
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 7 deletions.
4 changes: 3 additions & 1 deletion simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ def init_motivation_model(
number_high_value=int(_data["motivation_parameters"]["number_high_value"]),
nagents=number_agents,
agent_ids=ped_ids,
agent_positions=ped_positions,
competition_decay_reward=competition_decay_reward,
competition_max=competition_max,
percent=percent,
motivation_door_center=motivation_door_center,
evc=False,
)
# =================
Expand Down Expand Up @@ -438,7 +440,7 @@ def init_and_run_simulation(
motivation_file = _trajectory_path.with_name(
_trajectory_path.stem + "_motivation.csv"
)
logging.info(f"{motivation_file = }")
logging.info(f"{motivation_file.stem = }")
simulation = init_simulation(
_data, _time_step, _fps, _trajectory_path, from_file=True
)
Expand Down
73 changes: 67 additions & 6 deletions src/motivation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,54 @@

from .logger_config import log_debug
import math
import hashlib
from enum import Enum, auto

Point: TypeAlias = Tuple[float, float]


class SeedOperation(Enum):
"""Enumeration of different seeding operations for tracking purposes."""

AGENT_VALUE_ASSIGNMENT = auto()
POSITION_PROBABILITY = auto()
HIGH_VALUE_SELECTION = auto()
GENERAL_RANDOMIZATION = auto()


@dataclass
class SeedManager:
"""Manages seeds for reproducible randomization across different operations."""

base_seed: int
_operation_seeds: Dict[SeedOperation, int] = field(default_factory=dict)

def __post_init__(self):
"""Initialize operation-specific seeds based on the base seed."""
for operation in SeedOperation:
# Create unique seeds for each operation using hash function
operation_string = f"{self.base_seed}_{operation.name}"
hash_value = hashlib.md5(operation_string.encode()).hexdigest()
# Convert first 8 characters of hash to integer
self._operation_seeds[operation] = int(hash_value[:8], 16)

def get_operation_seed(
self, operation: SeedOperation, sub_id: Optional[int] = None
) -> int:
"""Get a deterministic seed for a specific operation and optional sub-operation."""
operation_base = self._operation_seeds[operation]
if sub_id is not None:
# Combine operation seed with sub_id for unique but deterministic result
return operation_base + sub_id * 1000
return operation_base

def set_seed_for_operation(
self, operation: SeedOperation, sub_id: Optional[int] = None
) -> None:
"""Set the random seed for a specific operation."""
random.seed(self.get_operation_seed(operation, sub_id))


class MotivationStrategy(ABC):
"""Abstract class for strategy model."""

Expand Down Expand Up @@ -91,10 +135,10 @@ def plot(self) -> List[Figure]:
class EVCStrategy(MotivationStrategy):
"""Motivation theory based on E.V.C (model4)."""

motivation_door_center: Point
agent_ids: List[int] = field(default_factory=list)
agent_positions: List[Point] = (field(default_factory=list),)
agent_positions: List[Point] = field(default_factory=list)
pedestrian_value: Dict[int, float] = field(default_factory=dict)
motivation_door_center: Point = tuple()
width: float = 1.0
height: float = 1.0
max_reward: int = 0
Expand All @@ -112,18 +156,22 @@ class EVCStrategy(MotivationStrategy):
alpha = 1.0
spatial_bias: float = 5.0 # To account for absolute distances
distance_scale: float = 2.0 # Scale factor for distance normalization
seed_manager: Optional[SeedManager] = None

def calculate_exit_distance(self, pos: Point) -> float:
"""Calculate distance from a position to the door center.
Returns absolute distance in spatial units."""
Returns absolute distance in spatial units.
"""
dx = pos[0] - self.motivation_door_center[0]
dy = pos[1] - self.motivation_door_center[1]
return math.sqrt(dx * dx + dy * dy)

def get_high_value_probability(self, pos: Point) -> float:
"""Calculate probability of being high value based on position.
Returns higher probability for positions closer to door."""
Returns higher probability for positions closer to door.
"""
distance = self.calculate_exit_distance(pos)
# Convert distance to probability using exponential decay
# Scale the distance to control the rate of probability decay
Expand All @@ -137,20 +185,29 @@ def get_derived_seed(base_seed: int, operation_id: int) -> int:

def __post_init__(self) -> None:
"""Initialize array pedestrian_value with random values in min max interval."""
if self.seed is not None:
random.seed(self.seed)
if self.seed is not None and self.seed_manager is None:
self.seed_manager = SeedManager(self.seed)

if self.number_high_value > self.nagents:
logging.warning(
f"Configuration error: {self.number_high_value} > {self.nagents}. Change value to match!"
)
self.number_high_value = self.nagents

# Set seed for position probability calculations
if self.seed_manager:
self.seed_manager.set_seed_for_operation(SeedOperation.POSITION_PROBABILITY)

# Calculate probabilities for each agent based on position
agent_probabilities = [
(agent_id, self.get_high_value_probability(pos))
for agent_id, pos in zip(self.agent_ids, self.agent_positions)
]

# Set seed for high value selection
if self.seed_manager:
self.seed_manager.set_seed_for_operation(SeedOperation.HIGH_VALUE_SELECTION)

# Sort agents by their probability of being high value
sorted_agents = sorted(
agent_probabilities,
Expand All @@ -165,6 +222,10 @@ def __post_init__(self) -> None:
)
# high_value_agents = set(random.sample(self.agent_ids, self.number_high_value))
for n in self.agent_ids:
if self.seed_manager:
self.seed_manager.set_seed_for_operation(
SeedOperation.AGENT_VALUE_ASSIGNMENT, sub_id=n
)
if n in high_value_agents:
# This agent gets a high value
self.pedestrian_value[n] = self.value(
Expand Down
13 changes: 13 additions & 0 deletions src/simutilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ def create_motivation_strategy(params: Dict[str, Any]) -> mm.MotivationStrategy:
width=params["width"], height=params["height"]
)
elif strategy in ["EVC", "EC-V"]:
door_point1 = (
params["motivation_doors"][0][0][0],
params["motivation_doors"][0][0][1],
)
door_point2 = (
params["motivation_doors"][0][1][0],
params["motivation_doors"][0][1][1],
)
x_door = 0.5 * (door_point1[0] + door_point2[0])
y_door = 0.5 * (door_point1[1] + door_point2[1])
motivation_door_center = (x_door, y_door)
return mm.EVCStrategy(
width=params["width"],
height=params["height"],
Expand All @@ -110,6 +121,8 @@ def create_motivation_strategy(params: Dict[str, Any]) -> mm.MotivationStrategy:
min_value_low=params["min_value_low"],
number_high_value=params["number_high_value"],
agent_ids=list(range(params["number_agents"])),
# agent_positions=ped_positions,
motivation_door_center=motivation_door_center,
competition_decay_reward=params["competition_decay_reward"],
competition_max=params["competition_max"],
percent=params["percent"],
Expand Down

0 comments on commit e51c7ef

Please sign in to comment.