Quick Start Guide
Below is a short introduction to using Cloud Tasks, providing the minimum steps required to get started. Please see the rest of this user’s guide for more detailed information.
Step 1: Install the Cloud Tasks CLI and Python Module
Activate your Python virtual environment, as appropriate, then:
pip install rms-cloud-tasks
Step 2: Modify Your Code to be a Worker
Modify your code to have a single function entry point; we will call this
process_taskhere. The function takes three arguments:task_id: str: The ID of the task as provided in the task file (described below).task_data: Dict[str, Any]: The data for the task as provided in the task file (described below). This should be the only data your function needs to process the task; it should not use command line arguments.worker_data: cloud_tasks.worker.WorkerData: The WorkerData object. This provides access to configuration options that were set by environment variables or command line arguments as well as real-time information about the worker’s and computer’s status, such as whether the termination of a spot instance is imminent. See The Worker API for more information about theWorkerDataobject.
The function must return a tuple of two elements:
The first is bool indicating whether the task was processed successfully. This value is used to determine if the task should be retried (perhaps there was an I/O error or the process ran out of memory). Return
Trueif the task should be retried; returnFalseif the task was processed successfully and should be permanently removed from the queue. Note that if the task failed in a deterministic way (e.g. the task data was malformed), you should returnFalseso that the task is not retried; otherwise, the task will be retried indefinitely with no hope of success. In either case, you can provide more information in the second argument.The second is the result of the task. This can be any Python object that can be converted to JSON. A return value of
Noneis converted to an empty dictionary. This result is returned in the event queue so that it can be logged.
Modify your code to save its results to cloud-based storage, if needed. For example, if running on a local workstation, you might save the results to a file in a local directory, but if running on a cloud compute instance, you might save the results to a cloud-based storage bucket. One particularly easy way to handle this is to use the FileCache package, which provides a provider-independent API for reading and writing files from/to the local disk or cloud-based storage.
Modify your code’s
__main__block to use thecloud_tasksworker support:import asyncio import sys from cloud_tasks.worker import Worker, WorkerData def process_task(task_id: str, task_data: Dict[str, Any], worker_data: WorkerData) -> Tuple[bool, Any]: """ Process a task. Args: task_id: The unique ID of the task. task_data: The data required to process the task. worker_data: WorkerData object (useful for retrieving information about the local environment and polling for shutdown notifications) Returns: Tuple of (retry, result) """ [... your code ...] async def main(): # These command line arguments are used to override environment variables when # specifying the behavior of the worker process manager. They are optional # and most useful when running the worker locally. worker = Worker(process_task, args=sys.argv[1:]) await worker.start() if __name__ == "__main__": asyncio.run(main())
Step 3: Create a Task File
The data for your tasks must be provided in a JSON (.json) or YAML (.yml or
.yaml) file with the following format:
YAML:
- task_id: task-1
data:
key1: value1
key2: value2
JSON:
[
{
"task_id": "task-1",
"data": {
"key1": "value1",
"key2": "value2"
}
}
]
Both task_id and data are required keys. task_id must be a string that is unique
within all tasks that will be processed at the same time. data must be a dictionary containing
zero or more key-value pairs. The values can be as complicated as necessary but must be able to
be represented in JSON/YAML format.
Interlude - Running Tasks Locally
At this point you have done all of the preparation needed to run the tasks locally on your workstation. This could be useful for debugging your initial implementation or, if you have access to a high-end workstation with enough parallelism, you may always want to run your code locally and not take advantage of a cloud provider’s (costly) resources.
To run tasks locally, set up your environment as needed (install Python, create and
activate a virtual environment, install the dependencies and the rms-cloud-tasks
package, etc.). Then execute your worker code from the command line as follows:
python3 my_worker.py --task-file my_tasks.json
This will run your process_task function once for each task, which may be useful for
initial debugging. To increase the parallelism, you can specify the number of simultaneous
tasks to run:
python3 my_worker.py --task-file my_tasks.json --num-simultaneous-tasks 10
For full details about how the task manager is operating, you can specify the
--verbose option. Many other Environment Variables and Command Line Arguments are available.
To abort the task manager before all tasks are complete, type Ctrl-C once. This
will give the current tasks a chance to complete cleanly, and then the task manager will
exit. You can change how long to wait before the current tasks are complete with the
--shutdown-grace-period option.
If you are only going to run the worker locally, you can stop reading here, although you may be interested in Step 7: Monitor the Job below.
Step 4: Create a Startup Script
The startup script is provided to the cloud_tasks program and will be run as root on
each cloud compute instance that is started to process tasks (it will not be run on a
local workstation, as the cloud_tasks CLI is not used to manage local processes). At a
minimum, the startup script should install your project and its dependencies and then run
your worker code. It may also do more complicated operations such as setting up
authentication, attaching additional disks or GPUs, copying static data files to the local
disk, etc., as well as defining environment variables that will be accessible to your task
code. Here is an example:
apt-get update -y
apt-get install -y python3 python3-pip python3-venv git
cd
git clone https://github.com/MY-ORG/MY-REPO.git
cd MY-REPO
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
export MY_WORKER_DEST_BUCKET=gs://my-bucket/results
python3 my_worker.py
Step 5: Create a Configuration File
The configuration file will be used to configure the cloud_tasks commands.
Almost everything in the configuration file could also be specified as a command line option to the
cloud_tasks commands, but consolating all of the configuration into a single file makes it much
simpler to run commands going forward.
At a minimum you will need to specify:
provider: The cloud provider to use (awsorgcp).job_id: A unique string that identifies the job.startup_scriptorstartup_script_file: The startup script to run on the compute instace.
You will also want to set some constraints on the number of instances that can be started and what compute instance types you want to use. You may also need to specify other options depending on the cloud provider. See configuration file for more information.
Here is an example:
provider: gcp
gcp:
job_id: my-processing-job
startup_script_file: startup_script.sh
max-instances: 5
max-cpu: 8
min-memory-per-cpu: 4 # GB
max-total-price-per-hour: 1.00 # USD/hour
instance-types: "n2-"
Step 6: Load the Task Queue and Run the Worker
The cloud_tasks run command will load the task queue and then start the compute instance
pool manager.
cloud_tasks run --config myconfig.yml --task-file my_tasks.json
This will perform the following steps:
Create the task queue.
Load the tasks from
my_tasks.jsoninto the task queue (this can be done separately using the load_queue command).Based on the constraints given in the configuration file, choose the optimal compute instance type (this and all subsequent steps can be done separately using the manage_pool command).
Based on the constraints given in the configuration file, choose the optimal number of compute instances.
Create the chosen number of compute instances. Each will run the startup script.
Monitor the compute instances and replace them if they fail or are terminated.
Monitor the task queue and terminate the compute instances once it is empty, if this functionality is supported by the cloud provider.
Step 7: Monitor the Job
As tasks run, their status may optionally be sent to a local file and/or a cloud-based event queue. By default, if the tasks are read from a cloud-based task queue, their status is sent to a cloud-based event queue, and if the tasks are read from a local file, their status is sent to a local file.
If a local event log is used while tasks are running on a local workstation, the event
log can be monitored manually using standard Unix tools such as tail or less.
You may also write separate programs to process the event log and make reports as
necessary.
If a cloud-based event queue is used, Cloud Tasks provides a way to monitor the queue, report on the status of each task, save the events to a file, and collect statistics:
cloud_tasks monitor_event_queue --config myconfig.yml --output-file events.json
Step 8: Stop the Job
Once you see that all jobs have been processed, you can use the stop command. This will terminate the compute instances.
cloud_tasks stop --config myconfig.yml
Step 9: Purge the Task Queue
If you want to purge the task and event queues of any remaining contents, you can use the purge_queue command. Assuming you have successfully processed all tasks and monitored the event queue, these queues should already be empty. Either way you will now be starting with a clean slate if you want to run the job again.
cloud_tasks purge_queue --config myconfig.yml