1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| import time import datetime import threading from typing import Dict, Optional, Callable, Tuple
class SimpleCronJob: def __init__(self): self.tasks: Dict[str, Tuple[datetime.datetime, datetime.timedelta, bool, bool, Callable]] = {} self.lock = threading.Lock() self.stop_flag = False
def add_task(self, task_id: str, func: Callable, delay_seconds: int = 0, interval_seconds: int = 0, once: bool = False): """ 添加一个定时任务。
:param task_id: 任务唯一标识符 :param func: 要执行的函数 :param delay_seconds: 首次执行延迟时间(秒) :param interval_seconds: 周期执行间隔时间(秒) :param once: 是否为一次性任务 """ with self.lock: self.tasks[task_id] = ( None, datetime.timedelta(seconds=delay_seconds), False, datetime.timedelta(seconds=interval_seconds), once, func ) print(f"任务 {task_id} 已注册")
def start(self): print("开始执行定时任务服务...") while not self.stop_flag: now = datetime.datetime.now()
with self.lock: for task_id, (last_time, delay_time, first_executed, interval_time, once, func) in list(self.tasks.items()): if delay_time.total_seconds() > 0 and not first_executed: if last_time is None: self.tasks[task_id] = (now, delay_time, first_executed, interval_time, once, func) continue elif (now - last_time) < delay_time: continue else: self.tasks[task_id] = (now, delay_time, True, interval_time, once, func)
elif last_time is None: self.tasks[task_id] = (now, delay_time, first_executed, interval_time, once, func) continue elif (now - last_time) < interval_time: continue
try: print(f"执行任务 {task_id}") func() except Exception as e: print(f"任务 {task_id} 执行出错: {e}")
self.tasks[task_id] = (now, delay_time, True, interval_time, once, func)
if once: del self.tasks[task_id] print(f"任务 {task_id} 已完成并移除")
time.sleep(1)
def stop(self): self.stop_flag = True print("定时任务服务已停止")
def example_task(): print("✅ 示例任务正在执行...")
if __name__ == "__main__": cron = SimpleCronJob()
cron.add_task("periodic_task", lambda: print("🔁 周期性任务执行"), interval_seconds=5)
cron.add_task("delayed_periodic_task", lambda: print("🕒 延迟+周期任务执行"), delay_seconds=3, interval_seconds=6)
cron.add_task("one_time_task", lambda: print("📎 一次性任务执行"), once=True)
cron.add_task("one_time_delayed_task", lambda: print("⏳ 一次性延迟任务执行"), delay_seconds=5, once=True)
try: cron.start() except KeyboardInterrupt: cron.stop()
|