dify/api/libs/datetime_utils.py
Harry b41538d8c7 feat(trigger): reinforcement schedule trigger debugging with cron calculation
- Implemented a caching mechanism for schedule trigger debug events using Redis to optimize performance.
- Added methods to create and manage schedule debug runtime configurations, including cron expression handling.
- Updated the ScheduleTriggerDebugEventPoller to utilize the new caching and event creation logic.
- Removed the deprecated build_schedule_pool_key function from event handling.
2025-10-28 23:34:08 +08:00

34 lines
1019 B
Python

import abc
import datetime
from typing import Protocol
class _NowFunction(Protocol):
@abc.abstractmethod
def __call__(self, tz: datetime.timezone | None) -> datetime.datetime:
pass
# _now_func is a callable with the _NowFunction signature.
# Its sole purpose is to abstract time retrieval, enabling
# developers to mock this behavior in tests and time-dependent scenarios.
_now_func: _NowFunction = datetime.datetime.now
def naive_utc_now() -> datetime.datetime:
"""Return a naive datetime object (without timezone information)
representing current UTC time.
"""
return _now_func(datetime.UTC).replace(tzinfo=None)
def ensure_naive_utc(dt: datetime.datetime) -> datetime.datetime:
"""Return the datetime as naive UTC (tzinfo=None).
If the input is timezone-aware, convert to UTC and drop the tzinfo.
Assumes naive datetimes are already expressed in UTC.
"""
if dt.tzinfo is None:
return dt
return dt.astimezone(datetime.UTC).replace(tzinfo=None)