Data scientists and engineers have made Apache Airflow a leading open source tool to create data pipelines due to its active open source community, familiar Python development as directed acyclic graph (DAG) workflows, and extensive library of prebuilt integrations. However, managing the connections and variables that these pipelines depend on can be a challenge, especially when dealing with multiple environments or teams. In this article, we’ll look at connections and variables and how to easily move from storing these in each environment to storing centrally using an alternate secrets backend, specifically AWS Secrets Manager.

Background

Whether you are using Amazon Managed Workflows for Apache Airflow (MWAA), another managed service, self-managing on premises, or self-managing using cloud compute such as Amazon Elastic Cloud Compute (Amazon EC2), you will need to create and maintain connections and variables. Apache Airflow depends on these to connect to downstream services and software and to provide the context needed for operators and sensors. When running Apache Airflow on Amazon Web Services (AWS), you can achieve connectivity to other managed services through an execution role or attached role when using Amazon MWAA or a self-managed environment, respectively. However, this approach does not address the required connection information for all APIs and downstream systems that are not AWS services, nor does it provide a repository for arbitrary variables.

Apache Airflow provides a mechanism to store Connections and Variables in the Metadata database (metadatabase), which is how most users handle this requirement, but this approach has a few limitations. First, it requires populating this information for each environment manually, directly to the metadatabase, or via command-line interface—the latter of which sends connection passwords and variables as plain text. Second, each environment will copy connections and variables, increasing the effort when a given credential or value changes. Third, should there be a need to migrate or restore an environment without the original metadata, you will have to recreate all connections and variables.

The Apache Airflow community thought of this and allows you to provide an alternative secrets backend. When specified, Apache Airflow will first check this alternate backend when a connection or variable is requested. If the alternate backend contains the needed value, it is returned; if not, Apache Airflow will check the metadatabase for the value and return that instead.

Flowchart showing: When specified, Apache Airflow will first check this alternate backend when a connection or variable is requested. If the alternate backend contains the needed value, it is returned; if not, Apache Airflow will check the metadatabase for the value and return that instead.

Setup

Using AWS Secrets Manager as a secrets backend for Apache Airflow is straightforward. There are two Apache Airflow configuration entries to specify. If self-managed, this is achieved by editing airflow.cfg. In Amazon MWAA, you specify them using the following configuration overrides:

YAML:

AirflowConfigurationOptions: secrets.backend: airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend secrets.backend_kwargs: '{"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}' 

Console:

Airflow configuration options screenshot

Note: When using an AWS IAM assumed role as with Amazon MWAA, do not set "profile_name": "default" as indicated in the Apache Airflow documentation.

The JSON in secrets.backend_kwargs specifies the prefix for the connection or variable in AWS Secrets Manager. For example, if my JSON value for the above is:

{
"connections_prefix"	: "my-airflow-env-1/connections", "variables_prefix"	: "my-airflow-env-1/variables"
}

Then my variable my_variable_name will be stored in AWS Secrets Manager as:

AWS Secrets Manager, Secrets, my_variable_name screenshot

With a plain-text value:

screenshot showing plaintext secret value example 12345

Configuring a connection is a little more involved. For example, the following Snowflake connection as defined in the Apache Airflow user interface:

Screenshot showing Conn Id: snowflake_conn; Conn Type: Snowflake; Host: myaccount.us-east-1.snowflakecomputing.com; schema: myschema; login: myusername; password: ****; port: 1234, and extra info with account, warehouse, database, and region info

will have the name my-airflow-env-1/connections/snowflake_conn and the following plain-text value in secrets manager:

snowflake://myusername:[email protected]:1234/myschema?account=myaccount&warehouse=mywarehouse&database=mydatabase&region=us-east-1

screenshot showing snowflake://myusername:mypassword@myaccount.us-east-1.snowflakecomputing.com:1234/myschema?account=myaccount&warehouse=mywarehouse&database=mydatabase&region=us-east-1

Note: When using an AWS IAM role to connect to AWS Secrets Manager, either with Amazon MWAA’s Execution Role or an assumed role in Amazon EC2, you must provide AWS Secrets Manager access to that role via the AWS IAM console:

