Skip to content

Client

The bot client.

Note

By default, all non-privileged intents will be enabled

Attributes:

Name Type Description
intents

Union[int, Intents]: The intents to use

default_prefix None

Union[str, Iterable[str]]: The default prefix (or prefixes) to use for prefixed commands. Defaults to your bot being mentioned.

generate_prefixes None

Callable[..., Coroutine]: A coroutine that returns a string or an iterable of strings to determine prefixes.

status

Status: The status the bot should log in with (IE ONLINE, DND, IDLE)

activity

Union[Activity, str]: The activity the bot should log in "playing"

sync_interactions None

bool: Should application commands be synced with discord?

delete_unused_application_cmds

bool: Delete any commands from discord that aren't implemented in this client

enforce_interaction_perms

bool: Enforce discord application command permissions, locally

fetch_members None

bool: Should the client fetch members from guilds upon startup (this will delay the client being ready)

auto_defer None

AutoDefer: A system to automatically defer commands after a set duration

interaction_context Type[InteractionContext]

Type[InteractionContext]: InteractionContext: The object to instantiate for Interaction Context

prefixed_context Type[PrefixedContext]

Type[PrefixedContext]: The object to instantiate for Prefixed Context

component_context Type[ComponentContext]

Type[ComponentContext]: The object to instantiate for Component Context

autocomplete_context Type[AutocompleteContext]

Type[AutocompleteContext]: The object to instantiate for Autocomplete Context

modal_context Type[ModalContext]

Type[ModalContext]: The object to instantiate for Modal Context

hybrid_context Type[HybridContext]

Type[HybridContext]: The object to instantiate for Hybrid Context

global_pre_run_callback

Callable[..., Coroutine]: A coroutine to run before every command is executed

global_post_run_callback

Callable[..., Coroutine]: A coroutine to run after every command is executed

send_command_tracebacks bool

bool: Should the traceback of command errors be sent in reply to the command invocation

total_shards

int: The total number of shards in use

shard_id

int: The zero based int ID of this shard

debug_scope None

Snowflake_Type: Force all application commands to be registered within this scope

asyncio_debug

bool: Enable asyncio debug features

basic_logging

bool: Utilise basic logging to output library data to console. Do not use in combination with Client.logger

logging_level

int: The level of logging to use for basic_logging. Do not use in combination with Client.logger

logger None

logging.Logger: The logger NAFF should use. Do not use in combination with Client.basic_logging and Client.logging_level. Note: Different loggers with multiple clients are not supported

Optionally, you can configure the caches here, by specifying the name of the cache, followed by a dict-style object to use. It is recommended to use smart_cache.create_cache to configure the cache here. as an example, this is a recommended attribute message_cache=create_cache(250, 50),

Note

Setting a message cache hard limit to None is not recommended, as it could result in extremely high memory usage, we suggest a sane limit.

Source code in naff/client/client.py
class Client(
    processors.AutoModEvents,
    processors.ChannelEvents,
    processors.GuildEvents,
    processors.MemberEvents,
    processors.MessageEvents,
    processors.ReactionEvents,
    processors.RoleEvents,
    processors.StageEvents,
    processors.ThreadEvents,
    processors.UserEvents,
    processors.VoiceEvents,
):
    """
    The bot client.

    note:
        By default, all non-privileged intents will be enabled

    Attributes:
        intents: Union[int, Intents]: The intents to use

        default_prefix: Union[str, Iterable[str]]: The default prefix (or prefixes) to use for prefixed commands. Defaults to your bot being mentioned.
        generate_prefixes: Callable[..., Coroutine]: A coroutine that returns a string or an iterable of strings to determine prefixes.
        status: Status: The status the bot should log in with (IE ONLINE, DND, IDLE)
        activity: Union[Activity, str]: The activity the bot should log in "playing"

        sync_interactions: bool: Should application commands be synced with discord?
        delete_unused_application_cmds: bool: Delete any commands from discord that aren't implemented in this client
        enforce_interaction_perms: bool: Enforce discord application command permissions, locally
        fetch_members: bool: Should the client fetch members from guilds upon startup (this will delay the client being ready)

        auto_defer: AutoDefer: A system to automatically defer commands after a set duration
        interaction_context: Type[InteractionContext]: InteractionContext: The object to instantiate for Interaction Context
        prefixed_context: Type[PrefixedContext]: The object to instantiate for Prefixed Context
        component_context: Type[ComponentContext]: The object to instantiate for Component Context
        autocomplete_context: Type[AutocompleteContext]: The object to instantiate for Autocomplete Context
        modal_context: Type[ModalContext]: The object to instantiate for Modal Context
        hybrid_context: Type[HybridContext]: The object to instantiate for Hybrid Context

        global_pre_run_callback: Callable[..., Coroutine]: A coroutine to run before every command is executed
        global_post_run_callback: Callable[..., Coroutine]: A coroutine to run after every command is executed
        send_command_tracebacks: bool: Should the traceback of command errors be sent in reply to the command invocation

        total_shards: int: The total number of shards in use
        shard_id: int: The zero based int ID of this shard

        debug_scope: Snowflake_Type: Force all application commands to be registered within this scope
        asyncio_debug: bool: Enable asyncio debug features
        basic_logging: bool: Utilise basic logging to output library data to console. Do not use in combination with `Client.logger`
        logging_level: int: The level of logging to use for basic_logging. Do not use in combination with `Client.logger`
        logger: logging.Logger: The logger NAFF should use. Do not use in combination with `Client.basic_logging` and `Client.logging_level`. Note: Different loggers with multiple clients are not supported

    Optionally, you can configure the caches here, by specifying the name of the cache, followed by a dict-style object to use.
    It is recommended to use `smart_cache.create_cache` to configure the cache here.
    as an example, this is a recommended attribute `message_cache=create_cache(250, 50)`,

    !!! note
        Setting a message cache hard limit to None is not recommended, as it could result in extremely high memory usage, we suggest a sane limit.

    """

    def __init__(
        self,
        *,
        activity: Union[Activity, str] = None,
        auto_defer: Absent[Union[AutoDefer, bool]] = MISSING,
        autocomplete_context: Type[AutocompleteContext] = AutocompleteContext,
        component_context: Type[ComponentContext] = ComponentContext,
        debug_scope: Absent["Snowflake_Type"] = MISSING,
        default_prefix: str | Iterable[str] = MENTION_PREFIX,
        delete_unused_application_cmds: bool = False,
        enforce_interaction_perms: bool = True,
        fetch_members: bool = False,
        generate_prefixes: Absent[Callable[..., Coroutine]] = MISSING,
        global_post_run_callback: Absent[Callable[..., Coroutine]] = MISSING,
        global_pre_run_callback: Absent[Callable[..., Coroutine]] = MISSING,
        intents: Union[int, Intents] = Intents.DEFAULT,
        interaction_context: Type[InteractionContext] = InteractionContext,
        logger: logging.Logger = logger,
        owner_ids: Iterable["Snowflake_Type"] = (),
        modal_context: Type[ModalContext] = ModalContext,
        prefixed_context: Type[PrefixedContext] = PrefixedContext,
        hybrid_context: Type[HybridContext] = HybridContext,
        send_command_tracebacks: bool = True,
        shard_id: int = 0,
        status: Status = Status.ONLINE,
        sync_interactions: bool = True,
        sync_ext: bool = True,
        total_shards: int = 1,
        basic_logging: bool = False,
        logging_level: int = logging.INFO,
        **kwargs,
    ) -> None:
        if basic_logging:
            logging.basicConfig()
            logger.setLevel(logging_level)

        # Set Up logger and overwrite the constant
        self.logger = logger
        """The logger NAFF should use. Do not use in combination with `Client.basic_logging` and `Client.logging_level`. Note: Different loggers with multiple clients are not supported"""
        constants.logger = logger

        # Configuration
        self.sync_interactions = sync_interactions
        """Should application commands be synced"""
        self.del_unused_app_cmd: bool = delete_unused_application_cmds
        """Should unused application commands be deleted?"""
        self.sync_ext: bool = sync_ext
        """Should we sync whenever a extension is (un)loaded"""
        self.debug_scope = to_snowflake(debug_scope) if debug_scope is not MISSING else MISSING
        """Sync global commands as guild for quicker command updates during debug"""
        self.default_prefix = default_prefix
        """The default prefix to be used for prefixed commands"""
        self.generate_prefixes = generate_prefixes if generate_prefixes is not MISSING else self.generate_prefixes
        """A coroutine that returns a prefix or an iterable of prefixes, for dynamic prefixes"""
        self.send_command_tracebacks: bool = send_command_tracebacks
        """Should the traceback of command errors be sent in reply to the command invocation"""
        if auto_defer is True:
            auto_defer = AutoDefer(enabled=True)
        else:
            auto_defer = auto_defer or AutoDefer()
        self.auto_defer = auto_defer
        """A system to automatically defer commands after a set duration"""
        self.intents = intents if isinstance(intents, Intents) else Intents(intents)

        # resources

        self.http: HTTPClient = HTTPClient()
        """The HTTP client to use when interacting with discord endpoints"""

        # context objects
        self.interaction_context: Type[InteractionContext] = interaction_context
        """The object to instantiate for Interaction Context"""
        self.prefixed_context: Type[PrefixedContext] = prefixed_context
        """The object to instantiate for Prefixed Context"""
        self.component_context: Type[ComponentContext] = component_context
        """The object to instantiate for Component Context"""
        self.autocomplete_context: Type[AutocompleteContext] = autocomplete_context
        """The object to instantiate for Autocomplete Context"""
        self.modal_context: Type[ModalContext] = modal_context
        """The object to instantiate for Modal Context"""
        self.hybrid_context: Type[HybridContext] = hybrid_context
        """The object to instantiate for Hybrid Context"""

        # flags
        self._ready = asyncio.Event()
        self._closed = False
        self._startup = False

        self._guild_event = asyncio.Event()
        self.guild_event_timeout = 3
        """How long to wait for guilds to be cached"""

        # Sharding
        self.total_shards = total_shards
        self._connection_state: ConnectionState = ConnectionState(self, intents, shard_id)

        self.enforce_interaction_perms = enforce_interaction_perms

        self.fetch_members = fetch_members
        """Fetch the full members list of all guilds on startup"""

        self._mention_reg = MISSING

        # caches
        self.cache: GlobalCache = GlobalCache(self, **{k: v for k, v in kwargs.items() if hasattr(GlobalCache, k)})
        # these store the last sent presence data for change_presence
        self._status: Status = status
        if isinstance(activity, str):
            self._activity = Activity.create(name=str(activity))
        else:
            self._activity: Activity = activity

        self._user: Absent[NaffUser] = MISSING
        self._app: Absent[Application] = MISSING

        # collections
        self.prefixed_commands: Dict[str, PrefixedCommand] = {}
        """A dictionary of registered prefixed commands: `{name: command}`"""
        self.interactions: Dict["Snowflake_Type", Dict[str, InteractionCommand]] = {}
        """A dictionary of registered application commands: `{cmd_id: command}`"""
        self._component_callbacks: Dict[str, Callable[..., Coroutine]] = {}
        self._modal_callbacks: Dict[str, Callable[..., Coroutine]] = {}
        self._interaction_scopes: Dict["Snowflake_Type", "Snowflake_Type"] = {}
        self.processors: Dict[str, Callable[..., Coroutine]] = {}
        self.__modules = {}
        self.ext = {}
        """A dictionary of mounted ext"""
        self.listeners: Dict[str, List] = {}
        self.waits: Dict[str, List] = {}
        self.owner_ids: set[Snowflake_Type] = set(owner_ids)

        self.async_startup_tasks: list[Coroutine] = []
        """A list of coroutines to run during startup"""

        # callbacks
        if global_pre_run_callback:
            if asyncio.iscoroutinefunction(global_pre_run_callback):
                self.pre_run_callback: Callable[..., Coroutine] = global_pre_run_callback
            else:
                raise TypeError("Callback must be a coroutine")
        else:
            self.pre_run_callback = MISSING

        if global_post_run_callback:
            if asyncio.iscoroutinefunction(global_post_run_callback):
                self.post_run_callback: Callable[..., Coroutine] = global_post_run_callback
            else:
                raise TypeError("Callback must be a coroutine")
        else:
            self.post_run_callback = MISSING

        super().__init__()
        self._sanity_check()

    @property
    def is_closed(self) -> bool:
        """Returns True if the bot has closed."""
        return self._closed

    @property
    def is_ready(self) -> bool:
        """Returns True if the bot is ready."""
        return self._ready.is_set()

    @property
    def latency(self) -> float:
        """Returns the latency of the websocket connection."""
        return self._connection_state.latency

    @property
    def average_latency(self) -> float:
        """Returns the average latency of the websocket connection."""
        return self._connection_state.average_latency

    @property
    def start_time(self) -> datetime:
        """The start time of the bot."""
        return self._connection_state.start_time

    @property
    def gateway_started(self) -> bool:
        """Returns if the gateway has been started."""
        return self._connection_state.gateway_started.is_set()

    @property
    def user(self) -> NaffUser:
        """Returns the bot's user."""
        return self._user

    @property
    def app(self) -> Application:
        """Returns the bots application."""
        return self._app

    @property
    def owner(self) -> Optional["User"]:
        """Returns the bot's owner'."""
        try:
            return self.app.owner
        except TypeError:
            return MISSING

    @property
    def owners(self) -> List["User"]:
        """Returns the bot's owners as declared via `client.owner_ids`."""
        return [self.get_user(u_id) for u_id in self.owner_ids]

    @property
    def guilds(self) -> List["Guild"]:
        """Returns a list of all guilds the bot is in."""
        return self.user.guilds

    @property
    def status(self) -> Status:
        """
        Get the status of the bot.

        IE online, afk, dnd

        """
        return self._status

    @property
    def activity(self) -> Activity:
        """Get the activity of the bot."""
        return self._activity

    @property
    def application_commands(self) -> List[InteractionCommand]:
        """A list of all application commands registered within the bot."""
        commands = []
        for scope in self.interactions.keys():
            commands += [cmd for cmd in self.interactions[scope].values() if cmd not in commands]

        return commands

    @property
    def ws(self) -> GatewayClient:
        """Returns the websocket client."""
        return self._connection_state.gateway

    def get_guild_websocket(self, id: "Snowflake_Type") -> GatewayClient:
        return self.ws

    def _sanity_check(self) -> None:
        """Checks for possible and common errors in the bot's configuration."""
        logger.debug("Running client sanity checks...")
        contexts = {
            self.interaction_context: InteractionContext,
            self.prefixed_context: PrefixedContext,
            self.component_context: ComponentContext,
            self.autocomplete_context: AutocompleteContext,
            self.modal_context: ModalContext,
            self.hybrid_context: HybridContext,
        }
        for obj, expected in contexts.items():
            if not issubclass(obj, expected):
                raise TypeError(f"{obj.__name__} must inherit from {expected.__name__}")

        if self.del_unused_app_cmd:
            logger.warning(
                "As `delete_unused_application_cmds` is enabled, the client must cache all guilds app-commands, this could take a while."
            )

        if Intents.GUILDS not in self._connection_state.intents:
            logger.warning("GUILD intent has not been enabled; this is very likely to cause errors")

        if self.fetch_members and Intents.GUILD_MEMBERS not in self._connection_state.intents:
            raise BotException("Members Intent must be enabled in order to use fetch members")
        elif self.fetch_members:
            logger.warning("fetch_members enabled; startup will be delayed")

        if len(self.processors) == 0:
            logger.warning("No Processors are loaded! This means no events will be processed!")

    async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
        """
        A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.

        !!! note
            To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client

        Args:
            bot: A reference to the client
            message: A message to determine the prefix from.

        Returns:
            A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`

        """
        return self.default_prefix

    def _queue_task(self, coro: Listener, event: BaseEvent, *args, **kwargs) -> asyncio.Task:
        async def _async_wrap(_coro: Listener, _event: BaseEvent, *_args, **_kwargs) -> None:
            try:
                if len(_event.__attrs_attrs__) == 2:
                    # override_name & bot
                    await _coro()
                else:
                    await _coro(_event, *_args, **_kwargs)
            except asyncio.CancelledError:
                pass
            except Exception as e:
                if isinstance(event, events.Error):
                    # No infinite loops please
                    self.default_error_handler(repr(event), e)
                else:
                    self.dispatch(events.Error(repr(event), e))

        wrapped = _async_wrap(coro, event, *args, **kwargs)

        return asyncio.create_task(wrapped, name=f"naff:: {event.resolved_name}")

    @staticmethod
    def default_error_handler(source: str, error: BaseException) -> None:
        """
        The default error logging behaviour.

        Args:
            source: The source of this error
            error: The exception itself

        """
        out = traceback.format_exception(error)

        if isinstance(error, HTTPException):
            # HTTPException's are of 3 known formats, we can parse them for human readable errors
            try:
                errors = error.search_for_message(error.errors)
                out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
            except Exception:  # noqa : S110
                pass

        logger.error(
            "Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
        )

    @Listener.create()
    async def _on_error(self, event: events.Error) -> None:
        await self.on_error(event.source, event.error, *event.args, **event.kwargs)

    async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
        """
        Catches all errors dispatched by the library.

        By default it will format and print them to console

        Override this to change error handling behaviour

        """
        self.default_error_handler(source, error)

    async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
        """
        Catches all errors dispatched by commands.

        By default it will call `Client.on_error`

        Override this to change error handling behavior

        """
        self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
        try:
            if isinstance(error, errors.CommandOnCooldown):
                await ctx.send(
                    embeds=Embed(
                        description=f"This command is on cooldown!\n"
                        f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
                        color=BrandColors.FUCHSIA,
                    )
                )
            elif isinstance(error, errors.MaxConcurrencyReached):
                await ctx.send(
                    embeds=Embed(
                        description="This command has reached its maximum concurrent usage!\n"
                        "Please try again shortly.",
                        color=BrandColors.FUCHSIA,
                    )
                )
            elif isinstance(error, errors.CommandCheckFailure):
                await ctx.send(
                    embeds=Embed(
                        description="You do not have permission to run this command!",
                        color=BrandColors.YELLOW,
                    )
                )
            elif self.send_command_tracebacks:
                out = "".join(traceback.format_exception(error))
                if self.http.token is not None:
                    out = out.replace(self.http.token, "[REDACTED TOKEN]")
                await ctx.send(
                    embeds=Embed(
                        title=f"Error: {type(error).__name__}",
                        color=BrandColors.RED,
                        description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
                    )
                )
        except errors.NaffException:
            pass

    async def on_command(self, ctx: Context) -> None:
        """
        Called *after* any command is ran.

        By default, it will simply log the command, override this to change that behaviour

        Args:
            ctx: The context of the command that was called

        """
        if isinstance(ctx, PrefixedContext):
            symbol = "@"
        elif isinstance(ctx, InteractionContext):
            symbol = "/"
        else:
            symbol = "?"  # likely custom context
        logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

    async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
        """
        Catches all errors dispatched by components.

        By default it will call `Naff.on_error`

        Override this to change error handling behavior

        """
        return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))

    async def on_component(self, ctx: ComponentContext) -> None:
        """
        Called *after* any component callback is ran.

        By default, it will simply log the component use, override this to change that behaviour

        Args:
            ctx: The context of the component that was called

        """
        symbol = "¢"
        logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

    async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
        """
        Catches all errors dispatched by autocompletion options.

        By default it will call `Naff.on_error`

        Override this to change error handling behavior

        """
        return self.dispatch(
            events.Error(
                f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
                error,
                args,
                kwargs,
                ctx,
            )
        )

    async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
        """
        Called *after* any autocomplete callback is ran.

        By default, it will simply log the autocomplete callback, override this to change that behaviour

        Args:
            ctx: The context of the command that was called

        """
        symbol = "$"
        logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

    @Listener.create()
    async def on_resume(self) -> None:
        self._ready.set()

    @Listener.create()
    async def _on_websocket_ready(self, event: events.RawGatewayEvent) -> None:
        """
        Catches websocket ready and determines when to dispatch the client `READY` signal.

        Args:
            event: The websocket ready packet

        """
        data = event.data
        expected_guilds = {to_snowflake(guild["id"]) for guild in data["guilds"]}
        self._user._add_guilds(expected_guilds)

        if not self._startup:
            while True:
                try:  # wait to let guilds cache
                    await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
                except asyncio.TimeoutError:
                    logger.warning("Timeout waiting for guilds cache: Not all guilds will be in cache")
                    break
                self._guild_event.clear()

                if len(self.cache.guild_cache) == len(expected_guilds):
                    # all guilds cached
                    break

            if self.fetch_members:
                # ensure all guilds have completed chunking
                for guild in self.guilds:
                    if guild and not guild.chunked.is_set():
                        logger.debug(f"Waiting for {guild.id} to chunk")
                        await guild.chunked.wait()

            # run any pending startup tasks
            if self.async_startup_tasks:
                try:
                    await asyncio.gather(*self.async_startup_tasks)
                except Exception as e:
                    self.dispatch(events.Error("async-extension-loader", e))

            # cache slash commands
            if not self._startup:
                await self._init_interactions()

            self._startup = True
            self.dispatch(events.Startup())

        else:
            # reconnect ready
            ready_guilds = set()

            async def _temp_listener(_event: events.RawGatewayEvent) -> None:
                ready_guilds.add(_event.data["id"])

            listener = Listener.create("_on_raw_guild_create")(_temp_listener)
            self.add_listener(listener)

            while True:
                try:
                    await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
                    if len(ready_guilds) == len(expected_guilds):
                        break
                except asyncio.TimeoutError:
                    break

            self.listeners["raw_guild_create"].remove(listener)

        self._ready.set()
        self.dispatch(events.Ready())

    async def login(self, token) -> None:
        """
        Login to discord via http.

        !!! note
            You will need to run Naff.start_gateway() before you start receiving gateway events.

        Args:
            token str: Your bot's token

        """
        # i needed somewhere to put this call,
        # login will always run after initialisation
        # so im gathering commands here
        self._gather_commands()

        logger.debug("Attempting to login")
        me = await self.http.login(token.strip())
        self._user = NaffUser.from_dict(me, self)
        self.cache.place_user_data(me)
        self._app = Application.from_dict(await self.http.get_current_bot_information(), self)
        self._mention_reg = re.compile(rf"^(<@!?{self.user.id}*>\s)")

        if self.app.owner:
            self.owner_ids.add(self.app.owner.id)

        self.dispatch(events.Login())

    async def astart(self, token) -> None:
        """
        Asynchronous method to start the bot.

        Args:
            token: Your bot's token

        Returns:

        """
        await self.login(token)
        try:
            await self._connection_state.start()
        finally:
            await self.stop()

    def start(self, token) -> None:
        """
        Start the bot.

        info:
            This is the recommended method to start the bot

        Args:
            token: Your bot's token

        """
        try:
            asyncio.run(self.astart(token))
        except KeyboardInterrupt:
            # ignore, cus this is useless and can be misleading to the
            # user
            pass

    async def start_gateway(self) -> None:
        """Starts the gateway connection."""
        try:
            await self._connection_state.start()
        finally:
            await self.stop()

    async def stop(self) -> None:
        """Shutdown the bot."""
        logger.debug("Stopping the bot.")
        self._ready.clear()
        await self.http.close()
        await self._connection_state.stop()

    def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
        """
        Dispatch an event.

        Args:
            event: The event to be dispatched.

        """
        listeners = self.listeners.get(event.resolved_name, [])
        if listeners:
            logger.debug(f"Dispatching Event: {event.resolved_name}")
            event.bot = self
            for _listen in listeners:
                try:
                    self._queue_task(_listen, event, *args, **kwargs)
                except Exception as e:
                    raise BotException(
                        f"An error occurred attempting during {event.resolved_name} event processing"
                    ) from e

        _waits = self.waits.get(event.resolved_name, [])
        if _waits:
            index_to_remove = []
            for i, _wait in enumerate(_waits):
                result = _wait(event)
                if result:
                    index_to_remove.append(i)

            for idx in sorted(index_to_remove, reverse=True):
                _waits.pop(idx)

    async def wait_until_ready(self) -> None:
        """Waits for the client to become ready."""
        await self._ready.wait()

    def wait_for(
        self,
        event: Union[str, "BaseEvent"],
        checks: Absent[Optional[Callable[..., bool]]] = MISSING,
        timeout: Optional[float] = None,
    ) -> Any:
        """
        Waits for a WebSocket event to be dispatched.

        Args:
            event: The name of event to wait.
            checks: A predicate to check what to wait for.
            timeout: The number of seconds to wait before timing out.

        Returns:
            The event object.

        """
        event = get_event_name(event)

        if event not in self.waits:
            self.waits[event] = []

        future = asyncio.Future()
        self.waits[event].append(Wait(event, checks, future))

        return asyncio.wait_for(future, timeout)

    async def wait_for_modal(
        self,
        modal: "Modal",
        author: Optional["Snowflake_Type"] = None,
        timeout: Optional[float] = None,
    ) -> ModalContext:
        """
        Wait for a modal response.

        Args:
            modal: The modal we're waiting for.
            author: The user we're waiting for to reply
            timeout: A timeout in seconds to stop waiting

        Returns:
            The context of the modal response

        Raises:
            `asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed

        """
        author = to_snowflake(author) if author else None

        def predicate(event) -> bool:
            if modal.custom_id != event.context.custom_id:
                return False
            if author and author != to_snowflake(event.context.author):
                return False
            return True

        resp = await self.wait_for("modal_response", predicate, timeout)
        return resp.context

    async def wait_for_component(
        self,
        messages: Union[Message, int, list] = None,
        components: Optional[
            Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
        ] = None,
        check: Optional[Callable] = None,
        timeout: Optional[float] = None,
    ) -> "Component":
        """
        Waits for a component to be sent to the bot.

        Args:
            messages: The message object to check for.
            components: The components to wait for.
            check: A predicate to check what to wait for.
            timeout: The number of seconds to wait before timing out.

        Returns:
            `Component` that was invoked. Use `.context` to get the `ComponentContext`.

        Raises:
            `asyncio.TimeoutError` if timed out

        """
        if not (messages or components):
            raise ValueError("You must specify messages or components (or both)")

        message_ids = (
            to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
        )
        custom_ids = list(get_components_ids(components)) if components else None

        # automatically convert improper custom_ids
        if custom_ids and not all(isinstance(x, str) for x in custom_ids):
            custom_ids = [str(i) for i in custom_ids]

        def _check(event: Component) -> bool:
            ctx: ComponentContext = event.context
            # if custom_ids is empty or there is a match
            wanted_message = not message_ids or ctx.message.id in (
                [message_ids] if isinstance(message_ids, int) else message_ids
            )
            wanted_component = not custom_ids or ctx.custom_id in custom_ids
            if wanted_message and wanted_component:
                if check is None or check(event):
                    return True
                return False
            return False

        return await self.wait_for("component", checks=_check, timeout=timeout)

    def listen(self, event_name: Absent[str] = MISSING) -> Listener:
        """
        A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.

        Args:
            event_name: The event name to use, if not the coroutine name

        Returns:
            A listener that can be used to hook into the event.

        """

        def wrapper(coro: Callable[..., Coroutine]) -> Listener:
            listener = Listener.create(event_name)(coro)
            self.add_listener(listener)
            return listener

        return wrapper

    def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
        """
        A decorator to be used to add event processors.

        Args:
            event_name: The event name to use, if not the coroutine name

        Returns:
            A function that can be used to hook into the event.

        """

        def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
            name = event_name
            if name is MISSING:
                name = coro.__name__
            name = name.lstrip("_")
            name = name.removeprefix("on_")
            self.processors[name] = coro
            return coro

        return wrapper

    def add_listener(self, listener: Listener) -> None:
        """
        Add a listener for an event, if no event is passed, one is determined.

        Args:
            listener Listener: The listener to add to the client

        """
        # check that the required intents are enabled
        event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
        if event_class := globals().get(event_class_name):
            if required_intents := _INTENT_EVENTS.get(event_class):  # noqa
                if not any(required_intent in self.intents for required_intent in required_intents):
                    self.logger.warning(
                        f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
                    )

        if listener.event not in self.listeners:
            self.listeners[listener.event] = []
        self.listeners[listener.event].append(listener)

    def add_interaction(self, command: InteractionCommand) -> bool:
        """
        Add a slash command to the client.

        Args:
            command InteractionCommand: The command to add

        """
        if self.debug_scope:
            command.scopes = [self.debug_scope]

        # for SlashCommand objs without callback (like objects made to hold group info etc)
        if command.callback is None:
            return False

        for scope in command.scopes:
            if scope not in self.interactions:
                self.interactions[scope] = {}
            elif command.resolved_name in self.interactions[scope]:
                old_cmd = self.interactions[scope][command.resolved_name]
                raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")

            if self.enforce_interaction_perms:
                command.checks.append(command._permission_enforcer)  # noqa : w0212

            self.interactions[scope][command.resolved_name] = command

        return True

    def add_hybrid_command(self, command: HybridCommand) -> bool:
        if self.debug_scope:
            command.scopes = [self.debug_scope]

        if command.callback is None:
            return False

        if command.is_subcommand:
            prefixed_base = self.prefixed_commands.get(str(command.name))
            if not prefixed_base:
                prefixed_base = _base_subcommand_generator(
                    str(command.name), list(command.name.to_locale_dict().values()), str(command.description)
                )
                self.add_prefixed_command(prefixed_base)

            if command.group_name:  # if this is a group command
                _prefixed_cmd = prefixed_base
                prefixed_base = prefixed_base.subcommands.get(str(command.group_name))

                if not prefixed_base:
                    prefixed_base = _base_subcommand_generator(
                        str(command.group_name),
                        list(command.group_name.to_locale_dict().values()),
                        str(command.group_description),
                        group=True,
                    )
                    _prefixed_cmd.add_command(prefixed_base)

            new_command = _prefixed_from_slash(command)
            new_command._parse_parameters()
            prefixed_base.add_command(new_command)
        else:
            new_command = _prefixed_from_slash(command)
            self.add_prefixed_command(new_command)

        return self.add_interaction(command)

    def add_prefixed_command(self, command: PrefixedCommand) -> None:
        """
        Add a prefixed command to the client.

        Args:
            command PrefixedCommand: The command to add

        """
        # check that the required intent is enabled or the prefix is a mention
        prefixes = (
            self.default_prefix
            if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
            else (self.default_prefix,)
        )
        if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
            self.logger.warning(
                f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
            )

        command._parse_parameters()

        if self.prefixed_commands.get(command.name):
            raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
        self.prefixed_commands[command.name] = command

        for alias in command.aliases:
            if self.prefixed_commands.get(alias):
                raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
            self.prefixed_commands[alias] = command

    def add_component_callback(self, command: ComponentCommand) -> None:
        """
        Add a component callback to the client.

        Args:
            command: The command to add

        """
        for listener in command.listeners:
            # I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
            if listener not in self._component_callbacks.keys():
                self._component_callbacks[listener] = command
                continue
            else:
                raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")

    def add_modal_callback(self, command: ModalCommand) -> None:
        """
        Add a modal callback to the client.

        Args:
            command: The command to add
        """
        for listener in command.listeners:
            if listener not in self._modal_callbacks.keys():
                self._modal_callbacks[listener] = command
                continue
            else:
                raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")

    def _gather_commands(self) -> None:
        """Gathers commands from __main__ and self."""

        def process(_cmds) -> None:

            for func in _cmds:
                if isinstance(func, ModalCommand):
                    self.add_modal_callback(func)
                elif isinstance(func, ComponentCommand):
                    self.add_component_callback(func)
                elif isinstance(func, HybridCommand):
                    self.add_hybrid_command(func)
                elif isinstance(func, InteractionCommand):
                    self.add_interaction(func)
                elif (
                    isinstance(func, PrefixedCommand) and not func.is_subcommand
                ):  # subcommands will be added with main comamnds
                    self.add_prefixed_command(func)
                elif isinstance(func, Listener):
                    self.add_listener(func)

            logger.debug(f"{len(_cmds)} commands have been loaded from `__main__` and `client`")

        process(
            [obj for _, obj in inspect.getmembers(sys.modules["__main__"]) if isinstance(obj, (BaseCommand, Listener))]
        )
        process(
            [
                obj.copy_with_binding(self)
                for _, obj in inspect.getmembers(self)
                if isinstance(obj, (BaseCommand, Listener))
            ]
        )

        [wrap_partial(obj, self) for _, obj in inspect.getmembers(self) if isinstance(obj, Task)]

    async def _init_interactions(self) -> None:
        """
        Initialise slash commands.

        If `sync_interactions` this will submit all registered slash
        commands to discord. Otherwise, it will get the list of
        interactions and cache their scopes.

        """
        # allow for ext and main to share the same decorator
        try:
            if self.sync_interactions:
                await self.synchronise_interactions()
            else:
                await self._cache_interactions(warn_missing=False)
        except Exception as e:
            self.dispatch(events.Error("Interaction Syncing", e))

    async def _cache_interactions(self, warn_missing: bool = False) -> None:
        """Get all interactions used by this bot and cache them."""
        if warn_missing or self.del_unused_app_cmd:
            bot_scopes = {g.id for g in self.cache.guild_cache.values()}
            bot_scopes.add(GLOBAL_SCOPE)
        else:
            bot_scopes = set(self.interactions)

        req_lock = asyncio.Lock()

        async def wrap(*args, **kwargs) -> Absent[List[Dict]]:
            async with req_lock:
                # throttle this
                await asyncio.sleep(0.1)
            try:
                return await self.http.get_application_commands(*args, **kwargs)
            except Forbidden:
                return MISSING

        results = await asyncio.gather(*[wrap(self.app.id, scope) for scope in bot_scopes])
        results = dict(zip(bot_scopes, results))

        for scope, remote_cmds in results.items():
            if remote_cmds == MISSING:
                logger.debug(f"Bot was not invited to guild {scope} with `application.commands` scope")
                continue

            remote_cmds = {cmd_data["name"]: cmd_data for cmd_data in remote_cmds}
            found = set()  # this is a temporary hack to fix subcommand detection
            if scope in self.interactions:
                for cmd in self.interactions[scope].values():
                    cmd_name = str(cmd.name)
                    cmd_data = remote_cmds.get(cmd_name, MISSING)
                    if cmd_data is MISSING:
                        if cmd_name not in found:
                            if warn_missing:
                                logger.error(
                                    f'Detected yet to sync slash command "/{cmd_name}" for scope '
                                    f"{'global' if scope == GLOBAL_SCOPE else scope}"
                                )
                        continue
                    else:
                        found.add(cmd_name)
                    self._interaction_scopes[str(cmd_data["id"])] = scope
                    cmd.cmd_id[scope] = int(cmd_data["id"])

            if warn_missing:
                for cmd_data in remote_cmds.values():
                    logger.error(
                        f"Detected unimplemented slash command \"/{cmd_data['name']}\" for scope "
                        f"{'global' if scope == GLOBAL_SCOPE else scope}"
                    )

    async def synchronise_interactions(
        self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
    ) -> None:
        """
        Synchronise registered interactions with discord.

        Args:
            scopes: Optionally specify which scopes are to be synced
            delete_commands: Override the client setting and delete commands
        """
        s = time.perf_counter()
        _delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
        await self._cache_interactions()

        if scopes is not MISSING:
            cmd_scopes = scopes
        elif self.del_unused_app_cmd:
            # if we're deleting unused commands, we check all scopes
            cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
        else:
            # if we're not deleting, just check the scopes we have cmds registered in
            cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})

        local_cmds_json = application_commands_to_dict(self.interactions)

        async def sync_scope(cmd_scope) -> None:

            sync_needed_flag = False  # a flag to force this scope to synchronise
            sync_payload = []  # the payload to be pushed to discord

            try:
                try:
                    remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
                except Forbidden:
                    logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
                    return

                for local_cmd in self.interactions.get(cmd_scope, {}).values():
                    # get remote equivalent of this command
                    remote_cmd_json = next(
                        (v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
                    )
                    # get json representation of this command
                    local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))

                    # this works by adding any command we *want* on Discord, to a payload, and synchronising that
                    # this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls

                    if sync_needed(local_cmd_json, remote_cmd_json):
                        # determine if the local and remote commands are out-of-sync
                        sync_needed_flag = True
                        sync_payload.append(local_cmd_json)
                    elif not _delete_cmds and remote_cmd_json:
                        _remote_payload = {
                            k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
                        }
                        sync_payload.append(_remote_payload)
                    elif _delete_cmds:
                        sync_payload.append(local_cmd_json)

                sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]

                if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
                    # synchronise commands if flag is set, or commands are to be deleted
                    logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
                    sync_response: list[dict] = await self.http.overwrite_application_commands(
                        self.app.id, sync_payload, cmd_scope
                    )
                    self._cache_sync_response(sync_response, cmd_scope)
                else:
                    logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")

            except Forbidden as e:
                raise InteractionMissingAccess(cmd_scope) from e
            except HTTPException as e:
                self._raise_sync_exception(e, local_cmds_json, cmd_scope)

        await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])

        t = time.perf_counter() - s
        logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")

    def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
        """
        Get a application command from the internal cache by its ID.

        Args:
            cmd_id: The ID of the command

        Returns:
            The command, if one with the given ID exists internally, otherwise None

        """
        scope = self._interaction_scopes.get(str(cmd_id), MISSING)
        cmd_id = int(cmd_id)  # ensure int ID
        if scope != MISSING:
            for cmd in self.interactions[scope].values():
                if int(cmd.cmd_id.get(scope)) == cmd_id:
                    return cmd
        return None

    @staticmethod
    def _raise_sync_exception(e: HTTPException, cmds_json: dict, cmd_scope: "Snowflake_Type") -> NoReturn:
        try:
            if isinstance(e.errors, dict):
                for cmd_num in e.errors.keys():
                    cmd = cmds_json[cmd_scope][int(cmd_num)]
                    output = e.search_for_message(e.errors[cmd_num], cmd)
                    if len(output) > 1:
                        output = "\n".join(output)
                        logger.error(f"Multiple Errors found in command `{cmd['name']}`:\n{output}")
                    else:
                        logger.error(f"Error in command `{cmd['name']}`: {output[0]}")
            else:
                raise e from None
        except Exception:
            # the above shouldn't fail, but if it does, just raise the exception normally
            raise e from None

    def _cache_sync_response(self, sync_response: list[dict], scope: "Snowflake_Type") -> None:
        for cmd_data in sync_response:
            self._interaction_scopes[cmd_data["id"]] = scope
            if cmd_data["name"] in self.interactions[scope]:
                self.interactions[scope][cmd_data["name"]].cmd_id[scope] = int(cmd_data["id"])
            else:
                # sub_cmd
                for sc in cmd_data["options"]:
                    if sc["type"] == OptionTypes.SUB_COMMAND:
                        if f"{cmd_data['name']} {sc['name']}" in self.interactions[scope]:
                            self.interactions[scope][f"{cmd_data['name']} {sc['name']}"].cmd_id[scope] = int(
                                cmd_data["id"]
                            )
                    elif sc["type"] == OptionTypes.SUB_COMMAND_GROUP:
                        for _sc in sc["options"]:
                            if f"{cmd_data['name']} {sc['name']} {_sc['name']}" in self.interactions[scope]:
                                self.interactions[scope][f"{cmd_data['name']} {sc['name']} {_sc['name']}"].cmd_id[
                                    scope
                                ] = int(cmd_data["id"])

    @overload
    async def get_context(self, data: ComponentChannelInteractionData, interaction: Literal[True]) -> ComponentContext:
        ...

    @overload
    async def get_context(
        self, data: AutocompleteChannelInteractionData, interaction: Literal[True]
    ) -> AutocompleteContext:
        ...

    # as of right now, discord_typings doesn't include anything like this
    # @overload
    # async def get_context(self, data: ModalSubmitInteractionData, interaction: Literal[True]) -> ModalContext:
    #     ...

    @overload
    async def get_context(self, data: InteractionData, interaction: Literal[True]) -> InteractionContext:
        ...

    @overload
    async def get_context(
        self, data: dict, interaction: Literal[True]
    ) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext:
        # fallback case since some data isn't typehinted properly
        ...

    @overload
    async def get_context(self, data: Message, interaction: Literal[False] = False) -> PrefixedContext:
        ...

    async def get_context(
        self, data: InteractionData | dict | Message, interaction: bool = False
    ) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
        """
        Return a context object based on data passed.

        note:
            If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.

        Args:
            data: The data of the event
            interaction: Is this an interaction or not?

        Returns:
            Context object

        """
        # this line shuts up IDE warnings
        cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext

        if interaction:
            match data["type"]:
                case InteractionTypes.MESSAGE_COMPONENT:
                    cls = self.component_context.from_dict(data, self)

                case InteractionTypes.AUTOCOMPLETE:
                    cls = self.autocomplete_context.from_dict(data, self)

                case InteractionTypes.MODAL_RESPONSE:
                    cls = self.modal_context.from_dict(data, self)

                case _:
                    cls = self.interaction_context.from_dict(data, self)

            if not cls.channel:
                try:
                    cls.channel = await self.cache.fetch_channel(data["channel_id"])
                except Forbidden:
                    cls.channel = BaseChannel.from_dict_factory(
                        {"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
                    )

        else:
            cls = self.prefixed_context.from_message(self, data)
            if not cls.channel:
                cls.channel = await self.cache.fetch_channel(data._channel_id)

        return cls

    async def _run_slash_command(self, command: SlashCommand, ctx: InteractionContext) -> Any:
        """Overrideable method that executes slash commands, can be used to wrap callback execution"""
        return await command(ctx, **ctx.kwargs)

    async def _run_prefixed_command(self, command: PrefixedCommand, ctx: PrefixedContext) -> Any:
        """Overrideable method that executes prefixed commands, can be used to wrap callback execution"""
        return await command(ctx)

    @processors.Processor.define("raw_interaction_create")
    async def _dispatch_interaction(self, event: RawGatewayEvent) -> None:
        """
        Identify and dispatch interaction of slash commands or components.

        Args:
            raw interaction event

        """
        interaction_data = event.data

        if interaction_data["type"] in (
            InteractionTypes.PING,
            InteractionTypes.APPLICATION_COMMAND,
            InteractionTypes.AUTOCOMPLETE,
        ):
            interaction_id = interaction_data["data"]["id"]
            name = interaction_data["data"]["name"]
            scope = self._interaction_scopes.get(str(interaction_id))

            if scope in self.interactions:
                ctx = await self.get_context(interaction_data, True)

                ctx.command: SlashCommand = self.interactions[scope][ctx.invoke_target]  # type: ignore
                logger.debug(f"{scope} :: {ctx.command.name} should be called")

                if ctx.command.auto_defer:
                    auto_defer = ctx.command.auto_defer
                elif ctx.command.extension and ctx.command.extension.auto_defer:
                    auto_defer = ctx.command.extension.auto_defer
                else:
                    auto_defer = self.auto_defer

                if auto_opt := getattr(ctx, "focussed_option", None):
                    try:
                        await ctx.command.autocomplete_callbacks[auto_opt](ctx, **ctx.kwargs)
                    except Exception as e:
                        await self.on_autocomplete_error(ctx, e)
                    finally:
                        await self.on_autocomplete(ctx)
                else:
                    try:
                        await auto_defer(ctx)
                        if self.pre_run_callback:
                            await self.pre_run_callback(ctx, **ctx.kwargs)
                        await self._run_slash_command(ctx.command, ctx)
                        if self.post_run_callback:
                            await self.post_run_callback(ctx, **ctx.kwargs)
                    except Exception as e:
                        await self.on_command_error(ctx, e)
                    finally:
                        await self.on_command(ctx)
            else:
                logger.error(f"Unknown cmd_id received:: {interaction_id} ({name})")

        elif interaction_data["type"] == InteractionTypes.MESSAGE_COMPONENT:
            # Buttons, Selects, ContextMenu::Message
            ctx = await self.get_context(interaction_data, True)
            component_type = interaction_data["data"]["component_type"]

            self.dispatch(events.Component(ctx))
            if callback := self._component_callbacks.get(ctx.custom_id):
                ctx.command = callback
                try:
                    if self.pre_run_callback:
                        await self.pre_run_callback(ctx)
                    await callback(ctx)
                    if self.post_run_callback:
                        await self.post_run_callback(ctx)
                except Exception as e:
                    await self.on_component_error(ctx, e)
                finally:
                    await self.on_component(ctx)
            if component_type == ComponentTypes.BUTTON:
                self.dispatch(events.Button(ctx))
            if component_type == ComponentTypes.SELECT:
                self.dispatch(events.Select(ctx))

        elif interaction_data["type"] == InteractionTypes.MODAL_RESPONSE:
            ctx = await self.get_context(interaction_data, True)
            self.dispatch(events.ModalResponse(ctx))

            # todo: Polls remove this icky code duplication - love from past-polls ❤️
            if callback := self._modal_callbacks.get(ctx.custom_id):
                ctx.command = callback

                try:
                    if self.pre_run_callback:
                        await self.pre_run_callback(ctx)
                    await callback(ctx)
                    if self.post_run_callback:
                        await self.post_run_callback(ctx)
                except Exception as e:
                    await self.on_component_error(ctx, e)
                finally:
                    await self.on_component(ctx)

        else:
            raise NotImplementedError(f"Unknown Interaction Received: {interaction_data['type']}")

    @Listener.create("message_create")
    async def _dispatch_prefixed_commands(self, event: MessageCreate) -> None:
        """Determine if a prefixed command is being triggered, and dispatch it."""
        message = event.message

        if not message.content:
            return

        if not message.author.bot:
            prefixes: str | Iterable[str] = await self.generate_prefixes(self, message)

            if isinstance(prefixes, str) or prefixes == MENTION_PREFIX:
                # its easier to treat everything as if it may be an iterable
                # rather than building a special case for this
                prefixes = (prefixes,)  # type: ignore

            prefix_used = None

            for prefix in prefixes:
                if prefix == MENTION_PREFIX:
                    if mention := self._mention_reg.search(message.content):  # type: ignore
                        prefix = mention.group()
                    else:
                        continue

                if message.content.startswith(prefix):
                    prefix_used = prefix
                    break

            if prefix_used:
                context = await self.get_context(message)
                context.prefix = prefix_used

                # interestingly enough, we cannot count on ctx.invoke_target
                # being correct as its hard to account for newlines and the like
                # with the way we get subcommands here
                # we'll have to reconstruct it by getting the content_parameters
                # then removing the prefix and the parameters from the message
                # content
                content_parameters = message.content.removeprefix(prefix_used)  # type: ignore
                command = self  # yes, this is a hack

                while True:
                    first_word: str = get_first_word(content_parameters)  # type: ignore
                    if isinstance(command, PrefixedCommand):
                        new_command = command.subcommands.get(first_word)
                    else:
                        new_command = command.prefixed_commands.get(first_word)
                    if not new_command or not new_command.enabled:
                        break

                    command = new_command
                    content_parameters = content_parameters.removeprefix(first_word).strip()

                    if command.subcommands and command.hierarchical_checking:
                        try:
                            await new_command._can_run(context)  # will error out if we can't run this command
                        except Exception as e:
                            if new_command.error_callback:
                                await new_command.error_callback(e, context)
                            elif new_command.extension and new_command.extension.extension_error:
                                await new_command.extension.extension_error(context)
                            else:
                                await self.on_command_error(context, e)
                            return

                if not isinstance(command, PrefixedCommand):
                    command = None

                if command and command.enabled:
                    # yeah, this looks ugly
                    context.command = command
                    context.invoke_target = (
                        message.content.removeprefix(prefix_used).removesuffix(content_parameters).strip()  # type: ignore
                    )
                    context.args = get_args(context.content_parameters)
                    try:
                        if self.pre_run_callback:
                            await self.pre_run_callback(context)
                        await self._run_prefixed_command(command, context)
                        if self.post_run_callback:
                            await self.post_run_callback(context)
                    except Exception as e:
                        await self.on_command_error(context, e)
                    finally:
                        await self.on_command(context)

    @Listener.create("disconnect")
    async def _disconnect(self) -> None:
        self._ready.clear()

    def get_extensions(self, name: str) -> list[Extension]:
        """
        Get all ext with a name or extension name.

        Args:
            name: The name of the extension, or the name of it's extension

        Returns:
            List of Extensions
        """
        if name not in self.ext.keys():
            return [ext for ext in self.ext.values() if ext.extension_name == name]

        return [self.ext.get(name, None)]

    def get_ext(self, name: str) -> Extension | None:
        """
        Get a extension with a name or extension name.

        Args:
            name: The name of the extension, or the name of it's extension

        Returns:
            A extension, if found
        """
        if ext := self.get_extensions(name):
            return ext[0]
        return None

    def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
        """
        Load an extension with given arguments.

        Args:
            name: The name of the extension.
            package: The package the extension is in
            load_kwargs: The auto-filled mapping of the load keyword arguments

        """
        name = importlib.util.resolve_name(name, package)
        if name in self.__modules:
            raise Exception(f"{name} already loaded")

        module = importlib.import_module(name, package)
        try:
            setup = getattr(module, "setup", None)
            if not setup:
                raise ExtensionLoadException(
                    f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
                ) from None
            setup(self, **load_kwargs)
        except ExtensionLoadException:
            raise
        except Exception as e:
            del sys.modules[name]
            raise ExtensionLoadException(f"Unexpected Error loading {name}") from e

        else:
            logger.debug(f"Loaded Extension: {name}")
            self.__modules[name] = module

            if self.sync_ext and self._ready.is_set():
                try:
                    asyncio.get_running_loop()
                except RuntimeError:
                    return
                asyncio.create_task(self.synchronise_interactions())

    def unload_extension(self, name, package=None, **unload_kwargs) -> None:
        """
        Unload an extension with given arguments.

        Args:
            name: The name of the extension.
            package: The package the extension is in
            unload_kwargs: The auto-filled mapping of the unload keyword arguments

        """
        name = importlib.util.resolve_name(name, package)
        module = self.__modules.get(name)

        if module is None:
            raise ExtensionNotFound(f"No extension called {name} is loaded")

        try:
            teardown = getattr(module, "teardown")
            teardown(**unload_kwargs)
        except AttributeError:
            pass

        for ext in self.get_extensions(name):
            ext.drop(**unload_kwargs)

        del sys.modules[name]
        del self.__modules[name]

        if self.sync_ext and self._ready.is_set():
            if self.sync_ext and self._ready.is_set():
                try:
                    asyncio.get_running_loop()
                except RuntimeError:
                    return
                asyncio.create_task(self.synchronise_interactions())

    def reload_extension(
        self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
    ) -> None:
        """
        Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.

        Args:
            name: The name of the extension.
            package: The package the extension is in
            load_kwargs: The manually-filled mapping of the load keyword arguments
            unload_kwargs: The manually-filled mapping of the unload keyword arguments

        """
        name = importlib.util.resolve_name(name, package)
        module = self.__modules.get(name)

        if module is None:
            logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
            return self.load_extension(name, package)

        if not load_kwargs:
            load_kwargs = {}
        if not unload_kwargs:
            unload_kwargs = {}

        self.unload_extension(name, package, **unload_kwargs)
        self.load_extension(name, package, **load_kwargs)

        # todo: maybe add an ability to revert to the previous version if unable to load the new one

    async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
        """
        Fetch a guild.

        Note:
            This method is an alias for the cache which will either return a cached object, or query discord for the object
            if its not already cached.

        Args:
            guild_id: The ID of the guild to get

        Returns:
            Guild Object if found, otherwise None

        """
        try:
            return await self.cache.fetch_guild(guild_id)
        except NotFound:
            return None

    def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
        """
        Get a guild.

        Note:
            This method is an alias for the cache which will return a cached object.

        Args:
            guild_id: The ID of the guild to get

        Returns:
            Guild Object if found, otherwise None

        """
        return self.cache.get_guild(guild_id)

    async def create_guild_from_template(
        self,
        template_code: Union["GuildTemplate", str],
        name: str,
        icon: Absent[UPLOADABLE_TYPE] = MISSING,
    ) -> Optional[Guild]:
        """
        Creates a new guild based on a template.

        note:
            This endpoint can only be used by bots in less than 10 guilds.

        Args:
            template_code: The code of the template to use.
            name: The name of the guild (2-100 characters)
            icon: Location or File of icon to set

        Returns:
            The newly created guild object

        """
        if isinstance(template_code, GuildTemplate):
            template_code = template_code.code

        if icon:
            icon = to_image_data(icon)
        guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
        return Guild.from_dict(guild_data, self)

    async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
        """
        Fetch a channel.

        Note:
            This method is an alias for the cache which will either return a cached object, or query discord for the object
            if its not already cached.

        Args:
            channel_id: The ID of the channel to get

        Returns:
            Channel Object if found, otherwise None

        """
        try:
            return await self.cache.fetch_channel(channel_id)
        except NotFound:
            return None

    def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
        """
        Get a channel.

        Note:
            This method is an alias for the cache which will return a cached object.

        Args:
            channel_id: The ID of the channel to get

        Returns:
            Channel Object if found, otherwise None

        """
        return self.cache.get_channel(channel_id)

    async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
        """
        Fetch a user.

        Note:
            This method is an alias for the cache which will either return a cached object, or query discord for the object
            if its not already cached.

        Args:
            user_id: The ID of the user to get

        Returns:
            User Object if found, otherwise None

        """
        try:
            return await self.cache.fetch_user(user_id)
        except NotFound:
            return None

    def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
        """
        Get a user.

        Note:
            This method is an alias for the cache which will return a cached object.

        Args:
            user_id: The ID of the user to get

        Returns:
            User Object if found, otherwise None

        """
        return self.cache.get_user(user_id)

    async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
        """
        Fetch a member from a guild.

        Note:
            This method is an alias for the cache which will either return a cached object, or query discord for the object
            if its not already cached.

        Args:
            user_id: The ID of the member
            guild_id: The ID of the guild to get the member from

        Returns:
            Member object if found, otherwise None

        """
        try:
            return await self.cache.fetch_member(guild_id, user_id)
        except NotFound:
            return None

    def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
        """
        Get a member from a guild.

        Note:
            This method is an alias for the cache which will return a cached object.

        Args:
            user_id: The ID of the member
            guild_id: The ID of the guild to get the member from

        Returns:
            Member object if found, otherwise None

        """
        return self.cache.get_member(guild_id, user_id)

    async def fetch_scheduled_event(
        self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
    ) -> Optional["ScheduledEvent"]:
        """
        Fetch a scheduled event by id.

        Args:
            event_id: The id of the scheduled event.

        Returns:
            The scheduled event if found, otherwise None

        """
        try:
            scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
            return ScheduledEvent.from_dict(scheduled_event_data, self)
        except NotFound:
            return None

    async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
        """
        Fetch a custom emoji by id.

        Args:
            emoji_id: The id of the custom emoji.
            guild_id: The id of the guild the emoji belongs to.

        Returns:
            The custom emoji if found, otherwise None.

        """
        try:
            return await self.cache.fetch_emoji(guild_id, emoji_id)
        except NotFound:
            return None

    def get_custom_emoji(
        self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
    ) -> Optional[CustomEmoji]:
        """
        Get a custom emoji by id.

        Args:
            emoji_id: The id of the custom emoji.
            guild_id: The id of the guild the emoji belongs to.

        Returns:
            The custom emoji if found, otherwise None.

        """
        emoji = self.cache.get_emoji(emoji_id)
        if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
            return emoji
        return None

    async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
        """
        Fetch a sticker by ID.

        Args:
            sticker_id: The ID of the sticker.

        Returns:
            A sticker object if found, otherwise None

        """
        try:
            sticker_data = await self.http.get_sticker(sticker_id)
            return Sticker.from_dict(sticker_data, self)
        except NotFound:
            return None

    async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
        """
        List the sticker packs available to Nitro subscribers.

        Returns:
            A list of StickerPack objects if found, otherwise returns None

        """
        try:
            packs_data = await self.http.list_nitro_sticker_packs()
            return [StickerPack.from_dict(data, self) for data in packs_data]

        except NotFound:
            return None

    async def fetch_voice_regions(self) -> List["VoiceRegion"]:
        """
        List the voice regions available on Discord.

        Returns:
            A list of voice regions.

        """
        regions_data = await self.http.list_voice_regions()
        regions = VoiceRegion.from_list(regions_data)
        return regions

    async def connect_to_vc(
        self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
    ) -> ActiveVoiceState:
        """
        Connect the bot to a voice channel.

        Args:
            guild_id: id of the guild the voice channel is in.
            channel_id: id of the voice channel client wants to join.
            muted: Whether the bot should be muted when connected.
            deafened: Whether the bot should be deafened when connected.

        Returns:
            The new active voice state on successfully connection.

        """
        return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)

    def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
        """
        Get the bot's voice state for a guild.

        Args:
            guild_id: The target guild's id.

        Returns:
            The bot's voice state for the guild if connected, otherwise None.

        """
        return self._connection_state.get_voice_state(guild_id)

    async def change_presence(
        self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
    ) -> None:
        """
        Change the bots presence.

        Args:
            status: The status for the bot to be. i.e. online, afk, etc.
            activity: The activity for the bot to be displayed as doing.

        Note::
            Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.

        """
        await self._connection_state.change_presence(status, activity)

