Basis taakplanning met APScheduler
Plan eenvoudige taken eenvoudig met APScheduler en gebruik meer functionaliteit voor complexe taken.
Je hebt een Python programma gemaakt, bijvoorbeeld om een back-up te maken van een database, en nu wil je het elk uur uitvoeren, of elke dag.
Er zijn veel oplossingen om uit te kiezen, hier gebruiken we APScheduler, 'Advanced Python Scheduler', een taakplanner die aan veel eisen voldoet. Voor onze taak lijkt dit pakket misschien overkill, maar laten we het hier gebruiken en als we eenmaal de basis begrijpen, willen we het misschien gebruiken voor complexere taken.
Zoals altijd doe ik dit op Ubuntu 22.04.
Ons back-upprogramma
Ons back-upprogramma stelt niet veel voor:
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
Voor deze demonstratie hebben we twee variabelen toegevoegd. Om het gedrag van onze planner te kunnen controleren, voegen we 'random.randint' toe om af en toe een uitzondering te genereren.
Jobs
Natuurlijk gaat dit pakket over het plannen van jobs. Hiervoor wordt gebruik gemaakt van een scheduler en de volgende componenten:
- Trigger
De trigger vertelt wanneer de taak moet worden uitgevoerd. Hier gebruiken we de CronTrigger. - Job store
De job store biedt plaats aan geplande taken. De standaard job store, MemoryJobStore, bewaart de taken gewoon in het geheugen. Prima voor ons doel. - Executor
De executor dient een taak in bij een thread of process pool. Als de taak klaar is, wordt de scheduler op de hoogte gebracht en wordt een event, zoals 'executed' of 'error', gegenereerd. De standaard uitvoerder, ThreadPoolExecutor, heeft een maximaal aantal threads van 10. Dit is prima voor ons doel. Prima voor ons doel.
Dit betekent dat we alleen de Trigger component hoeven te specificeren.
CronTrigger en interval
We gebruiken de CronTrigger functie om CRON functionaliteit te krijgen. Het komt er dan op aan om de juiste waarden op te geven voor jaar, maand, dag, uur, minuut en seconde. Er zijn veel websites met voorbeelden, hier definiëren we enkele instellingen waaruit we kunnen kiezen:
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
Wanneer we een taak toevoegen, specificeren we de trigger als volgt:
add_job(
...
trigger=CronTrigger(...)
...
)
Het is ook mogelijk om een interval op te geven:
add_job(
...
trigger='interval',
seconds=1,
...
)
Dit komt ook van pas bij het testen van onze applicatie.
Meerdere instanties
Wanneer een taak wordt gestart, controleert APScheduler of deze niet al actief is. Als dat wel het geval is, wordt het volgende bericht weergegeven:
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
Meestal is dit wenselijk, maar in sommige gevallen wil je misschien meerdere instanties tegelijk toestaan. Dit kan worden opgegeven met de parameter 'max_instances':
- Als 1, dan wordt, als er al een instantie draait, een nieuwe instantie niet gestart maar overgeslagen.
- Als 2, als er al een instantie draait, dan wordt er een nieuwe instantie gestart en draaien er twee instanties.
- Enz.
Taak starten bij programmastart
Standaard moeten we wachten tot CRONTrigger een job start. Vaak willen we dat niet, we willen dat de taak direct wordt uitgevoerd nadat we het programma hebben gestart. In deze gevallen moeten we de bestaande job aanpassen en de parameter 'next_run_time' instellen:
job.modify(next_run_time=datetime.datetime.now())
Gebeurtenissen en luisteraar
We hebben dit niet echt nodig voor onze back-uptoepassing, maar het kan nuttig zijn voor jou. Hier voegen we code toe voor drie gevallen en incrementeren we tellers voor elke gebeurtenis:
- Wanneer een taak wordt uitgevoerd: EVENT_JOB_EXECUTED
- Wanneer een taak een fout heeft: EVENT_JOB_ERROR
- Wanneer een taakuitvoering is gemist: EVENT_JOB_MISSED
Wanneer een taak wordt toegevoegd aan de planner, krijgt het een job_id toegewezen. In de listener halen we de job_naam eruit door eerst de job uit de gebeurtenis te halen en dan een omgekeerde lookup te gebruiken met behulp van de job_id. Merk op dat de gebeurtenis EVENT_JOB_MISSED nooit de luisteraar bereikt.
De code
Hieronder staat de code om dit te demonstreren.
We hebben wat extra code toegevoegd om de run-statistieken te laten zien.
De BackupRunner wordt geïnitialiseerd door de 'job_backup' methode in de scheduler.
Hier kunnen we meer initialisatie en variabelen toevoegen en optioneel fouten opvangen.
Commentarieer/decommentarieer wat regels om verschillende triggers te proberen: CronTrigger en interval.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Hier zijn enkele regels die naar de console worden afgedrukt:
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Samenvatting
Het implementeren van herhalende taken is eenvoudig met APscheduler. We hebben hier alleen de basis aangeraakt, maar dit is goed genoeg voor veel use cases. Wat ontbreekt is een gemakkelijke manier om te controleren welke taken worden uitgevoerd en hoeveel instanties van een taak worden uitgevoerd. Maar dat is alleen maar leuk om te hebben.
Links / credits
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
Lees meer
Scheduling
Recent
Meest bekeken
- Basis taakplanning met APScheduler
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- LSTM meerstappen-optimalisatie hyperparameter met Keras Tuner
- Voorkomen dat dubbele berichten naar een extern systeem worden gestuurd
- Documenteren van een Flask RESTful API met OpenAPI (Swagger) met gebruikmaking van APISpec
- Testen van de RabbitMQ Pika publicatievoorbeelden