Mercurial
view asyncio_threads/bucket_questions/main.py @ 205:e07b4b5a66bb
Bad named files.
| author | MrJuneJune <me@mrjunejune.com> |
|---|---|
| date | Sun, 15 Feb 2026 11:07:52 -0800 |
| parents | 46daba6e3cf4 |
| children |
line wrap: on
line source
import asyncio from concurrent.futures import ThreadPoolExecutor from typing import Dict class DistributedCache: def __init__(self): self._user_bucket_map: Dict[str, TokenBucket] = {} def get_bucket(self, user_id: str): return self._user_bucket_map[user_id] def set_bucket(self, user_id: str): self._user_bucket_map[user_id] = TokenBucket() INITIAL_VALUES = 10 class TokenBucket: def __init__(self, initial_values = INITIAL_VALUES): self._tokens = initial_values self._refill_values = initial_values self._lock = asyncio.Lock() def get_tokens(self): print(self._tokens) return self._tokens async def consume_tokens(self, token: int): async with self._lock: if self.get_tokens() < token: return False await asyncio.sleep(1) self._tokens -= token return True async def refill_tokens(self): async with self._lock: self._tokens = self._refill_values cache = DistributedCache() user_1 = "JUNE" user_2 = "VICTOR" cache.set_bucket(user_1) cache.set_bucket(user_2) async def refill_token_bucket(user_id: str): await cache.get_bucket(user_id).refill_tokens() async def use_tokens(user_id: str, tokens: int): return await cache.get_bucket(user_id).consume_tokens(tokens) def run(): with ThreadPoolExecutor(max_workers=3) as thread: thread.submit(asyncio.run, use_tokens(user_1, 10)) thread.submit(asyncio.run, use_tokens(user_1, 10)) thread.submit(asyncio.run, use_tokens(user_2, 10)) if __name__ == "__main__": run()