view grok_interview/bucket_try_2.py @ 52:636eab07809d

Fixed dowa memory problems. Add few more utility functions.
author June Park <parkjune1995@gmail.com>
date Fri, 19 Dec 2025 13:30:30 -0800
parents 68fa88ac73fe
children
line wrap: on
line source

# Context
# 
# Please implement two functions: refillTokenBucket and useTokens. Two classes are already defined: DistributedCache and TokenBucket.
# 
# refillTokenBucket(user_id): Refill tokens for the specified user's bucket.
# 
# useTokens(user_id, tokens): Check if there are enough tokens in the specified user's token bucket. If so, update the remaining token count and return true; otherwise, return false. You are required to use instances of the existing classes to implement these functions.

import asyncio
from typing import Dict


class DistributedCache:

    def __init__(self):
        self._hashmap: Dict[str, TokenBucket] = {}

    def add_user(self, user_id: str):
        self._hashmap[user_id] = TokenBucket()

    def get_token_bucket(self, user_id: str):
        return self._hashmap[user_id] 

class TokenBucket:

    def __init__(self, initial_token_value: int = 100):
        self._tokens = initial_token_value
        self.initial_token_value = initial_token_value
        self._lock = asyncio.Lock()

    async def consume(self, token: int):
        async with self._lock:
            tokens = self.get_token()
            await asyncio.sleep(0.1)
            if tokens < token:
                return False
            tokens -= token
            self.set_token(tokens)
            return True

    async def refill(self):
        async with self._lock:
            self._tokens = self.initial_token_value

    def get_token(self):
        return self._tokens

    def set_token(self, token: int):
        self._tokens = token

distCache = DistributedCache()
distCache.add_user("june")

async def refillTokenBucket(user_id: str):
    await distCache.get_token_bucket(user_id).refill()

async def useTokens(user_id: str, token: int):
    return await distCache.get_token_bucket(user_id).consume(token)

async def main():
    await asyncio.gather(useTokens("june", 10), useTokens("june", 10), useTokens("june", 10))
    print(distCache.get_token_bucket("june").get_token())

asyncio.run(main())