Skip to content

Voice gateway

class OP (IntEnum)

An enumeration.

Source code in naff/api/voice/voice_gateway.py
class OP(IntEnum):
    IDENTIFY = 0
    SELECT_PROTOCOL = 1
    READY = 2
    HEARTBEAT = 3
    SESSION_DESCRIPTION = 4
    SPEAKING = 5
    HEARTBEAT_ACK = 6
    RESUME = 7
    HELLO = 8
    RESUMED = 9
    CLIENT_DISCONNECT = 13

class VoiceGateway (WebsocketClient)

Source code in naff/api/voice/voice_gateway.py
class VoiceGateway(WebsocketClient):
    guild_id: str
    heartbeat_interval: int
    session_id: str
    token: str
    encryptor: Encryption

    ssrc: int
    me_ip: str
    me_port: int
    voice_ip: str
    voice_port: int
    voice_modes: list[str]
    selected_mode: str
    socket: socket.socket
    ready: Event

    def __init__(self, state, voice_state: dict, voice_server: dict) -> None:
        super().__init__(state)

        self._voice_server_update = asyncio.Event()
        self.ws_url = f"wss://{voice_server['endpoint']}?v=4"
        self.session_id = voice_state["session_id"]
        self.token = voice_server["token"]
        self.guild_id = voice_server["guild_id"]

        self.sock_sequence = 0
        self.timestamp = 0
        self.ready = Event()
        self.cond = None

    async def wait_until_ready(self) -> None:
        await asyncio.to_thread(self.ready.wait)

    async def run(self) -> None:
        """Start receiving events from the websocket."""
        while True:
            if self._stopping is None:
                self._stopping = asyncio.create_task(self._close_gateway.wait())
            receiving = asyncio.create_task(self.receive())
            done, _ = await asyncio.wait({self._stopping, receiving}, return_when=asyncio.FIRST_COMPLETED)

            if receiving in done:
                # Note that we check for a received message first, because if both completed at
                # the same time, we don't want to discard that message.
                msg = await receiving
            else:
                # This has to be the stopping task, which we join into the current task (even
                # though that doesn't give any meaningful value in the return).
                await self._stopping
                receiving.cancel()
                return

            op = msg.get("op")
            data = msg.get("d")
            seq = msg.get("s")

            if seq:
                self.sequence = seq

            # This may try to reconnect the connection so it is best to wait
            # for it to complete before receiving more - that way there's less
            # possible race conditions to consider.
            await self.dispatch_opcode(data, op)

    async def receive(self, force=False) -> str:
        buffer = bytearray()

        while True:
            if not force:
                await self._closed.wait()

            resp = await self.ws.receive()

            if resp.type == WSMsgType.CLOSE:
                logger.debug(f"Disconnecting from voice gateway! Reason: {resp.data}::{resp.extra}")
                if resp.data in (4006, 4009, 4014, 4015):
                    # these are all recoverable close codes, anything else means we're foobared
                    # codes: session expired, session timeout, disconnected, server crash
                    self.ready.clear()
                    # docs state only resume on 4015
                    await self.reconnect(resume=resp.data == 4015)
                    continue
                raise VoiceWebSocketClosed(resp.data)

            elif resp.type is WSMsgType.CLOSED:
                if force:
                    raise RuntimeError("Discord unexpectedly closed the underlying socket during force receive!")

                if not self._closed.is_set():
                    # Because we are waiting for the even before we receive, this shouldn't be
                    # possible - the CLOSING message should be returned instead. Either way, if this
                    # is possible after all we can just wait for the event to be set.
                    await self._closed.wait()
                else:
                    # This is an odd corner-case where the underlying socket connection was closed
                    # unexpectedly without communicating the WebSocket closing handshake. We'll have
                    # to reconnect ourselves.
                    await self.reconnect(resume=True)

            elif resp.type is WSMsgType.CLOSING:
                if force:
                    raise RuntimeError("WebSocket is unexpectedly closing during force receive!")

                # This happens when the keep-alive handler is reconnecting the connection even
                # though we waited for the event before hand, because it got to run while we waited
                # for data to come in. We can just wait for the event again.
                await self._closed.wait()
                continue

            if resp.data is None:
                continue

            if isinstance(resp.data, bytes):
                buffer.extend(resp.data)

                if len(resp.data) < 4 or resp.data[-4:] != b"\x00\x00\xff\xff":
                    # message isn't complete yet, wait
                    continue

                msg = self._zlib.decompress(buffer)
                msg = msg.decode("utf-8")
            else:
                msg = resp.data

            try:
                msg = OverriddenJson.loads(msg)
            except Exception as e:
                logger.error(e)

            return msg

    async def dispatch_opcode(self, data, op) -> None:
        match op:
            case OP.HEARTBEAT_ACK:
                self.latency.append(time.perf_counter() - self._last_heartbeat)

                if self._last_heartbeat != 0 and self.latency[-1] >= 15:
                    logger.warning(f"High Latency! Voice heartbeat took {self.latency[-1]:.1f}s to be acknowledged!")
                else:
                    logger.debug(f"❤ Heartbeat acknowledged after {self.latency[-1]:.5f} seconds")

                return self._acknowledged.set()

            case OP.READY:
                logger.debug("Discord send VC Ready! Establishing a socket connection...")
                self.voice_ip = data["ip"]
                self.voice_port = data["port"]
                self.ssrc = data["ssrc"]
                self.voice_modes = [mode for mode in data["modes"] if mode in Encryption.SUPPORTED]

                if len(self.voice_modes) == 0:
                    logger.critical("NO VOICE ENCRYPTION MODES SHARED WITH GATEWAY!")

                await self.establish_voice_socket()

            case OP.SESSION_DESCRIPTION:
                logger.info(f"Voice connection established; using {data['mode']}")
                self.encryptor = Encryption(data["secret_key"])
                self.ready.set()
                if self.cond:
                    with self.cond:
                        self.cond.notify()

            case _:
                return logger.debug(f"Unhandled OPCODE: {op} = {data = }")

    async def reconnect(self, *, resume: bool = False, code: int = 1012) -> None:
        async with self._race_lock:
            self._closed.clear()

            if self.ws is not None:
                await self.ws.close(code=code)

            self.ws = None

            if not resume:
                logger.debug("Waiting for updated server information...")
                try:
                    await asyncio.wait_for(self._voice_server_update.wait(), timeout=5)
                except asyncio.TimeoutError:
                    self._kill_bee_gees.set()
                    self.close()
                    logger.debug("Terminating VoiceGateway due to disconnection")
                    return

                self._voice_server_update.clear()

            self.ws = await self.state.client.http.websocket_connect(self.ws_url)

            try:
                hello = await self.receive(force=True)
                self.heartbeat_interval = hello["d"]["heartbeat_interval"] / 1000
            except RuntimeError:
                # sometimes the initial connection fails with voice gateways, handle that
                return await self.reconnect(resume=resume, code=code)

            if not resume:
                await self._identify()
            else:
                await self._resume_connection()

            self._closed.set()
            self._acknowledged.set()

    async def _resume_connection(self) -> None:
        if self.ws is None:
            raise RuntimeError

        payload = {
            "op": OP.RESUME,
            "d": {"server_id": self.guild_id, "session_id": self.session_id, "token": self.token},
        }
        await self.ws.send_json(payload)

    async def establish_voice_socket(self) -> None:
        """Establish the socket connection to discord"""
        logger.debug("IP Discovery in progress...")

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.setblocking(False)

        packet = bytearray(70)
        struct.pack_into(">H", packet, 0, 1)  # 1 = Send
        struct.pack_into(">H", packet, 2, 70)  # 70 = Length
        struct.pack_into(">I", packet, 4, self.ssrc)

        self.socket.sendto(packet, (self.voice_ip, self.voice_port))
        resp = await self.loop.sock_recv(self.socket, 70)
        logger.debug(f"Voice Initial Response Received: {resp}")

        ip_start = 4
        ip_end = resp.index(0, ip_start)
        self.me_ip = resp[ip_start:ip_end].decode("ascii")

        self.me_port = struct.unpack_from(">H", resp, len(resp) - 2)[0]
        logger.debug(f"IP Discovered: {self.me_ip} #{self.me_port}")

        await self._select_protocol()

    def generate_packet(self, data: bytes) -> bytes:
        """Generate a packet to be sent to the voice socket."""
        header = bytearray(12)
        header[0] = 0x80
        header[1] = 0x78

        struct.pack_into(">H", header, 2, self.sock_sequence)
        struct.pack_into(">I", header, 4, self.timestamp)
        struct.pack_into(">I", header, 8, self.ssrc)

        return self.encryptor.encrypt(self.voice_modes[0], header, data)

    def send_packet(self, data: bytes, encoder, needs_encode=True) -> None:
        """Send a packet to the voice socket"""
        self.sock_sequence += 1
        if self.sock_sequence > 0xFFFF:
            self.sock_sequence = 0

        if self.timestamp > 0xFFFFFFFF:
            self.timestamp = 0

        if needs_encode:
            data = encoder.encode(data)
        packet = self.generate_packet(data)

        self.socket.sendto(packet, (self.voice_ip, self.voice_port))
        self.timestamp += encoder.samples_per_frame

    async def send_heartbeat(self) -> None:
        await self.send_json({"op": OP.HEARTBEAT, "d": random.uniform(0.0, 1.0)})
        logger.debug("❤ Voice Connection is sending Heartbeat")

    async def _identify(self) -> None:
        """Send an identify payload to the voice gateway."""
        payload = {
            "op": OP.IDENTIFY,
            "d": {
                "server_id": self.guild_id,
                "user_id": self.state.client.user.id,
                "session_id": self.session_id,
                "token": self.token,
            },
        }
        serialized = OverriddenJson.dumps(payload)
        await self.ws.send_str(serialized)

        logger.debug("Voice Connection has identified itself to Voice Gateway")

    async def _select_protocol(self) -> None:
        """Inform Discord of our chosen protocol."""
        payload = {
            "op": OP.SELECT_PROTOCOL,
            "d": {
                "protocol": "udp",
                "data": {"address": self.me_ip, "port": self.me_port, "mode": self.voice_modes[0]},
            },
        }
        await self.send_json(payload)

    async def speaking(self, is_speaking: bool = True) -> None:
        """
        Tell the gateway if we're sending audio or not.

        Args:
            is_speaking: If we're sending audio or not

        """
        payload = {
            "op": OP.SPEAKING,
            "d": {
                "speaking": 1 << 0 if is_speaking else 0,
                "delay": 0,
                "ssrc": self.ssrc,
            },
        }
        await self.ws.send_json(payload)

    def set_new_voice_server(self, payload: dict) -> None:
        """
        Set a new voice server to connect to.

        Args:
            payload: New voice server connection data

        """
        self.ws_url = f"wss://{payload['endpoint']}?v=4"
        self.token = payload["token"]
        self.guild_id = payload["guild_id"]
        self._voice_server_update.set()

async method run(self)

Start receiving events from the websocket.

Source code in naff/api/voice/voice_gateway.py
async def run(self) -> None:
    """Start receiving events from the websocket."""
    while True:
        if self._stopping is None:
            self._stopping = asyncio.create_task(self._close_gateway.wait())
        receiving = asyncio.create_task(self.receive())
        done, _ = await asyncio.wait({self._stopping, receiving}, return_when=asyncio.FIRST_COMPLETED)

        if receiving in done:
            # Note that we check for a received message first, because if both completed at
            # the same time, we don't want to discard that message.
            msg = await receiving
        else:
            # This has to be the stopping task, which we join into the current task (even
            # though that doesn't give any meaningful value in the return).
            await self._stopping
            receiving.cancel()
            return

        op = msg.get("op")
        data = msg.get("d")
        seq = msg.get("s")

        if seq:
            self.sequence = seq

        # This may try to reconnect the connection so it is best to wait
        # for it to complete before receiving more - that way there's less
        # possible race conditions to consider.
        await self.dispatch_opcode(data, op)

async method receive(self, force)

Receive a full event payload from the WebSocket.

Parameters:

Name Type Description Default
force

Whether to force the receiving, ignoring safety measures such as the read-lock. This option also means that exceptions are raised when a reconnection would normally be tried.

