angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Programación básica de trabajos con APScheduler

Programe trabajos sencillos fácilmente con APScheduler y utilice más funciones para tareas complejas.

19 octubre 2024
post main image
https://unsplash.com/@sarahs_captured_moments

Has creado un programa Python , por ejemplo, para hacer una copia de seguridad de una base de datos, y ahora quieres ejecutarlo cada hora, o cada día.

Hay muchas soluciones para elegir, aquí utilizamos APScheduler, 'Advanced Python Scheduler', un programador de tareas que cumple muchos requisitos. Para nuestra tarea, este paquete puede parecer exagerado, pero vamos a usarlo aquí y una vez que entendamos lo básico, es posible que queramos usarlo para tareas más complejas.

Como siempre hago esto en Ubuntu 22.04.

Nuestro programa de copia de seguridad

No hay mucho a nuestro programa de copia de seguridad:

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

Para esta demostración añadimos dos variables. Para poder comprobar el comportamiento de nuestro planificador, añadimos 'random.randint' para generar una excepción de vez en cuando.

Trabajos

Por supuesto, este paquete trata sobre la programación de trabajos. Para ello utiliza un planificador y los siguientes componentes:

  • Trigger
    El disparador indica cuándo debe ejecutarse el trabajo. Aquí usamos el CronTrigger.
  • Job store
    El almacén de trabajos aloja los trabajos programados. El almacén de trabajos por defecto, MemoryJobStore, simplemente mantiene los trabajos en memoria. Bien para nuestro propósito.
  • Ejecutor
    El ejecutor normalmente envía un trabajo a un hilo o a un pool de procesos. Cuando el trabajo está hecho, notifica al planificador y se genera un evento, como 'executed' o 'error'. El ejecutor por defecto, ThreadPoolExecutor, viene con un máximo de 10 hilos. Bien para nuestro propósito.

Esto significa que sólo tenemos que especificar el componente Trigger .

CronTrigger e intervalo

Utilizamos la función CronTrigger para obtener la funcionalidad CRON. Se trata entonces de especificar los valores correctos para año, mes, día, hora, minuto y segundo. Hay muchos sitios web con ejemplos, aquí definimos algunas configuraciones para elegir:

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

Cuando añadimos un trabajo, especificamos el disparador de la siguiente manera:

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

También es posible especificar un intervalo:

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

Esto también resulta útil a la hora de probar nuestra aplicación.

Instancias múltiples

Cuando se inicia un trabajo, APScheduler comprueba que no esté ya activo. Si lo está, se muestra el siguiente mensaje:

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

Por lo general, esto es deseable, pero en algunos casos es posible que desee permitir múltiples instancias al mismo tiempo. Esto se puede especificar con el parámetro 'max_instances':

  • Si es 1, cuando una instancia ya se esté ejecutando, no se iniciará una nueva instancia, sino que se omitirá.
  • Si es 2, cuando una instancia ya se esté ejecutando, se iniciará una nueva instancia y se ejecutarán dos instancias.
  • Etc.

Iniciar el trabajo al arrancar el programa

Por defecto, tenemos que esperar a que CRONTrigger inicie un trabajo. A menudo no queremos eso, queremos que el trabajo se ejecute inmediatamente después de iniciar el programa. En estos casos, tenemos que modificar el trabajo existente y establecer el parámetro 'next_run_time':

job.modify(next_run_time=datetime.datetime.now())

Eventos y listener

Realmente no necesitamos esto para nuestra aplicación de copia de seguridad, pero puede ser útil para usted. Aquí añadimos código para tres casos e incrementamos los contadores para cada evento:

  • Cuando se ejecuta un trabajo: EVENT_JOB_EXECUTED
  • Cuando un trabajo tiene un error: EVENT_JOB_ERROR
  • Cuando se ha omitido la ejecución de un trabajo: EVENT_JOB_MISSED

Cuando se añade un trabajo al planificador, se le asigna un job_id. En el listener, extraemos el job_name extrayendo primero el job del evento, y luego usando una búsqueda inversa usando el job_id. Observa que el evento EVENT_JOB_MISSED nunca llega al receptor.

El código

A continuación se muestra el código para demostrar esto.
Añadimos algo de código extra para mostrar las estadísticas de ejecución.
El BackupRunner es inicializado por el método 'job_backup' en el planificador.
Aquí podemos añadir más inicialización y variables y opcionalmente capturar errores.
Comenta/comenta algunas líneas para probar diferentes disparadores: CronTrigger e intervalo.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Aquí hay algunas líneas impresas en la consola:

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Resumen

Implementar trabajos repetitivos es fácil con APscheduler. Aquí hemos tocado sólo lo básico, pero esto será suficiente para muchos casos de uso. Lo que falta es una manera fácil de comprobar qué trabajos se están ejecutando y cuántas instancias de un trabajo se están ejecutando. Pero eso es sólo un "nice to have".

Enlaces / créditos

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

Leer más

Scheduling

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.