Best practises for KubernetesPodOperator in Cloud Composer

In this post I will go through best practises on using the KubernetesPodOperator with examples. I will share dags and terraform scripts so it should be easy to test it out for yourself.

Quite a few of the questions I get when talking about Cloud Composer is how to use it for autoscaling job operations. With the (current) out-of-the-box settings cloud composer gives you, you are quite limited in the scaling you can achieve.

For example, cloud composer is not able to scale horizontally on work demand — so the usual practise is to have a sufficient cluster size and configuration that can deal with any workload. Challenges surface when you want to go beyond the initial settings. Lets say you suddenly have a job that requires 10Gi of memory, but your worker nodes are only n1 machines? What do you do then, in a cost efficient manner?

What about python libraries and dependency issues. If you use the python operator without a virtual environment it quickly gets messy.

KubernetesPodOperator is a brilliant operator that leverages native kubernetes functionality to execute its work, handling both horizontal scaling and baking all dependencies within the docker image. You dont even have to use python — since its all containerised!

I have simplified the architecture in the diagram below:

Simplified illustration of composer internals

So usually the tasks you are running in composer is running inside the green dots. A green dot picks x tasks in parallel from the task queue, so if you are running one cpu heavy workload, it might affect other jobs running in composer as well. If you are running a particularly memory heavy job, the entire work pod might get killed, failing all the tasks it was working on!

What the KubernetesPodOperator allows us to do is to spawn processes as blue dots in a separate node pool (machine), and allocate resources to them, so they will not take resources from the “normal” jobs. Horizontal scaling is out of the box, since we are telling kubernets how much cpu and memory to use — if kubernetes doesnt have enough to place the blue dot, it will spin up a new VM if the node pool size allows it, or wait gracefully for resources to become available.

KubernetesPodOperator is getting more traction these days, and I would like to share some hard-learned experiences — best practises if you would like.

Running Pods in Composer

There are at least two valid operators that can do this:

1. KubernetesPodOperator

Runs a pod in the current composer cluster

2. GKEPodOperator

Runs a pod in any GKE cluster. Note that this operator has a know bug regarding xcom, so using this together with xcom will not work. (https://stackoverflow.com/questions/58349627/airflow-gkepodoperator-xcom-push-returns-none/59285712#59285712)

I would suggest using KubernetesPodOperator, mostly due to the xcom bug that google has showed little interest in fixing.

Create a Separate Node pool to run your tasks

Create a separate autoscaled node pool to run your workloads. If you do not do this, your pod will run in the composer k8s cluster, and they will compete for resources. Since the default config of an airflow-worker is to request 0 bytes of memory(!), airflow-workers is first on the list to get evicted if there is resource shortage!

You also need to create rules that prevents kubernetes from migrating composer core services to your new node pool by using taints and tolerations.

The reason for this is simple: Workloads will crash, and if by accident (this has happened to me) airflow services crash when you have an expensive node pool up and running, its a pretty good chance that kubernetes will place the airflow service on that node. That will prevent it from scaling down, since kubernetes will not take down services by itself to place it someplace else!

Instruct composer where to place your tasks

You need to explicitly tell composer which node pool you want it to run on, and place the correct tolerations. If you do not do this, it will run in the default node pool, where airflow itself is running — not a good idea!

https://medium.com/media/ca669d2549d685d9a1fb52e6f58ac596/href

Since this config can be quite verbal, it is quite common to create a package in the dags folder where you add the bootstrapping code for this — reducing the visual noise in your jobs.

https://medium.com/media/57605464e0ca7efc8e50fac003f9a9eb/href

Above: Example of moving verbal code into a package to reduce visual noise

Always specify resource request!

This is probably one of the most important things to do. If you wrap pod creation in a utils function it is possible to dynamically select node pool based on resource requests. (more than 5Gi goes to a memory heavy one, while less than 5Gi goes to a cheaper node pool)

If you do not do this you will get problems with autoscaling (kubernetes will think your jobs require 0 resources, and will attempt to place all jobs on the same node) and evicting pods (once you run out of resources).

Specify a sufficient startup time

The default startup time for a pod is 120 seconds. If a job takes more time than this to start up it will fail in composer only. The kubernetes scheduler will still have the pod in its pool, and will still attempt to start it up (and may succeed!) giving you some very hard-to-debug cases.

I will illustrate by example:

Job illustration

We have a node pool with only 1 node (n1-highmem-2) with 2 cpus and 13Gi. The 3Gi tasks each takes 120 seconds to finish

https://medium.com/media/1fb1cfb2daf1325f5699a64b68d449d1/href

result of the actual job

Result in composer
how it looks in GKE

If we look in kubernetes at this time it looks something like this. The reds and greens seems to be somewhat in sync… BUT, the kubernetes scheduler has not given up yet!

So what has happened?

The 3Gi tasks each requests 3Gi of memory and take 2 minutes to finish. startup time for this example is set to 60 seconds. Concurrency is set to 10, so composer will try to start all 10 at the same time. Composer is able to start 3jobs, then it has to wait… and the rest of the jobs fails in composer. But since the timeout of kubernetes scheduler is more that 120 seconds (actually, kubernetes scheduler doesnt care about how much time has passed, it will try a specific number of times before giving up.) — you can see errors like this. Set concurrency and startup times (at least 600 seconds) accordingly!

If you look in this repo, you can find all the sources used in this post. Usually I host this in workshops with clients, so it may contain some additional stuff I have not gone through in detail.


Best practises for KubernetesPodOperator in Cloud Composer was originally published in Grensesnittet on Medium, where people are continuing the conversation by highlighting and responding to this story.