Python定時任務(wù)Schedule模塊介紹
互聯(lián)網(wǎng)軟件開發(fā)中可以說我們離不開定時任務(wù)的功能開發(fā)。12306支付時間限制30分鐘、定時短信通知、定時統(tǒng)計報表、定時上線活動、數(shù)據(jù)備份等。我理解的定時任務(wù)特點:周期性、定時性、離線性、低調(diào)性(夜深人靜默默無聞哈哈)。定時任務(wù)在軟件開發(fā)中起到了無私奉獻、不求回報的大無畏精神。今天給大家介紹Python定時任務(wù)Schedule。
Python Schedule優(yōu)勢:
- 簡單易用的API用于作業(yè)調(diào)度。
- 非常輕量級,沒有外部依賴性。
- 出色的測試覆蓋率。
- 在Python 3.6、3.7、3.8和3.9上測試通過。
- 進程內(nèi)的任務(wù)調(diào)用不再需要額外的進程。
Python Schedule缺點:
- 持久性的任務(wù)
- 毫秒級的任務(wù)
- 并發(fā)執(zhí)行(可以用多線程解決)
- 本地化(時區(qū)、工作日或節(jié)假日)
1、Python Schedule模塊(安裝)
#官方文檔https://schedule.readthedocs.io/en/stable/#安裝pip install schedule
2、Python Schedule模塊(Hello Schedule)
#模塊導(dǎo)入import scheduleimport time#定時執(zhí)行函數(shù)def job(): with open('schedule_history.log', 'a', encoding='utf-8') as objFile: objFile.write('Hello Schedule!!!n') #每秒循環(huán)執(zhí)行schedule.every().second.do(job)#守護進程while True: schedule.run_pending() time.sleep(1)
3、Python Schedule模塊(業(yè)務(wù)類)
#業(yè)務(wù)邏輯處理類class HandelDataService(object): def __init__(self): pass def run(self): with open('business.txt', 'a', encoding='utf-8') as fileobj: fileobj.write('Hello Businessn') import scheduleimport time#導(dǎo)入業(yè)務(wù)類from business import HandelDataService#調(diào)用類業(yè)務(wù)邏輯def handle_dispatcher(): objDataService = HandelDataService() objDataService.run() schedule.every().second.do(handle_dispatcher)while True: schedule.run_pending() time.sleep(1)
4、Python Schedule模塊(后臺運行Nohup)
nohup python3 background.py 2>&1 &
5、Python Schedule模塊(后臺運行官方推薦方式)
import threadingimport timeimport schedule#啟動一個shedule pending線程def run_schedule_thread(interval=1): schedule_thread_event = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not schedule_thread_event.is_set(): schedule.run_pending() time.sleep(interval) schedule_thread = ScheduleThread() schedule_thread.start() return schedule_thread_eventdef background_job(): with open('demo.txt', 'a', encoding='utf-8') as fileobj: fileobj.write('Hello Pythonn')#每秒執(zhí)行一次定時任務(wù)schedule.every().second.do(background_job)#調(diào)用schedule pending線程run_schedule_event = run_schedule_thread()
6、Python Schedule模塊(Crontab)
#每3秒執(zhí)行一次schedule.every(3).seconds.do(job)#每3分鐘執(zhí)行一次schedule.every(3).minutes.do(job)#每3小時執(zhí)行一次schedule.every(3).hours.do(job)#每3天執(zhí)行一次schedule.every(3).days.do(job)#每3周執(zhí)行一次schedule.every(3).weeks.do(job)#每分鐘的23秒執(zhí)行一次schedule.every().minute.at(":23").do(job)#每小時的42分鐘執(zhí)行一次schedule.every().hour.at(":42").do(job)#每5小時的20分30秒執(zhí)行一次schedule.every(5).hours.at("20:30").do(job)#每天的10:30執(zhí)行schedule.every().day.at("10:30").do(job)#每天的10:30:42秒執(zhí)行一次schedule.every().day.at("10:30:42").do(job)#每星期一執(zhí)行一次schedule.every().monday.do(job)#每星期三的13:15分執(zhí)行一次schedule.every().wednesday.at("13:15").do(job)#每5天到10天隨機天數(shù)執(zhí)行一次schedule.every(5).to(10).seconds.do(my_job)#每小時執(zhí)行一次至2030年1月1日18點schedule.every(1).hours.until("2030-01-01 18:33").do(job)
7、Python Schedule模塊(業(yè)務(wù)傳參)
import scheduleimport time#函數(shù)傳參def job(name,address): with open('demo3.txt', 'a', encoding='utf-8') as fileobj: fileobj.write('Hello ' name ", " address 'n')#定時任務(wù)調(diào)用傳值#每秒執(zhí)行一次schedule.every().second.do(job,name="second",address="beijing city")while True: schedule.run_pending() time.sleep(1)
8、Python Schedule模塊(接口)
#取消某個任務(wù)second_job = schedule.every().second.do(job,name="second",address="beijing city")schedule.cancel_job(second_job)#任務(wù)只運行一次def job_exec_once(): #業(yè)務(wù)類調(diào)用 #增加return返回表示任務(wù)只運行一次 return schedule.CancelJobschedule.every().second.do(job_exec_once)#獲取所有任務(wù)all_jobs = schedule.get_jobs()#取消所有任務(wù)schedule.clear()#根據(jù)任務(wù)tag選擇性過濾任務(wù)schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')#根據(jù)任務(wù)tag選擇性取消任務(wù)schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')schedule.clear('daily-tasks')
個人非常喜歡Schedule代碼語法方式。一種適合人類邏輯思維閱讀的代碼設(shè)計。
感謝大家的評論、點贊、分享、關(guān)注。。。