The Worker API

Worker module for the cloud task processing system.

This module provides tools for integrating existing worker code with the cloud task processing system. It abstracts away the details of cloud provider integration, allowing any worker to process tasks from cloud-based queues.

class cloud_tasks.worker.Worker(user_worker_function: Callable[[str, Dict[str, Any], WorkerData], Tuple[bool, str]], *, task_source: str | Path | FCPath | Callable[[], Iterable[Dict[str, Any]]] | None = None, args: Sequence[str] | None = None, argparser: ArgumentParser | None = None)[source]

Bases: object

Worker class for processing tasks from queues using multiprocessing.

__init__(user_worker_function: Callable[[str, Dict[str, Any], WorkerData], Tuple[bool, str]], *, task_source: str | Path | FCPath | Callable[[], Iterable[Dict[str, Any]]] | None = None, args: Sequence[str] | None = None, argparser: ArgumentParser | None = None)[source]

Initialize the worker.

Parameters:
  • user_worker_function – The function to execute for each task. It will be called with the task_id, task_data dictionary, and Worker object as arguments.

  • task_source – Optional task source. Can be a filename, a pathlib.Path, a filecache.FCPath, or a function that returns an iterator of tasks. If specified, this will override the command line and environment variable task sources.

  • args – Optional list of command line arguments (sys.argv[1:]).

  • argparser – Optional argument parser to use. If provided, the command line arguments used by Worker will be added before arguments are parsed. The resulting argparse.Namespace can be retrieved from the WorkerData structure.

async start() None[source]

Start the worker and begin processing tasks.

class cloud_tasks.worker.WorkerData[source]

Bases: object

Class containing properties that can be safely inherited by child processes.

__init__()[source]
property received_shutdown_request: bool

Whether the worker has received a shutdown request. This is for the user hitting Ctrl-C at the terminal or otherwise receiving a SIGINT or SIGTERM signal.

property received_termination_notice: bool

Whether the worker has received a termination notice. This is for a spot instance or system maintenance.

args: Namespace | None

argparse.Namespace containing the command line arguments, including any additional arguments specified by the user

provider: str | None

The cloud provider to use (AWS or GCP)

project_id: str | None

The project ID to use (only for GCP)

job_id: str | None

The job ID to use

queue_name: str | None

The queue name to use

exactly_once_queue: bool

Whether to use an exactly-once queue

event_log_to_queue: bool

Whether to log events to a cloud-based queue

event_log_queue_name: str | None

The name of the cloud-based queue to log events to

event_log_to_file: bool

Whether to log events to a file

event_log_file: str | None

The name of the file to log events to

instance_type: str | None

The instance type this task is running on

num_cpus: int | None

The number of vCPUs on this computer

memory_gb: int | None

The amount of memory on this computer

local_ssd_gb: int | None

The amount of local SSD on this computer

boot_disk_gb: int | None

The amount of boot disk on this computer

is_spot: bool

Whether the instance is a spot instance

price_per_hour: float | None

The price per hour for the instance

num_simultaneous_tasks: int

The number of simultaneous tasks to process

max_runtime: int

The maximum runtime for a task in seconds (1 hour)

shutdown_grace_period: int

The time in seconds to wait for tasks to complete during shutdown

retry_on_exit: bool

Whether to retry tasks on premature exit

retry_on_exception: bool

Whether to retry tasks on unhandled exception

retry_on_timeout: bool

Whether to retry tasks on timeout

simulate_spot_termination_after: float | None

The number of seconds after worker start to simulate a spot termination notice

simulate_spot_termination_delay: float | None

The number of seconds after a simulated spot termination notice to forcibly kill all running tasks