Mercurial
diff asyncio_threads/stop_token/main.py @ 48:46daba6e3cf4
Few python scrtips to show how to use asychio.
| author | MrJuneJune <me@mrjunejune.com> |
|---|---|
| date | Sat, 13 Dec 2025 14:23:02 -0800 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/asyncio_threads/stop_token/main.py Sat Dec 13 14:23:02 2025 -0800 @@ -0,0 +1,42 @@ +from typing import Generator +from typing import List + +# [Hello There, "I am <EN", "D> Something"] +# | + +def truncate_stream(text_stream: Generator[str, None, None], stop_token: str) -> Generator[str, None, None]: + buffer = "" + for chunk in text_stream: + can_be_prefix = False + buffer += chunk + if stop_token not in buffer: + for end in range(1, len(stop_token)): + if stop_token[:end] in buffer[-1 * len(stop_token):]: + can_be_prefix = True + if not can_be_prefix: + yield chunk + buffer = "" + else: + pos = buffer.find(stop_token) + yield buffer[:pos] + return + +def stream_to_list(stream: Generator[str, None, None]) -> List[str]: + return [chunk for chunk in stream] + +def list_to_stream(chunks: List[str]) -> Generator[str, None, None]: + for chunk in chunks: + yield chunk + + +print( + stream_to_list( + truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<END>", "Thanks for asking.", "How are you"]), "<END>") + ) +) + +print( + stream_to_list( + truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<E", "N", "D>", "Thanks for asking.", "How are you"]), "<END>") + ) +)