False
Source code in naff/api/voice/voice_gateway.py
async def receive(self, force=False) -> str:
    buffer = bytearray()

    while True:
        if not force:
            await self._closed.wait()

        resp = await self.ws.receive()

        if resp.type == WSMsgType.CLOSE:
            logger.debug(f"Disconnecting from voice gateway! Reason: {resp.data}::{resp.extra}")
            if resp.data in (4006, 4009, 4014, 4015):
                # these are all recoverable close codes, anything else means we're foobared
                # codes: session expired, session timeout, disconnected, server crash
                self.ready.clear()
                # docs state only resume on 4015
                await self.reconnect(resume=resp.data == 4015)
                continue
            raise VoiceWebSocketClosed(resp.data)

        elif resp.type is WSMsgType.CLOSED:
            if force:
                raise RuntimeError("Discord unexpectedly closed the underlying socket during force receive!")

            if not self._closed.is_set():
                # Because we are waiting for the even before we receive, this shouldn't be
                # possible - the CLOSING message should be returned instead. Either way, if this
                # is possible after all we can just wait for the event to be set.
                await self._closed.wait()
            else:
                # This is an odd corner-case where the underlying socket connection was closed
                # unexpectedly without communicating the WebSocket closing handshake. We'll have
                # to reconnect ourselves.
                await self.reconnect(resume=True)

        elif resp.type is WSMsgType.CLOSING:
            if force:
                raise RuntimeError("WebSocket is unexpectedly closing during force receive!")

            # This happens when the keep-alive handler is reconnecting the connection even
            # though we waited for the event before hand, because it got to run while we waited
            # for data to come in. We can just wait for the event again.
            await self._closed.wait()
            continue

        if resp.data is None:
            continue

        if isinstance(resp.data, bytes):
            buffer.extend(resp.data)

            if len(resp.data) < 4 or resp.data[-4:] != b"\x00\x00\xff\xff":
                # message isn't complete yet, wait
                continue

            msg = self._zlib.decompress(buffer)
            msg = msg.decode("utf-8")
        else:
            msg = resp.data

        try:
            msg = OverriddenJson.loads(msg)
        except Exception as e:
            logger.error(e)

        return msg

