angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Basis taakplanning met APScheduler

Plan eenvoudige taken eenvoudig met APScheduler en gebruik meer functionaliteit voor complexe taken.

19 oktober 2024
post main image
https://unsplash.com/@sarahs_captured_moments

Je hebt een Python programma gemaakt, bijvoorbeeld om een back-up te maken van een database, en nu wil je het elk uur uitvoeren, of elke dag.

Er zijn veel oplossingen om uit te kiezen, hier gebruiken we APScheduler, 'Advanced Python Scheduler', een taakplanner die aan veel eisen voldoet. Voor onze taak lijkt dit pakket misschien overkill, maar laten we het hier gebruiken en als we eenmaal de basis begrijpen, willen we het misschien gebruiken voor complexere taken.

Zoals altijd doe ik dit op Ubuntu 22.04.

Ons back-upprogramma

Ons back-upprogramma stelt niet veel voor:

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

Voor deze demonstratie hebben we twee variabelen toegevoegd. Om het gedrag van onze planner te kunnen controleren, voegen we 'random.randint' toe om af en toe een uitzondering te genereren.

Jobs

Natuurlijk gaat dit pakket over het plannen van jobs. Hiervoor wordt gebruik gemaakt van een scheduler en de volgende componenten:

  • Trigger
    De trigger vertelt wanneer de taak moet worden uitgevoerd. Hier gebruiken we de CronTrigger.
  • Job store
    De job store biedt plaats aan geplande taken. De standaard job store, MemoryJobStore, bewaart de taken gewoon in het geheugen. Prima voor ons doel.
  • Executor
    De executor dient een taak in bij een thread of process pool. Als de taak klaar is, wordt de scheduler op de hoogte gebracht en wordt een event, zoals 'executed' of 'error', gegenereerd. De standaard uitvoerder, ThreadPoolExecutor, heeft een maximaal aantal threads van 10. Dit is prima voor ons doel. Prima voor ons doel.

Dit betekent dat we alleen de Trigger component hoeven te specificeren.

CronTrigger en interval

We gebruiken de CronTrigger functie om CRON functionaliteit te krijgen. Het komt er dan op aan om de juiste waarden op te geven voor jaar, maand, dag, uur, minuut en seconde. Er zijn veel websites met voorbeelden, hier definiëren we enkele instellingen waaruit we kunnen kiezen:

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

Wanneer we een taak toevoegen, specificeren we de trigger als volgt:

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

Het is ook mogelijk om een interval op te geven:

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

Dit komt ook van pas bij het testen van onze applicatie.

Meerdere instanties

Wanneer een taak wordt gestart, controleert APScheduler of deze niet al actief is. Als dat wel het geval is, wordt het volgende bericht weergegeven:

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

Meestal is dit wenselijk, maar in sommige gevallen wil je misschien meerdere instanties tegelijk toestaan. Dit kan worden opgegeven met de parameter 'max_instances':

  • Als 1, dan wordt, als er al een instantie draait, een nieuwe instantie niet gestart maar overgeslagen.
  • Als 2, als er al een instantie draait, dan wordt er een nieuwe instantie gestart en draaien er twee instanties.
  • Enz.

Taak starten bij programmastart

Standaard moeten we wachten tot CRONTrigger een job start. Vaak willen we dat niet, we willen dat de taak direct wordt uitgevoerd nadat we het programma hebben gestart. In deze gevallen moeten we de bestaande job aanpassen en de parameter 'next_run_time' instellen:

job.modify(next_run_time=datetime.datetime.now())

Gebeurtenissen en luisteraar

We hebben dit niet echt nodig voor onze back-uptoepassing, maar het kan nuttig zijn voor jou. Hier voegen we code toe voor drie gevallen en incrementeren we tellers voor elke gebeurtenis:

  • Wanneer een taak wordt uitgevoerd: EVENT_JOB_EXECUTED
  • Wanneer een taak een fout heeft: EVENT_JOB_ERROR
  • Wanneer een taakuitvoering is gemist: EVENT_JOB_MISSED

Wanneer een taak wordt toegevoegd aan de planner, krijgt het een job_id toegewezen. In de listener halen we de job_naam eruit door eerst de job uit de gebeurtenis te halen en dan een omgekeerde lookup te gebruiken met behulp van de job_id. Merk op dat de gebeurtenis EVENT_JOB_MISSED nooit de luisteraar bereikt.

De code

Hieronder staat de code om dit te demonstreren.
We hebben wat extra code toegevoegd om de run-statistieken te laten zien.
De BackupRunner wordt geïnitialiseerd door de 'job_backup' methode in de scheduler.
Hier kunnen we meer initialisatie en variabelen toevoegen en optioneel fouten opvangen.
Commentarieer/decommentarieer wat regels om verschillende triggers te proberen: CronTrigger en interval.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Hier zijn enkele regels die naar de console worden afgedrukt:

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Samenvatting

Het implementeren van herhalende taken is eenvoudig met APscheduler. We hebben hier alleen de basis aangeraakt, maar dit is goed genoeg voor veel use cases. Wat ontbreekt is een gemakkelijke manier om te controleren welke taken worden uitgevoerd en hoeveel instanties van een taak worden uitgevoerd. Maar dat is alleen maar leuk om te hebben.

Links / credits

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

Lees meer

Scheduling

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.