Skip to content

Tasks

task

class Task

Create an asynchronous background tasks. Tasks allow you to run code according to a trigger object.

A task's trigger must inherit from BaseTrigger.

Attributes:

Name Type Description
callback Callable

The function to be called when the trigger is triggered.

trigger BaseTrigger

The trigger object that determines when the task should run.

task Optional[_Task]

The task object that is running the trigger loop.

iteration int

The number of times the task has run.

Source code in naff/models/naff/tasks/task.py
class Task:
    """
    Create an asynchronous background tasks. Tasks allow you to run code according to a trigger object.

    A task's trigger must inherit from `BaseTrigger`.

    Attributes:
        callback (Callable): The function to be called when the trigger is triggered.
        trigger (BaseTrigger): The trigger object that determines when the task should run.
        task (Optional[_Task]): The task object that is running the trigger loop.
        iteration (int): The number of times the task has run.

    """

    callback: Callable
    trigger: BaseTrigger
    task: _Task
    _stop: asyncio.Event
    iteration: int

    def __init__(self, callback: Callable, trigger: BaseTrigger) -> None:
        self.callback = callback
        self.trigger = trigger
        self._stop = asyncio.Event()
        self.task = MISSING
        self.iteration = 0

    @property
    def next_run(self) -> Optional[datetime]:
        """Get the next datetime this task will run."""
        if not self.task.done():
            return self.trigger.next_fire()
        return None

    @property
    def delta_until_run(self) -> Optional[timedelta]:
        """Get the time until the next run of this task."""
        if not self.task.done():
            return self.next_run - datetime.now()

    def on_error(self, error: Exception) -> None:
        """Error handler for this task. Called when an exception is raised during execution of the task."""
        naff.Client.default_error_handler("Task", error)

    async def __call__(self) -> None:
        try:
            if inspect.iscoroutinefunction(self.callback):
                val = await self.callback()
            else:
                val = self.callback()

            if isinstance(val, BaseTrigger):
                self.trigger = val
        except Exception as e:
            self.on_error(e)

    def _fire(self, fire_time: datetime) -> None:
        """Called when the task is being fired."""
        self.trigger.last_call_time = fire_time
        asyncio.create_task(self())
        self.iteration += 1

    async def _task_loop(self) -> None:
        """The main task loop to fire the task at the specified time based on triggers configured."""
        while not self._stop.is_set():
            fire_time = self.trigger.next_fire()
            if fire_time is None:
                return self.stop()

            try:
                await asyncio.wait_for(self._stop.wait(), max(0.0, (fire_time - datetime.now()).total_seconds()))
            except asyncio.TimeoutError:
                pass
            else:
                return None

            self._fire(fire_time)

    def start(self) -> None:
        """Start this task."""
        try:
            self._stop.clear()
            self.task = asyncio.create_task(self._task_loop())
        except RuntimeError:
            logger.error(
                "Unable to start task without a running event loop! We recommend starting tasks within an `on_startup` event."
            )

    def stop(self) -> None:
        """End this task."""
        self._stop.set()
        if self.task:
            self.task.cancel()

    def restart(self) -> None:
        """Restart this task."""
        self.stop()
        self.start()

    def reschedule(self, trigger: BaseTrigger) -> None:
        """
        Change the trigger being used by this task.

        Args:
            trigger: The new Trigger to use

        """
        self.trigger = trigger
        self.restart()

    @classmethod
    def create(cls, trigger: BaseTrigger) -> Callable[[Callable], "Task"]:
        """
        A decorator to create a task.

        Args:
            trigger: The trigger to use for this task

        """

        def wrapper(func: Callable) -> "Task":
            return cls(func, trigger)

        return wrapper

property readonly next_run: Optional[datetime.datetime]

Get the next datetime this task will run.

property readonly delta_until_run: Optional[datetime.timedelta]

Get the time until the next run of this task.

method on_error(self, error)

Error handler for this task. Called when an exception is raised during execution of the task.

Source code in naff/models/naff/tasks/task.py
def on_error(self, error: Exception) -> None:
    """Error handler for this task. Called when an exception is raised during execution of the task."""
    naff.Client.default_error_handler("Task", error)

method start(self)

