view asyncio_threads/bucket_questions/main.py @ 105:4de2fb74ce82

Adding few comments for future.
author June Park <parkjune1995@gmail.com>
date Sat, 03 Jan 2026 10:28:54 -0800
parents 46daba6e3cf4
children
line wrap: on
line source

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Dict

class DistributedCache:

    def __init__(self):
        self._user_bucket_map: Dict[str, TokenBucket] = {}


    def get_bucket(self, user_id: str):
        return self._user_bucket_map[user_id]

    def set_bucket(self, user_id: str):
        self._user_bucket_map[user_id] = TokenBucket()


INITIAL_VALUES = 10

class TokenBucket:

    def __init__(self, initial_values = INITIAL_VALUES):
        self._tokens = initial_values
        self._refill_values = initial_values
        self._lock = asyncio.Lock()

    def get_tokens(self):
        print(self._tokens)
        return self._tokens

    async def consume_tokens(self, token: int):
        async with self._lock:
            if self.get_tokens() < token:
                return False

            await asyncio.sleep(1)
            self._tokens -=  token
            return True 

    async def refill_tokens(self):
        async with self._lock:
            self._tokens = self._refill_values


cache = DistributedCache()

user_1 = "JUNE"
user_2 = "VICTOR"
cache.set_bucket(user_1)
cache.set_bucket(user_2)

async def refill_token_bucket(user_id: str):
    await cache.get_bucket(user_id).refill_tokens()

async def use_tokens(user_id: str, tokens: int):
    return await cache.get_bucket(user_id).consume_tokens(tokens)

def run():
    with ThreadPoolExecutor(max_workers=3) as thread:
        thread.submit(asyncio.run, use_tokens(user_1, 10))
        thread.submit(asyncio.run, use_tokens(user_1, 10))
        thread.submit(asyncio.run, use_tokens(user_2, 10))

if __name__ == "__main__":
    run()