inherited property readonly average_latency: float

Get the average latency of the connection.

async inherited method send(self, data, bypass)

Send data to the websocket.

Parameters:

Name Type Description Default
data str

The data to send

required
bypass

Should the rate limit be ignored for this send (used for heartbeats)

False
Source code in naff/api/voice/voice_gateway.py
async def send(self, data: str, bypass=False) -> None:
    """
    Send data to the websocket.

    Args:
        data: The data to send
        bypass: Should the rate limit be ignored for this send (used for heartbeats)

    """
    logger.debug(f"Sending data to websocket: {data}")

    async with self._race_lock:
        if self.ws is None:
            return logger.warning("Attempted to send data while websocket is not connected!")
        if not bypass:
            await self.rl_manager.rate_limit()

        await self.ws.send_str(data)

async inherited method send_json(self, data, bypass)

Send JSON data to the websocket.

Parameters:

Name Type Description Default
data dict

The data to send

required
bypass

Should the rate limit be ignored for this send (used for heartbeats)

False
Source code in naff/api/voice/voice_gateway.py
async def send_json(self, data: dict, bypass=False) -> None:
    """
    Send JSON data to the websocket.

    Args:
        data: The data to send
        bypass: Should the rate limit be ignored for this send (used for heartbeats)

    """
    serialized = OverriddenJson.dumps(data)
    await self.send(serialized, bypass)

async method establish_voice_socket(self)

Establish the socket connection to discord

Source code in naff/api/voice/voice_gateway.py
async def establish_voice_socket(self) -> None:
    """Establish the socket connection to discord"""
    logger.debug("IP Discovery in progress...")

    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.socket.setblocking(False)

    packet = bytearray(70)
    struct.pack_into(">H", packet, 0, 1)  # 1 = Send
    struct.pack_into(">H", packet, 2, 70)  # 70 = Length
    struct.pack_into(">I", packet, 4, self.ssrc)

    self.socket.sendto(packet, (self.voice_ip, self.voice_port))
    resp = await self.loop.sock_recv(self.socket, 70)
    logger.debug(f"Voice Initial Response Received: {resp}")

    ip_start = 4
    ip_end = resp.index(0, ip_start)
    self.me_ip = resp[ip_start:ip_end].decode("ascii")

    self.me_port = struct.unpack_from(">H", resp, len(resp) - 2)[0]
    logger.debug(f"IP Discovered: {self.me_ip} #{self.me_port}")

    await self._select_protocol()

method generate_packet(self, data)

Generate a packet to be sent to the voice socket.

Source code in naff/api/voice/voice_gateway.py
def generate_packet(self, data: bytes) -> bytes:
    """Generate a packet to be sent to the voice socket."""
    header = bytearray(12)
    header[0] = 0x80
    header[1] = 0x78

    struct.pack_into(">H", header, 2, self.sock_sequence)
    struct.pack_into(">I", header, 4, self.timestamp)
    struct.pack_into(">I", header, 8, self.ssrc)

    return self.encryptor.encrypt(self.voice_modes[0], header, data)

method send_packet(self, data, encoder, needs_encode)

Send a packet to the voice socket

Source code in naff/api/voice/voice_gateway.py
def send_packet(self, data: bytes, encoder, needs_encode=True) -> None:
    """Send a packet to the voice socket"""
    self.sock_sequence += 1
    if self.sock_sequence > 0xFFFF:
        self.sock_sequence = 0

    if self.timestamp > 0xFFFFFFFF:
        self.timestamp = 0

    if needs_encode:
        data = encoder.encode(data)
    packet = self.generate_packet(data)

    self.socket.sendto(packet, (self.voice_ip, self.voice_port))
    self.timestamp += encoder.samples_per_frame

async method send_heartbeat(self)

Send a heartbeat to the gateway.

Source code in naff/api/voice/voice_gateway.py
async def send_heartbeat(self) -> None:
    await self.send_json({"op": OP.HEARTBEAT, "d": random.uniform(0.0, 1.0)})
    logger.debug("❤ Voice Connection is sending Heartbeat")

async method speaking(self, is_speaking)

Tell the gateway if we're sending audio or not.

Parameters:

Name Type Description Default
is_speaking bool

If we're sending audio or not

True
Source code in naff/api/voice/voice_gateway.py
async def speaking(self, is_speaking: bool = True) -> None:
    """
    Tell the gateway if we're sending audio or not.

    Args:
        is_speaking: If we're sending audio or not

    """
    payload = {
        "op": OP.SPEAKING,
        "d": {
            "speaking": 1 << 0 if is_speaking else 0,
            "delay": 0,
            "ssrc": self.ssrc,
        },
    }
    await self.ws.send_json(payload)

method set_new_voice_server(self, payload)

Set a new voice server to connect to.

Parameters:

Name Type Description Default
payload dict

New voice server connection data

required
Source code in naff/api/voice/voice_gateway.py
def set_new_voice_server(self, payload: dict) -> None:
    """
    Set a new voice server to connect to.

    Args:
        payload: New voice server connection data

    """
    self.ws_url = f"wss://{payload['endpoint']}?v=4"
    self.token = payload["token"]
    self.guild_id = payload["guild_id"]
    self._voice_server_update.set()