from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta @dataclass(frozen=True) class CronSchedule: minute: str hour: str day_of_month: str month: str day_of_week: str def parse_cron_expr(expr: str) -> CronSchedule: parts = expr.strip().split() if len(parts) != 5: raise ValueError("cron expression must have exactly 5 fields") return CronSchedule(*parts) def due_key(moment: datetime) -> str: return moment.strftime("%Y%m%d%H%M") def is_due(expr: str, moment: datetime) -> bool: schedule = parse_cron_expr(expr) cron_dow = (moment.weekday() + 1) % 7 return ( _field_matches(schedule.minute, moment.minute, 0, 59) and _field_matches(schedule.hour, moment.hour, 0, 23) and _field_matches(schedule.day_of_month, moment.day, 1, 31) and _field_matches(schedule.month, moment.month, 1, 12) and _field_matches(schedule.day_of_week, cron_dow, 0, 7, sunday_alias=True) ) def next_due_after(expr: str, moment: datetime, *, max_days: int = 366) -> datetime | None: parse_cron_expr(expr) candidate = moment.replace(second=0, microsecond=0) + timedelta(minutes=1) deadline = candidate + timedelta(days=max_days) while candidate <= deadline: if is_due(expr, candidate): return candidate candidate += timedelta(minutes=1) return None def _field_matches(field: str, value: int, min_value: int, max_value: int, sunday_alias: bool = False) -> bool: for part in field.split(","): if _part_matches(part.strip(), value, min_value, max_value, sunday_alias=sunday_alias): return True return False def _part_matches(part: str, value: int, min_value: int, max_value: int, sunday_alias: bool) -> bool: if not part: return False step = 1 base = part if "/" in part: base, step_s = part.split("/", 1) if not step_s.isdigit(): return False step = int(step_s) if step <= 0: return False if base == "*": start = min_value end = max_value elif "-" in base: start_s, end_s = base.split("-", 1) if not start_s.isdigit() or not end_s.isdigit(): return False start = int(start_s) end = int(end_s) elif base.isdigit(): start = end = int(base) else: return False values = _normalize_values(range(start, end + 1), sunday_alias=sunday_alias) normalized_value = 0 if sunday_alias and value == 7 else value if normalized_value not in values: return False return (normalized_value - min(values)) % step == 0 def _normalize_values(values: range, sunday_alias: bool) -> set[int]: out: set[int] = set() for value in values: if sunday_alias and value == 7: out.add(0) else: out.add(value) return out