AutoLine源碼分析之調度管理器
什麼是AutoLine開源平台
AutoLine開源平台是一個開源自動化測試解決方案,基於RobotFramework進行二次開發,支持RobotFramework幾乎所有的庫。
源碼地址
github地址: https://github.com/small99/AutoLine
碼 雲 地 址:https://gitee.com/lym51/AutoLine
AutoLine採用了Apscheduler庫來實現AutoLine的測試用例的執行任務的調度管理
什麼是Apscheduler
APScheduler:Python下強大的任務調度工具,可以完成定時任務,周期任務等,它是跨平台的,用於取代Linux下的cron daemon或者Windows下的task scheduler。
Apscheduler內置三種調度調度系統:
Cron風格
間隔性執行
僅在某個時間執行一次
在AutoLine開源平台中,我們採用了cron風格的支持以實現自由靈活的調度控制
Apscheduler支持哪些存儲方式
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
AutoLine開源平台採用了SQLAlchemy MySQL存儲調度任務
下面我們一起看看AutoLine下對調度的封裝源碼:
源碼結構如下
__init__ 你懂的
setup 初始化調度器
start 啟動調度器
is_running 判斷調度是否在運行
shutdown 關閉調度
load_job_list 載入所有項目任務
add_job 新增調度任務
update_job 更新調度任務
remove_job 移除調度任務
pause_job 暫停調度任務
resume_job 恢復調度任務
get_jobs 獲取所有任務
print_jobs 在控制台輸出所有任務
setup源碼分析
setup主要用於配置Apscheduler啟動時載入的配置
defsetup(self):
self.scheduler = BackgroundScheduler({
# 設置調度任務存儲mysql連接串
"apscheduler.jobstores.default": {
"type":"sqlalchemy",
"url":self.app.config["TRIGGER_DATABASE_URL"]
},
# 設置調度執行器進程池信息
"apscheduler.executors.processpool": {
"type":"threadpool",
"max_workers":"30"
},
# 設置調度其他配置
"apscheduler.job_defaults.coalesce":"false",
"apscheduler.job_defaults.max_instances":"4",
"apscheduler.timezone":"UTC",
})
load_job_list源碼分析
用於載入所有設置了有效cron"表達式的項目進行自動調度,一般初始化啟動時,調用一次即可
defload_job_list(self):
withself.app.app_context():
# 查詢所有項目
projects = AutoProject.query.all()
forpinprojects:
ifp.enableandself.scheduler.get_job(p.id)is None:
cron = p.cron.replace("
","").strip().split(" ")
# 判斷cron表達式是否有效
iflen(cron)
continue
# 新增任務
j=self.scheduler.add_job(func=run_job,
trigger="cron",
name=p.name,
replace_existing=True,
minute=cron[],
hour=cron[1],
day=cron[2],
month=cron[3],
day_of_week=cron[4],
id="%s"% p.id,
args=(p.id,))
else:
self.update_job(p.id)
其他函數就不一一展示了,請直接查閱代碼
最後附上Apscheduler的官方手冊鏈接:http://apscheduler.readthedocs.io/en/latest/userguide.html
※AutoLine源碼分析之靜態頁面模板及對應API介紹
※adb devices命令 unauthorized解決方案
TAG:開源優測 |