property readonly is_closed: bool

Returns True if the bot has closed.

property readonly is_ready: bool

Returns True if the bot is ready.

property readonly latency: float

Returns the latency of the websocket connection.

property readonly average_latency: float

Returns the average latency of the websocket connection.

property readonly start_time: datetime

The start time of the bot.

property readonly gateway_started: bool

Returns if the gateway has been started.

property readonly user: NaffUser

Returns the bot's user.

property readonly app: Application

Returns the bots application.

property readonly owner: Optional[User]

Returns the bot's owner'.

property readonly owners: List[User]

Returns the bot's owners as declared via client.owner_ids.

property readonly guilds: List[Guild]

Returns a list of all guilds the bot is in.

property readonly status: Status

Get the status of the bot.

IE online, afk, dnd

property readonly activity: Activity

Get the activity of the bot.

property readonly application_commands: List[naff.models.naff.application_commands.InteractionCommand]

A list of all application commands registered within the bot.

property readonly ws: GatewayClient

Returns the websocket client.

async method generate_prefixes(self, bot, message)

A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.

Note

To easily override this method, simply use the generate_prefixes parameter when instantiating the client

Parameters:

Name Type Description Default
bot Client

A reference to the client

required
message Message

A message to determine the prefix from.

required

Returns:

Type Description
str | collections.abc.Iterable[str]