Start this task.

Source code in naff/models/naff/tasks/task.py
def start(self) -> None:
    """Start this task."""
    try:
        self._stop.clear()
        self.task = asyncio.create_task(self._task_loop())
    except RuntimeError:
        logger.error(
            "Unable to start task without a running event loop! We recommend starting tasks within an `on_startup` event."
        )

method stop(self)

End this task.

Source code in naff/models/naff/tasks/task.py
def stop(self) -> None:
    """End this task."""
    self._stop.set()
    if self.task:
        self.task.cancel()

method restart(self)

Restart this task.

Source code in naff/models/naff/tasks/task.py
def restart(self) -> None:
    """Restart this task."""
    self.stop()
    self.start()

method reschedule(self, trigger)

Change the trigger being used by this task.

Parameters:

Name Type Description Default
trigger BaseTrigger

The new Trigger to use

required
Source code in naff/models/naff/tasks/task.py
def reschedule(self, trigger: BaseTrigger) -> None:
    """
    Change the trigger being used by this task.

    Args:
        trigger: The new Trigger to use

    """
    self.trigger = trigger
    self.restart()

classmethod method create(trigger)

A decorator to create a task.

Parameters:

Name Type Description Default
trigger BaseTrigger

The trigger to use for this task

required
Source code in naff/models/naff/tasks/task.py
@classmethod
def create(cls, trigger: BaseTrigger) -> Callable[[Callable], "Task"]:
    """
    A decorator to create a task.

    Args:
        trigger: The trigger to use for this task

    """

    def wrapper(func: Callable) -> "Task":
        return cls(func, trigger)

    return wrapper

triggers

class BaseTrigger (ABC)

Source code in naff/models/naff/tasks/triggers.py
class BaseTrigger(ABC):
    last_call_time: datetime

    def __new__(cls, *args, **kwargs) -> "BaseTrigger":
        new_cls = super().__new__(cls)
        new_cls.last_call_time = datetime.now()
        return new_cls

    def __or__(self, other: "BaseTrigger") -> "OrTrigger":
        return OrTrigger(self, other)

    @abstractmethod
    def next_fire(self) -> Optional[datetime]:
        """
        Return the next datetime to fire on.

        Returns:
            Datetime if one can be determined. If no datetime can be determined, return None

        """
        ...

method next_fire(self)

Return the next datetime to fire on.

Returns:

Type Description
Optional[datetime.datetime]

Datetime if one can be determined. If no datetime can be determined, return None

Source code in naff/models/naff/tasks/triggers.py
@abstractmethod
def next_fire(self) -> Optional[datetime]:
    """
    Return the next datetime to fire on.

    Returns:
        Datetime if one can be determined. If no datetime can be determined, return None

    """
    ...

class IntervalTrigger (BaseTrigger)

Trigger the task every set interval.

Attributes:

Name Type Description
seconds Union[int, float]

How many seconds between intervals

minutes Union[int, float]

How many minutes between intervals

hours Union[int, float]

How many hours between intervals

days Union[int, float]

How many days between intervals

weeks Union[int, float]

How many weeks between intervals

Source code in naff/models/naff/tasks/triggers.py
class IntervalTrigger(BaseTrigger):
    """
    Trigger the task every set interval.

    Attributes:
        seconds Union[int, float]: How many seconds between intervals
        minutes Union[int, float]: How many minutes between intervals
        hours Union[int, float]: How many hours between intervals
        days Union[int, float]: How many days between intervals
        weeks Union[int, float]: How many weeks between intervals

    """

    _t = Union[int, float]

    def __init__(self, seconds: _t = 0, minutes: _t = 0, hours: _t = 0, days: _t = 0, weeks: _t = 0) -> None:
        self.delta = timedelta(days=days, seconds=seconds, minutes=minutes, hours=hours, weeks=weeks)

        # lazy check for negatives
        if (datetime.now() + self.delta) < datetime.now():
            raise ValueError("Interval values must result in a time in the future!")

    def next_fire(self) -> Optional[datetime]:
        return self.last_call_time + self.delta

method next_fire(self)

Return the next datetime to fire on.

Returns:

Type Description
Optional[datetime.datetime]

