Airflow & Google Batch With PythonVirtualenvOperator
Overview
Effective use of Airflow (and the python ecosystem in general) is plagued with dependency management problems. There are many solutions for this (venv, conda, poetry) but within Airflow, it is often easy to assume that all packages will work together.
In addition, with vendor-specific Airflow releases, such as Cloud Composer, there are often conflicts between the well-defined and supported package setup and the latest, cutting edge packages needed to incorporate the latest business functionality.
A good example of is this Google’s newest Batch compute service, Google Batch. The Python client for this is up to 0.9.0 (as of Mar 2023) and the dependencies do not match up with the versions of the Google Cloud packages within the latest version of Cloud Composer (image definitions).
In order to use the Google Batch, we must use a Python environment specific to the task. This can be achieved through a few ways
- Containerisation - often overkill for lightweight tasks
- Virtual environments - see below
Submitting a Google Batch job with Python
The steps to submit a Google Batch job with python are thankfully simple, and a collection of the example resources can be found here There are two key example functions:
- submit_job
- monitor job
submitted with
def create_container_job(
project_id: str, region: str, job_name: str, network: str, subnetwork: str
):
I have modified the create_container job to use a custom network and subnetwork, as this is more relevant for non-default applications.
The job can be monitored with
def get_job(project_id: str, region: str, job_name: str):
within a checking loop. We can import those local functions into the airflow DAG by copying the file to the right GCS bucket.
Batch with the Virtual environment operator
To run Batch with the PyVirtualEnvOperator, the following can be used:
@task.virtualenv(
task_id="virtualenv_batch",
requirements=["google-cloud-batch==0.9.0"], # Todo track package until integrated into composer
system_site_packages=True, # TODO: sadly need this to import local modules (batch_job_utils), even if it breaks the google-cloud setup - maybe another solution?
)
def batch_virtualenv():
...
this operator will create a dedicated virtual environment for the duration of the task and install packages appropriately.
Next steps
To implement this without having to import local files, you might want to create a private package to install in the environment, importing the relevant config from the extenral environment.