angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Базовое планирование заданий с помощью APScheduler

С помощью APScheduler легко планировать простые задания, а для сложных задач использовать более широкие функциональные возможности.

19 октября 2024
post main image
https://unsplash.com/@sarahs_captured_moments

Вы создали программу Python , например, для резервного копирования базы данных, и теперь хотите запускать ее каждый час или каждый день.

Существует множество решений, и мы используем APScheduler, 'Advanced Python Scheduler', планировщик заданий, который отвечает многим требованиям. Для нашей задачи этот пакет может показаться излишеством, но давайте воспользуемся им здесь, а когда мы поймем основы, то, возможно, захотим использовать его для более сложных задач.

Как всегда, я делаю это на Ubuntu 22.04.

Наша программа резервного копирования

В нашей программе резервного копирования нет ничего особенного:

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

Для этой демонстрации мы добавили две переменные. Чтобы иметь возможность проверить поведение нашего планировщика, мы добавили 'random.randint', чтобы время от времени генерировать исключение.

Рабочие места

Конечно же, этот пакет предназначен для планирования заданий. Для этого он использует планировщик и следующие компоненты:

  • Trigger
    Триггер указывает, когда должно быть запущено задание. Здесь мы используем CronTrigger.
  • Job store
    Хранилище заданий вмещает запланированные задания. Хранилище заданий по умолчанию, MemoryJobStore, просто хранит задания в памяти. Вполне подходит для наших целей.
  • Исполнитель
    Исполнитель обычно отправляет задание в поток или пул процессов. Когда задание выполнено, он уведомляет планировщик, и генерируется событие, например 'executed' или 'error'. Исполнитель по умолчанию, ThreadPoolExecutor, имеет максимальное количество потоков 10. Вполне подходит для нашей цели.

Это означает, что нам нужно указать только компонент Trigger .

CronTrigger и интервал

Мы используем функцию CronTrigger , чтобы получить функциональность CRON. Затем нужно указать правильные значения года, месяца, дня, часа, минуты и секунды. Существует множество сайтов с примерами, здесь же мы определяем несколько параметров на выбор:

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

Когда мы добавляем задание, мы указываем триггер следующим образом:

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

Также можно указать интервал:

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

Это также пригодится при тестировании нашего приложения.

Несколько экземпляров

Когда задание запускается, APScheduler проверяет, не является ли оно уже активным. Если оно активно, отображается следующее сообщение:

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

Обычно это желательно, но в некоторых случаях вы можете разрешить одновременную работу нескольких экземпляров. Это можно указать с помощью параметра 'max_instances':

  • Если 1, то, когда экземпляр уже запущен, новый экземпляр не будет запущен, а будет пропущен.
  • Если 2, то, когда экземпляр уже запущен, будет запущен новый экземпляр, и будут запущены два экземпляра.
  • И т. д.

Запуск задания при старте программы

По умолчанию мы должны ждать, пока CRONTrigger запустит задание. Часто мы не хотим этого, а хотим, чтобы задание запускалось сразу после запуска программы. В таких случаях нам нужно изменить существующее задание и установить параметр 'next_run_time':

job.modify(next_run_time=datetime.datetime.now())

События и слушатель

Для нашего приложения резервного копирования это не очень нужно, но может быть полезно для вас. Здесь мы добавим код для трех случаев и увеличим счетчики для каждого события:

  • Когда выполняется задание: EVENT_JOB_EXECUTED
  • Когда в задании возникает ошибка: EVENT_JOB_ERROR
  • Когда выполнение задания было пропущено: EVENT_JOB_MISSED

Когда задание добавляется в планировщик, ему присваивается идентификатор job_id. В слушателе мы извлекаем имя_задания, сначала извлекая задание из события, а затем используя обратный поиск по идентификатору задания. Обратите внимание, что событие EVENT_JOB_MISSED никогда не попадает в слушатель.

Код

Ниже приведен код для демонстрации этого.
Мы добавили немного дополнительного кода для отображения статистики выполнения.
BackupRunner инициализируется методом 'job_backup' в планировщике.
Здесь мы можем добавить больше инициализации и переменных и, по желанию, отловить ошибки.
Закомментируйте/отмените некоторые строки, чтобы попробовать различные триггеры: CronTrigger и интервал.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Вот несколько строк, выведенных в консоль:

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Summary

Реализовать повторяющиеся задания очень просто с помощью APscheduler. Здесь мы затронули только основы, но этого будет достаточно для многих случаев использования. Чего не хватает, так это простого способа проверить, какие задания выполняются и сколько экземпляров задания запущено. Но это лишь приятная мелочь.

Ссылки / кредиты

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

Подробнее

Scheduling

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.