Using the Airflow ShortCircuitOperator to Stop Bad Data From Reaching ETL Pipelines
I’m a huge fan of Apache Airflow and how the open source tool enables data engineers to scale data pipelines by more precisely orchestrating workloads.
But what happens when Airflow testing doesn’t catch all of your bad data? What if “unknown unknown” data quality issues fall through the cracks and affect your Airflow jobs?
One helpful but underutilized solution is to leverage the Airflow ShortCircuitOperator to create data circuit breakers to prevent bad data from flowing across your data pipelines.
Data circuit breakers are powerful, but as with most data quality tactics, the nuances of how they are implemented are critical. Otherwise, you can make a bad problem worse.
In this post, we’ll cover:
- What is an Airflow circuit breaker and how do they help with data reliability?
- Airflow circuit breaker challenges
- How to build a circuit breaker using the ShortCircuitOperator within Airflow DAGs
- Airflow Circuit Breaker Best Practices
- How data observability platforms helps with Airflow circuit breaker implementation
- We didn’t start the fire
What is an Airflow circuit breaker and how do they help with data reliability?
In electrical engineering, a circuit breaker is a safety device that protects your home from damage caused by an overcurrent or a short. When the breaker encounters those electrical incidents it breaks the current to prevent an even worse issue, like a fire, from occurring.
Data circuit breakers are essentially data tests on steroids and the philosophy is the same. When the data does not meet your defined quality or integrity thresholds in your Airflow DAG the pipeline is stopped, preventing a worse outcome, like a CEO getting bad information, from occurring.
While data circuit breakers are most frequently used to prevent bad data from entering the storage layer, they can be deployed at multiple stages prior to the BI dashboards being updated– between transformation steps or after an ETL or ELT job executes, for example.
Using the Airflow ShortCircuitOperator to create a circuit breaker is a tactic that sits within the prevention stage of the data reliability lifecycle. And, just as it’s impossible for human beings to anticipate and write tests to identify every way data can break, it’s also impossible to set up circuit breakers to prevent every instance of bad data that will flow through a pipeline (it’s also inadvisable but more on that later).
For that reason, both data testing and data circuit breakers work best to reduce data downtime when paired with data observability or end-to-end data monitoring and alerting.
Proactive monitoring and alerting can also supplement and help overcome the challenges with Apache Airflow’s native monitoring and logging capabilities at scale. Specifically, that Airflow pipelines are not data aware. They run tasks, but they don’t know what’s in those tasks, which requires you to dig into execution data that is rarely sufficient for incident resolution.
Airflow circuit breaker challenges
Circuit breakers leveraging the Airflow ShortCircuitOperator should be the most critical of your tests from the underlying query operation and only consist of the most well-defined logic that mandates your pipeline should stop running.
You should also only leverage circuit breakers when you completely understand the history and what types of incidents and thresholds constitute a trigger.
For example, a data model requiring absolutely no null columns could be an ideal circuit breaker, but if some small range of null columns were acceptable, that’s likely a poor circuit breaker.
The reason for this is the Airflow ShortCircuitOperator, by design, introduces data downtime when the circuit breaker is tripped and needs to be reset. This is also happening in your pipeline completely automatically – it’s not like you’re comparing results in a console or looking at a spreadsheet.
This means that while they can prevent data issues from occurring, the AirflowShortCircuitOperator can also wreak havoc on your pipeline with delayed jobs creating chain reactions of data failures downstream.
Some best practices to mitigate these risks are to start iteratively on less critical pipelines and take advantage of your staging environment before deploying to production.
Also, make sure the rules–whether queries or some sort of custom logic–have reasonable timeouts. If your batch job takes one hour to complete, it’s probably not reasonable for your circuit breaker to take another hour as you’ve then just doubled the SLA of your pipeline.
You can go crazy with the number of data tests you have, but circuit break with the Apache ShortCircuitOperator sparingly and only on the assumptions that can’t be broken for your data to be considered valid.
Ask yourself, “if this test failed would I want the entire data team paged immediately to help resolve it?”
Okay, that’s enough caveat emptor. Let’s build a circuit breaker.
How to build a circuit breaker using the ShortCircuitOperator within Airflow DAGs
In Image 1, above, we have a simple DAG with two circuit breakers always_false and always_true between example_elt_job_1 and example_elt_job_2. When the data trips the always_false_circuit, example_elt_job_2 will be skipped. Let’s look at the code.
The code in Image 2 is very simple, but illustrates the point of where to put circuit breakers in your pipeline. You’d replace the circuit breakers above with your own business logic and we have a placeholder for the bash command.
You can also use the TaskFlow API paradigm in Airflow 2.X as seen below.
The code in Image 3 extracts items from our fake database (in dollars) and sends them over.
We’re then transforming them with a variable called USD to Euro conversion rate, which in the real world would likely be introduced from calling some third-party API, table, or other entity. That conversion rate is negative here, which is an obvious error and just not possible.
This is a type of incident you’d want to write a circuit breaker for. So the code here iterates through the items and checks to see if any of them was negative. If that condition is ever met, it trips the circuit.
Of course, other circuit breakers with additional thresholds could have been added. For example having a null conversion rate would also likely warrant a circuit breaker. So what happens when you start scaling and have multiple circuit breakers?
Airflow Task Groups to the rescue! Task Groups can help take a messy visual of 10 different circuit breakers and rather than have to rotate or parse through them, with task groups you can just convert it into a single circuit visually.
Airflow Circuit Breaker Best Practices
I do have some suggestions for how to add circuit breakers to your pipeline such as:
- Do not limit yourself to one type of operator: You can use any operator, not just the ShortCircuitOperator to create a custom circuit breaker. You can also leverage tools like dbt, Great Expectations, or Monte Carlo as a circuit breaker.
- Try raising an AirflowSkipException instead of an AirflowException when a circuit is closed: This increases visibility and prevents automatic retries.
- Do not merge multiple circuits into one operator. This makes it harder to trace exactly what issue or threshold tripped the circuit.
- Do not merge a circuit breaker into the operator performing the task to evaluate. This makes it more difficult to determine if the job failed or if the circuit was tripped.
- Include a mechanism to bypass or skip a circuit breaker.
- Generalize repeated patterns as plugins or custom operators.
- Leverage the metastore and log information in warehouses and lakes to create freshness circuit breakers to see if your table is actually updated. In Snowflake: SELECT CONVERT_TIMEZONE( ‘UTC’, last_altered) last_altered FROM information_schema.tables…With Delta Lake: DESCRIBE DETAIL…or via deltaLog.snapshot
How data observability platforms helps with Airflow circuit breaker implementation
You can use a data observability platform like Monte Carlo to automate, customize, and simplify circuit breaking in your Airflow DAGs or other orchestrators. Here’s how:
- Create an API token for Monte Carlo. See details here.
- Create a new Airflow HTTP Connection with the MCD API ID and Token in “extra” with the following format:
- Install airflow-mcd from PyPI. Normally this can be done by adding the python package to your requirements.txt file for Airflow.
- Add the SimpleCircuitBreakerOperator operator to your Airflow DAG. For instance:
This operator can leverage any new or existing rule monitors you have created in Monte Carlo via the Dashboard, API, SDK or IaC (monitors as code). Check out the full documentation here.
Note that the SimpleCircuitBreakerOperator supports multiple options and you can find all the details here. If you prefer you can leverage Monte Carlo’s SDK (pycarlo) and/or our API to circuit break too.
We didn’t start the fire
The Airflow ShortCircuitOperator and circuit breakers in general are useful and easy to implement, but they are only one tool in your data reliability stack. They need to be combined with other monitoring, alerting, lineage and testing for maximum effectiveness.
The power they wield is considerable, so be careful with how you implement them so they reduce, rather than exacerbate, data downtime. But most of all, have fun with how you instrument your circuit breakers!
Interested in improving data reliability through data observability? Talk to us! Interested in learning more about circuit breakers? Watch the webinar: