Mercurial
view asyncio_threads/stop_token/main.py @ 71:75de5903355c
Giagantic changes that update Dowa library to be more align with stb style array and hashmap. Updated Seobeo to be caching on server side instead of file level caching. Deleted bunch of things I don't really use.
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Sun, 28 Dec 2025 20:34:22 -0800 |
| parents | 46daba6e3cf4 |
| children |
line wrap: on
line source
from typing import Generator from typing import List # [Hello There, "I am <EN", "D> Something"] # | def truncate_stream(text_stream: Generator[str, None, None], stop_token: str) -> Generator[str, None, None]: buffer = "" for chunk in text_stream: can_be_prefix = False buffer += chunk if stop_token not in buffer: for end in range(1, len(stop_token)): if stop_token[:end] in buffer[-1 * len(stop_token):]: can_be_prefix = True if not can_be_prefix: yield chunk buffer = "" else: pos = buffer.find(stop_token) yield buffer[:pos] return def stream_to_list(stream: Generator[str, None, None]) -> List[str]: return [chunk for chunk in stream] def list_to_stream(chunks: List[str]) -> Generator[str, None, None]: for chunk in chunks: yield chunk print( stream_to_list( truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<END>", "Thanks for asking.", "How are you"]), "<END>") ) ) print( stream_to_list( truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<E", "N", "D>", "Thanks for asking.", "How are you"]), "<END>") ) )