Search Close
Email Security Blog

Airflow @ Agari

Siddharth Anand August 31st, 2015 How Email Works
Fallback Featured Image

dag3_0Workflow Schedulers

Workflow schedulers are systems that are responsibile for the periodic execution of workflows in a reliable and scalable manner. Workflow schedulers are pervasive – for instance, any company that has a data warehouse, a specialized database typically used for reporting, uses a workflow scheduler to coordinate nightly data loads into the data warehouse. Of more interest to companies like Agari is the use of workflow schedulers to reliably execute complex and business-critical “big” data science workloads! Agari, an email security company that tackles the problem of phishing, is increasingly leveraging data science, machine learning, and big data practices typically seen in data-driven companies like LinkedIn, Google, and Facebook in order to meet the demands of burgeoning data and dynamicism around modeling. In a previous post, I described how we leverage AWS to build a scalable data pipeline at Agari. In this post, I discuss our need for a workflow scheduler in order to improve the reliability of our data pipelines, providing the previous post’s pipeline as a working example.

Data_Pipeline

Scheduling Workflows @ Agari – A Smarter Cron

At the time of the writing of the previous post, we were still using Linux cron to schedule our periodic workflows and were in need of a Workflow (a..k.a. DAG) scheduler. Why?  In my previous post, I described how we loaded and processed data from on-premises Collectors (i.e. Collectors that live within on our enterprise customer’s data centers). Cron is a good first solution when it comes to kicking off a periodic data load, but it stops short of what we need. We need an execution engine that does the following :

  • Provides an easy means to create a new DAG and manage existing DAGs
  • Kicks off periodic data loads involving a DAG of tasks
  • Retries failed tasks multiple times to work around intermittent problems
  • Reports both successful and failed executions of a DAG via email
  • Provides compelling UI visualization to provide insightful feedback at a glance
  • Provides centralized logging — a central place to collect logs
  • Provides configuration management
  • Provides a powerful CLI for easy-integration with automation
  • Provides state capture
    • for any run, we would like to know the input and configuration used for that run. This is particularly important in the application of models for the purposes of scoring and categorization. As we modify our models, we need a way to track which model versions were used for a particular run both for diagnostic and attribution needs

When using Cron, a developer needs to write a program for the Cron to call. The developer not only needs to write code to define and execute the DAG, he also needs to handle logging, configuration management, metrics and insights, failure handling (e.g. retrying failed tasks or specifying timeouts for long-running tasks), reporting (e.g. via email of success or failure), and state-capture. Soon, every developer is re-inventing the wheel. DAG Schedulers take care of these ancillary needs — the developer just needs to focus on defining the DAG.

Enter Airflow 

Earlier this Summer, around the time that I was on the hunt for a good DAG scheduler, Airbnb open-sourced their DAG Scheduler, Airflow — it met all of our needs above.

DAG Creation

Airflow provides a very easy mechanism to define DAGs : a developer defines his DAG in a Python script. The DAG is then automatically loaded into the DAG engine and scheduled for its first run. Modifying a DAG is as easy as modifying the Python script! The ease with which a developer can get started on Airflow contributes greatly to its draw.

DAG_as_Code

Once your DAG has been loaded into the engine, you will see it on the Airflow home screen. On this page, you can easily hide your DAG from the scheduler by toggling an on/off switch — this is useful if one of your downstream systems is undergoing lengthy maintenance. Although Airflow handles failures, sometimes it’s best to just disable the DAG to avoid unnecessary failure alerts. In the screen shot below, the “cousin domains” DAG is disabled.

Home_Screen_all_dags

DAG Scheduling

Airflow provides a few handy views of your DAG. The first is the Graph View, which shows us that the run kicks off via the execution of 2 Spark jobs : the first converts any unprocessed collector files from Avro into date-partitioned Parquet files and the second runs aggregation and scoring for a particular date (i.e. the date of the run). As the second Spark job writes its output to S3, S3 “object created” notifications are sent to an SQS queue!

Graph View

The next task (i.e. check_for_sqs_message_branch_condition) provides a very cool feature not present in other DAG Schedulers — a Branch Condition Task. This task type allows execution to proceed down one of various paths in the DAG for a particular run. In our case, if we check and find no data in SQS, we abandon further downstream processing and send an email notification that data is missing from SQS! If all is well and messages do show up in SQS, we proceed down the main line of our pipeline! This involves a few more tasks :

  • wait_for_new_data_in_db
    • ensure that newly generated data is being successfully written to the db
  • wait_for_empty_queue
    • wait for the SQS queue to drain
  • send_email_notification_flow_successful
    • query the DB for the count of records imported
    • send the counts in a “successful” email to the engineering team

