airflow

Why the author of "We’re All Using Airflow Wrong and How to Fix It" needs to attend my meetup!

In this article “We’re All Using Airflow Wrong and How to Fix It“, The Author, Has claimed there 3 cons/reasons for why we are using airflow wrongly:

  1. First, because each step of this DAG is a different functional task, each step is created using a different Airflow Operator. Developers must spend time researching, understanding, using, and debugging the Operator they want to use
  2. This also means that each time a developer wants to perform a new type of task, they must repeat all of these steps with a new Operator, which sometimes may end up hitting an unexpected permissions error
  3. The third problem is that Operators are executed on the Airflow workers themselves. The Airflow Scheduler, which runs on Kubernetes Pod A, will indicate to a Worker, which runs on Kubernetes Pod B, that an Operator is ready to be executed. At that point, the Worker will pick up the Operator and execute the work directly on Pod B. This means that all Python package dependencies from each workflow will need to be installed on each Airflow Worker for Operators to be executed successfully

Why I think the author is wrong or to the very least inaccurate:

  1. The suggestd cons and solution are correct per the use case the author has mentions. not all use case are the same. It seems like the Author is talking on a use case where a cluster of thousand of nightly operators are required.
  2. Regarding Reason #1, using a short list of Operators from the documentations, and examples of usage should resolve this. Download custom made operators is not a good idea. Managing packages per node is resolved, if you are using using single node with LocalExecutor.
  3. Regarding Reason #2, Yes. Using Airflow operators correctly require a learning curve, but in most use case, you only learn it once, and use it ALOT in your DAGS. So a new operator might take you a day to learn, However, it will require 10 min to implement on the next time. By the way, this principal is called learning curve, and it is applicable to any new technology or ETL tool.
  4. There are many ways to install airflow one of them using single node with LocalExecutor. Some of the POC’s and blogs I performed on Airflow crearly state you can achieve A LOT in terms of performance using one instance. Thus, rendering reason #3 invalid.

Don’t get me wrong…

The author has suggested a nice solution – but has failed to address the issues of matching his solution to other use cases with small nightly usages of airflow. Matching The use case to the technical solution IS A leading principal in my meetup.

You are more than welcome to review some other Airflow use case and meetup blogs:

  1. https://big-data-demystified.ninja/2020/02/09/data-engineering-use-cases-airflow-cost-reduction-bq-ml/

2. Airflow demystified (investing.com use case)

Some more blogs about Airflow

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

airflow, Data Engineering

How to convert colabs notebook into python and scheduling it in Apache Airflow?

using colab from google is wonderful way to get started on data research, but once you want go production you need to schedule it daily via Apache Airflow. You need to convert the packages to make it fully python compatible. Simply exporting your colabs (via colabs–> File –> Download.py) into python sctipt.py will results in lines being automatically commented out.

First you need to confirm you are working on plain vanilla python environment , nothing like colabs, or datalabs.

I used this documentation to write a full example. i started with the python prerequisites.

pip install pandas google-colab google-cloud-bigquery pip install google-cloud-bigquery-storage pip install pyarrow

All the unsupported python code in colabs will be marked out. I started going over them one by one. The first challenge to handle querying bigquery and putting the result in dataframe. the example below is also committed in our big data demystified git. Notice the dataframe will be stored on the airflow machine, you might needs some tuning handle big results sets from BigQuery. example of python script.py

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

# Download query results.
query_string = """select * from t"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

How to schedule python script via Apache Airflow?

  1. you may put your python script.py script in your Apache Airflow dags folder. It appears Airflow will not mind reading a normal python script and ignore and python files not returning a DAG object. or you may put it on your CI/CD server and call the script remotely after connecting via SSH to the CI/CD server
  2. I used the airflow python operator example to write the following Dag example. But that create some problems as the python of the DAG expected some modules that have not been installed yet via pip install. as the imports are evaluated at the import dag level which creates an error loading the dag.
  3. I used bashOperators in the end, just the sure to add a bash operator in your DAG script to install missing packages. Assume the machine will die on you. this is the cloud after all :). Notice – this is a hack. not a recommended approach.
  4. The only problem i could not solve yet – the need path for full path for the python script.py
  5. The example dag is committed in our git
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta

import datetime
import os
import logging

from airflow import DAG
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
from airflow.contrib.operators import gcs_to_bq

import time
from pprint import pprint




today_date = datetime.datetime.now().strftime("%Y%m%d")


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with DAG(dag_id='monitor_dag', schedule_interval=None, default_args=default_dag_args) as dag:

	bash_prerequisites_install_cmd="""sudo apt install -y python-pip"""
	bash_prerequisites_install = BashOperator(task_id='bash_prerequisites_install', bash_command=bash_prerequisites_install_cmd)
    
	bash_pip_install_cmd="""sudo pip install pandas google-colab google-cloud-bigquery google-cloud-bigquery-storage pyarrow pyTelegramBotAPI"""
	bash_pip_install = BashOperator(task_id='bash_pip_install', bash_command=bash_pip_install_cmd)

	bash_colab_export_script_cmd="""python /home/omid/gs_dags/script.py"""
	bash_colab_export_scriptTask = BashOperator(task_id='bash_colab_export_script', bash_command=bash_colab_export_script_cmd)	

bash_prerequisites_install  >> bash_pip_install >>  bash_colab_export_scriptTask

——————————————————————————————————————————


——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

To join our meetups:
https://www.meetup.com/Big-Data-Demystified/

airflow

Similarweb API data pipeline| Airflow & Cost reduction

The documentation of similarweb api is available here:

API integration documentation.

Sample api call for Desktop visits, Notice domain, key,start, end,country, granularity. This returns the average page views per desktop visit for the given domain & country.

curl --location --request GET 'https://api.similarweb.com/v1/website/bbc.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2017-11&end_date=2018-01&country=gb&granularity=monthly&main_domain_only=false&format=json'

for investing (notice the max date range is 2 months back, this blog was written in 2020-01, so 2019-11 is latest):

curl –location –request GET ‘https://api.similarweb.com/v1/website/investing.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json’

so….

What we need is

  1. list of domains , country codes, dates,API’s
  2. airflow script to run in loop over all the possible permutations of API calls
  3. List of competitors to compare traffic.

The challenges with this specific airflow DAG for this job are

  1. using dynamic operators – controlling how many iterations will be used in parallel by using dynamic task id’s.
  2. continuing after the loop of airflow operators by controlling the downstream order of execution. e.g inside loop: start.set_downstream(bash_*)
  3. the amount of Operators in this current DAG is a bit big. Try to consider what should be the trigger for the load_to_bg_GET_DESKTOP_TRAFFIC , perhaps one_success? (assume something will fail in the api calls). A trigger_conditional example is committed in out git:
  4. Consider moving bash_gsutil_mv_files_to_ingestion further downstream to cut the amount of operators by half. I left it there simply b/c I wanted to test behaviour of airflow using 2 operators in the same loop.

The example is also commit in out git

from datetime import timedelta, date

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


### start & end date = delta period.
## -3 days?
delta=-2 
start_date = datetime.date.today() + datetime.timedelta(delta)
end_date = datetime.date.today()

 
with models.DAG('similar_web_api_pipeline', schedule_interval=None, default_args=default_dag_args) as dag:

	start = DummyOperator(task_id='start')
	wait 	= DummyOperator(task_id='wait')
	
	
	for single_date in daterange(start_date, end_date):
		bash_cmd="""curl --location --request GET 'https://api.similarweb.com/v1/website/big-data-demystified.ninja/traffic-and-engagement/visits?api_key=myApiKey123456789&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json' > /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json'	
		bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_api_call_GET_DESKTOP_TRAFFIC'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd)
		
		bash_cmd2="""gsutil mv /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json gs://data_lake/similar_web_desktop_traffic/'	
		bash_gsutil_mv_files_to_ingestion = BashOperator(task_id='bash_gsutil_mv_files_to_ingestion'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd2)
		#bash_cmd="""ls"""
		#bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_opr_'+str(item),bash_command=bash_cmd)
		start.set_downstream(bash_api_call_GET_DESKTOP_TRAFFIC)
		bash_api_call_GET_DESKTOP_TRAFFIC.set_downstream(bash_gsutil_mv_files_to_ingestion)
		bash_gsutil_mv_files_to_ingestion.set_downstream(wait)

				
	load_to_bg_GET_DESKTOP_TRAFFIC = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    	task_id='load_to_bg_GET_DESKTOP_TRAFFIC',
    	source_objects=['*'],
     	write_disposition='WRITE_TRUNCATE', #overwrite?
    	create_disposition='CREATE_IF_NEEDED',
    	bucket=DST_BUCKET,
    	destination_project_dataset_table=dst_table,
    	autodetect='true')

	end 	= DummyOperator(task_id='end')

wait 	>> load_to_bg_GET_DESKTOP_TRAFFIC 	>> end

Here is the cost reduction bit :

  1. Use BQ to load the data via external table.then use CTAS or another Airflow operator to convert the table from ROW to column.
  2. Alternatively, you can load the json directly to BQ via Naive table.
  3. further, change the process into delta , i.e let it run each month and get last available month

If you are wondering how to parse the json once BQ identified the schema via auto schema. below is an example. Notice the different parsing methods when using array of structs, struct of structs and structs. Notice, how i avoided the use of unnest since i know the api call will only return one line.

CREATE OR REPLACE VIEW
  `MyProject.DATA_LAKE.similar_web_desktop_transformed` AS
SELECT
  avg_visits,
  date,
  device,
  r.country AS country,
  r.domain AS domain,
  r.granularity AS granularity
FROM (
  SELECT
    CAST(visits[
    OFFSET
      (0)].visits AS int64) AS avg_visits,
    # array of struct
    visits[
  OFFSET
    (0)].date AS date,
    # array of struct
    meta.device AS device,
    #   struct
    meta.request AS r,
    #struct of struct
    #meta.status as status
  FROM
    `MyProject.DATA_LAKE.similar_web_desktop` )

  
 

Lets say you want to offload the the domain list , to external data source like db. how would you do that? well, one way is to keep it on the DB of airflow, i.e using xcomm. But then you would end up keeping a large DB for airflow.

I came up with another way Using GCS. Keep a file holing only a filename as the desired value in the GCS , and then read it from airflow as in the below example. Notice it just a “fofo” example to illustrate a concept that worked for me and saved money or larger DB instances. Notice the dynamic name i provided the bask task_id. A bit different as airflow only allows alphanumeric task names. the full example is also committed in out github

from google.cloud import storage
client = storage.Client()
i=0 
with models.DAG('loop_over_gcs_bucket_files_example', schedule_interval=None, default_args=default_dag_args) as dag:

	start = DummyOperator(task_id='start')
	wait  = DummyOperator(task_id='wait',trigger_rule=TriggerRule.ONE_SUCCESS)	
	for blob in client.list_blobs('data_lake', prefix='similar_web_desktop_traffic'):
		#task id must only contain alphanumeric chars
		bash_cmd="echo "+ str(blob.name)
		i=i+1
		bash_operator = BashOperator(task_id='bash_operator'+str(i),bash_command=bash_cmd)
		start.set_downstream(bash_operator)
		bash_operator.set_downstream(wait)

	end 	= DummyOperator(task_id='end')
wait >> end

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

airflow

Airflow Demystified | Big Data Demystified

In this lecture I will share with you how we use Airflow in investing.com

Some more blogs about Airflow

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/