victory的博客

长安一片月,万户捣衣声

0%

python | 定时任务

Python定时任务调度

定时任务调度(Scheduled Task Execution)是一种在预定时间或周期性地自动执行特定操作的机制。它广泛应用于自动化运维、数据处理、日志清理、监控告警、定时推送等场景。


🕰️ 定时任务调度的核心概念

概念 说明
任务(Task) 要执行的具体操作,通常是一个函数或命令。
调度器(Scheduler) 管理和触发任务执行的组件。
触发条件(Trigger) 决定任务何时执行的规则,如:固定间隔、延迟执行、cron 表达式等。
执行方式 可同步或异步执行任务。

📌 定时任务的常见类型

  1. 一次性任务(One-time Task)
    • 执行一次后自动结束。
    • 示例:5 秒后发送一条通知。
  2. 周期性任务(Recurring Task)
    • 按照固定间隔重复执行。
    • 示例:每 5 分钟检查服务器状态。
  3. 延迟首次执行任务(Delayed First Execution)
    • 首次执行有延迟,之后按周期执行。
    • 示例:启动后等待 10 秒再开始心跳检测。
  4. 基于时间表达式的任务(Cron-like Task)
    • 使用类似 cron 的语法定义执行时间点。
    • 示例:每天凌晨 2:00 执行数据库备份。

✅ 定时任务调度的应用场景

场景 描述
数据同步 每小时从远程服务器拉取最新数据
日志清理 每天凌晨删除过期日志文件
报表生成 每月第一天自动生成统计报表
健康检查 每隔一段时间检测服务可用性
自动提醒 每天上午9点推送待办事项

🧩 定时任务调度的实现方式(Python 中)

1. 简单轮询 + sleep

  • 如你提供的 SimpleCronJob 类所示。
  • 利用 while 循环 + time.sleep() 实现秒级轮询。
  • 适用于轻量级任务。

2. 使用第三方库

  • APScheduler:功能强大的任务调度器,支持 cron、日期、间隔三种触发器。
  • Celery Beat:分布式任务调度系统,适合大型项目。
  • schedule:简洁易用的库,适合脚本化任务。

3. 操作系统级定时任务

  • Linux 下使用 crontab
  • Windows 下使用“任务计划程序”。

🔁 示例对比:不同方式实现每 5 秒打印一次

方式一:原生 Python(当前类)

1
cron.add_task("periodic_task", lambda: print("🔁 周期性任务执行"), interval_seconds=5)

方式二:使用 schedule

1
2
3
4
5
6
7
8
9
10
11
import schedule
import time

def job():
print("Scheduled task executed")

schedule.every(5).seconds.do(job)

while True:
schedule.run_pending()
time.sleep(1)

方式三:使用 APScheduler

1
2
3
4
5
6
7
8
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
print("APScheduled task executed")

sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5)
sched.start()

🧠 小结

特性 当前类 SimpleCronJob APScheduler Celery Beat crontab
易用性 ✅ 简单直接 ✅ 支持多种触发器 ⚠️ 分布式复杂 ✅ 系统级配置
功能丰富度 ❌ 较基础 ✅ 强大 ✅ 极其强大 ⚠️ 仅基本调度
多线程支持 ✅ 已封装 ✅ 支持 ✅ 支持 ❌ 单进程
持久化 ❌ 不支持 ✅ 可持久化 ✅ 支持 ❌ 不支持
适用场景 本地测试、小型脚本 中型应用 大型分布式系统 系统维护任务

📎 总结一句话:

定时任务调度就是让程序在指定时间或条件下自动执行某些操作,从而实现自动化流程管理。

python定时任务调度实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import time
import datetime
import threading
from typing import Dict, Optional, Callable, Tuple

class SimpleCronJob:
def __init__(self):
self.tasks: Dict[str, Tuple[datetime.datetime, datetime.timedelta, bool, bool, Callable]] = {}
self.lock = threading.Lock()
self.stop_flag = False

def add_task(self, task_id: str, func: Callable, delay_seconds: int = 0, interval_seconds: int = 0, once: bool = False):
"""
添加一个定时任务。

:param task_id: 任务唯一标识符
:param func: 要执行的函数
:param delay_seconds: 首次执行延迟时间(秒)
:param interval_seconds: 周期执行间隔时间(秒)
:param once: 是否为一次性任务
"""
with self.lock:
self.tasks[task_id] = (
None, # last_time
datetime.timedelta(seconds=delay_seconds),
False, # first_executed
datetime.timedelta(seconds=interval_seconds),
once,
func
)
print(f"任务 {task_id} 已注册")

def start(self):
print("开始执行定时任务服务...")
while not self.stop_flag:
now = datetime.datetime.now()

with self.lock:
for task_id, (last_time, delay_time, first_executed, interval_time, once, func) in list(self.tasks.items()):
if delay_time.total_seconds() > 0 and not first_executed:
if last_time is None:
self.tasks[task_id] = (now, delay_time, first_executed, interval_time, once, func)
continue
elif (now - last_time) < delay_time:
continue
else:
self.tasks[task_id] = (now, delay_time, True, interval_time, once, func)

elif last_time is None:
self.tasks[task_id] = (now, delay_time, first_executed, interval_time, once, func)
continue
elif (now - last_time) < interval_time:
continue

# 执行任务
try:
print(f"执行任务 {task_id}")
func()
except Exception as e:
print(f"任务 {task_id} 执行出错: {e}")

# 更新最后执行时间
self.tasks[task_id] = (now, delay_time, True, interval_time, once, func)

# 如果是一次性任务,执行后移除
if once:
del self.tasks[task_id]
print(f"任务 {task_id} 已完成并移除")

time.sleep(1) # 模拟 CRON_TIME_INTERVAL

def stop(self):
self.stop_flag = True
print("定时任务服务已停止")


# 示例任务函数
def example_task():
print("✅ 示例任务正在执行...")

if __name__ == "__main__":
cron = SimpleCronJob()

# 1. 周期性任务:每5秒执行一次
cron.add_task("periodic_task", lambda: print("🔁 周期性任务执行"), interval_seconds=5)

# 2. 延迟性任务:首次延迟3秒后开始,之后每6秒执行一次
cron.add_task("delayed_periodic_task",
lambda: print("🕒 延迟+周期任务执行"),
delay_seconds=3,
interval_seconds=6)

# 3. 一次性任务:立即执行一次(无延迟)
cron.add_task("one_time_task",
lambda: print("📎 一次性任务执行"),
once=True)

# 4. 一次性延迟任务:延迟5秒后执行一次
cron.add_task("one_time_delayed_task",
lambda: print("⏳ 一次性延迟任务执行"),
delay_seconds=5,
once=True)

try:
cron.start()
except KeyboardInterrupt:
cron.stop()