angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Grundlegende Auftragsplanung mit APScheduler

Planen Sie einfache Aufgaben mit APScheduler und nutzen Sie weitere Funktionen für komplexe Aufgaben.

19 Oktober 2024
post main image
https://unsplash.com/@sarahs_captured_moments

Sie haben ein Python Programm erstellt, zum Beispiel, um eine Datenbank zu sichern, und wollen es nun jede Stunde oder jeden Tag laufen lassen.

Es gibt viele Lösungen zur Auswahl, hier verwenden wir APScheduler, 'Advanced Python Scheduler', einen Job Scheduler, der viele Anforderungen erfüllt. Für unsere Aufgabe mag dieses Paket übertrieben erscheinen, aber wir wollen es hier verwenden, und sobald wir die Grundlagen verstanden haben, werden wir es vielleicht für komplexere Aufgaben verwenden wollen.

Wie immer mache ich dies auf Ubuntu 22.04.

Unser Sicherungsprogramm

Es gibt nicht viel zu unserem Sicherungsprogramm:

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

Für diese Demonstration haben wir zwei Variablen hinzugefügt. Um das Verhalten unseres Schedulers überprüfen zu können, fügen wir 'random.randint' hinzu, um ab und zu eine Ausnahme zu erzeugen.

Aufträge

In diesem Paket geht es natürlich um die Planung von Aufträgen. Zu diesem Zweck werden ein Scheduler und die folgenden Komponenten verwendet:

  • Trigger
    Der Trigger gibt an, wann der Job laufen muss. Hier verwenden wir den CronTrigger.
  • Job store
    Der Jobspeicher beherbergt einen geplanten Job. Der Standard-Job-Store, MemoryJobStore, hält die Jobs einfach im Speicher. Das ist für unseren Zweck ausreichend.
  • Executor
    Der Executor übergibt in der Regel einen Auftrag an einen Thread oder Prozesspool. Wenn der Auftrag erledigt ist, benachrichtigt er den Scheduler und erzeugt ein Ereignis, wie 'executed' oder 'error'. Der Standard-Executor, ThreadPoolExecutor, hat eine maximale Thread-Anzahl von 10. Für unseren Zweck ist das gut.

Das bedeutet, dass wir nur die Komponente Trigger angeben müssen.

CronTrigger und Intervall

Wir verwenden die Funktion CronTrigger , um die CRON-Funktionalität zu erhalten. Es kommt dann darauf an, die richtigen Werte für Jahr, Monat, Tag, Stunde, Minute und Sekunde anzugeben. Es gibt viele Websites mit Beispielen, hier definieren wir einige Einstellungen, aus denen wir wählen können:

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

Wenn wir einen Job hinzufügen, geben wir den Auslöser wie folgt an:

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

Es ist auch möglich, ein Intervall anzugeben:

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

Dies ist auch beim Testen unserer Anwendung nützlich.

Mehrere Instanzen

Wenn ein Job startet, prüft APScheduler , ob er nicht bereits aktiv ist. Wenn dies der Fall ist, wird die folgende Meldung angezeigt:

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

Normalerweise ist dies wünschenswert, aber in manchen Fällen möchten Sie vielleicht mehrere Instanzen gleichzeitig zulassen. Dies kann mit dem Parameter 'max_instances' festgelegt werden:

  • Wenn 1, dann wird, wenn eine Instanz bereits läuft, eine neue Instanz nicht gestartet, sondern übersprungen
  • Wenn 2, dann wird, wenn eine Instanz bereits läuft, eine neue Instanz gestartet, und es laufen zwei Instanzen.
  • Etc.

Job bei Programmstart starten

Standardmäßig müssen wir auf CRONTrigger warten, um einen Job zu starten. Oft wollen wir das nicht, sondern den Job sofort nach dem Programmstart starten. In diesen Fällen müssen wir den bestehenden Job ändern und den Parameter 'next_run_time' setzen:

job.modify(next_run_time=datetime.datetime.now())

Ereignisse und Listener

Für unsere Backup-Anwendung brauchen wir das eigentlich nicht, aber es könnte für Sie nützlich sein. Hier fügen wir Code für drei Fälle hinzu und erhöhen die Zähler für jedes Ereignis:

  • Wenn ein Auftrag ausgeführt wird: EVENT_JOB_EXECUTED
  • Wenn ein Auftrag einen Fehler hat: EVENT_JOB_ERROR
  • Wenn ein Auftrag nicht ausgeführt wurde: EVENT_JOB_MISSED

Wenn ein Job dem Scheduler hinzugefügt wird, erhält er eine Job_id. Im Listener extrahieren wir den job_name, indem wir zuerst den Job aus dem Ereignis extrahieren und dann eine umgekehrte Suche anhand der job_id durchführen. Beachten Sie, dass das Ereignis EVENT_JOB_MISSED den Listener nie erreicht.

Der Code

Nachfolgend finden Sie den Code zur Veranschaulichung.
Wir haben zusätzlichen Code hinzugefügt, um die Laufstatistiken anzuzeigen.
Der BackupRunner wird durch die Methode 'job_backup' im Scheduler initialisiert.
Hier können wir weitere Initialisierungen und Variablen hinzufügen und optional Fehler abfangen.
Kommentieren Sie einige Zeilen, um verschiedene Auslöser auszuprobieren: CronTrigger und Intervall.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Hier sind einige Zeilen, die auf der Konsole ausgegeben werden:

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Zusammenfassung

Die Implementierung von sich wiederholenden Jobs ist mit APscheduler einfach. Wir haben hier nur die Grundlagen berührt, aber das wird für viele Anwendungsfälle ausreichen. Was fehlt, ist eine einfache Möglichkeit zu überprüfen, welche Jobs laufen und wie viele Instanzen eines Jobs laufen. Aber das ist nur ein "nice to have".

Links / Impressum

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

Mehr erfahren

Scheduling

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.