Planification de base des tâches avec APScheduler
Planifiez facilement des tâches simples avec APScheduler et utilisez davantage de fonctionnalités pour les tâches complexes.
Vous avez créé un programme Python , par exemple pour sauvegarder une base de données, et vous voulez maintenant l'exécuter toutes les heures, ou tous les jours.
Il existe de nombreuses solutions, nous utilisons ici APScheduler, 'Advanced Python Scheduler', un planificateur de tâches qui répond à de nombreuses exigences. Pour notre tâche, ce paquet peut sembler excessif, mais utilisons-le ici et une fois que nous aurons compris les bases, nous voudrons peut-être l'utiliser pour des tâches plus complexes.
Comme toujours, je le fais sur Ubuntu 22.04.
Notre programme de sauvegarde
Il n'y a pas grand-chose à notre programme de sauvegarde :
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
Pour cette démonstration, nous avons ajouté deux variables. Pour pouvoir vérifier le comportement de notre planificateur, nous ajoutons 'random.randint' pour générer une exception de temps en temps.
Emplois
Bien entendu, ce paquetage concerne l'ordonnancement des tâches. Pour ce faire, il utilise un planificateur et les composants suivants :
- Trigger
Le déclencheur indique quand le travail doit être exécuté. Ici, nous utilisons le CronTrigger. - Job store
Le magasin de tâches (job store) contient les tâches planifiées. Le magasin de tâches par défaut, MemoryJobStore, conserve simplement les tâches en mémoire. Cela convient à notre objectif. - Exécutant
L'exécuteur soumet généralement un travail à un thread ou à un pool de processus. Lorsque le travail est terminé, il notifie l'ordonnanceur et un événement, comme 'executed' ou 'error', est généré. L'exécuteur par défaut, ThreadPoolExecutor, a un nombre maximal de threads de 10. C'est parfait pour notre objectif.
Cela signifie que nous devons seulement spécifier le composant Trigger .
CronTrigger et intervalle
Nous utilisons la fonction CronTrigger pour obtenir la fonctionnalité CRON. Il s'agit ensuite de spécifier les valeurs correctes pour l'année, le mois, le jour, l'heure, la minute et la seconde. Il existe de nombreux sites web avec des exemples, ici nous définissons quelques paramètres à choisir :
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
Lorsque nous ajoutons un travail, nous spécifions le déclencheur comme suit :
add_job(
...
trigger=CronTrigger(...)
...
)
Il est également possible de spécifier un intervalle :
add_job(
...
trigger='interval',
seconds=1,
...
)
Cela s'avère également utile lorsque nous testons notre application.
Instances multiples
Lorsqu'un travail démarre, APScheduler vérifie qu'il n'est pas déjà actif. Si c'est le cas, le message suivant s'affiche :
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
En général, cela est souhaitable, mais dans certains cas, vous pouvez vouloir autoriser plusieurs instances en même temps. Cela peut être spécifié avec le paramètre 'max_instances' :
- Si 1, lorsqu'une instance est déjà en cours d'exécution, une nouvelle instance ne sera pas démarrée mais ignorée.
- Si 2, lorsqu'une instance est déjà en cours d'exécution, une nouvelle instance sera démarrée et deux instances seront en cours d'exécution.
- Etc.
Démarrer le travail au démarrage du programme
Par défaut, nous devons attendre que CRONTrigger démarre un travail. Souvent, ce n'est pas ce que nous voulons, nous voulons que le travail soit exécuté immédiatement après le démarrage du programme. Dans ce cas, nous devons modifier le job existant et définir le paramètre 'next_run_time' :
job.modify(next_run_time=datetime.datetime.now())
Événements et auditeurs
Nous n'en avons pas vraiment besoin pour notre application de sauvegarde, mais cela pourrait vous être utile. Ici, nous ajoutons du code pour trois cas et nous incrémentons les compteurs pour chaque événement :
- Lorsqu'un travail est exécuté : EVENT_JOB_EXECUTED
- Lorsqu'un travail a une erreur : EVENT_JOB_ERROR
- Lorsqu'un travail n'a pas été exécuté : EVENT_JOB_MISSED
Lorsqu'un travail est ajouté à l'ordonnanceur, un numéro d'identification lui est attribué. Dans l'auditeur, nous extrayons le nom du travail en extrayant d'abord le travail de l'événement, puis en effectuant une recherche inversée à partir de l'identifiant du travail. Notez que l'événement EVENT_JOB_MISSED n'atteint jamais l'auditeur.
Le code
Voici le code de démonstration.
Nous avons ajouté du code supplémentaire pour afficher les statistiques d'exécution.
Le BackupRunner est initialisé par la méthode 'job_backup' de l'ordonnanceur.
Nous pouvons ici ajouter d'autres initialisations et variables, et éventuellement détecter des erreurs.
Commenter/décommenter certaines lignes pour essayer différents déclencheurs : CronTrigger et intervalle.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Voici quelques lignes imprimées sur la console :
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Résumé
L'implémentation de tâches répétitives est facile avec APscheduler. Nous n'avons abordé ici que les bases, mais cela suffira pour de nombreux cas d'utilisation. Ce qui manque, c'est un moyen facile de vérifier quels travaux sont en cours et combien d'instances d'un travail sont en cours d'exécution. Mais ce n'est qu'un avantage.
Liens / crédits
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
En savoir plus...
Scheduling
Récent
- Graphique de séries temporelles avec Flask, Bootstrap et Chart.js
- Utiliser IPv6 avec Microk8s
- Utilisation de Ingress pour accéder à RabbitMQ sur un cluster Microk8s
- Galerie vidéo simple avec Flask, Jinja, Bootstrap et JQuery
- Planification de base des tâches avec APScheduler
- Un commutateur de base de données avec HAProxy et HAProxy Runtime API
Les plus consultés
- Utiliser PyInstaller et Cython pour créer un exécutable Python
- Réduire les temps de réponse d'un Flask SQLAlchemy site web
- Utilisation des Python's pyOpenSSL pour vérifier les certificats SSL téléchargés d'un hôte
- Connexion à un service sur un hôte Docker à partir d'un conteneur Docker
- Utiliser UUIDs au lieu de Integer Autoincrement Primary Keys avec SQLAlchemy et MariaDb
- SQLAlchemy : Utilisation de Cascade Deletes pour supprimer des objets connexes