Over time, we can quickly assess the run status for multiple days by referring to Airflow’s Tree View. In the image below, a vertical column of squares relates to all of the tasks in a day’s run of a DAG. For July 26, the run completed successfully because all squares are forest green!

Tree-View_all_complete

Airflow Command-line Interface

Airflow also has a very powerful command-line interface, one that we leverage in automation. One powerful command, “backfill”, allows us to re-run a DAG over a set of days. In the Tree View below, we observe a 30-day backfill in progress. Some days are complete (e.g. July 26-30), some are in progress (e.g. July 31, Aug 1, Aug 2, Aug 3), and some have not been scheduled yet (e.g. Aug 16). Airflow executes tasks in parallel when it can, based on the DAG definition and optional time-precedence rules (e.g. a previous day’s task must be successful before executing the current day’s run)

Tree-View_0

DAG Metrics and Insights

For every DAG execution, Airflow captures the run state, including any parameters and configuration used for that run and provides this run state at your finger tips. We can leverage this run-state to capture information such as model versions for different versions of machine-learned models that we use in our pipeline. This helps us in both issue diagnosis and attribution.

Task_Execution_State_Logged

In terms of pipeline execution, we care about speeding up our pipelines. In the Gantt chart below, we can view how long our Spark jobs take in hopes of speeding them up. Additionally, our wait_for_empty_queue phase can be sped up by more aggressive auto-scaling (i.e. via AWS Auto-Scaling Groups). For example, we scale out the importers 4-at-a-time, but if we scale them out 8-at-a-time or increase the max ASG pool size from 20 to 40, we can reduce the time spent in this stage of our pipeline.

Gantt_chart

We also care about the time variability of runs. For example, will a run always take 30 minutes to complete or does the time vary wildly? As shown in the Task Duration chart, two of our stages, the two spark jobs, have large time variability. The time variability in these 2 tasks results in large time variability in our overall job completion times. Hence, this chart clearly tells us where we should spend time in terms of speed and scalability enhancements in order to get more predictable run times. Once we solve that problem, we can consider turning on another Airflow feature : SLAs (Service-level Agreements).

Task_Duration

DAG Configuration

Another handy feature of Airflow are Variables. Variables allow us to feed environment-specific (e.g Prod, QA, Dev) configuration via an Admin screen to our DAGs. This takes the config out of our Git repo and places it squarely in the UI and Airflow Metadata DB. It also allows us to make changes on-the-fly without checking changes into Git and waiting for deployments.

Variables

Other Cool Features

Airflow allows you to specify tasks pools, task priorities, and a powerful CLI, which we leverage in automation.

 

Why Airflow?

Airflow is easy to set up (e.g. via pip install if you only want releases) as an admin. It has a terrific UI. It is developer friendly since it allows a developer to set up a simple DAG and test it in minutes. How does it compare with leading solutions such as Spotify’s Luigi, LinkedIn’s Azkaban, and Oozie?  Having previously worked at LinkedIn and used Azkaban, I wanted a DAG scheduler with good UI functionality, at least on par with Azkaban’s. The UI in Spotify’s Luigi is not very useful. However, Azkaban requires some build automation to package even relatively simple DAGs into a zip file — the zip file zips a directory that contains a tree-structured representation of code and config. Modifying a DAG requires hopping through the tree of config. Oozie, at least when I last used it, required DAG definitions in an XML file — this made even simple DAGs a nightmare to grok. Spotify’s Luigi and Airbnb’s Airflow both provide DAG definition in a single file — both leverage Python. An additional requirement was that the DAG scheduler be cloud-friendly. Since, both Luigi and Airflow were born in the cloud, that was one less headache to worry about.  In short, I wanted the UI sophistication of Azkaban and the cloud-friendliness and the DAG management and definition ease of Luigi — Airfbnb’s Airflow was that right mix.

 

Our revised architecture  is show below

Slide1

Caveats

It is important to mention that Airflow, which was only open-sourced a few months ago, is still a work in progress. It has a lot of promise, a dedicated and capable team,  and a small but growing community. As an early adopter, Agari is committed to making this a successful project, either by tripping over bugs and reporting them, suggesting features and enhancements, and by contributing to the code base.

 

Leave a Reply

Your email will not be published. All fields are required.

February 20, 2018 Jacob Rideout

Strengthen DKIM Signatures with DCRUP

February 15, 2018 Markus Jakobsson

How SMS 2FA Might Leave You Vulnerable to Email Account Takeover

February 13, 2018 Jacob Rideout

The Arrival of ARC

July 24, 2017 Markus Jakobsson

The Threat Taxonomy: A Working Framework to Describe Cyber Attacks

September 28, 2016 Gabriel Ortiz

Software Ate My Infrastructure: 2 Years on AWS with Ansible, Terraform and Packer - Part 2

mobile image