Mercurial
view asyncio_threads/bucket_questions/single_thread_async.py @ 212:84826b3c655b
[MrJuneJune] Forgot to add assets.
| author | MrJuneJune <me@mrjunejune.com> |
|---|---|
| date | Sun, 15 Feb 2026 21:38:36 -0800 |
| parents | 46daba6e3cf4 |
| children |
line wrap: on
line source
import asyncio from typing import Dict class DistributedCache: def __init__(self): self._user_bucket_map: Dict[str, TokenBucket] = {} def get_bucket(self, user_id: str): return self._user_bucket_map[user_id] def set_bucket(self, user_id: str): self._user_bucket_map[user_id] = TokenBucket() INITIAL_VALUES = 10 class TokenBucket: def __init__(self, initial_values = INITIAL_VALUES): self._tokens = initial_values self._refill_values = initial_values self._lock = asyncio.Lock() def get_tokens(self): print(self._tokens) return self._tokens async def consume_tokens(self, token: int): async with self._lock: if self.get_tokens() < token: return False await asyncio.sleep(1) self._tokens -= token return True async def refill_tokens(self): async with self._lock: self._tokens = self._refill_values cache = DistributedCache() user_1 = "JUNE" user_2 = "VICTOR" cache.set_bucket(user_1) cache.set_bucket(user_2) async def refill_token_bucket(user_id: str): await cache.get_bucket(user_id).refill_tokens() async def use_tokens(user_id: str, tokens: int): return await cache.get_bucket(user_id).consume_tokens(tokens) async def run(): return_value = await asyncio.gather( use_tokens(user_1, 10), refill_token_bucket(user_1), use_tokens(user_1, 10), use_tokens(user_2, 10), ) print(return_value) if __name__ == "__main__": asyncio.run(run())