screenshot of 7 permission policies applied

Migration

You may be saying, “This seems great, but I have dozens (or even hundreds) of connections and variables that I would need to migrate. There’s no way I can justify that effort.” The great news is that you already have a tool to achieve this: Apache Airflow.

Warning: Standard charges apply for the use of AWS Secrets Manager. Before running the below DAG (directed acyclic graph), it is recommended that you eliminate unneeded connections and variables from your environment (e.g., the default connections that are present with all Apache Airflow installations). You may also choose to edit the DAG below to filter out unused connections (e.g., skipping those with blank logins or hosts). There is a 30-day free trial available for eligible accounts. Refer to AWS Secrets Manager Pricing for details.

Note: This example has been written and tested with Airflow 1.10.12 to ensure compatibility with Amazon MWAA. Airflow 2.0 support can be achieved by modifying the DAG Python imports, a sample of which will be available on GitHub.

The first thing we’ll do is set up our DAG and imports:

from airflow import DAG, settings, secrets
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Connection, Variable
from airflow.contrib.hooks.aws_hook import AwsHook from datetime import timedelta
import os
import json DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0,
}

We’ll then create a function that will use a secretsmanager boto3 client to add/update a given key to Secrets Manager. Because create and update are separate calls, we’re trying the former, and, if it fails, we’ll assume that the secret already exists and just needs to be updated:

def write_to_sm_fn(name,value,client): print("Writing ",name,"=",value,"to AWS Secrets Manager...") try: response = client.create_secret(Name=name,SecretString=value) except: print(name," exists, overwriting...") response = client.put_secret_value(SecretId=name,SecretString=value) print(response) 

The main function that our PythonOperator will call starts by determining the correct secrets prefixes to use by querying the backend_kwargs defined for our environment:

def write_all_to_aws_sm_fn(**kwargs): connections_prefix='airflow/connections' variables_prefix='airflow/variables' backend_kwargs = kwargs['conf'].get(section='secrets', key='backend_kwargs') if backend_kwargs: x = json.loads(backend_kwargs) connections_prefix=x['connections_prefix'].strip().rstrip('/') variables_prefix=x['variables_prefix'].strip().rstrip('/') print("using connections_prefix=",connections_prefix,",variables_prefix=",variables_prefix,"...") else: print("backend_kwargs undefined--using defaults connections_prefix=",connections_prefix,",variables_prefix=",variables_prefix)

We’ll then create a secretsmanager boto3 client using AwsHook, and a SQLAlchemy session to populate the Connection and Variable models:

session = settings.Session() hook = AwsHook() client = hook.get_client_type('secretsmanager')

Now we’ll query and create the entries for the Connections defined in our Apache Airflow environment. To make things easier, Apache Airflow provides a utility function get_uri()to generate a connection string from a Connection object. We can use airflow.models.Connection along with SQLAlchemy to get a list of Connection objects that we can convert to URIs, and then use boto3 to push these to AWS Secrets Manager.

 query = session.query(Connection) print(query.count()," connections: ") for curr_entry in query: curr_id=connections_prefix+'/'+curr_entry.conn_id curr_val=curr_entry.get_uri() write_to_sm_fn(name=curr_id, value=curr_val, client=client)

We can use a similar method to retrieve a list of Variable objects and then use Variable.get() to retrieve the values and push them also via boto3. After that, we’ll return from the function.

 query = session.query(Variable) print(query.count()," variables: ") for curr_entry in query: curr_id=variables_prefix+'/'+curr_entry.key curr_val=curr_entry.get_val() write_to_sm_fn(name=curr_id, value=curr_val, client=client) return "OK"

The DAG itself is just a host for the PythonOperator that calls the above function.

with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None
) as dag: write_all_to_aws_sm = PythonOperator( task_id="write_all_to_aws_sm", python_callable=write_all_to_aws_sm_fn, provide_context=True )

The complete DAG is available on GitHub.

Summary

This article covered why centrally managed secrets and variables are important, how to configure Apache Airflow to use AWS Secrets Manager, and how to automate migration of your existing connections and variables from your metadatabase to AWS Secrets Manager.