view grok_interview/job_scheduler.py @ 64:a30944e5719e

Added vibe coded markdown to html script since it is useful for me. Updated Dowa so that it can be compiled without dirnet for windows.
author June Park <parkjune1995@gmail.com>
date Tue, 23 Dec 2025 15:18:46 -0800
parents 68fa88ac73fe
children
line wrap: on
line source

# Job Schedular
# 
# Implement a concurrent job scheduler that can schedule multiple workers to complete job tasks. The scheduler must handle asynchronous tasks and be able to dynamically increase or decrease the number of workers. It must handle task execution failures, for example, by retrying or logging. Provide examples with at least three different tasks and examples of scheduling strategies.

import asyncio
from asyncio import Task
import random

# Rename to match standard library function
async def job(name: str, delay: int = 1):
    # Use asyncio.sleep
    await asyncio.sleep(delay)
    val = random.randint(1, 10)
    if val > 5:
        raise Exception("Something went wrong")
    else:
        print(name)

class JobScheduler:
    def __init__(self):
        self._jobs: list[Task] = []

    def add_task(self, task: Task):
        self._jobs.append(task)
        print(f"Added job: {task.get_name()}") # Added for debugging

    async def run(self):
        print("Starting concurrent jobs...")
        await asyncio.gather(*self._jobs, return_exceptions=True)
        print("All jobs completed.")

async def main():
    job_scheduler = JobScheduler()
    # Use asyncio.create_task and provide a name
    job_scheduler.add_task(asyncio.create_task(job("JUNE", 3), name="JUNE_JOB"))
    job_scheduler.add_task(asyncio.create_task(job("PARK", 2), name="PARK_JOB"))
    job_scheduler.add_task(asyncio.create_task(job("HELLO", 1), name="HELLO_JOB"))
    await job_scheduler.run()


if __name__ == "__main__":
    # Run the main asynchronous entry point
    asyncio.run(main())