schedule 库是一个轻量级的定时任务调度工具,它允许用户以分钟、小时、日、星期几或特定日期为单位来设置任务的执行时间。
安装
要安装 schedule 库,只需在命令行中使用 pip 命令即可:
pip install schedule
使用方法
schedule 的基本使用方法如下:
# 定义一个任务函数
def my_task():
print("任务执行中")
# 设置定时任务,每10分钟执行一次
schedule.every(10).minutes.do(my_task)
# 启动任务调度循环
while True:
schedule.run_pending()
time.sleep(1)
定时格式
schedule 支持多种定时格式,例如:
# 每10分钟执行一次
schedule.every(10).minutes.do(my_task)
# 每小时执行一次
schedule.every().hour.do(my_task)
# 每天10:30执行一次
schedule.every().day.at("10:30").do(my_task)
# 每周一执行一次
schedule.every().monday.do(my_task)
# 每周三13:15执行一次
schedule.every().wednesday.at("13:15").do(my_task)
# 每小时的第42分钟执行一次
schedule.every().hour.at(":42").do(my_task)
取消定时任务
在某些情况下,我们可能需要取消一个已经设置的定时任务:
import schedule
def my_task():
print("Hello world")
# 设置一个每日22:30执行的任务
job = schedule.every().day.at("22:30").do(my_task)
# 取消任务
schedule.cancel_job(job)
任务只执行一次
有时我们希望任务只执行一次,然后自动取消:
import schedule
import time
def job_that_executes_once():
print("只执行一次的任务")
return schedule.CancelJob
# 设置任务
schedule.every().day.at("22:30").do(job_that_executes_once)
# 启动循环
while True:
schedule.run_pending()
time.sleep(1)
获取所有任务和清除所有任务
获取所有任务
import schedule
def my_task():
print("Hello world")
# 设置任务
schedule.every().second.do(my_task)
# 获取所有任务
all_jobs = schedule.get_jobs()
清除所有任务
import schedule
def greet(name):
print(f"Hello {name}")
# 设置任务
schedule.every().second.do(greet, "World")
# 清除所有任务
schedule.clear()
任务过滤
通过给任务打标签,我们可以对任务进行分类,并根据标签获取或清除任务:
import schedule
def greet(name):
print(f"Hello {name}")
# 设置任务并打上标签
schedule.every().day.do(greet, "Andrea").tag("daily", "friend")
schedule.every().hour.do(greet, "John").tag("hourly", "friend")
schedule.every().hour.do(greet, "Monica").tag("hourly", "customer")
schedule.every().day.do(greet, "Derek").tag("daily", "guest")
# 根据标签获取任务
friends_jobs = schedule.get_jobs("friend")
# 清除特定标签的任务
schedule.clear("daily")
时间模糊
schedule 支持时间模糊设置,即在一定时间范围内执行任务:
def my_job():
print("你好")
# 在5到10秒之间执行任务
schedule.every(5).to(10).seconds.do(my_job)
到达指定时间后执行任务
使用 until() 函数,我们可以设置任务在到达特定时间后才开始执行:
import schedule
from datetime import datetime, timedelta
def my_job():
print("到达指定时间后执行的任务")
# 直到18:30才开始执行
schedule.every(1).hours.until("18:30").do(my_job)
通过这些示例,我们可以看到 schedule 库提供了丰富的功能,可以满足各种定时任务的需求。