This article is about having Airflow configured with KubernetesExecutor
and deployed to Kubernetes. This configuration will ensure that Airflow takes advantage of Kubernetes scalability by scheduling individual containers for each task.
Requirements
Airflow is well known to be a great scheduler for parallel tasks. Below is a set of requirements for running Airflow at scale:
- tasks should run in parallel in different containers, one temporary container per task
- all the logs should be centralised and be available at all time
- the variables will need to be imported automatically from AWS Secrets Manager
- enable Google authentication in Airflow
The solution
Luckily other people had the exact same requirements and now we are able to take advantage.
Temporary containers
This can be achieved quite easily by configuring Airflow to use the KubernetesExecutor
:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-env
labels:
app: airflow
data:
TZ: Etc/UTC
POSTGRES_HOST: "MY_PSQL"
POSTGRES_PORT: "5432"
POSTGRES_DB: "airflow"
POSTGRES_USER: "airflow"
POSTGRES_PASSWORD: "MY_PASS"
REDIS_HOST: "MY_REDIS_HOST"
REDIS_PORT: "6379"
REDIS_PASSWORD: ""
FLOWER_PORT: "5555"
AIRFLOW__CORE__EXECUTOR: "KubernetesExecutor"
FERNET_KEY: "oniqx7yno09xmpe9umpqxR390U-0="
AIRFLOW__CORE__FERNET_KEY: "oniqx7yno09xmpe9umpqxR390U-0="
DO_WAIT_INITDB: "true"
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://airflow:MY_PASS@MY_PSQL:5432/airflow"
AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://airflow:MY_PASS@MY_PSQL:5432/airflow"
AIRFLOW__CORE__DONOT_PICKLE: "false"
AIRFLOW__CELERY__FLOWER_URL_PREFIX: ""
AIRFLOW__CELERY__WORKER_CONCURRENCY: "10"
AIRFLOW__CORE__DAGS_FOLDER: "/usr/local/airflow/dags"
AIRFLOW__WEBSERVER__BASE_URL: "https://MY_AIRFLOW_URL"
AIRFLOW__CODE__ENABLE_XCOM_PICKLING: "false"
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: "/repo-sync/dags-repo/kubernetes/pod_templates/pod.yaml"
AIRFLOW__KUBERNETES__NAMESPACE: "default"
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: "True"
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS_ON_FAILURE: "True"
AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "False"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "30"
AIRFLOW__CORE__STORE_DAG_CODE: "False"
AIRFLOW__WEBSERVER__AUTHENTICATE: "True"
AIRFLOW__WEBSERVER__AUTH_BACKEND: "airflow.contrib.auth.backends.google_auth"
AIRFLOW__GOOGLE__CLIENT_ID: "MY_CLIENT_IDm"
AIRFLOW__GOOGLE__CLIENT_SECRET: "MY_SECRET"
AIRFLOW__GOOGLE__OAUTH_CALLBACK_ROUTE: "/oauth2callback"
AIRFLOW__GOOGLE__DOMAIN: "MY_DOMAIN"
The pod template will usually be the same airflow pod container with some extra added packages depending on what the dags will be required to do. git-sync
will be used for initial sync of the dags to the temporary pod. Below example will stream also the logs also to STDOUT while the task is running. At the end the logs will be pushed to S3.
---
apiVersion: v1
kind: Pod
metadata:
name: airflow-worker
labels:
app: airflow
component: worker
annotations:
iam.amazonaws.com/role: Airflow
namespace: default
spec:
restartPolicy: Never
securityContext:
runAsUser: 50000
serviceAccountName: airflow
hostNetwork: false
initContainers:
- name: "git-sync"
env:
- name: "GIT_SYNC_REPO"
value: "dags-repo.git"
- name: "GIT_SYNC_BRANCH"
value: "master"
- name: "GIT_SYNC_DEST"
value: "dags-repo"
- name: "GIT_SYNC_SSH"
value: "true"
- name: "GIT_SYNC_WAIT"
value: "60"
- name: "GIT_SYNC_ROOT"
value: "/tmp/git"
- name: "GIT_KNOWN_HOSTS"
value: "false"
- name: "GIT_SYNC_ADD_USER"
value: "true"
- name: GIT_SYNC_ONE_TIME
value: "true"
- name: "GIT_SSH_KEY_FILE"
value: "/etc/git-secret/ssh"
image: "k8s.gcr.io/git-sync/git-sync:v3.1.7"
volumeMounts:
- name: dags-data
mountPath: /tmp/git
- name: ssh-key
mountPath: /etc/git-secret/
containers:
- name: airflow-worker-base
imagePullPolicy: IfNotPresent
image: airflow/airflow:1.10.12
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "ln -sfn /repo-sync/dags-repo/dags /usr/local/airflow/dags && ln -sfn /repo-sync/dags-repo/libs /usr/local/airflow/libs && ln -sfn /home/airflow/.local /root/.local; mkdir -p ${AIRFLOW_HOME}/logs && (sleep 30; tail -f ${AIRFLOW_HOME}/logs/**/**/**/*.log >> /proc/1/fd/1 ) & "]
envFrom:
- configMapRef:
name: "airflow-env"
volumeMounts:
- name: dags-data
mountPath: /repo-sync
volumes:
- name: dags-data
emptyDir: {}
- name: "ssh-key"
secret:
secretName: "ssh-key"
defaultMode: 0755
Centralize logs
Airflow has support for logs streaming to an s3 bucket. The following config will be required for Airflow
AIRFLOW__CORE__REMOTE_LOGGING: "True"
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: "s3://my-bucket-for-airflow-logs"
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: "" # the IAM role will have read/write access to this bucket
AIRFLOW__CORE__ENCRYPT_S3_LOGS: "False"
AIRFLOW__HIVE__WORKER_CLASS: "sync"
AWS Secrets manager for variables
For this we will create the variables.json file as a secret on AWS Secrets Manager and we will use Kubernetes External Secret addon to create the configmap in Kubernetes. The configmap will be created automatically and will be kept in sync by this addon.
Below is the External Secret configuration:
---
apiVersion: 'kubernetes-client.io/v1'
kind: ExternalSecret
metadata:
name: airflow-variables
secretDescriptor:
backendType: secretsManager
data:
- key: "arn:aws:secretsmanager:SECRET_REGION:MY_AWS_ACCOUNT:secret:devops/airflow/variables.json"
name: "variables.json"
Below is the Airflow custom script attached to Airflow-scheduler to import the variables automatically when are updated in Secrets Manager.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-scripts
labels:
app: airflow
data:
import-variables.sh: |
#!/bin/sh -e
while sleep 30
do
if [ -f ${AIRFLOW_HOME}/vars_md5sum ]
then
md5sum_sm=`md5sum /airflow-variables/variables.json | awk "{print $1}"`
md5sum_imported=`cat ${AIRFLOW_HOME}/vars_md5sum`
if [ "$md5sum_sm" != "$md5sum_imported" ]
then
echo "[INFO]: Change detected, importing the variables"
airflow variables -i /airflow-variables/variables.json || true
md5sum /airflow-variables/variables.json | awk "{print $1}" > ${AIRFLOW_HOME}/vars_md5sum
fi
else
airflow variables -i /airflow-variables/variables.json || true
md5sum /airflow-variables/variables.json | awk "{print $1}" > ${AIRFLOW_HOME}/vars_md5sum
fi
done
Conclusion
Using the above snippets with the configuration recommended by Airflow will result in a scalable solution which will reduce the AWS bill. This will require a minimum infrastructure, only Airflow Scheduler and Airflow Web.
This configuration has been tested with Airflow 1.10.12.
>>> Please Allow cookies in order to post or read comments. <<<