Datetime if one can be determined. If no datetime can be determined, return None

Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
    return self.last_call_time + self.delta

class DateTrigger (BaseTrigger)

Trigger the task once, when the specified datetime is reached.

Attributes:

Name Type Description
target_datetime datetime

A datetime representing the date/time to run this task

Source code in naff/models/naff/tasks/triggers.py
class DateTrigger(BaseTrigger):
    """
    Trigger the task once, when the specified datetime is reached.

    Attributes:
        target_datetime datetime: A datetime representing the date/time to run this task

    """

    def __init__(self, target_datetime: datetime) -> None:
        self.target = target_datetime

    def next_fire(self) -> Optional[datetime]:
        if datetime.now() < self.target:
            return self.target
        return None

method next_fire(self)

Return the next datetime to fire on.

Returns:

Type Description
Optional[datetime.datetime]

Datetime if one can be determined. If no datetime can be determined, return None

Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
    if datetime.now() < self.target:
        return self.target
    return None

class TimeTrigger (BaseTrigger)

Trigger the task every day, at a specified (24 hour clock) time.

Attributes:

Name Type Description
hour int

The hour of the day (24 hour clock)

minute int

The minute of the hour

seconds int

The seconds of the minute

utc bool

If this time is in UTC

Source code in naff/models/naff/tasks/triggers.py
class TimeTrigger(BaseTrigger):
    """
    Trigger the task every day, at a specified (24 hour clock) time.

    Attributes:
        hour int: The hour of the day (24 hour clock)
        minute int: The minute of the hour
        seconds int: The seconds of the minute
        utc bool: If this time is in UTC

    """

    def __init__(self, hour: int = 0, minute: int = 0, seconds: Union[int, float] = 0, utc: bool = True) -> None:
        self.target_time = (hour, minute, seconds)
        self.tz = timezone.utc if utc else None

    def next_fire(self) -> Optional[datetime]:
        now = datetime.now()
        target = datetime(
            now.year, now.month, now.day, self.target_time[0], self.target_time[1], self.target_time[2], tzinfo=self.tz
        )
        if target.tzinfo == timezone.utc:
            target = target.astimezone(now.tzinfo)
            target = target.replace(tzinfo=None)

        if target <= self.last_call_time:
            target += timedelta(days=1)
        return target

method next_fire(self)

Return the next datetime to fire on.

Returns:

Type Description
Optional[datetime.datetime]

Datetime if one can be determined. If no datetime can be determined, return None

Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
    now = datetime.now()
    target = datetime(
        now.year, now.month, now.day, self.target_time[0], self.target_time[1], self.target_time[2], tzinfo=self.tz
    )
    if target.tzinfo == timezone.utc:
        target = target.astimezone(now.tzinfo)
        target = target.replace(tzinfo=None)

    if target <= self.last_call_time:
        target += timedelta(days=1)
    return target

class OrTrigger (BaseTrigger)

Trigger a task when any sub-trigger is fulfilled.

Source code in naff/models/naff/tasks/triggers.py
class OrTrigger(BaseTrigger):
    """Trigger a task when any sub-trigger is fulfilled."""

    def __init__(self, *trigger: BaseTrigger) -> None:
        self.triggers: List[BaseTrigger] = list(trigger)

    def _get_delta(self, d: BaseTrigger) -> timedelta:
        next_fire = d.next_fire()
        if not next_fire:
            return float("inf")
        return abs(next_fire - self.last_call_time)

    def __or__(self, other: "BaseTrigger") -> "OrTrigger":
        self.triggers.append(other)
        return self

    def next_fire(self) -> Optional[datetime]:
        if len(self.triggers) == 1:
            return self.triggers[0].next_fire()
        trigger = min(self.triggers, key=self._get_delta)
        return trigger.next_fire()

method next_fire(self)

Return the next datetime to fire on.

Returns:

Type Description
Optional[datetime.datetime]

Datetime if one can be determined. If no datetime can be determined, return None

Source code in naff/models/naff/tasks/triggers.py
def next_fire(self) -> Optional[datetime]:
    if len(self.triggers) == 1:
        return self.triggers[0].next_fire()
    trigger = min(self.triggers, key=self._get_delta)
    return trigger.next_fire()