Mercurial
annotate asyncio_threads/stop_token/main.py @ 134:75e7aac67fa2 websocket-blog
Closing blog branch since it is written
| author | June Park <parkjune1995@gmail.com> |
|---|---|
| date | Fri, 09 Jan 2026 09:30:30 -0800 |
| parents | 46daba6e3cf4 |
| children |
| rev | line source |
|---|---|
|
48
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
1 from typing import Generator |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
2 from typing import List |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
3 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
4 # [Hello There, "I am <EN", "D> Something"] |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
5 # | |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
6 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
7 def truncate_stream(text_stream: Generator[str, None, None], stop_token: str) -> Generator[str, None, None]: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
8 buffer = "" |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
9 for chunk in text_stream: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
10 can_be_prefix = False |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
11 buffer += chunk |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
12 if stop_token not in buffer: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
13 for end in range(1, len(stop_token)): |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
14 if stop_token[:end] in buffer[-1 * len(stop_token):]: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
15 can_be_prefix = True |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
16 if not can_be_prefix: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
17 yield chunk |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
18 buffer = "" |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
19 else: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
20 pos = buffer.find(stop_token) |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
21 yield buffer[:pos] |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
22 return |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
23 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
24 def stream_to_list(stream: Generator[str, None, None]) -> List[str]: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
25 return [chunk for chunk in stream] |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
26 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
27 def list_to_stream(chunks: List[str]) -> Generator[str, None, None]: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
28 for chunk in chunks: |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
29 yield chunk |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
30 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
31 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
32 print( |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
33 stream_to_list( |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
34 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<END>", "Thanks for asking.", "How are you"]), "<END>") |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
35 ) |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
36 ) |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
37 |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
38 print( |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
39 stream_to_list( |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
40 truncate_stream(list_to_stream(["Hello there ", "I'm doing great today.<E", "N", "D>", "Thanks for asking.", "How are you"]), "<END>") |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
41 ) |
|
46daba6e3cf4
Few python scrtips to show how to use asychio.
MrJuneJune <me@mrjunejune.com>
parents:
diff
changeset
|
42 ) |