angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Planification de base des tâches avec APScheduler

Planifiez facilement des tâches simples avec APScheduler et utilisez davantage de fonctionnalités pour les tâches complexes.

19 octobre 2024
Dans Scheduling
post main image
https://unsplash.com/@sarahs_captured_moments

Vous avez créé un programme Python , par exemple pour sauvegarder une base de données, et vous voulez maintenant l'exécuter toutes les heures, ou tous les jours.

Il existe de nombreuses solutions, nous utilisons ici APScheduler, 'Advanced Python Scheduler', un planificateur de tâches qui répond à de nombreuses exigences. Pour notre tâche, ce paquet peut sembler excessif, mais utilisons-le ici et une fois que nous aurons compris les bases, nous voudrons peut-être l'utiliser pour des tâches plus complexes.

Comme toujours, je le fais sur Ubuntu 22.04.

Notre programme de sauvegarde

Il n'y a pas grand-chose à notre programme de sauvegarde :

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

Pour cette démonstration, nous avons ajouté deux variables. Pour pouvoir vérifier le comportement de notre planificateur, nous ajoutons 'random.randint' pour générer une exception de temps en temps.

Emplois

Bien entendu, ce paquetage concerne l'ordonnancement des tâches. Pour ce faire, il utilise un planificateur et les composants suivants :

  • Trigger
    Le déclencheur indique quand le travail doit être exécuté. Ici, nous utilisons le CronTrigger.
  • Job store
    Le magasin de tâches (job store) contient les tâches planifiées. Le magasin de tâches par défaut, MemoryJobStore, conserve simplement les tâches en mémoire. Cela convient à notre objectif.
  • Exécutant
    L'exécuteur soumet généralement un travail à un thread ou à un pool de processus. Lorsque le travail est terminé, il notifie l'ordonnanceur et un événement, comme 'executed' ou 'error', est généré. L'exécuteur par défaut, ThreadPoolExecutor, a un nombre maximal de threads de 10. C'est parfait pour notre objectif.

Cela signifie que nous devons seulement spécifier le composant Trigger .

CronTrigger et intervalle

Nous utilisons la fonction CronTrigger pour obtenir la fonctionnalité CRON. Il s'agit ensuite de spécifier les valeurs correctes pour l'année, le mois, le jour, l'heure, la minute et la seconde. Il existe de nombreux sites web avec des exemples, ici nous définissons quelques paramètres à choisir :

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

Lorsque nous ajoutons un travail, nous spécifions le déclencheur comme suit :

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

Il est également possible de spécifier un intervalle :

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

Cela s'avère également utile lorsque nous testons notre application.

Instances multiples

Lorsqu'un travail démarre, APScheduler vérifie qu'il n'est pas déjà actif. Si c'est le cas, le message suivant s'affiche :

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

En général, cela est souhaitable, mais dans certains cas, vous pouvez vouloir autoriser plusieurs instances en même temps. Cela peut être spécifié avec le paramètre 'max_instances' :

  • Si 1, lorsqu'une instance est déjà en cours d'exécution, une nouvelle instance ne sera pas démarrée mais ignorée.
  • Si 2, lorsqu'une instance est déjà en cours d'exécution, une nouvelle instance sera démarrée et deux instances seront en cours d'exécution.
  • Etc.

Démarrer le travail au démarrage du programme

Par défaut, nous devons attendre que CRONTrigger démarre un travail. Souvent, ce n'est pas ce que nous voulons, nous voulons que le travail soit exécuté immédiatement après le démarrage du programme. Dans ce cas, nous devons modifier le job existant et définir le paramètre 'next_run_time' :

job.modify(next_run_time=datetime.datetime.now())

Événements et auditeurs

Nous n'en avons pas vraiment besoin pour notre application de sauvegarde, mais cela pourrait vous être utile. Ici, nous ajoutons du code pour trois cas et nous incrémentons les compteurs pour chaque événement :

  • Lorsqu'un travail est exécuté : EVENT_JOB_EXECUTED
  • Lorsqu'un travail a une erreur : EVENT_JOB_ERROR
  • Lorsqu'un travail n'a pas été exécuté : EVENT_JOB_MISSED

Lorsqu'un travail est ajouté à l'ordonnanceur, un numéro d'identification lui est attribué. Dans l'auditeur, nous extrayons le nom du travail en extrayant d'abord le travail de l'événement, puis en effectuant une recherche inversée à partir de l'identifiant du travail. Notez que l'événement EVENT_JOB_MISSED n'atteint jamais l'auditeur.

Le code

Voici le code de démonstration.
Nous avons ajouté du code supplémentaire pour afficher les statistiques d'exécution.
Le BackupRunner est initialisé par la méthode 'job_backup' de l'ordonnanceur.
Nous pouvons ici ajouter d'autres initialisations et variables, et éventuellement détecter des erreurs.
Commenter/décommenter certaines lignes pour essayer différents déclencheurs : CronTrigger et intervalle.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Voici quelques lignes imprimées sur la console :

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Résumé

L'implémentation de tâches répétitives est facile avec APscheduler. Nous n'avons abordé ici que les bases, mais cela suffira pour de nombreux cas d'utilisation. Ce qui manque, c'est un moyen facile de vérifier quels travaux sont en cours et combien d'instances d'un travail sont en cours d'exécution. Mais ce n'est qu'un avantage.

Liens / crédits

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

En savoir plus...

Scheduling

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.