comparison asyncio_threads/bucket_questions/main.py @ 48:46daba6e3cf4

Few python scrtips to show how to use asychio.
author MrJuneJune <me@mrjunejune.com>
date Sat, 13 Dec 2025 14:23:02 -0800
parents
children
comparison
equal deleted inserted replaced
47:829623189a57 48:46daba6e3cf4
1 import asyncio
2 from concurrent.futures import ThreadPoolExecutor
3 from typing import Dict
4
5 class DistributedCache:
6
7 def __init__(self):
8 self._user_bucket_map: Dict[str, TokenBucket] = {}
9
10
11 def get_bucket(self, user_id: str):
12 return self._user_bucket_map[user_id]
13
14 def set_bucket(self, user_id: str):
15 self._user_bucket_map[user_id] = TokenBucket()
16
17
18 INITIAL_VALUES = 10
19
20 class TokenBucket:
21
22 def __init__(self, initial_values = INITIAL_VALUES):
23 self._tokens = initial_values
24 self._refill_values = initial_values
25 self._lock = asyncio.Lock()
26
27 def get_tokens(self):
28 print(self._tokens)
29 return self._tokens
30
31 async def consume_tokens(self, token: int):
32 async with self._lock:
33 if self.get_tokens() < token:
34 return False
35
36 await asyncio.sleep(1)
37 self._tokens -= token
38 return True
39
40 async def refill_tokens(self):
41 async with self._lock:
42 self._tokens = self._refill_values
43
44
45 cache = DistributedCache()
46
47 user_1 = "JUNE"
48 user_2 = "VICTOR"
49 cache.set_bucket(user_1)
50 cache.set_bucket(user_2)
51
52 async def refill_token_bucket(user_id: str):
53 await cache.get_bucket(user_id).refill_tokens()
54
55 async def use_tokens(user_id: str, tokens: int):
56 return await cache.get_bucket(user_id).consume_tokens(tokens)
57
58 def run():
59 with ThreadPoolExecutor(max_workers=3) as thread:
60 thread.submit(asyncio.run, use_tokens(user_1, 10))
61 thread.submit(asyncio.run, use_tokens(user_1, 10))
62 thread.submit(asyncio.run, use_tokens(user_2, 10))
63
64 if __name__ == "__main__":
65 run()