Mercurial
comparison asyncio_threads/stop_token/main.py @ 48:46daba6e3cf4
Few python scrtips to show how to use asychio.
| author | MrJuneJune <me@mrjunejune.com> |
|---|---|
| date | Sat, 13 Dec 2025 14:23:02 -0800 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 47:829623189a57 | 48:46daba6e3cf4 |
|---|---|
| 1 from typing import Generator | |
| 2 from typing import List | |
| 3 | |
| 4 # [Hello There, "I am <EN", "D> Something"] | |
| 5 # | | |
| 6 | |
| 7 def truncate_stream(text_stream: Generator[str, None, None], stop_token: str) -> Generator[str, None, None]: | |
| 8 buffer = "" | |
| 9 for chunk in text_stream: | |
| 10 can_be_prefix = False | |
| 11 buffer += chunk | |
| 12 if stop_token not in buffer: | |
| 13 for end in range(1, len(stop_token)): | |
| 14 if stop_token[:end] in buffer[-1 * len(stop_token):]: | |
| 15 can_be_prefix = True | |
| 16 if not can_be_prefix: | |
| 17 yield chunk | |
| 18 buffer = "" | |
| 19 else: | |
| 20 pos = buffer.find(stop_token) | |
| 21 yield buffer[:pos] | |
| 22 return | |
| 23 | |
| 24 def stream_to_list(stream: Generator[str, None, None]) -> List[str]: | |
| 25 return [chunk for chunk in stream] | |
| 26 | |
| 27 def list_to_stream(chunks: List[str]) -> Generator[str, None, None]: | |
| 28 for chunk in chunks: | |
| 29 yield chunk | |
| 30 | |
| 31 | |
| 32 print( | |
| 33 stream_to_list( | |
| 34 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<END>", "Thanks for asking.", "How are you"]), "<END>") | |
| 35 ) | |
| 36 ) | |
| 37 | |
| 38 print( | |
| 39 stream_to_list( | |
| 40 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<E", "N", "D>", "Thanks for asking.", "How are you"]), "<END>") | |
| 41 ) | |
| 42 ) |