Grundlegende Auftragsplanung mit APScheduler
Planen Sie einfache Aufgaben mit APScheduler und nutzen Sie weitere Funktionen für komplexe Aufgaben.
Sie haben ein Python Programm erstellt, zum Beispiel, um eine Datenbank zu sichern, und wollen es nun jede Stunde oder jeden Tag laufen lassen.
Es gibt viele Lösungen zur Auswahl, hier verwenden wir APScheduler, 'Advanced Python Scheduler', einen Job Scheduler, der viele Anforderungen erfüllt. Für unsere Aufgabe mag dieses Paket übertrieben erscheinen, aber wir wollen es hier verwenden, und sobald wir die Grundlagen verstanden haben, werden wir es vielleicht für komplexere Aufgaben verwenden wollen.
Wie immer mache ich dies auf Ubuntu 22.04.
Unser Sicherungsprogramm
Es gibt nicht viel zu unserem Sicherungsprogramm:
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
Für diese Demonstration haben wir zwei Variablen hinzugefügt. Um das Verhalten unseres Schedulers überprüfen zu können, fügen wir 'random.randint' hinzu, um ab und zu eine Ausnahme zu erzeugen.
Aufträge
In diesem Paket geht es natürlich um die Planung von Aufträgen. Zu diesem Zweck werden ein Scheduler und die folgenden Komponenten verwendet:
- Trigger
Der Trigger gibt an, wann der Job laufen muss. Hier verwenden wir den CronTrigger. - Job store
Der Jobspeicher beherbergt einen geplanten Job. Der Standard-Job-Store, MemoryJobStore, hält die Jobs einfach im Speicher. Das ist für unseren Zweck ausreichend. - Executor
Der Executor übergibt in der Regel einen Auftrag an einen Thread oder Prozesspool. Wenn der Auftrag erledigt ist, benachrichtigt er den Scheduler und erzeugt ein Ereignis, wie 'executed' oder 'error'. Der Standard-Executor, ThreadPoolExecutor, hat eine maximale Thread-Anzahl von 10. Für unseren Zweck ist das gut.
Das bedeutet, dass wir nur die Komponente Trigger angeben müssen.
CronTrigger und Intervall
Wir verwenden die Funktion CronTrigger , um die CRON-Funktionalität zu erhalten. Es kommt dann darauf an, die richtigen Werte für Jahr, Monat, Tag, Stunde, Minute und Sekunde anzugeben. Es gibt viele Websites mit Beispielen, hier definieren wir einige Einstellungen, aus denen wir wählen können:
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
Wenn wir einen Job hinzufügen, geben wir den Auslöser wie folgt an:
add_job(
...
trigger=CronTrigger(...)
...
)
Es ist auch möglich, ein Intervall anzugeben:
add_job(
...
trigger='interval',
seconds=1,
...
)
Dies ist auch beim Testen unserer Anwendung nützlich.
Mehrere Instanzen
Wenn ein Job startet, prüft APScheduler , ob er nicht bereits aktiv ist. Wenn dies der Fall ist, wird die folgende Meldung angezeigt:
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
Normalerweise ist dies wünschenswert, aber in manchen Fällen möchten Sie vielleicht mehrere Instanzen gleichzeitig zulassen. Dies kann mit dem Parameter 'max_instances' festgelegt werden:
- Wenn 1, dann wird, wenn eine Instanz bereits läuft, eine neue Instanz nicht gestartet, sondern übersprungen
- Wenn 2, dann wird, wenn eine Instanz bereits läuft, eine neue Instanz gestartet, und es laufen zwei Instanzen.
- Etc.
Job bei Programmstart starten
Standardmäßig müssen wir auf CRONTrigger warten, um einen Job zu starten. Oft wollen wir das nicht, sondern den Job sofort nach dem Programmstart starten. In diesen Fällen müssen wir den bestehenden Job ändern und den Parameter 'next_run_time' setzen:
job.modify(next_run_time=datetime.datetime.now())
Ereignisse und Listener
Für unsere Backup-Anwendung brauchen wir das eigentlich nicht, aber es könnte für Sie nützlich sein. Hier fügen wir Code für drei Fälle hinzu und erhöhen die Zähler für jedes Ereignis:
- Wenn ein Auftrag ausgeführt wird: EVENT_JOB_EXECUTED
- Wenn ein Auftrag einen Fehler hat: EVENT_JOB_ERROR
- Wenn ein Auftrag nicht ausgeführt wurde: EVENT_JOB_MISSED
Wenn ein Job dem Scheduler hinzugefügt wird, erhält er eine Job_id. Im Listener extrahieren wir den job_name, indem wir zuerst den Job aus dem Ereignis extrahieren und dann eine umgekehrte Suche anhand der job_id durchführen. Beachten Sie, dass das Ereignis EVENT_JOB_MISSED den Listener nie erreicht.
Der Code
Nachfolgend finden Sie den Code zur Veranschaulichung.
Wir haben zusätzlichen Code hinzugefügt, um die Laufstatistiken anzuzeigen.
Der BackupRunner wird durch die Methode 'job_backup' im Scheduler initialisiert.
Hier können wir weitere Initialisierungen und Variablen hinzufügen und optional Fehler abfangen.
Kommentieren Sie einige Zeilen, um verschiedene Auslöser auszuprobieren: CronTrigger und Intervall.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Hier sind einige Zeilen, die auf der Konsole ausgegeben werden:
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Zusammenfassung
Die Implementierung von sich wiederholenden Jobs ist mit APscheduler einfach. Wir haben hier nur die Grundlagen berührt, aber das wird für viele Anwendungsfälle ausreichen. Was fehlt, ist eine einfache Möglichkeit zu überprüfen, welche Jobs laufen und wie viele Instanzen eines Jobs laufen. Aber das ist nur ein "nice to have".
Links / Impressum
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
Mehr erfahren
Scheduling
Neueste
- Zeitreihenchart mit Flask, Bootstrap und Chart.js
- Verwendung von IPv6 mit Microk8s
- Verwendung von Ingress für den Zugriff auf RabbitMQ auf einem Microk8s -Cluster
- Einfache Videogalerie mit Flask, Jinja, Bootstrap und JQuery
- Grundlegende Auftragsplanung mit APScheduler
- Ein Datenbankschalter mit HAProxy und der HAProxy Runtime API
Meistgesehen
- Grundlegende Auftragsplanung mit APScheduler
- Verhinderung des Versands doppelter Nachrichten an ein entferntes System
- LSTM mehrstufige hyperparameter Optimierung mit Keras Tuner
- Dokumentieren einer Flask RESTful API mit OpenAPI (Swagger) mit APISpec
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von IPv6 mit Microk8s