Skip to content
This repository has been archived by the owner on Mar 13, 2023. It is now read-only.

Commit

Permalink
feat: use async iterator to speedup http chunking (#620)
Browse files Browse the repository at this point in the history
* feat: get total retrieved from iterator

* feat: use async iterator to speedup http chunking
  • Loading branch information
LordOfPolls authored Sep 4, 2022
1 parent 27b657f commit 5bcc33b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
51 changes: 28 additions & 23 deletions naff/models/discord/guild.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import time
from asyncio import QueueEmpty
from collections import namedtuple
from functools import cmp_to_key
from typing import List, Optional, Union, Set, Dict, Any, TYPE_CHECKING
Expand Down Expand Up @@ -119,6 +120,26 @@ def _process_dict(cls, data: Dict[str, Any], client: "Client") -> Dict[str, Any]
return super()._process_dict(data, client)


class MemberIterator(AsyncIterator):
def __init__(self, guild: "Guild", limit: int = 0) -> None:
super().__init__(limit)
self.guild = guild
self._more = True

async def fetch(self) -> list:
if self._more:
expected = self.get_limit

rcv = await self.guild._client.http.list_members(
self.guild.id, limit=expected, after=self.last["id"] if self.last else MISSING
)
if not rcv:
raise QueueEmpty
self._more = len(rcv) == expected
return rcv
raise QueueEmpty


@define()
class Guild(BaseGuild):
"""Guilds in Discord represent an isolated collection of users and channels, and are often referred to as "servers" in the UI."""
Expand Down Expand Up @@ -501,31 +522,15 @@ async def edit_nickname(self, new_nickname: Absent[str] = MISSING, reason: Absen
async def http_chunk(self) -> None:
"""Populates all members of this guild using the REST API."""
start_time = time.perf_counter()
members = []

# request all guild members
after = MISSING
while True:
if members:
after = members[-1]["user"]["id"]
rcv: list = await self._client.http.list_members(self.id, limit=1000, after=after)
members.extend(rcv)
if len(rcv) < 1000:
# we're done
break

# process all members
s = time.monotonic()
for member in members:

iterator = MemberIterator(self)
async for member in iterator:
self._client.cache.place_member_data(self.id, member)
if (time.monotonic() - s) > 0.05:
# look, i get this *could* be a thread, but because it needs to modify data in the main thread,
# it is still blocking. So by periodically yielding to the event loop, we can avoid blocking, and still
# process this data properly
await asyncio.sleep(0)
s = time.monotonic()

self.chunked.set()
logger.info(f"Cached {len(members)} members for {self.id} in {time.perf_counter() - start_time:.2f} seconds")
logger.info(
f"Cached {iterator.total_retrieved} members for {self.id} in {time.perf_counter() - start_time:.2f} seconds"
)

async def gateway_chunk(self, wait=True, presences=True) -> None:
"""
Expand Down
5 changes: 5 additions & 0 deletions naff/models/misc/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def get_limit(self) -> int:
"""Get how the maximum number of items that should be retrieved."""
return min(self._limit - len(self._retrieved_objects), 100) if self._limit else 100

@property
def total_retrieved(self) -> int:
"""Get the total number of objects this iterator has retrieved."""
return len(self._retrieved_objects)

async def add_object(self, obj) -> None:
"""Add an object to iterator's queue."""
return await self._queue.put(obj)
Expand Down

0 comments on commit 5bcc33b

Please sign in to comment.