Programación básica de trabajos con APScheduler
Programe trabajos sencillos fácilmente con APScheduler y utilice más funciones para tareas complejas.
Has creado un programa Python , por ejemplo, para hacer una copia de seguridad de una base de datos, y ahora quieres ejecutarlo cada hora, o cada día.
Hay muchas soluciones para elegir, aquí utilizamos APScheduler, 'Advanced Python Scheduler', un programador de tareas que cumple muchos requisitos. Para nuestra tarea, este paquete puede parecer exagerado, pero vamos a usarlo aquí y una vez que entendamos lo básico, es posible que queramos usarlo para tareas más complejas.
Como siempre hago esto en Ubuntu 22.04.
Nuestro programa de copia de seguridad
No hay mucho a nuestro programa de copia de seguridad:
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
Para esta demostración añadimos dos variables. Para poder comprobar el comportamiento de nuestro planificador, añadimos 'random.randint' para generar una excepción de vez en cuando.
Trabajos
Por supuesto, este paquete trata sobre la programación de trabajos. Para ello utiliza un planificador y los siguientes componentes:
- Trigger
El disparador indica cuándo debe ejecutarse el trabajo. Aquí usamos el CronTrigger. - Job store
El almacén de trabajos aloja los trabajos programados. El almacén de trabajos por defecto, MemoryJobStore, simplemente mantiene los trabajos en memoria. Bien para nuestro propósito. - Ejecutor
El ejecutor normalmente envía un trabajo a un hilo o a un pool de procesos. Cuando el trabajo está hecho, notifica al planificador y se genera un evento, como 'executed' o 'error'. El ejecutor por defecto, ThreadPoolExecutor, viene con un máximo de 10 hilos. Bien para nuestro propósito.
Esto significa que sólo tenemos que especificar el componente Trigger .
CronTrigger e intervalo
Utilizamos la función CronTrigger para obtener la funcionalidad CRON. Se trata entonces de especificar los valores correctos para año, mes, día, hora, minuto y segundo. Hay muchos sitios web con ejemplos, aquí definimos algunas configuraciones para elegir:
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
Cuando añadimos un trabajo, especificamos el disparador de la siguiente manera:
add_job(
...
trigger=CronTrigger(...)
...
)
También es posible especificar un intervalo:
add_job(
...
trigger='interval',
seconds=1,
...
)
Esto también resulta útil a la hora de probar nuestra aplicación.
Instancias múltiples
Cuando se inicia un trabajo, APScheduler comprueba que no esté ya activo. Si lo está, se muestra el siguiente mensaje:
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
Por lo general, esto es deseable, pero en algunos casos es posible que desee permitir múltiples instancias al mismo tiempo. Esto se puede especificar con el parámetro 'max_instances':
- Si es 1, cuando una instancia ya se esté ejecutando, no se iniciará una nueva instancia, sino que se omitirá.
- Si es 2, cuando una instancia ya se esté ejecutando, se iniciará una nueva instancia y se ejecutarán dos instancias.
- Etc.
Iniciar el trabajo al arrancar el programa
Por defecto, tenemos que esperar a que CRONTrigger inicie un trabajo. A menudo no queremos eso, queremos que el trabajo se ejecute inmediatamente después de iniciar el programa. En estos casos, tenemos que modificar el trabajo existente y establecer el parámetro 'next_run_time':
job.modify(next_run_time=datetime.datetime.now())
Eventos y listener
Realmente no necesitamos esto para nuestra aplicación de copia de seguridad, pero puede ser útil para usted. Aquí añadimos código para tres casos e incrementamos los contadores para cada evento:
- Cuando se ejecuta un trabajo: EVENT_JOB_EXECUTED
- Cuando un trabajo tiene un error: EVENT_JOB_ERROR
- Cuando se ha omitido la ejecución de un trabajo: EVENT_JOB_MISSED
Cuando se añade un trabajo al planificador, se le asigna un job_id. En el listener, extraemos el job_name extrayendo primero el job del evento, y luego usando una búsqueda inversa usando el job_id. Observa que el evento EVENT_JOB_MISSED nunca llega al receptor.
El código
A continuación se muestra el código para demostrar esto.
Añadimos algo de código extra para mostrar las estadísticas de ejecución.
El BackupRunner es inicializado por el método 'job_backup' en el planificador.
Aquí podemos añadir más inicialización y variables y opcionalmente capturar errores.
Comenta/comenta algunas líneas para probar diferentes disparadores: CronTrigger e intervalo.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Aquí hay algunas líneas impresas en la consola:
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Resumen
Implementar trabajos repetitivos es fácil con APscheduler. Aquí hemos tocado sólo lo básico, pero esto será suficiente para muchos casos de uso. Lo que falta es una manera fácil de comprobar qué trabajos se están ejecutando y cuántas instancias de un trabajo se están ejecutando. Pero eso es sólo un "nice to have".
Enlaces / créditos
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
Leer más
Scheduling
Recientes
- Uso de Ingress para acceder a RabbitMQ en un clúster Microk8s
- Galería de vídeo simple con Flask, Jinja, Bootstrap y JQuery
- Programación básica de trabajos con APScheduler
- Un conmutador de base de datos con HAProxy y el HAProxy Runtime API
- Docker Swarm rolling updates
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
Más vistos
- Usando PyInstaller y Cython para crear un ejecutable de Python
- Reducir los tiempos de respuesta de las páginas de un sitio Flask SQLAlchemy web
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados