view grok_interview/thread.py @ 58:ccb42d5bf8fd

[PostDog] Somewhat working copy. That would use for testing.
author June Park <parkjune1995@gmail.com>
date Sat, 20 Dec 2025 09:33:15 -0800
parents 68fa88ac73fe
children
line wrap: on
line source

# 1. data struture {}
#    - multiple threads can access it
#    - multiple threads can edit it if it is editable. (mutex)


from hashlib import sha256
from threading import Lock

class Subscribers:
    def __init__(self):
        self.id = sha256()

    def on_event(self, event):
        pass

class Publisher:
    def __init__(self):
        self._subscritions = {}
        self.lock = Lock()

    def publish_event(self, event):
        for subscriber_id, subscriber in enumerate(self._subscritions):
            subscriber.on_event(event)

    def add_subscribers(self, subscriber: Subscribers):
        with self.lock:
            self._subscritions[subscriber.id] = subscriber

    def remove_subscribers(self, subscriber: Subscribers):
        with self.lock:
            del self._subscritions[subscriber.id]