Basic job scheduling with APScheduler
Schedule simple jobs easily with APScheduler and use more functionality for complex tasks.

You have created a Python program, for example, to back up a database, and now you want to run it every hour, or every day.
There are many solutions to choose from, here we use APScheduler, 'Advanced Python Scheduler', a job scheduler that meets many requirements. For our task, this package may seem like overkill, but let's use it here and once we understand the basics, we may want to use it for more complex tasks.
As always I do this on Ubuntu 22.04.
Our backup program
There is not much to our backup program:
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(5)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
For this demonstration we added two variables. To be able to check the behaviour of our scheduler, we add 'random.randint' to generate an exception once in while.
Jobs
Of course, this package is about scheduling jobs. To do this it uses a scheduler and the following components:
- Trigger
The trigger tells when the job must run. Here we use the CronTrigger. - Job store
The job store accommodates a scheduled jobs. The default job store, MemoryJobStore, simply keeps the jobs in memory. Fine for our purpose. - Executor
The executor typically submits a job to a thread or process pool. When the job is done, it notifies the scheduler and an event, like 'executed' or 'error', is generated. The default executor, ThreadPoolExecutor, comes with a maximum thread count of 10. Fine for our purpose.
This means that we only have to specify the Trigger component.
CronTrigger and interval
We use the CronTrigger function to get CRON functionality. It then comes down to specifying the correct values for year, month, day, hour, minute and second. There are many websites with examples, here we define some settings to choose from:
...
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
...
When we add a job, we specify the trigger as follows:
add_job(
...
trigger=CronTrigger(...)
...
)
It is also possible to specify an interval:
add_job(
...
trigger='interval',
seconds=1,
...
)
This also comes in handy when testing our application.
Multiple instances
When a job starts, APScheduler checks that it is not already active. If it is, the following message is displayed:
Execution of job "job_backup ... skipped: maximum number of running instances reached (1)
Usually this is desirable, but in some cases you may want to allow multiple instances at the same time. This can be specified with the 'max_instances' parameter:
- If 1, then, when an instance is already running, a new instance will not be started but skipped
- If 2, then, when an instance is already running, a new instance will be started, and two instances will be running.
- Etc.
Start job on program start
By default, we have to wait for CRONTrigger to start a job. Often we don't want that, we want the job to run immediately after we start the program. In these cases, we need to modify the existing job and set the 'next_run_time' parameter:
job.modify(next_run_time=datetime.datetime.now())
Events and listener
We don't really need this for our backup application, but it might be useful for you. Here we add code for three cases and increment counters for on each event:
- When a job is executed: EVENT_JOB_EXECUTED
- When a job has an error: EVENT_JOB_ERROR
- When a job execution was missed: EVENT_JOB_MISSED
When a job is added to the scheduler, it gets assigned a job_id. In the listener, we extract the job_name by first extracting the job from the event, and then using a reverse lookup using the job_id. Note that the event EVENT_JOB_MISSED never reaches the listener.
The code
Below is the code to demonstrate this.
We added some extra code to show the run statistics.
The BackupRunner is initialized by the 'job_backup' method in the scheduler.
Here we can add more initialization and variables and optionally catch errors.
Comment/decomment some lines to try different triggers: CronTrigger and interval.
# sched.py
import datetime
import logging
import random
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
apscheduler_logger = logging.getLogger('apscheduler')
# decomment to skip messages from apscheduler
#apscheduler_logger.setLevel(logging.ERROR)
logging.basicConfig(
format='%(asctime)s %(levelname)8s [%(filename)-10s%(funcName)20s():%(lineno)04s] %(message)s',
level=logging.DEBUG,
)
logger = logging.getLogger(__name__)
class BackupRunner:
def __init__(self, var_1, var_2):
self.var_1 = var_1
self.var_2 = var_2
def backup(self):
logger.debug(f'backup: start ...')
# do something here
time.sleep(15)
#time.sleep(2)
if random.randint(0, 3) == 0:
raise Exception('BACKUP PROBLEM')
logger.debug(f'backup: ready')
class JobScheduler:
def __init__(
self,
var_a,
var_b,
):
self.var_a = var_a
self.var_b = var_b
self.cron_triggers = {
'every10seconds': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='*/10'),
'everyminute': CronTrigger(year='*', month='*', day='*', hour='*', minute='*', second='0'),
'every5minutes': CronTrigger(year='*', month='*', day='*', hour='*', minute='*/5', second='0'),
'everyhour': CronTrigger(year='*', month='*', day='*', hour='*', minute='0', second='0'),
'every2hours': CronTrigger(year='*', month='*', day='*', hour='*/2', minute='0', second='0'),
}
# job params and stats
self.job_name_params = {
'job_backup': {
'trigger': self.cron_triggers['every10seconds'],
#'trigger': 'interval',
#'seconds': 1,
'func': self.job_backup,
'kwargs': {
'var_1': self.var_a,
'var_2': self.var_b,
},
'max_instances': 1,
'run_on_start': True,
},
}
self.job_name_stats = {
'job_backup': {
# run counters (unsafe)
'execd': 0,
'error': 0,
'missd': 0,
},
}
self.scheduler = None
# reverse lookup
self.job_id_job_names = {}
def job_backup(self, var_1, var_2):
backup_runner = BackupRunner(
var_1=var_1,
var_2=var_2,
)
backup_runner.backup()
def listener(self, event):
logger.debug(f'event = {event}')
job = self.scheduler.get_job(event.job_id)
job_name = self.job_id_job_names[job.id]
if event.code == EVENT_JOB_EXECUTED:
self.job_name_stats[job_name]['execd'] += 1
elif event.code == EVENT_JOB_ERROR:
self.job_name_stats[job_name]['error'] += 1
elif event.code == EVENT_JOB_MISSED:
self.job_name_stats[job_name]['missd'] += 1
if event.exception:
logger.debug(f'{job_name}: job exception event, exception = {event.exception}')
def schedule(self):
logger.debug(f'()')
self.scheduler = BackgroundScheduler()
self.scheduler.add_listener(self.listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
self.scheduler.start()
# add jobs to scheduler
for job_name, params in self.job_name_params.items():
run_on_start = params.pop('run_on_start')
job = self.scheduler.add_job(**params)
# for reverse lookup in listener
self.job_id_job_names[job.id] = job_name
# start immediately
if run_on_start:
for job in self.scheduler.get_jobs():
if job.name == job_name:
job.modify(next_run_time=datetime.datetime.now())
break
et_start_secs = int(time.time())
while True:
run_secs = int(time.time()) - et_start_secs
for job_name, stats in self.job_name_stats.items():
execd, error, missd = stats['execd'], stats['error'], stats['missd']
logger.debug(f'{run_secs:3} {job_name:15s} execd: {execd:3} error: {error:3} missd: {missd:3}')
time.sleep(1)
def main():
job_scheduler = JobScheduler(
var_a='a',
var_b='b',
)
job_scheduler.schedule()
if __name__ == '__main__':
main()
Here are some lines printed to the console:
...
2024-10-19 15:29:52,562 DEBUG [sched.py schedule(): 294] 36 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:53,563 DEBUG [sched.py schedule(): 294] 37 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:54,564 DEBUG [sched.py schedule(): 294] 38 job_backup execd: 0 error: 1 missd: 0
2024-10-19 15:29:55,014 DEBUG [sched.py backup(): 204] backup: ready
2024-10-19 15:29:55,014 INFO [base.py run_job(): 144] Job "JobScheduler.job_backup (trigger: cron[year='*', month='*', day='*', hour='*', minute='*', second='*/10'], next run at: 2024-10-19 15:30:00 CEST)" executed successfully
2024-10-19 15:29:55,014 DEBUG [sched.py listener(): 259] event = <JobExecutionEvent (code=4096)>
2024-10-19 15:29:55,566 DEBUG [sched.py schedule(): 294] 39 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:56,566 DEBUG [sched.py schedule(): 294] 40 job_backup execd: 1 error: 1 missd: 0
2024-10-19 15:29:57,567 DEBUG [sched.py schedule(): 294] 41 job_backup execd: 1 error: 1 missd: 0
...
Summary
Implementing repeating jobs is easy with APscheduler. Here we touched only the basics, but this will be good enough for many use cases. What is lacking is an easy way to check which jobs are running and how many instances of a job are running. But that only is a nice to have.
Links / credits
Advanced Python Scheduler
https://apscheduler.readthedocs.io/en/3.x
How to access return value from apscheduler?
https://stackoverflow.com/questions/55483073/how-to-access-return-value-from-apscheduler
how to fetch missed jobs from scheduler
https://stackoverflow.com/questions/50837288/how-to-fetch-missed-jobs-from-scheduler/50862996#50862996
Make sure only one worker launches the apscheduler event in a pyramid web app running multiple workers
https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
Read more
Scheduling
Most viewed
- Using PyInstaller and Cython to create a Python executable
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Connect to a service on a Docker host from a Docker container
- Time series chart with Flask, Bootstrap and Chart.js
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb