Mercurial
view asyncio_threads/bucket_questions/single_thread_async.py @ 109:1c446ab6f945
[MrJuneJune] New Blog about writing Seobeo library.
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Sat, 03 Jan 2026 19:37:51 -0800 |
| parents | 46daba6e3cf4 |
| children |
line wrap: on
line source
import asyncio from typing import Dict class DistributedCache: def __init__(self): self._user_bucket_map: Dict[str, TokenBucket] = {} def get_bucket(self, user_id: str): return self._user_bucket_map[user_id] def set_bucket(self, user_id: str): self._user_bucket_map[user_id] = TokenBucket() INITIAL_VALUES = 10 class TokenBucket: def __init__(self, initial_values = INITIAL_VALUES): self._tokens = initial_values self._refill_values = initial_values self._lock = asyncio.Lock() def get_tokens(self): print(self._tokens) return self._tokens async def consume_tokens(self, token: int): async with self._lock: if self.get_tokens() < token: return False await asyncio.sleep(1) self._tokens -= token return True async def refill_tokens(self): async with self._lock: self._tokens = self._refill_values cache = DistributedCache() user_1 = "JUNE" user_2 = "VICTOR" cache.set_bucket(user_1) cache.set_bucket(user_2) async def refill_token_bucket(user_id: str): await cache.get_bucket(user_id).refill_tokens() async def use_tokens(user_id: str, tokens: int): return await cache.get_bucket(user_id).consume_tokens(tokens) async def run(): return_value = await asyncio.gather( use_tokens(user_1, 10), refill_token_bucket(user_1), use_tokens(user_1, 10), use_tokens(user_2, 10), ) print(return_value) if __name__ == "__main__": asyncio.run(run())