comparison asyncio_threads/stop_token/main.py @ 48:46daba6e3cf4

Few python scrtips to show how to use asychio.
author MrJuneJune <me@mrjunejune.com>
date Sat, 13 Dec 2025 14:23:02 -0800
parents
children
comparison
equal deleted inserted replaced
47:829623189a57 48:46daba6e3cf4
1 from typing import Generator
2 from typing import List
3
4 # [Hello There, "I am <EN", "D> Something"]
5 # |
6
7 def truncate_stream(text_stream: Generator[str, None, None], stop_token: str) -> Generator[str, None, None]:
8 buffer = ""
9 for chunk in text_stream:
10 can_be_prefix = False
11 buffer += chunk
12 if stop_token not in buffer:
13 for end in range(1, len(stop_token)):
14 if stop_token[:end] in buffer[-1 * len(stop_token):]:
15 can_be_prefix = True
16 if not can_be_prefix:
17 yield chunk
18 buffer = ""
19 else:
20 pos = buffer.find(stop_token)
21 yield buffer[:pos]
22 return
23
24 def stream_to_list(stream: Generator[str, None, None]) -> List[str]:
25 return [chunk for chunk in stream]
26
27 def list_to_stream(chunks: List[str]) -> Generator[str, None, None]:
28 for chunk in chunks:
29 yield chunk
30
31
32 print(
33 stream_to_list(
34 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<END>", "Thanks for asking.", "How are you"]), "<END>")
35 )
36 )
37
38 print(
39 stream_to_list(
40 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<E", "N", "D>", "Thanks for asking.", "How are you"]), "<END>")
41 )
42 )