angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Basic job scheduling with APScheduler

Schedule simple jobs easily with APScheduler and use more functionality for complex tasks.

19 October 2024 Updated 19 October 2024
post main image
https://unsplash.com/@sarahs_captured_moments

You have created a Python program, for example, to back up a database, and now you want to run it every hour, or every day.

There are many solutions to choose from, here we use APScheduler, 'Advanced Python Scheduler', a job scheduler that meets many requirements. For our task, this package may seem like overkill, but let's use it here and once we understand the basics, we may want to use it for more complex tasks.

As always I do this on Ubuntu 22.04.

Our backup program

There is not much to our backup program:

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(5)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

For this demonstration we added two variables. To be able to check the behaviour of our scheduler, we add 'random.randint' to generate an exception once in while.

Jobs

Of course, this package is about scheduling jobs. To do this it uses a scheduler and the following components:

  • Trigger
    The trigger tells when the job must run. Here we use the CronTrigger.
  • Job store
    The job store accommodates a scheduled jobs. The default job store, MemoryJobStore, simply keeps the jobs in memory. Fine for our purpose.
  • Executor
    The executor typically submits a job to a thread or process pool. When the job is done, it notifies the scheduler and an event, like 'executed' or 'error', is generated. The default executor, ThreadPoolExecutor, comes with a maximum thread count of 10. Fine for our purpose.

This means that we only have to specify the Trigger component.

CronTrigger and interval

We use the CronTrigger function to get CRON functionality. It then comes down to specifying the correct values for year, month, day, hour, minute and second. There are many websites with examples, here we define some settings to choose from:

...
'every10seconds': CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='*/10'),
'everyminute':    CronTrigger(year='*',   month='*',   day='*', hour='*',   minute='*',   second='0'),
...

When we add a job, we specify the trigger as follows:

    add_job(
        ...
        trigger=CronTrigger(...)
        ...
    )

It is also possible to specify an interval:

    add_job(
        ...
        trigger='interval',
        seconds=1,
        ...
    )

This also comes in handy when testing our application.

Multiple instances

When a job starts, APScheduler checks that it is not already active. If it is, the following message is displayed:

Execution of job "job_backup ... skipped: maximum number of running instances reached (1)

Usually this is desirable, but in some cases you may want to allow multiple instances at the same time. This can be specified with the 'max_instances' parameter:

  • If 1, then, when an instance is already running, a new instance will not be started but skipped
  • If 2, then, when an instance is already running, a new instance will be started, and two instances will be running.
  • Etc.

Start job on program start

By default, we have to wait for CRONTrigger to start a job. Often we don't want that, we want the job to run immediately after we start the program. In these cases, we need to modify the existing job and set the 'next_run_time' parameter:

job.modify(next_run_time=datetime.datetime.now())

Events and listener

We don't really need this for our backup application, but it might be useful for you. Here we add code for three cases and increment counters for on each event:

  • When a job is executed: EVENT_JOB_EXECUTED
  • When a job has an error: EVENT_JOB_ERROR
  • When a job execution was missed: EVENT_JOB_MISSED

When a job is added to the scheduler, it gets assigned a job_id. In the listener, we extract the job_name by first extracting the job from the event, and then using a reverse lookup using the job_id. Note that the event EVENT_JOB_MISSED never reaches the listener.

The code

Below is the code to demonstrate this.
We added some extra code to show the run statistics.
The BackupRunner is initialized by the 'job_backup' method in the scheduler.
Here we can add more initialization and variables and optionally catch errors.
Comment/decomment some lines to try different triggers: CronTrigger and interval.

# sched.py
import datetime
import logging
import random
import time

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED

apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)

logging.basicConfig(
    format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
    level=logging.DEBUG,
)
logger = logging.getLogger(__name__)

class BackupRunner:
    def __init__(self, var_1, var_2):
        self.var_1 = var_1
        self.var_2 = var_2

    def backup(self):
        logger.debug(f'backup: start ...')
        # do something here
        time.sleep(15)
        #time.sleep(2)
        if random.randint(0, 3) == 0:
            raise Exception('BACKUP PROBLEM')
        logger.debug(f'backup: ready')

