Scheduler documentation

Table of Contents

Scheduler source code

scheduling

This is a general purpose scheduler. It does best effort scheduling and execution of expired items in the order they are added. This also means that there is no guarantee the tasks will be executed on time every time, in fact they will always be late, even if just by milliseconds. If you need it to be done on time, you schedule it early, but remember that it will still be best effort.

The way this scheduler is supposed to be used is to add a scheduling queue, then you can add tasks to the queue to either be put in a task queue ASAP, or at or an absolute time in the future. The queue should be consumed by a worker thread.

This module defines the following objects:

class ocspd.scheduling.ScheduledTaskContext(task_name, subject, sched_time=None, **attributes)[source]

A context for scheduled tasks, this context can be updated with an exception count for the last exception, so it can be re-scheduled if it is the appropriate action.

__init__(task_name, subject, sched_time=None, **attributes)[source]

Initialise a ScheduledTaskContext with a task name, subject and optional scheduled time. Any remaining keyword arguments are set as attributes of the task context.

Parameters:
  • task (str) – A task corresponding to an existing queue in the target scheduler.
  • sched_time (datetime.datetime|int) – Absolute time (datetime.datetime object) or relative time in seconds (int) to schedule the task.
  • subject (obj) – A subject for the context instance this can be whatever object you want to pass along to the worker.
  • attributes (kwargs) – Any additional data you want to assign to the context, avoid using names already defined in the context: scheduler, task, subject, sched_time, reschedule.
scheduler = None

This attribute will be set automatically when the context is passed to a scheduler.

reschedule(sched_time=None)[source]

Reschedule this context itself.

Parameters:sched_time (datetime.datetime) – When should this context be added back to the task queue
__weakref__

list of weak references to the object (if defined)

class ocspd.scheduling.SchedulerThread(*args, **kwargs)[source]

This object can be used to schedule tasks for contexts.

The context should be a ScheduledTaskContext or an extension of it.. When the scheduled time has passed, the context will be added back to the internal task queue(s), where it can be consumed by a worker thread. When a task is scheduled you can choose to have it added to the task queue ASAP or at a specified absolute or relative point in time. If you add it with an absolute time in the past, or a negative relative number, it will be added to the task queue the first time the scheduler checks expired tasks schedule times. If you want to run a task ASAP, you probably don’t that, you should pass sched_time=None instead, it will bypass the scheduling mechanism and place your task directly into the worker queue.

__init__(*args, **kwargs)[source]

Initialise the thread’s arguments and its parent threading.Thread.

Parameters:
  • queues (iterable) – A list, tuple or any iterable that returns strings that should be the names of queues.
  • sleep (int|float) – The sleep time in seconds between checking the expired items in the queue (default=1)
Raises:

KeyError – If the queue name is already taken (only when queues kwarg is used).

schedule = None

The schedule contains items indexed by time.

scheduled_by_context = None

Keeping the tasks in reverse order helps for faster unscheduling.

scheduled_by_queue = None

Keeping the tasks per queue name helps faster queue deletion.

scheduled_by_subject = None

To allow removing by subject we keep the scheduled tasks by subject.

add_queue(name, max_size=0)[source]

Add a scheduled queue to the scheduler.

Parameters:
  • name (str) – A unique name for the queue.
  • max_size (int) – Maximum queue depth, [default=0 (unlimited)].
Raises:

KeyError – If the queue name is already taken.

remove_queue(name)[source]

Remove a scheduled queue from the scheduler.

Parameters:name (str) – The name of the existing queue.
Raises:KeyError – If the queue doesn’t exist.
add_task(ctx)[source]

Add a ScheduledTaskContext to be added to the task queue either ASAP, or at a specific time.

If the context is not unique, the scheduled task will be cancelled before scheduling the new task.

Parameters:

ctx (ScheduledTaskContext) – A context containing data for a worker thread.

Raises:
  • queue.Queue.Full – If the underlying task queue is full.
  • TypeError – If the passed context is not a ScheduledTaskContext
  • KeyError – If the task queue doesn’t exist.
cancel_task(ctx)[source]

Remove a task from the scheduler.

Note

Tasks that were already queued for a worker to process can’t be canceled anymore.

Parameters:ctx (ScheduledTaskContext) – A context containing data for a worker thread.
Return bool:True for successfully cancelled task or False.
get_task(task_name, blocking=True, timeout=None)[source]

Get a task context from the task queue task.

Parameters:
  • task_name (str) – Task name that refers to an existsing scheduler queue.
  • blocking (bool) – Wait until there is something to return from the queue.
Raises:
  • Queue.Empty – If the underlying task queue is empty and blocking is False or the timout expires.
  • KeyError – If the task queue does not exist.
task_done(task_name)[source]

Mark a task done on a queue, this up the queue’s counter of completed tasks.

Parameters:task_name (str) – The task queue name.
Raises:KeyError – If the task queue does not exist.
run()[source]

Start the scheduler thread.

run_all()[source]

Run all tasks currently queued regardless schedule time.

_run(all_tasks=False)[source]

Runs all scheduled tasks that have a scheduled time < now.

cancel_by_subject(subject)[source]

Cancel scheduled tasks by the task’s context’s subject.

This comes down to: delete anything from the scheduler that relates to my object X.

Parameters:subject (obj) – The object you want all scheduled tasks cancelled for.