Tasks
task
¶
class
Task
¶
Create an asynchronous background tasks. Tasks allow you to run code according to a trigger object.
A task's trigger must inherit from BaseTrigger
.
Attributes:
Name | Type | Description |
---|---|---|
callback |
Callable |
The function to be called when the trigger is triggered. |
trigger |
BaseTrigger |
The trigger object that determines when the task should run. |
task |
Optional[_Task] |
The task object that is running the trigger loop. |
iteration |
int |
The number of times the task has run. |
Source code in naff/models/naff/tasks/task.py
class Task:
"""
Create an asynchronous background tasks. Tasks allow you to run code according to a trigger object.
A task's trigger must inherit from `BaseTrigger`.
Attributes:
callback (Callable): The function to be called when the trigger is triggered.
trigger (BaseTrigger): The trigger object that determines when the task should run.
task (Optional[_Task]): The task object that is running the trigger loop.
iteration (int): The number of times the task has run.
"""
callback: Callable
trigger: BaseTrigger
task: _Task
_stop: asyncio.Event
iteration: int
def __init__(self, callback: Callable, trigger: BaseTrigger) -> None:
self.callback = callback
self.trigger = trigger
self._stop = asyncio.Event()
self.task = MISSING
self.iteration = 0
@property
def next_run(self) -> Optional[datetime]:
"""Get the next datetime this task will run."""
if not self.task.done():
return self.trigger.next_fire()
return None
@property
def delta_until_run(self) -> Optional[timedelta]:
"""Get the time until the next run of this task."""
if not self.task.done():
return self.next_run - datetime.now()
def on_error(self, error: Exception) -> None:
"""Error handler for this task. Called when an exception is raised during execution of the task."""
naff.Client.default_error_handler("Task", error)
async def __call__(self) -> None:
try:
if inspect.iscoroutinefunction(self.callback):
val = await self.callback()
else:
val = self.callback()
if isinstance(val, BaseTrigger):
self.trigger = val
except Exception as e:
self.on_error(e)
def _fire(self, fire_time: datetime) -> None:
"""Called when the task is being fired."""
self.trigger.last_call_time = fire_time
asyncio.create_task(self())
self.iteration += 1
async def _task_loop(self) -> None:
"""The main task loop to fire the task at the specified time based on triggers configured."""
while not self._stop.is_set():
fire_time = self.trigger.next_fire()
if fire_time is None:
return self.stop()
try:
await asyncio.wait_for(self._stop.wait(), max(0.0, (fire_time - datetime.now()).total_seconds()))
except asyncio.TimeoutError:
pass
else:
return None
self._fire(fire_time)
def start(self) -> None:
"""Start this task."""
try:
self._stop.clear()
self.task = asyncio.create_task(self._task_loop())
except RuntimeError:
logger.error(
"Unable to start task without a running event loop! We recommend starting tasks within an `on_startup` event."
)
def stop(self) -> None:
"""End this task."""
self._stop.set()
if self.task:
self.task.cancel()
def restart(self) -> None:
"""Restart this task."""
self.stop()
self.start()
def reschedule(self, trigger: BaseTrigger) -> None:
"""
Change the trigger being used by this task.
Args:
trigger: The new Trigger to use
"""
self.trigger = trigger
self.restart()
@classmethod
def create(cls, trigger: BaseTrigger) -> Callable[[Callable], "Task"]:
"""
A decorator to create a task.
Args:
trigger: The trigger to use for this task
"""
def wrapper(func: Callable) -> "Task":
return cls(func, trigger)
return wrapper
property
readonly
next_run: Optional[datetime.datetime]
¶
Get the next datetime this task will run.
property
readonly
delta_until_run: Optional[datetime.timedelta]
¶
Get the time until the next run of this task.
method
on_error(self, error)
¶
Error handler for this task. Called when an exception is raised during execution of the task.
Source code in naff/models/naff/tasks/task.py
def on_error(self, error: Exception) -> None:
"""Error handler for this task. Called when an exception is raised during execution of the task."""
naff.Client.default_error_handler("Task", error)
method
start(self)
¶
Start this task.
Source code in naff/models/naff/tasks/task.py
def start(self) -> None:
"""Start this task."""
try:
self._stop.clear()
self.task = asyncio.create_task(self._task_loop())
except RuntimeError:
logger.error(
"Unable to start task without a running event loop! We recommend starting tasks within an `on_startup` event."
)
method
stop(self)
¶
End this task.
Source code in naff/models/naff/tasks/task.py
def stop(self) -> None:
"""End this task."""
self._stop.set()
if self.task:
self.task.cancel()
method
restart(self)
¶
Restart this task.
Source code in naff/models/naff/tasks/task.py
def restart(self) -> None:
"""Restart this task."""
self.stop()
self.start()
method
reschedule(self, trigger)
¶
Change the trigger being used by this task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
BaseTrigger |
The new Trigger to use |
required |
Source code in naff/models/naff/tasks/task.py
def reschedule(self, trigger: BaseTrigger) -> None:
"""
Change the trigger being used by this task.
Args:
trigger: The new Trigger to use
"""
self.trigger = trigger
self.restart()
classmethod
method
create(trigger)
¶
A decorator to create a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
BaseTrigger |
The trigger to use for this task |
required |
Source code in naff/models/naff/tasks/task.py
@classmethod
def create(cls, trigger: BaseTrigger) -> Callable[[Callable], "Task"]:
"""
A decorator to create a task.
Args:
trigger: The trigger to use for this task
"""
def wrapper(func: Callable) -> "Task":
return cls(func, trigger)
return wrapper
triggers
¶
class
BaseTrigger (ABC)
¶
Source code in naff/models/naff/tasks/triggers.py
class BaseTrigger(ABC):
last_call_time: datetime
def __new__(cls, *args, **kwargs) -> "BaseTrigger":
new_cls = super().__new__(cls)
new_cls.last_call_time = datetime.now()
return new_cls
def __or__(self, other: "BaseTrigger") -> "OrTrigger":
return OrTrigger(self, other)
@abstractmethod
def next_fire(self) -> Optional[datetime]:
"""
Return the next datetime to fire on.
Returns:
Datetime if one can be determined. If no datetime can be determined, return None
"""
...
method
next_fire(self)
¶
Return the next datetime to fire on.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
Datetime if one can be determined. If no datetime can be determined, return None |
Source code in naff/models/naff/tasks/triggers.py
@abstractmethod
def next_fire(self) -> Optional[datetime]:
"""
Return the next datetime to fire on.
Returns:
Datetime if one can be determined. If no datetime can be determined, return None
"""
...
class
IntervalTrigger (BaseTrigger)
¶
Trigger the task every set interval.
Attributes:
Name | Type | Description |
---|---|---|
seconds |
Union[int, float] |
How many seconds between intervals |
minutes |
Union[int, float] |
How many minutes between intervals |
hours |
Union[int, float] |
How many hours between intervals |
days |
Union[int, float] |
How many days between intervals |
weeks |
Union[int, float] |
How many weeks between intervals |
Source code in naff/models/naff/tasks/triggers.py
class IntervalTrigger(BaseTrigger):
"""
Trigger the task every set interval.
Attributes:
seconds Union[int, float]: How many seconds between intervals
minutes Union[int, float]: How many minutes between intervals
hours Union[int, float]: How many hours between intervals
days Union[int, float]: How many days between intervals
weeks Union[int, float]: How many weeks between intervals
"""
_t = Union[int, float]
def __init__(self, seconds: _t = 0, minutes: _t = 0, hours: _t = 0, days: _t = 0, weeks: _t = 0) -> None:
self.delta = timedelta(days=days, seconds=seconds, minutes=minutes, hours=hours, weeks=weeks)
# lazy check for negatives
if (datetime.now() + self.delta) < datetime.now():
raise ValueError("Interval values must result in a time in the future!")
def next_fire(self) -> Optional[datetime]:
return self.last_call_time + self.delta
method
next_fire(self)
¶
Return the next datetime to fire on.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
Datetime if one can be determined. If no datetime can be determined, return None |
Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
return self.last_call_time + self.delta
class
DateTrigger (BaseTrigger)
¶
Trigger the task once, when the specified datetime is reached.
Attributes:
Name | Type | Description |
---|---|---|
target_datetime |
datetime |
A datetime representing the date/time to run this task |
Source code in naff/models/naff/tasks/triggers.py
class DateTrigger(BaseTrigger):
"""
Trigger the task once, when the specified datetime is reached.
Attributes:
target_datetime datetime: A datetime representing the date/time to run this task
"""
def __init__(self, target_datetime: datetime) -> None:
self.target = target_datetime
def next_fire(self) -> Optional[datetime]:
if datetime.now() < self.target:
return self.target
return None
method
next_fire(self)
¶
Return the next datetime to fire on.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
Datetime if one can be determined. If no datetime can be determined, return None |
Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
if datetime.now() < self.target:
return self.target
return None
class
TimeTrigger (BaseTrigger)
¶
Trigger the task every day, at a specified (24 hour clock) time.
Attributes:
Name | Type | Description |
---|---|---|
hour |
int |
The hour of the day (24 hour clock) |
minute |
int |
The minute of the hour |
seconds |
int |
The seconds of the minute |
utc |
bool |
If this time is in UTC |
Source code in naff/models/naff/tasks/triggers.py
class TimeTrigger(BaseTrigger):
"""
Trigger the task every day, at a specified (24 hour clock) time.
Attributes:
hour int: The hour of the day (24 hour clock)
minute int: The minute of the hour
seconds int: The seconds of the minute
utc bool: If this time is in UTC
"""
def __init__(self, hour: int = 0, minute: int = 0, seconds: Union[int, float] = 0, utc: bool = True) -> None:
self.target_time = (hour, minute, seconds)
self.tz = timezone.utc if utc else None
def next_fire(self) -> Optional[datetime]:
now = datetime.now()
target = datetime(
now.year, now.month, now.day, self.target_time[0], self.target_time[1], self.target_time[2], tzinfo=self.tz
)
if target.tzinfo == timezone.utc:
target = target.astimezone(now.tzinfo)
target = target.replace(tzinfo=None)
if target <= self.last_call_time:
target += timedelta(days=1)
return target
method
next_fire(self)
¶
Return the next datetime to fire on.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
Datetime if one can be determined. If no datetime can be determined, return None |
Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
now = datetime.now()
target = datetime(
now.year, now.month, now.day, self.target_time[0], self.target_time[1], self.target_time[2], tzinfo=self.tz
)
if target.tzinfo == timezone.utc:
target = target.astimezone(now.tzinfo)
target = target.replace(tzinfo=None)
if target <= self.last_call_time:
target += timedelta(days=1)
return target
class
OrTrigger (BaseTrigger)
¶
Trigger a task when any sub-trigger is fulfilled.
Source code in naff/models/naff/tasks/triggers.py
class OrTrigger(BaseTrigger):
"""Trigger a task when any sub-trigger is fulfilled."""
def __init__(self, *trigger: BaseTrigger) -> None:
self.triggers: List[BaseTrigger] = list(trigger)
def _get_delta(self, d: BaseTrigger) -> timedelta:
next_fire = d.next_fire()
if not next_fire:
return float("inf")
return abs(next_fire - self.last_call_time)
def __or__(self, other: "BaseTrigger") -> "OrTrigger":
self.triggers.append(other)
return self
def next_fire(self) -> Optional[datetime]:
if len(self.triggers) == 1:
return self.triggers[0].next_fire()
trigger = min(self.triggers, key=self._get_delta)
return trigger.next_fire()
method
next_fire(self)
¶
Return the next datetime to fire on.
Returns:
Type | Description |
---|---|
Optional[datetime.datetime] |
Datetime if one can be determined. If no datetime can be determined, return None |
Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
if len(self.triggers) == 1:
return self.triggers[0].next_fire()
trigger = min(self.triggers, key=self._get_delta)
return trigger.next_fire()