Scheduler documentation¶
Table of Contents
Scheduler source code¶
scheduling¶
This is a general purpose scheduler. It does best effort scheduling and execution of expired items in the order they are added. This also means that there is no guarantee the tasks will be executed on time every time, in fact they will always be late, even if just by milliseconds. If you need it to be done on time, you schedule it early, but remember that it will still be best effort.
The way this scheduler is supposed to be used is to add a scheduling queue, then you can add tasks to the queue to either be put in a task queue ASAP, or at or an absolute time in the future. The queue should be consumed by a worker thread.
This module defines the following objects:
ocspd.scheduling.ScheduledTaskContext
- A context that wraps around any data you want to pass to the scheduler and which will be added to the task queue when the schedule time expires.
ocspd.scheduling.SchedulerThread
- An object that is capable of scheduling and unscheduling tasks that you can define with
ocspd.scheduling.ScheduledTaskContext
.
-
class
ocspd.scheduling.
ScheduledTaskContext
(task_name, subject, sched_time=None, **attributes)[source]¶ A context for scheduled tasks, this context can be updated with an exception count for the last exception, so it can be re-scheduled if it is the appropriate action.
-
__init__
(task_name, subject, sched_time=None, **attributes)[source]¶ Initialise a
ScheduledTaskContext
with a task name, subject and optional scheduled time. Any remaining keyword arguments are set as attributes of the task context.Parameters: - task (str) – A task corresponding to an existing queue in the target scheduler.
- sched_time (datetime.datetime|int) – Absolute time (datetime.datetime object) or relative time in seconds (int) to schedule the task.
- subject (obj) – A subject for the context instance this can be whatever object you want to pass along to the worker.
- attributes (kwargs) – Any additional data you want to assign to
the context, avoid using names already defined in the context:
scheduler
,task
,subject
,sched_time
,reschedule
.
-
scheduler
= None¶ This attribute will be set automatically when the context is passed to a scheduler.
-
reschedule
(sched_time=None)[source]¶ Reschedule this context itself.
Parameters: sched_time (datetime.datetime) – When should this context be added back to the task queue
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
class
ocspd.scheduling.
SchedulerThread
(*args, **kwargs)[source]¶ This object can be used to schedule tasks for contexts.
The context should be a
ScheduledTaskContext
or an extension of it.. When the scheduled time has passed, the context will be added back to the internal task queue(s), where it can be consumed by a worker thread. When a task is scheduled you can choose to have it added to the task queue ASAP or at a specified absolute or relative point in time. If you add it with an absolute time in the past, or a negative relative number, it will be added to the task queue the first time the scheduler checks expired tasks schedule times. If you want to run a task ASAP, you probably don’t that, you should passsched_time=None
instead, it will bypass the scheduling mechanism and place your task directly into the worker queue.-
__init__
(*args, **kwargs)[source]¶ Initialise the thread’s arguments and its parent
threading.Thread
.Parameters: - queues (iterable) – A list, tuple or any iterable that returns strings that should be the names of queues.
- sleep (int|float) – The sleep time in seconds between checking the expired items in the queue (default=1)
Raises: KeyError – If the queue name is already taken (only when queues kwarg is used).
-
schedule
= None¶ The schedule contains items indexed by time.
-
scheduled_by_context
= None¶ Keeping the tasks in reverse order helps for faster unscheduling.
-
scheduled_by_queue
= None¶ Keeping the tasks per queue name helps faster queue deletion.
-
scheduled_by_subject
= None¶ To allow removing by subject we keep the scheduled tasks by subject.
-
add_queue
(name, max_size=0)[source]¶ Add a scheduled queue to the scheduler.
Parameters: Raises: KeyError – If the queue name is already taken.
-
remove_queue
(name)[source]¶ Remove a scheduled queue from the scheduler.
Parameters: name (str) – The name of the existing queue. Raises: KeyError – If the queue doesn’t exist.
-
add_task
(ctx)[source]¶ Add a
ScheduledTaskContext
to be added to the task queue either ASAP, or at a specific time.If the context is not unique, the scheduled task will be cancelled before scheduling the new task.
Parameters: ctx (ScheduledTaskContext) – A context containing data for a worker thread.
Raises:
-
cancel_task
(ctx)[source]¶ Remove a task from the scheduler.
Note
Tasks that were already queued for a worker to process can’t be canceled anymore.
Parameters: ctx (ScheduledTaskContext) – A context containing data for a worker thread. Return bool: True for successfully cancelled task or False.
-
get_task
(task_name, blocking=True, timeout=None)[source]¶ Get a task context from the task queue
task
.Parameters: Raises: - Queue.Empty – If the underlying task queue is empty and blocking is False or the timout expires.
- KeyError – If the task queue does not exist.
-