A string or an iterable of strings to use as a prefix. By default, this will return client.default_prefix

Source code in naff/client/client.py
async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
    """
    A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.

    !!! note
        To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client

    Args:
        bot: A reference to the client
        message: A message to determine the prefix from.

    Returns:
        A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`

    """
    return self.default_prefix

staticmethod method default_error_handler(source, error)

The default error logging behaviour.

Parameters:

Name Type Description Default
source str

The source of this error

required
error BaseException

The exception itself

required
Source code in naff/client/client.py
@staticmethod
def default_error_handler(source: str, error: BaseException) -> None:
    """
    The default error logging behaviour.

    Args:
        source: The source of this error
        error: The exception itself

    """
    out = traceback.format_exception(error)

    if isinstance(error, HTTPException):
        # HTTPException's are of 3 known formats, we can parse them for human readable errors
        try:
            errors = error.search_for_message(error.errors)
            out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
        except Exception:  # noqa : S110
            pass

    logger.error(
        "Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
    )

async method on_error(self, source, error, *args, **kwargs)

Catches all errors dispatched by the library.

By default it will format and print them to console

Override this to change error handling behaviour

Source code in naff/client/client.py
async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by the library.

    By default it will format and print them to console

    Override this to change error handling behaviour

    """
    self.default_error_handler(source, error)

async method on_command_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by commands.

By default it will call Client.on_error

Override this to change error handling behavior

Source code in naff/client/client.py
async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by commands.

    By default it will call `Client.on_error`

    Override this to change error handling behavior

    """
    self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
    try:
        if isinstance(error, errors.CommandOnCooldown):
            await ctx.send(
                embeds=Embed(
                    description=f"This command is on cooldown!\n"
                    f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
                    color=BrandColors.FUCHSIA,
                )
            )
        elif isinstance(error, errors.MaxConcurrencyReached):
            await ctx.send(
                embeds=Embed(
                    description="This command has reached its maximum concurrent usage!\n"
                    "Please try again shortly.",
                    color=BrandColors.FUCHSIA,
                )
            )
        elif isinstance(error, errors.CommandCheckFailure):
            await ctx.send(
                embeds=Embed(
                    description="You do not have permission to run this command!",
                    color=BrandColors.YELLOW,
                )
            )
        elif self.send_command_tracebacks:
            out = "".join(traceback.format_exception(error))
            if self.http.token is not None:
                out = out.replace(self.http.token, "[REDACTED TOKEN]")
            await ctx.send(
                embeds=Embed(
                    title=f"Error: {type(error).__name__}",
                    color=BrandColors.RED,
                    description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
                )
            )
    except errors.NaffException:
        pass

async method on_command(self, ctx)

Called after any command is ran.

By default, it will simply log the command, override this to change that behaviour

Parameters:

Name Type Description Default
ctx Context

The context of the command that was called

required
Source code in naff/client/client.py
async def on_command(self, ctx: Context) -> None:
    """
    Called *after* any command is ran.

    By default, it will simply log the command, override this to change that behaviour

    Args:
        ctx: The context of the command that was called

    """
    if isinstance(ctx, PrefixedContext):
        symbol = "@"
    elif isinstance(ctx, InteractionContext):
        symbol = "/"
    else:
        symbol = "?"  # likely custom context
    logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

async method on_component_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by components.

By default it will call Naff.on_error

Override this to change error handling behavior

Source code in naff/client/client.py
async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by components.

    By default it will call `Naff.on_error`

    Override this to change error handling behavior

    """
    return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))

async method on_component(self, ctx)

Called after any component callback is ran.

By default, it will simply log the component use, override this to change that behaviour

Parameters:

Name Type Description Default
ctx ComponentContext

The context of the component that was called

required
Source code in naff/client/client.py
async def on_component(self, ctx: ComponentContext) -> None:
    """
    Called *after* any component callback is ran.

    By default, it will simply log the component use, override this to change that behaviour

    Args:
        ctx: The context of the component that was called

    """
    symbol = "¢"
    logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

async method on_autocomplete_error(self, ctx, error, *args, **kwargs)

Catches all errors dispatched by autocompletion options.

By default it will call Naff.on_error

Override this to change error handling behavior

Source code in naff/client/client.py
async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
    """
    Catches all errors dispatched by autocompletion options.

    By default it will call `Naff.on_error`

    Override this to change error handling behavior

    """
    return self.dispatch(
        events.Error(
            f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
            error,
            args,
            kwargs,
            ctx,
        )
    )

async method on_autocomplete(self, ctx)

Called after any autocomplete callback is ran.

By default, it will simply log the autocomplete callback, override this to change that behaviour

Parameters:

Name Type Description Default
ctx AutocompleteContext

The context of the command that was called

required
Source code in naff/client/client.py
async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
    """
    Called *after* any autocomplete callback is ran.

    By default, it will simply log the autocomplete callback, override this to change that behaviour

    Args:
        ctx: The context of the command that was called

    """
    symbol = "$"
    logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")

async method login(self, token)

Login to discord via http.

Note

You will need to run Naff.start_gateway() before you start receiving gateway events.

Parameters:

Name Type Description Default
token str

Your bot's token

required
Source code in naff/client/client.py
async def login(self, token) -> None:
    """
    Login to discord via http.

    !!! note
        You will need to run Naff.start_gateway() before you start receiving gateway events.

    Args:
        token str: Your bot's token

    """
    # i needed somewhere to put this call,
    # login will always run after initialisation
    # so im gathering commands here
    self._gather_commands()

    logger.debug("Attempting to login")
    me = await self.http.login(token.strip())
    self._user = NaffUser.from_dict(me, self)
    self.cache.place_user_data(me)
    self._app = Application.from_dict(await self.http.get_current_bot_information(), self)
    self._mention_reg = re.compile(rf"^(<@!?{self.user.id}*>\s)")

    if self.app.owner:
        self.owner_ids.add(self.app.owner.id)

    self.dispatch(events.Login())

async method astart(self, token)

Asynchronous method to start the bot.

Parameters:

Name Type Description Default
token

Your bot's token

required
Source code in naff/client/client.py
async def astart(self, token) -> None:
    """
    Asynchronous method to start the bot.

    Args:
        token: Your bot's token

    Returns:

    """
    await self.login(token)
    try:
        await self._connection_state.start()
    finally:
        await self.stop()

method start(self, token)

Start the bot.

Info

This is the recommended method to start the bot

Parameters:

Name Type Description Default
token

Your bot's token

required
Source code in naff/client/client.py
def start(self, token) -> None:
    """
    Start the bot.

    info:
        This is the recommended method to start the bot

    Args:
        token: Your bot's token

    """
    try:
        asyncio.run(self.astart(token))
    except KeyboardInterrupt:
        # ignore, cus this is useless and can be misleading to the
        # user
        pass

async method start_gateway(self)

Starts the gateway connection.

Source code in naff/client/client.py
async def start_gateway(self) -> None:
    """Starts the gateway connection."""
    try:
        await self._connection_state.start()
    finally:
        await self.stop()

async method stop(self)

Shutdown the bot.

Source code in naff/client/client.py
async def stop(self) -> None:
    """Shutdown the bot."""
    logger.debug("Stopping the bot.")
    self._ready.clear()
    await self.http.close()
    await self._connection_state.stop()

method dispatch(self, event, *args, **kwargs)

Dispatch an event.

Parameters:

Name Type Description Default
event BaseEvent

The event to be dispatched.

required
Source code in naff/client/client.py
def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
    """
    Dispatch an event.

    Args:
        event: The event to be dispatched.

    """
    listeners = self.listeners.get(event.resolved_name, [])
    if listeners:
        logger.debug(f"Dispatching Event: {event.resolved_name}")
        event.bot = self
        for _listen in listeners:
            try:
                self._queue_task(_listen, event, *args, **kwargs)
            except Exception as e:
                raise BotException(
                    f"An error occurred attempting during {event.resolved_name} event processing"
                ) from e

    _waits = self.waits.get(event.resolved_name, [])
    if _waits:
        index_to_remove = []
        for i, _wait in enumerate(_waits):
            result = _wait(event)
            if result:
                index_to_remove.append(i)

        for idx in sorted(index_to_remove, reverse=True):
            _waits.pop(idx)

async method wait_until_ready(self)

Waits for the client to become ready.

Source code in naff/client/client.py
async def wait_until_ready(self) -> None:
    """Waits for the client to become ready."""
    await self._ready.wait()

method wait_for(self, event, checks, timeout)

Waits for a WebSocket event to be dispatched.

Parameters:

Name Type Description Default
event Union[str, BaseEvent]

The name of event to wait.

required
checks Union[Callable[..., bool], NoneType, naff.client.const.Missing]

A predicate to check what to wait for.

Missing
timeout Optional[float]

The number of seconds to wait before timing out.

None

Returns:

Type Description
Any

The event object.

Source code in naff/client/client.py
def wait_for(
    self,
    event: Union[str, "BaseEvent"],
    checks: Absent[Optional[Callable[..., bool]]] = MISSING,
    timeout: Optional[float] = None,
) -> Any:
    """
    Waits for a WebSocket event to be dispatched.

    Args:
        event: The name of event to wait.
        checks: A predicate to check what to wait for.
        timeout: The number of seconds to wait before timing out.

    Returns:
        The event object.

    """
    event = get_event_name(event)

    if event not in self.waits:
        self.waits[event] = []

    future = asyncio.Future()
    self.waits[event].append(Wait(event, checks, future))

    return asyncio.wait_for(future, timeout)

async method wait_for_modal(self, modal, author, timeout)

Wait for a modal response.

Parameters:

Name Type Description Default
modal Modal

The modal we're waiting for.

required
author Optional[Snowflake_Type]

The user we're waiting for to reply

None
timeout Optional[float]

A timeout in seconds to stop waiting

None

Returns:

Type Description
ModalContext

The context of the modal response

Source code in naff/client/client.py
async def wait_for_modal(
    self,
    modal: "Modal",
    author: Optional["Snowflake_Type"] = None,
    timeout: Optional[float] = None,
) -> ModalContext:
    """
    Wait for a modal response.

    Args:
        modal: The modal we're waiting for.
        author: The user we're waiting for to reply
        timeout: A timeout in seconds to stop waiting

    Returns:
        The context of the modal response

    Raises:
        `asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed

    """
    author = to_snowflake(author) if author else None

    def predicate(event) -> bool:
        if modal.custom_id != event.context.custom_id:
            return False
        if author and author != to_snowflake(event.context.author):
            return False
        return True

    resp = await self.wait_for("modal_response", predicate, timeout)
    return resp.context

async method wait_for_component(self, messages, components, check, timeout)

Waits for a component to be sent to the bot.

Parameters:

Name Type Description Default
messages Union[naff.models.discord.message.Message, int, list]

The message object to check for.

None
components Union[List[List[Union[BaseComponent, dict]]], List[Union[BaseComponent, dict]], BaseComponent, dict]

The components to wait for.

None
check Optional[Callable]

A predicate to check what to wait for.

None
timeout Optional[float]

The number of seconds to wait before timing out.

None

Returns:

Type Description
Component

Component that was invoked. Use .context to get the ComponentContext.

Source code in naff/client/client.py
async def wait_for_component(
    self,
    messages: Union[Message, int, list] = None,
    components: Optional[
        Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
    ] = None,
    check: Optional[Callable] = None,
    timeout: Optional[float] = None,
) -> "Component":
    """
    Waits for a component to be sent to the bot.

    Args:
        messages: The message object to check for.
        components: The components to wait for.
        check: A predicate to check what to wait for.
        timeout: The number of seconds to wait before timing out.

    Returns:
        `Component` that was invoked. Use `.context` to get the `ComponentContext`.

    Raises:
        `asyncio.TimeoutError` if timed out

    """
    if not (messages or components):
        raise ValueError("You must specify messages or components (or both)")

    message_ids = (
        to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
    )
    custom_ids = list(get_components_ids(components)) if components else None

    # automatically convert improper custom_ids
    if custom_ids and not all(isinstance(x, str) for x in custom_ids):
        custom_ids = [str(i) for i in custom_ids]

    def _check(event: Component) -> bool:
        ctx: ComponentContext = event.context
        # if custom_ids is empty or there is a match
        wanted_message = not message_ids or ctx.message.id in (
            [message_ids] if isinstance(message_ids, int) else message_ids
        )
        wanted_component = not custom_ids or ctx.custom_id in custom_ids
        if wanted_message and wanted_component:
            if check is None or check(event):
                return True
            return False
        return False

    return await self.wait_for("component", checks=_check, timeout=timeout)

method listen(self, event_name)

A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.

Parameters:

Name Type Description Default
event_name Union[str, naff.client.const.Missing]

The event name to use, if not the coroutine name

Missing

Returns:

Type Description
Listener

A listener that can be used to hook into the event.

Source code in naff/client/client.py
def listen(self, event_name: Absent[str] = MISSING) -> Listener:
    """
    A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.

    Args:
        event_name: The event name to use, if not the coroutine name

    Returns:
        A listener that can be used to hook into the event.

    """

    def wrapper(coro: Callable[..., Coroutine]) -> Listener:
        listener = Listener.create(event_name)(coro)
        self.add_listener(listener)
        return listener

    return wrapper

method add_event_processor(self, event_name)

A decorator to be used to add event processors.

Parameters:

Name Type Description Default
event_name Union[str, naff.client.const.Missing]

The event name to use, if not the coroutine name

Missing

Returns:

Type Description
Callable[..., Coroutine]

A function that can be used to hook into the event.

Source code in naff/client/client.py
def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
    """
    A decorator to be used to add event processors.

    Args:
        event_name: The event name to use, if not the coroutine name

    Returns:
        A function that can be used to hook into the event.

    """

    def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
        name = event_name
        if name is MISSING:
            name = coro.__name__
        name = name.lstrip("_")
        name = name.removeprefix("on_")
        self.processors[name] = coro
        return coro

    return wrapper

method add_listener(self, listener)

Add a listener for an event, if no event is passed, one is determined.

Parameters:

Name Type Description Default
listener Listener

The listener to add to the client

required
Source code in naff/client/client.py
def add_listener(self, listener: Listener) -> None:
    """
    Add a listener for an event, if no event is passed, one is determined.

    Args:
        listener Listener: The listener to add to the client

    """
    # check that the required intents are enabled
    event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
    if event_class := globals().get(event_class_name):
        if required_intents := _INTENT_EVENTS.get(event_class):  # noqa
            if not any(required_intent in self.intents for required_intent in required_intents):
                self.logger.warning(
                    f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
                )

    if listener.event not in self.listeners:
        self.listeners[listener.event] = []
    self.listeners[listener.event].append(listener)

method add_interaction(self, command)

Add a slash command to the client.

Parameters:

Name Type Description Default
command InteractionCommand

The command to add

required
Source code in naff/client/client.py
def add_interaction(self, command: InteractionCommand) -> bool:
    """
    Add a slash command to the client.

    Args:
        command InteractionCommand: The command to add

    """
    if self.debug_scope:
        command.scopes = [self.debug_scope]

    # for SlashCommand objs without callback (like objects made to hold group info etc)
    if command.callback is None:
        return False

    for scope in command.scopes:
        if scope not in self.interactions:
            self.interactions[scope] = {}
        elif command.resolved_name in self.interactions[scope]:
            old_cmd = self.interactions[scope][command.resolved_name]
            raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")

        if self.enforce_interaction_perms:
            command.checks.append(command._permission_enforcer)  # noqa : w0212

        self.interactions[scope][command.resolved_name] = command

    return True

method add_prefixed_command(self, command)

Add a prefixed command to the client.

Parameters:

Name Type Description Default
command PrefixedCommand

The command to add

required
Source code in naff/client/client.py
def add_prefixed_command(self, command: PrefixedCommand) -> None:
    """
    Add a prefixed command to the client.

    Args:
        command PrefixedCommand: The command to add

    """
    # check that the required intent is enabled or the prefix is a mention
    prefixes = (
        self.default_prefix
        if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
        else (self.default_prefix,)
    )
    if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
        self.logger.warning(
            f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
        )

    command._parse_parameters()

    if self.prefixed_commands.get(command.name):
        raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
    self.prefixed_commands[command.name] = command

    for alias in command.aliases:
        if self.prefixed_commands.get(alias):
            raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
        self.prefixed_commands[alias] = command

method add_component_callback(self, command)

Add a component callback to the client.

Parameters:

Name Type Description Default
command ComponentCommand

The command to add

required
Source code in naff/client/client.py
def add_component_callback(self, command: ComponentCommand) -> None:
    """
    Add a component callback to the client.

    Args:
        command: The command to add

    """
    for listener in command.listeners:
        # I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
        if listener not in self._component_callbacks.keys():
            self._component_callbacks[listener] = command
            continue
        else:
            raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")

method add_modal_callback(self, command)

Add a modal callback to the client.

Parameters:

Name Type Description Default
command ModalCommand

The command to add

required
Source code in naff/client/client.py
def add_modal_callback(self, command: ModalCommand) -> None:
    """
    Add a modal callback to the client.

    Args:
        command: The command to add
    """
    for listener in command.listeners:
        if listener not in self._modal_callbacks.keys():
            self._modal_callbacks[listener] = command
            continue
        else:
            raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")

async method synchronise_interactions(self, *, scopes, delete_commands)

Synchronise registered interactions with discord.

Parameters:

Name Type Description Default
scopes list

Optionally specify which scopes are to be synced

Missing
delete_commands Union[bool, naff.client.const.Missing]

Override the client setting and delete commands

Missing
Source code in naff/client/client.py
async def synchronise_interactions(
    self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
) -> None:
    """
    Synchronise registered interactions with discord.

    Args:
        scopes: Optionally specify which scopes are to be synced
        delete_commands: Override the client setting and delete commands
    """
    s = time.perf_counter()
    _delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
    await self._cache_interactions()

    if scopes is not MISSING:
        cmd_scopes = scopes
    elif self.del_unused_app_cmd:
        # if we're deleting unused commands, we check all scopes
        cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
    else:
        # if we're not deleting, just check the scopes we have cmds registered in
        cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})

    local_cmds_json = application_commands_to_dict(self.interactions)

    async def sync_scope(cmd_scope) -> None:

        sync_needed_flag = False  # a flag to force this scope to synchronise
        sync_payload = []  # the payload to be pushed to discord

        try:
            try:
                remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
            except Forbidden:
                logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
                return

            for local_cmd in self.interactions.get(cmd_scope, {}).values():
                # get remote equivalent of this command
                remote_cmd_json = next(
                    (v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
                )
                # get json representation of this command
                local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))

                # this works by adding any command we *want* on Discord, to a payload, and synchronising that
                # this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls

                if sync_needed(local_cmd_json, remote_cmd_json):
                    # determine if the local and remote commands are out-of-sync
                    sync_needed_flag = True
                    sync_payload.append(local_cmd_json)
                elif not _delete_cmds and remote_cmd_json:
                    _remote_payload = {
                        k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
                    }
                    sync_payload.append(_remote_payload)
                elif _delete_cmds:
                    sync_payload.append(local_cmd_json)

            sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]

            if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
                # synchronise commands if flag is set, or commands are to be deleted
                logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
                sync_response: list[dict] = await self.http.overwrite_application_commands(
                    self.app.id, sync_payload, cmd_scope
                )
                self._cache_sync_response(sync_response, cmd_scope)
            else:
                logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")

        except Forbidden as e:
            raise InteractionMissingAccess(cmd_scope) from e
        except HTTPException as e:
            self._raise_sync_exception(e, local_cmds_json, cmd_scope)

    await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])

    t = time.perf_counter() - s
    logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")

method get_application_cmd_by_id(self, cmd_id)

Get a application command from the internal cache by its ID.

Parameters:

Name Type Description Default
cmd_id Snowflake_Type

The ID of the command

required

Returns:

Type Description
Optional[naff.models.naff.application_commands.InteractionCommand]

The command, if one with the given ID exists internally, otherwise None

Source code in naff/client/client.py
def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
    """
    Get a application command from the internal cache by its ID.

    Args:
        cmd_id: The ID of the command

    Returns:
        The command, if one with the given ID exists internally, otherwise None

    """
    scope = self._interaction_scopes.get(str(cmd_id), MISSING)
    cmd_id = int(cmd_id)  # ensure int ID
    if scope != MISSING:
        for cmd in self.interactions[scope].values():
            if int(cmd.cmd_id.get(scope)) == cmd_id:
                return cmd
    return None

async method get_context(self, data, interaction)

Return a context object based on data passed.

Note

If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.

Parameters:

Name Type Description Default
data Union[discord_typings.interactions.receiving.ApplicationCommandGuildInteractionData, discord_typings.interactions.receiving.ApplicationCommandChannelInteractionData, discord_typings.interactions.receiving.GuildUserCommandInteractionData, discord_typings.interactions.receiving.ChannelUserCommandInteractionData, discord_typings.interactions.receiving.ComponentGuildInteractionData, discord_typings.interactions.receiving.ComponentChannelInteractionData, discord_typings.interactions.receiving.AutocompleteGuildInteractionData, discord_typings.interactions.receiving.AutocompleteChannelInteractionData, discord_typings.interactions.receiving.ModalGuildInteractionData, discord_typings.interactions.receiving.ModalChannelInteractionData, dict, naff.models.discord.message.Message]

The data of the event

required
interaction bool

Is this an interaction or not?

False

Returns:

Type Description
naff.models.naff.context.ComponentContext | naff.models.naff.context.AutocompleteContext | naff.models.naff.context.ModalContext | naff.models.naff.context.InteractionContext | naff.models.naff.context.PrefixedContext

Context object

Source code in naff/client/client.py
async def get_context(
    self, data: InteractionData | dict | Message, interaction: bool = False
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
    """
    Return a context object based on data passed.

    note:
        If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.

    Args:
        data: The data of the event
        interaction: Is this an interaction or not?

    Returns:
        Context object

    """
    # this line shuts up IDE warnings
    cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext

    if interaction:
        match data["type"]:
            case InteractionTypes.MESSAGE_COMPONENT:
                cls = self.component_context.from_dict(data, self)

            case InteractionTypes.AUTOCOMPLETE:
                cls = self.autocomplete_context.from_dict(data, self)

            case InteractionTypes.MODAL_RESPONSE:
                cls = self.modal_context.from_dict(data, self)

            case _:
                cls = self.interaction_context.from_dict(data, self)

        if not cls.channel:
            try:
                cls.channel = await self.cache.fetch_channel(data["channel_id"])
            except Forbidden:
                cls.channel = BaseChannel.from_dict_factory(
                    {"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
                )

    else:
        cls = self.prefixed_context.from_message(self, data)
        if not cls.channel:
            cls.channel = await self.cache.fetch_channel(data._channel_id)

    return cls

method get_extensions(self, name)

Get all ext with a name or extension name.

Parameters:

Name Type Description Default
name str

The name of the extension, or the name of it's extension

required

Returns:

Type Description
list

List of Extensions

Source code in naff/client/client.py
def get_extensions(self, name: str) -> list[Extension]:
    """
    Get all ext with a name or extension name.

    Args:
        name: The name of the extension, or the name of it's extension

    Returns:
        List of Extensions
    """
    if name not in self.ext.keys():
        return [ext for ext in self.ext.values() if ext.extension_name == name]

    return [self.ext.get(name, None)]

method get_ext(self, name)

Get a extension with a name or extension name.

Parameters:

Name Type Description Default
name str

The name of the extension, or the name of it's extension

required

Returns:

Type Description
naff.models.naff.extension.Extension | None

A extension, if found

Source code in naff/client/client.py
def get_ext(self, name: str) -> Extension | None:
    """
    Get a extension with a name or extension name.

    Args:
        name: The name of the extension, or the name of it's extension

    Returns:
        A extension, if found
    """
    if ext := self.get_extensions(name):
        return ext[0]
    return None

method load_extension(self, name, package, **load_kwargs)

Load an extension with given arguments.

Parameters:

Name Type Description Default
name str

The name of the extension.

required
package str

The package the extension is in

None
load_kwargs

The auto-filled mapping of the load keyword arguments

{}
Source code in naff/client/client.py
def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
    """
    Load an extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        load_kwargs: The auto-filled mapping of the load keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    if name in self.__modules:
        raise Exception(f"{name} already loaded")

    module = importlib.import_module(name, package)
    try:
        setup = getattr(module, "setup", None)
        if not setup:
            raise ExtensionLoadException(
                f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
            ) from None
        setup(self, **load_kwargs)
    except ExtensionLoadException:
        raise
    except Exception as e:
        del sys.modules[name]
        raise ExtensionLoadException(f"Unexpected Error loading {name}") from e

    else:
        logger.debug(f"Loaded Extension: {name}")
        self.__modules[name] = module

        if self.sync_ext and self._ready.is_set():
            try:
                asyncio.get_running_loop()
            except RuntimeError:
                return
            asyncio.create_task(self.synchronise_interactions())

method unload_extension(self, name, package, **unload_kwargs)

Unload an extension with given arguments.

Parameters:

Name Type Description Default
name

The name of the extension.

required
package

The package the extension is in

None
unload_kwargs

The auto-filled mapping of the unload keyword arguments

{}
Source code in naff/client/client.py
def unload_extension(self, name, package=None, **unload_kwargs) -> None:
    """
    Unload an extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        unload_kwargs: The auto-filled mapping of the unload keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    module = self.__modules.get(name)

    if module is None:
        raise ExtensionNotFound(f"No extension called {name} is loaded")

    try:
        teardown = getattr(module, "teardown")
        teardown(**unload_kwargs)
    except AttributeError:
        pass

    for ext in self.get_extensions(name):
        ext.drop(**unload_kwargs)

    del sys.modules[name]
    del self.__modules[name]

    if self.sync_ext and self._ready.is_set():
        if self.sync_ext and self._ready.is_set():
            try:
                asyncio.get_running_loop()
            except RuntimeError:
                return
            asyncio.create_task(self.synchronise_interactions())

method reload_extension(self, name, package, *, load_kwargs, unload_kwargs)

Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.

Parameters:

Name Type Description Default
name

The name of the extension.

required
package

The package the extension is in

None
load_kwargs Mapping[str, Any]

The manually-filled mapping of the load keyword arguments

None
unload_kwargs Mapping[str, Any]

The manually-filled mapping of the unload keyword arguments

None
Source code in naff/client/client.py
def reload_extension(
    self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
) -> None:
    """
    Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.

    Args:
        name: The name of the extension.
        package: The package the extension is in
        load_kwargs: The manually-filled mapping of the load keyword arguments
        unload_kwargs: The manually-filled mapping of the unload keyword arguments

    """
    name = importlib.util.resolve_name(name, package)
    module = self.__modules.get(name)

    if module is None:
        logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
        return self.load_extension(name, package)

    if not load_kwargs:
        load_kwargs = {}
    if not unload_kwargs:
        unload_kwargs = {}

    self.unload_extension(name, package, **unload_kwargs)
    self.load_extension(name, package, **load_kwargs)

    # todo: maybe add an ability to revert to the previous version if unable to load the new one

async method fetch_guild(self, guild_id)

Fetch a guild.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The ID of the guild to get

required

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

Guild Object if found, otherwise None

Source code in naff/client/client.py
async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
    """
    Fetch a guild.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        guild_id: The ID of the guild to get

    Returns:
        Guild Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_guild(guild_id)
    except NotFound:
        return None

method get_guild(self, guild_id)

Get a guild.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The ID of the guild to get

required

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

Guild Object if found, otherwise None

Source code in naff/client/client.py
def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
    """
    Get a guild.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        guild_id: The ID of the guild to get

    Returns:
        Guild Object if found, otherwise None

    """
    return self.cache.get_guild(guild_id)

async method create_guild_from_template(self, template_code, name, icon)

Creates a new guild based on a template.

Note

This endpoint can only be used by bots in less than 10 guilds.

Parameters:

Name Type Description Default
template_code Union[GuildTemplate, str]

The code of the template to use.

required
name str

The name of the guild (2-100 characters)

required
icon Union[naff.models.discord.file.File, io.IOBase, BinaryIO, pathlib.Path, str, naff.client.const.Missing]

Location or File of icon to set

Missing

Returns:

Type Description
Optional[naff.models.discord.guild.Guild]

The newly created guild object

Source code in naff/client/client.py
async def create_guild_from_template(
    self,
    template_code: Union["GuildTemplate", str],
    name: str,
    icon: Absent[UPLOADABLE_TYPE] = MISSING,
) -> Optional[Guild]:
    """
    Creates a new guild based on a template.

    note:
        This endpoint can only be used by bots in less than 10 guilds.

    Args:
        template_code: The code of the template to use.
        name: The name of the guild (2-100 characters)
        icon: Location or File of icon to set

    Returns:
        The newly created guild object

    """
    if isinstance(template_code, GuildTemplate):
        template_code = template_code.code

    if icon:
        icon = to_image_data(icon)
    guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
    return Guild.from_dict(guild_data, self)

async method fetch_channel(self, channel_id)

Fetch a channel.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
channel_id Snowflake_Type

The ID of the channel to get

required

Returns:

Type Description
Optional[TYPE_ALL_CHANNEL]

Channel Object if found, otherwise None

Source code in naff/client/client.py
async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
    """
    Fetch a channel.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        channel_id: The ID of the channel to get

    Returns:
        Channel Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_channel(channel_id)
    except NotFound:
        return None

method get_channel(self, channel_id)

Get a channel.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
channel_id Snowflake_Type

The ID of the channel to get

required

Returns:

Type Description
Optional[TYPE_ALL_CHANNEL]

Channel Object if found, otherwise None

Source code in naff/client/client.py
def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
    """
    Get a channel.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        channel_id: The ID of the channel to get

    Returns:
        Channel Object if found, otherwise None

    """
    return self.cache.get_channel(channel_id)

async method fetch_user(self, user_id)

Fetch a user.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the user to get

required

Returns:

Type Description
Optional[naff.models.discord.user.User]

User Object if found, otherwise None

Source code in naff/client/client.py
async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
    """
    Fetch a user.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        user_id: The ID of the user to get

    Returns:
        User Object if found, otherwise None

    """
    try:
        return await self.cache.fetch_user(user_id)
    except NotFound:
        return None

method get_user(self, user_id)

Get a user.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the user to get

required

Returns:

Type Description
Optional[naff.models.discord.user.User]

User Object if found, otherwise None

Source code in naff/client/client.py
def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
    """
    Get a user.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        user_id: The ID of the user to get

    Returns:
        User Object if found, otherwise None

    """
    return self.cache.get_user(user_id)

async method fetch_member(self, user_id, guild_id)

Fetch a member from a guild.

Note

This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the member

required
guild_id Snowflake_Type

The ID of the guild to get the member from

required

Returns:

Type Description
Optional[naff.models.discord.user.Member]

Member object if found, otherwise None

Source code in naff/client/client.py
async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
    """
    Fetch a member from a guild.

    Note:
        This method is an alias for the cache which will either return a cached object, or query discord for the object
        if its not already cached.

    Args:
        user_id: The ID of the member
        guild_id: The ID of the guild to get the member from

    Returns:
        Member object if found, otherwise None

    """
    try:
        return await self.cache.fetch_member(guild_id, user_id)
    except NotFound:
        return None

method get_member(self, user_id, guild_id)

Get a member from a guild.

Note

This method is an alias for the cache which will return a cached object.

Parameters:

Name Type Description Default
user_id Snowflake_Type

The ID of the member

required
guild_id Snowflake_Type

The ID of the guild to get the member from

required

Returns:

Type Description
Optional[naff.models.discord.user.Member]

Member object if found, otherwise None

Source code in naff/client/client.py
def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
    """
    Get a member from a guild.

    Note:
        This method is an alias for the cache which will return a cached object.

    Args:
        user_id: The ID of the member
        guild_id: The ID of the guild to get the member from

    Returns:
        Member object if found, otherwise None

    """
    return self.cache.get_member(guild_id, user_id)

async method fetch_scheduled_event(self, guild_id, scheduled_event_id, with_user_count)

Fetch a scheduled event by id.

Parameters:

Name Type Description Default
event_id

The id of the scheduled event.

required

Returns:

Type Description
Optional[ScheduledEvent]

The scheduled event if found, otherwise None

Source code in naff/client/client.py
async def fetch_scheduled_event(
    self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
) -> Optional["ScheduledEvent"]:
    """
    Fetch a scheduled event by id.

    Args:
        event_id: The id of the scheduled event.

    Returns:
        The scheduled event if found, otherwise None

    """
    try:
        scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
        return ScheduledEvent.from_dict(scheduled_event_data, self)
    except NotFound:
        return None

async method fetch_custom_emoji(self, emoji_id, guild_id)

Fetch a custom emoji by id.

Parameters:

Name Type Description Default
emoji_id Snowflake_Type

The id of the custom emoji.

required
guild_id Snowflake_Type

The id of the guild the emoji belongs to.

required

Returns:

Type Description
Optional[naff.models.discord.emoji.CustomEmoji]

The custom emoji if found, otherwise None.

Source code in naff/client/client.py
async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
    """
    Fetch a custom emoji by id.

    Args:
        emoji_id: The id of the custom emoji.
        guild_id: The id of the guild the emoji belongs to.

    Returns:
        The custom emoji if found, otherwise None.

    """
    try:
        return await self.cache.fetch_emoji(guild_id, emoji_id)
    except NotFound:
        return None

method get_custom_emoji(self, emoji_id, guild_id)

Get a custom emoji by id.

Parameters:

Name Type Description Default
emoji_id Snowflake_Type

The id of the custom emoji.

required
guild_id Optional[Snowflake_Type]

The id of the guild the emoji belongs to.

None

Returns:

Type Description
Optional[naff.models.discord.emoji.CustomEmoji]

The custom emoji if found, otherwise None.

Source code in naff/client/client.py
def get_custom_emoji(
    self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
) -> Optional[CustomEmoji]:
    """
    Get a custom emoji by id.

    Args:
        emoji_id: The id of the custom emoji.
        guild_id: The id of the guild the emoji belongs to.

    Returns:
        The custom emoji if found, otherwise None.

    """
    emoji = self.cache.get_emoji(emoji_id)
    if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
        return emoji
    return None

async method fetch_sticker(self, sticker_id)

Fetch a sticker by ID.

Parameters:

Name Type Description Default
sticker_id Snowflake_Type

The ID of the sticker.

required

Returns:

Type Description
Optional[naff.models.discord.sticker.Sticker]

A sticker object if found, otherwise None

Source code in naff/client/client.py
async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
    """
    Fetch a sticker by ID.

    Args:
        sticker_id: The ID of the sticker.

    Returns:
        A sticker object if found, otherwise None

    """
    try:
        sticker_data = await self.http.get_sticker(sticker_id)
        return Sticker.from_dict(sticker_data, self)
    except NotFound:
        return None

async method fetch_nitro_packs(self)

List the sticker packs available to Nitro subscribers.

Returns:

Type Description
Optional[List[StickerPack]]

A list of StickerPack objects if found, otherwise returns None

Source code in naff/client/client.py
async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
    """
    List the sticker packs available to Nitro subscribers.

    Returns:
        A list of StickerPack objects if found, otherwise returns None

    """
    try:
        packs_data = await self.http.list_nitro_sticker_packs()
        return [StickerPack.from_dict(data, self) for data in packs_data]

    except NotFound:
        return None

async method fetch_voice_regions(self)

List the voice regions available on Discord.

Returns:

Type Description
List[VoiceRegion]

A list of voice regions.

Source code in naff/client/client.py
async def fetch_voice_regions(self) -> List["VoiceRegion"]:
    """
    List the voice regions available on Discord.

    Returns:
        A list of voice regions.

    """
    regions_data = await self.http.list_voice_regions()
    regions = VoiceRegion.from_list(regions_data)
    return regions

async method connect_to_vc(self, guild_id, channel_id, muted, deafened)

Connect the bot to a voice channel.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

id of the guild the voice channel is in.

required
channel_id Snowflake_Type

id of the voice channel client wants to join.

required
muted bool

Whether the bot should be muted when connected.

False
deafened bool

Whether the bot should be deafened when connected.

False

Returns:

Type Description
ActiveVoiceState

The new active voice state on successfully connection.

Source code in naff/client/client.py
async def connect_to_vc(
    self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
) -> ActiveVoiceState:
    """
    Connect the bot to a voice channel.

    Args:
        guild_id: id of the guild the voice channel is in.
        channel_id: id of the voice channel client wants to join.
        muted: Whether the bot should be muted when connected.
        deafened: Whether the bot should be deafened when connected.

    Returns:
        The new active voice state on successfully connection.

    """
    return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)

method get_bot_voice_state(self, guild_id)

Get the bot's voice state for a guild.

Parameters:

Name Type Description Default
guild_id Snowflake_Type

The target guild's id.

required

Returns:

Type Description
Optional[naff.models.naff.active_voice_state.ActiveVoiceState]

The bot's voice state for the guild if connected, otherwise None.

Source code in naff/client/client.py
def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
    """
    Get the bot's voice state for a guild.

    Args:
        guild_id: The target guild's id.

    Returns:
        The bot's voice state for the guild if connected, otherwise None.

    """
    return self._connection_state.get_voice_state(guild_id)

async method change_presence(self, status, activity)

Change the bots presence.

Parameters:

Name Type Description Default
status Union[str, naff.models.discord.enums.Status]

The status for the bot to be. i.e. online, afk, etc.

<Status.ONLINE: 'online'>
activity Union[naff.models.discord.activity.Activity, str]

The activity for the bot to be displayed as doing.

None

Note:: Bots may only be playing streaming listening watching or competing, other activity types are likely to fail.

Source code in naff/client/client.py
async def change_presence(
    self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
) -> None:
    """
    Change the bots presence.

    Args:
        status: The status for the bot to be. i.e. online, afk, etc.
        activity: The activity for the bot to be displayed as doing.

    Note::
        Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.

    """
    await self._connection_state.change_presence(status, activity)