class JobScheduler:
    def __init__(
        self,
        var_a,
        var_b,
    ):
        self.var_a = var_a
        self.var_b = var_b

        self.cron_triggers = {
            'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
            'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
            'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
            'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
            'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
        }

        # job params and stats
        self.job_name_params = {
            'job_backup': {
                'trigger': self.cron_triggers['every10seconds'],
                #'trigger': 'interval',
                #'seconds': 1,
                'func': self.job_backup,
                'kwargs': {
                    'var_1': self.var_a,
                    'var_2': self.var_b,
                },
                'max_instances': 1,
                'run_on_start': True,
            },
        }
        self.job_name_stats = {
            'job_backup': {
                # run counters (unsafe)
                'execd': 0,
                'error': 0,
                'missd': 0,
            },
        }
        self.scheduler = None
        # reverse lookup
        self.job_id_job_names = {}

    def job_backup(self, var_1, var_2):
        backup_runner = BackupRunner(
            var_1=var_1,
            var_2=var_2,
        )
        backup_runner.backup()

    def listener(self, event):
        logger.debug(f'event = {event}')
        job = self.scheduler.get_job(event.job_id)
        job_name = self.job_id_job_names[job.id]
        if event.code == EVENT_JOB_EXECUTED:
                self.job_name_stats[job_name]['execd'] += 1
        elif event.code == EVENT_JOB_ERROR:
                self.job_name_stats[job_name]['error'] += 1
        elif event.code == EVENT_JOB_MISSED:
                self.job_name_stats[job_name]['missd'] += 1
        if event.exception:
            logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
        
    def schedule(self):
        logger.debug(f'()')
        self.scheduler = BackgroundScheduler()
        self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
        self.scheduler.start()
        # add jobs to scheduler
        for job_name, params in self.job_name_params.items():
            run_on_start = params.pop('run_on_start')
            job = self.scheduler.add_job(**params)
            # for reverse lookup in listener
            self.job_id_job_names[job.id] = job_name
            # start immediately
            if run_on_start:
                for job in self.scheduler.get_jobs():
                    if job.name == job_name:
                        job.modify(next_run_time=datetime.datetime.now())
                        break

        et_start_secs = int(time.time())
        while True:
            run_secs = int(time.time()) - et_start_secs
            for job_name, stats in self.job_name_stats.items():
                execd, error, missd = stats['execd'], stats['error'], stats['missd']
                logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3}   error: {error:3}   missd: {missd:3}')
            time.sleep(1)

def main():
    job_scheduler = JobScheduler(
        var_a='a',
        var_b='b',
    )
    job_scheduler.schedule()

if __name__ == '__main__':
    main()

Here are some lines printed to the console:

...
2024-10-19 15:29:52,562    DEBUG [sched.py              schedule(): 294]  36 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:53,563    DEBUG [sched.py              schedule(): 294]  37 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:54,564    DEBUG [sched.py              schedule(): 294]  38 job_backup      execd:   0   error:   1   missd:   0
2024-10-19 15:29:55,014    DEBUG [sched.py                backup(): 204] backup: ready
2024-10-19 15:29:55,014     INFO [base.py                run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014    DEBUG [sched.py              listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566    DEBUG [sched.py              schedule(): 294]  39 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:56,566    DEBUG [sched.py              schedule(): 294]  40 job_backup      execd:   1   error:   1   missd:   0
2024-10-19 15:29:57,567    DEBUG [sched.py              schedule(): 294]  41 job_backup      execd:   1   error:   1   missd:   0
...

Summary

Implementing repeating jobs is easy with APscheduler. Here we touched only the basics, but this will be good enough for many use cases. What is lacking is an easy way to check which jobs are running and how many instances of a job are running. But that only is a nice to have.

Links / credits

Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x

How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler

how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996

Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

Read more

Scheduling

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.