Airflow & Google Batch With PythonVirtualenvOperator

Overview

Effective use of Airflow (and the python ecosystem in general) is plagued with dependency management problems. There are many solutions for this (venv, conda, poetry) but within Airflow, it is often easy to assume that all packages will work together.

In addition, with vendor-specific Airflow releases, such as Cloud Composer, there are often conflicts between the well-defined and supported package setup and the latest, cutting edge packages needed to incorporate the latest business functionality.

A good example of is this Google’s newest Batch compute service, Google Batch. The Python client for this is up to 0.9.0 (as of Mar 2023) and the dependencies do not match up with the versions of the Google Cloud packages within the latest version of Cloud Composer (image definitions).

In order to use the Google Batch, we must use a Python environment specific to the task. This can be achieved through a few ways

  1. Containerisation - often overkill for lightweight tasks
  2. Virtual environments - see below

Submitting a Google Batch job with Python

The steps to submit a Google Batch job with python are thankfully simple, and a collection of the example resources can be found here There are two key example functions:

  1. submit_job
  2. monitor job

submitted with

def create_container_job(
project_id: str, region: str, job_name: str, network: str, subnetwork: str
):

I have modified the create_container job to use a custom network and subnetwork, as this is more relevant for non-default applications.

The job can be monitored with

def get_job(project_id: str, region: str, job_name: str):

within a checking loop. We can import those local functions into the airflow DAG by copying the file to the right GCS bucket.

Batch with the Virtual environment operator

To run Batch with the PyVirtualEnvOperator, the following can be used:

@task.virtualenv(
  task_id="virtualenv_batch",
  requirements=["google-cloud-batch==0.9.0"], # Todo track package until integrated into composer
  system_site_packages=True, # TODO: sadly need this to import local modules (batch_job_utils), even if it breaks the google-cloud setup - maybe another solution?
)
def batch_virtualenv():
   ...

this operator will create a dedicated virtual environment for the duration of the task and install packages appropriately.

Next steps

To implement this without having to import local files, you might want to create a private package to install in the environment, importing the relevant config from the extenral environment.