Базовое планирование заданий с помощью APScheduler
С помощью APScheduler легко планировать простые задания, а для сложных задач использовать более широкие функциональные возможности.
Вы создали программу Python , например, для резервного копирования базы данных, и теперь хотите запускать ее каждый час или каждый день.
Существует множество решений, и мы используем APScheduler, 'Advanced Python Scheduler', планировщик заданий, который отвечает многим требованиям. Для нашей задачи этот пакет может показаться излишеством, но давайте воспользуемся им здесь, а когда мы поймем основы, то, возможно, захотим использовать его для более сложных задач.
Как всегда, я делаю это на Ubuntu 22.04.
Наша программа резервного копирования
В нашей программе резервного копирования нет ничего особенного:
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
Для этой демонстрации мы добавили две переменные. Чтобы иметь возможность проверить поведение нашего планировщика, мы добавили 'random.randint', чтобы время от времени генерировать исключение.
Рабочие места
Конечно же, этот пакет предназначен для планирования заданий. Для этого он использует планировщик и следующие компоненты:
- Trigger
Триггер указывает, когда должно быть запущено задание. Здесь мы используем CronTrigger. - Job store
Хранилище заданий вмещает запланированные задания. Хранилище заданий по умолчанию, MemoryJobStore, просто хранит задания в памяти. Вполне подходит для наших целей. - Исполнитель
Исполнитель обычно отправляет задание в поток или пул процессов. Когда задание выполнено, он уведомляет планировщик, и генерируется событие, например 'executed' или 'error'. Исполнитель по умолчанию, ThreadPoolExecutor, имеет максимальное количество потоков 10. Вполне подходит для нашей цели.
Это означает, что нам нужно указать только компонент Trigger .
CronTrigger и интервал
Мы используем функцию CronTrigger , чтобы получить функциональность CRON. Затем нужно указать правильные значения года, месяца, дня, часа, минуты и секунды. Существует множество сайтов с примерами, здесь же мы определяем несколько параметров на выбор:
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
Когда мы добавляем задание, мы указываем триггер следующим образом:
add_job(
...
trigger=CronTrigger(...)
...
)
Также можно указать интервал:
add_job(
...
trigger='interval',
seconds=1,
...
)
Это также пригодится при тестировании нашего приложения.
Несколько экземпляров
Когда задание запускается, APScheduler проверяет, не является ли оно уже активным. Если оно активно, отображается следующее сообщение:
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
Обычно это желательно, но в некоторых случаях вы можете разрешить одновременную работу нескольких экземпляров. Это можно указать с помощью параметра 'max_instances':
- Если 1, то, когда экземпляр уже запущен, новый экземпляр не будет запущен, а будет пропущен.
- Если 2, то, когда экземпляр уже запущен, будет запущен новый экземпляр, и будут запущены два экземпляра.
- И т. д.
Запуск задания при старте программы
По умолчанию мы должны ждать, пока CRONTrigger запустит задание. Часто мы не хотим этого, а хотим, чтобы задание запускалось сразу после запуска программы. В таких случаях нам нужно изменить существующее задание и установить параметр 'next_run_time':
job.modify(next_run_time=datetime.datetime.now())
События и слушатель
Для нашего приложения резервного копирования это не очень нужно, но может быть полезно для вас. Здесь мы добавим код для трех случаев и увеличим счетчики для каждого события:
- Когда выполняется задание: EVENT_JOB_EXECUTED
- Когда в задании возникает ошибка: EVENT_JOB_ERROR
- Когда выполнение задания было пропущено: EVENT_JOB_MISSED
Когда задание добавляется в планировщик, ему присваивается идентификатор job_id. В слушателе мы извлекаем имя_задания, сначала извлекая задание из события, а затем используя обратный поиск по идентификатору задания. Обратите внимание, что событие EVENT_JOB_MISSED никогда не попадает в слушатель.
Код
Ниже приведен код для демонстрации этого.
Мы добавили немного дополнительного кода для отображения статистики выполнения.
BackupRunner инициализируется методом 'job_backup' в планировщике.
Здесь мы можем добавить больше инициализации и переменных и, по желанию, отловить ошибки.
Закомментируйте/отмените некоторые строки, чтобы попробовать различные триггеры: CronTrigger и интервал.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Вот несколько строк, выведенных в консоль:
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Summary
Реализовать повторяющиеся задания очень просто с помощью APscheduler. Здесь мы затронули только основы, но этого будет достаточно для многих случаев использования. Чего не хватает, так это простого способа проверить, какие задания выполняются и сколько экземпляров задания запущено. Но это лишь приятная мелочь.
Ссылки / кредиты
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
Подробнее
Scheduling
Недавний
- Использование Ingress для доступа к RabbitMQ на кластере Microk8s
- Простая видеогалерея с Flask, Jinja, Bootstrap и JQuery
- Базовое планирование заданий с помощью APScheduler
- Коммутатор базы данных с HAProxy и HAProxy Runtime API
- Docker Swarm rolling updates
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
Большинство просмотренных
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Уменьшение времени отклика на запросы на странице Flask SQLAlchemy веб-сайта
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Подключение к службе на хосте Docker из контейнера Docker
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов