Writing a Worker Task
Introduction
The Cloud Tasks Worker API provides a framework for implementing worker programs that process tasks from cloud provider queues or local task files. Workers run on compute instances or a local workstation and process tasks from the queue (or local file) in parallel using Python’s multiprocessing capabilities. The framework automatically handles the creation and destruction of processes, logging of task results, and graceful shutdown when the queue is empty and the worker is idle. When run on spot/preemptible instances, it also takes care of monitoring for the instance shutdown warning and notifying each running worker process.
Basic Usage
Here’s a simple example of how to implement a worker:
import sys
from cloud_tasks.worker import Worker
def process_task(task_id: str, task_data: dict, worker_data: WorkerData) -> tuple[bool, str | dict]:
"""Process a single task.
Args:
task_id: Unique identifier for the task
task_data: Dictionary containing task data; will have the fields:
- "task_id": Unique identifier for the task
- "data": Dictionary containing task data
worker_data: WorkerData object (useful for retrieving information about the
local environment and polling for shutdown notifications)
Returns:
Tuple of (retry: bool, result: str or dict)
- retry: False if task succeeded or failed in a way that it should not be
re-queued for some other process to try it again; True to indicate that the
task failed in a way that it should be re-queued for some other process to
try it again
- result: String or dict describing the result; this will be sent to the local
log file or the result queue to be picked up by the pool manager
"""
print(f"Processing task {task_id}")
# Your processing logic here
print(f"Task data: {task_data}")
return False, "Task completed successfully"
# Create and start the worker
if __name__ == "__main__":
worker = Worker(process_task, args=sys.argv[1:])
asyncio.run(worker.start())
Returning a Result
The top-level worker function should return a tuple of (retry, result).
Returning a retry value of False fundamentally indicates that the task should not
be re-tried. This could mean it actually succeeded in whatever you wanted it to do, or it
failed in such a way that you don’t want to retry it (for example, an unhandled exception
which is likely to recur on future attempts). Returning a retry value of True
indicates that the task failed in a way that it should be re-queued for some other process
to try it again. This normally would indicate some kind of transient error, such as
running out of disk space or memory or hitting some other kind of temporary resource
limit that you expect to not repeat.
The result value can be a string or a JSON-serializable dictionary. This value will be
returned in the results queue to the pool manager so that you can log it to a local file.
For example, you might return a dictionary that contains a flag indicating whether the
task truly succeeded or not, and a string message with more details.
If the task does not complete successfully (meaning it returned a retry value and a result data structure), there are three possibilities:
The task timed out (exceeded the time set by the
--max-runtimeoption).The task exited prematurely, e.g. due to a crash or by calling
sys.exit().The task raised an unhandled exception.
In each case, you have the option of deciding whether to automatically retry the task by
using the worker command line options --retry-on-timeout, --retry-on-exit, and
--retry-on-exception, or their corresponding environment variables. Note that if you
turn on retry for a particular type of failure, but your program will always fail in the
same way for a particular task, this could result in an infinite task loop where the task
keeps getting re-queued and retried. Thus these options should be used with caution. This
is also why it is important to monitor the returned results and abort the pool manager if
no progress is being made.
Note that if you are using a local task file, the task manager will never re-queue a task, regardless of the retry options you set.
Environment Variables and Command Line Arguments
The worker is configured using the following environment variables and/or command line
arguments. All parameters will first be set from the command line arguments, and if not
specified, will then be set from the environment variables. If neither is available, the
parameter will be set to None or the given default. When a worker is run on a remote
compute instance, the following subset of environment variables are set automatically
based on information in the Cloud Tasks configuration file (or command line arguments
given to ``manage_pool`` or ``run``), or from information derived from the instance type:
RMS_CLOUD_TASKS_PROVIDER
RMS_CLOUD_TASKS_PROJECT_ID
RMS_CLOUD_TASKS_JOB_ID
RMS_CLOUD_TASKS_QUEUE_NAME
RMS_CLOUD_TASKS_INSTANCE_TYPE
RMS_CLOUD_TASKS_INSTANCE_NUM_VCPUS
RMS_CLOUD_TASKS_INSTANCE_MEM_GB
RMS_CLOUD_TASKS_INSTANCE_SSD_GB
RMS_CLOUD_TASKS_INSTANCE_BOOT_DISK_GB
RMS_CLOUD_TASKS_INSTANCE_IS_SPOT
RMS_CLOUD_TASKS_INSTANCE_PRICE
RMS_CLOUD_TASKS_NUM_TASKS_PER_INSTANCE
RMS_CLOUD_TASKS_MAX_RUNTIME
RMS_CLOUD_TASKS_RETRY_ON_EXIT
RMS_CLOUD_TASKS_RETRY_ON_EXCEPTION
RMS_CLOUD_TASKS_RETRY_ON_TIMEOUT
Task File
- --task-file TASK_FILE
The name of a local JSON or YAML file containing tasks to process; if not specified, the worker will pull tasks from the cloud provider queue (see below). The filename can also be a cloud storage path like
gs://bucket/file,s3://bucket/file, orhttps://path/to/file. If not specified, the task manager will pull tasks from the cloud provider queue.
If specified, the task file should be in the same format as read by the load_queue command.
Overriding the Task Source
The task source (either file or queue) can be overridden by passing a task_source
argument to the Worker constructor. This can be a string or pathlib.Path or
filecache.FCPath, or a function that returns an iterator of tasks. If a filename is
passed, it will be treated as a path to a JSON or YAML file containing tasks. If a
function is passed, it will be called repeatedly to yield the tasks. If task_source
is specified, the --task-file command line argument will be ignored.
Parameters Required if Task File or Task Source is Not Specified, Optional Otherwise
- --provider PROVIDER
The cloud provider (AWS or GCP) to use to check for spot instance termination notices and for cloud-based queueing [or
RMS_CLOUD_TASKS_PROVIDER]- --job-id JOB_ID
Job ID; used to identify the cloud-based task queue name [or
RMS_CLOUD_TASKS_JOB_ID]
Optional Parameters
- --project-id PROJECT_ID
Project ID (required for GCP) [or
RMS_CLOUD_TASKS_PROJECT_ID]- --queue-name QUEUE_NAME
Cloud-based task queue name; if not specified will be derived from the job ID [or
RMS_CLOUD_TASKS_QUEUE_NAME]- --exactly-once-queue
If specified, task and event queue messages are guaranteed to be delivered exactly once to any recipient [or
RMS_CLOUD_TASKS_EXACTLY_ONCE_QUEUEis “1” or “true”]- --no-exactly-once-queue
If specified, task and event queue messages are delivered at least once, but could be delivered multiple times [or
RMS_CLOUD_TASKS_EXACTLY_ONCE_QUEUEis “0” or “false”]- --event-log-file EVENT_LOG_FILE
File to write events to if –event-log-to-file is specified (defaults to “events.log”) [or
RMS_CLOUD_TASKS_EVENT_LOG_FILE]- --event-log-to-file
If specified, events will be written to the file specified by –event-log-file or $RMS_CLOUD_TASKS_EVENT_LOG_FILE (default if –task-file is specified) [or
RMS_CLOUD_TASKS_EVENT_LOG_TO_FILEis “1” or “true”]- --no-event-log-to-file
If specified, events will not be written to a file [or
RMS_CLOUD_TASKS_EVENT_LOG_TO_FILEis “0” or “false”]- --event-log-to-queue
If specified, events will be written to a cloud-based queue (default if –task-file is not specified) [or
RMS_CLOUD_TASKS_EVENT_LOG_TO_QUEUEis “1” or “true”]- --no-event-log-to-queue
If specified, events will not be written to a cloud-based queue [or
RMS_CLOUD_TASKS_EVENT_LOG_TO_QUEUEis “0” or “false”]- --instance-type INSTANCE_TYPE
Instance type; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_TYPE]- --num-cpus N
Number of vCPUs on this computer; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_NUM_VCPUS]- --memory MEMORY_GB
Memory in GB on this computer; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_MEM_GB]- --local-ssd LOCAL_SSD_GB
Local SSD in GB on this computer; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_SSD_GB]- --boot-disk BOOT_DISK_GB
Boot disk size in GB on this computer; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_BOOT_DISK_GB]- --is-spot
If supported by the provider, specify that this is a spot instance and subject to unexpected termination [or
RMS_CLOUD_TASKS_INSTANCE_IS_SPOTis “1” or “true”]- --no-is-spot
If supported by the provider, specify that this is not a spot instance and is not subject to unexpected termination (default) [or
RMS_CLOUD_TASKS_INSTANCE_IS_SPOTis “0” or “false”]- --price PRICE_PER_HOUR
Price in USD/hour on this computer; optional information for the worker processes [or
RMS_CLOUD_TASKS_INSTANCE_PRICE]- --num-simultaneous-tasks N
Number of concurrent tasks to process (defaults to number of vCPUs, or 1 if not specified) [or
RMS_CLOUD_TASKS_NUM_TASKS_PER_INSTANCE]- --max-runtime SECONDS
Maximum allowed runtime in seconds; used to determine queue visibility timeout and to kill tasks that are running too long [or
RMS_CLOUD_TASKS_MAX_RUNTIME] (default 600 seconds)- --shutdown-grace-period SECONDS
How long to wait in seconds for processes to gracefully finish after shutdown (SIGINT, SIGTERM, or Ctrl-C) is requested before killing them (default 30) [or
RMS_CLOUD_TASKS_SHUTDOWN_GRACE_PERIOD]- --tasks-to-skip TASKS_TO_SKIP
Number of tasks to skip before processing any from the queue [or
RMS_CLOUD_TASKS_TO_SKIP]- --max-num-tasks MAX_NUM_TASKS
Maximum number of tasks to process [or
RMS_CLOUD_TASKS_MAX_NUM_TASKS]- --retry-on-exit
If specified, retry tasks on premature exit [or
RMS_CLOUD_TASKS_RETRY_ON_EXITis “1” or “true”]- --no-retry-on-exit
If specified, do not retry tasks on premature exit (default) [or
RMS_CLOUD_TASKS_RETRY_ON_EXITis “0” or “false”]- --retry-on-exception
If specified, retry tasks on unhandled exception [or
RMS_CLOUD_TASKS_RETRY_ON_EXCEPTIONis “1” or “true”]- --no-retry-on-exception
If specified, do not retry tasks on unhandled exception (default) [or
RMS_CLOUD_TASKS_RETRY_ON_EXCEPTIONis “0” or “false”]- --retry-on-timeout
If specified, tasks will be retried if they exceed the maximum runtime specified by –max-runtime [or
RMS_CLOUD_TASKS_RETRY_ON_TIMEOUTis “1” or “true”]- --no-retry-on-timeout
If specified, tasks will not be retried if they exceed the maximum runtime specified by –max-runtime (default) [or
RMS_CLOUD_TASKS_RETRY_ON_TIMEOUTis “0” or “false”]- --simulate-spot-termination-after SECONDS
Number of seconds after worker start to simulate a spot termination notice [or
RMS_CLOUD_TASKS_SIMULATE_SPOT_TERMINATION_AFTER]- --simulate-spot-termination-delay SECONDS
Number of seconds after a simulated spot termination notice to forcibly kill all running tasks [or
RMS_CLOUD_TASKS_SIMULATE_SPOT_TERMINATION_DELAY]- --verbose
Set the console log level to DEBUG instead of INFO
Specifying Additional Arguments
The worker can be configured to accept additional arguments. This is done by creating an argparse.ArgumentParser,
populating it with the arguments you want to accept, and passing it to the Worker constructor. For example:
parser = argparse.ArgumentParser()
parser.add_argument("--my-arg", type=str, required=True)
worker = Worker(process_task, args=sys.argv[1:], argparser=parser)
It is important that these user-specified arguments not conflict with the arguments already
supported by Worker.
The resulting parsed arguments can be accessed from the WorkerData object using the
args attribute. For example:
val = worker_data.args.my_arg
Logging Events
Various events can be logged to a local file or a cloud-based queue. The events are written in a structured format that can be parsed by the pool manager or other software to update the status and results of the tasks. An example entry is:
{"timestamp": "2025-05-26T01:56:26.321172",
"hostname": "rmscr-parallel-addition-job-0g23gxetnyyavtxjrul6gberr",
"event_type": "task_completed",
"task_id": "addition-task-009684",
"elapsed_time": 0.13359451293945312,
"retry": false,
"result": "Success!"
}
The timestamp and hostname fields are always present.
The event_type field can have the following values:
task_completed: Indicates that the task completed normally. Thetask_idfield will contain the task ID as given in the task file. Theretryandresultfields will contain the values returned by the task function. Theelapsed_timefield will contain the number of fractional seconds the task took to complete, including process creation and destruction overhead.task_timed_out: Indicates that the task timed out (exceeded the time set by the--max-runtimeoption). Thetask_idfield will contain the task ID as given in the task file. Theelapsed_timefield will contain the number of fractional seconds the task ran before being killed.task_exited: Indicates that the task exited prematurely. Thetask_idfield will contain the task ID as given in the task file. Theelapsed_timeandexit_codefields will contain the number of seconds the task took to complete and the exit code of the task, respectively.non_fatal_exception: Indicates that the task manager encountered an exception that was deemed non-fatal and continued to run. Theexceptionfield will contain the exception message. Thestack_tracefield will contain the stack trace of the exception.fatal_exception: Indicates that the task manager encountered an exception that was deemed fatal and has exited. No further tasks will be processed and no events collected or reported. If this occurred on a cloud-based compute instance, be aware that the instance is now costing money without performing any work. Theexceptionfield will contain the exception message. Thestack_tracefield will contain the stack trace of the exception.spot_termination: Indicates that the worker received a spot termination notice. No further tasks will be accepted and any existing tasks may be terminated prematurely if the instance is destroyed before they finish. Any existing tasks that complete before the instance is destroyed will have their results reported as usual.
Handling Spot Instance Termination
For some providers, it is possible to select instances that are preemptible (e.g. spot instances). Such instances are usually dramatically cheaper than regular instances, but they can be terminated at any time by the cloud provider with little notice. When using spot instances, the worker will monitor for the instance to be terminated and will attempt to notify all running worker processes so they can exit gracefully.
To simulate a spot termination notice and subsequent forced shutdown of the compute
instance, you can use the --simulate-spot-termination-after and
--simulate-spot-termination-delay arguments or the
RMS_CLOUD_TASKS_SIMULATE_SPOT_TERMINATION_AFTER and
RMS_CLOUD_TASKS_SIMULATE_SPOT_TERMINATION_DELAY environment variables. This is useful
for testing the worker’s shutdown behavior without waiting for an actual spot termination
notice, which is unpredictable.
It is recommended that a task check for impending termination before starting to commit
results to storage, as the writing and copying process may be interrupted by the
destruction of the instance, resulting in a partial write. This can be done by checking
the worker_data.received_termination_notice property. However, note that providers do not
guarantee a particular instance lifetime after the termination notice is sent, so a worker
must still be able to tolerate an unexpected shutdown at any point in its execution.
Running Workers on a Local Workstation
The workers can be run on a local workstation. This is useful for testing and debugging,
and also as a simple way to parallelize an existing program that does not require the
performance of cloud-based compute instances. When run locally, the top-level program
should be supplied the necessary command line arguments to specify the task source (such as
--task-file)