Similarweb API data pipeline| Airflow & Cost reduction

The documentation of similarweb api is available here:

API integration documentation.

Sample api call for Desktop visits, Notice domain, key,start, end,country, granularity. This returns the average page views per desktop visit for the given domain & country.

curl --location --request GET 'https://api.similarweb.com/v1/website/bbc.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2017-11&end_date=2018-01&country=gb&granularity=monthly&main_domain_only=false&format=json'

for investing (notice the max date range is 2 months back, this blog was written in 2020-01, so 2019-11 is latest):

curl –location –request GET ‘https://api.similarweb.com/v1/website/investing.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json’

so….

What we need is

  1. list of domains , country codes, dates,API’s
  2. airflow script to run in loop over all the possible permutations of API calls
  3. List of competitors to compare traffic.

The challenges with this specific airflow DAG for this job are

  1. using dynamic operators – controlling how many iterations will be used in parallel by using dynamic task id’s.
  2. continuing after the loop of airflow operators by controlling the downstream order of execution. e.g inside loop: start.set_downstream(bash_*)
  3. the amount of Operators in this current DAG is a bit big. Try to consider what should be the trigger for the load_to_bg_GET_DESKTOP_TRAFFIC , perhaps one_success? (assume something will fail in the api calls). A trigger_conditional example is committed in out git:
  4. Consider moving bash_gsutil_mv_files_to_ingestion further downstream to cut the amount of operators by half. I left it there simply b/c I wanted to test behaviour of airflow using 2 operators in the same loop.

The example is also commit in out git

from datetime import timedelta, date
def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)
### start & end date = delta period.
## -3 days?
delta=-2 
start_date = datetime.date.today() + datetime.timedelta(delta)
end_date = datetime.date.today()
 
with models.DAG('similar_web_api_pipeline', schedule_interval=None, default_args=default_dag_args) as dag:
	start = DummyOperator(task_id='start')
	wait 	= DummyOperator(task_id='wait')
	
	
	for single_date in daterange(start_date, end_date):
		bash_cmd="""curl --location --request GET 'https://api.similarweb.com/v1/website/big-data-demystified.ninja/traffic-and-engagement/visits?api_key=myApiKey123456789&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json' > /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json'	
		bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_api_call_GET_DESKTOP_TRAFFIC'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd)
		
		bash_cmd2="""gsutil mv /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json gs://data_lake/similar_web_desktop_traffic/'	
		bash_gsutil_mv_files_to_ingestion = BashOperator(task_id='bash_gsutil_mv_files_to_ingestion'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd2)
		#bash_cmd="""ls"""
		#bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_opr_'+str(item),bash_command=bash_cmd)
		start.set_downstream(bash_api_call_GET_DESKTOP_TRAFFIC)
		bash_api_call_GET_DESKTOP_TRAFFIC.set_downstream(bash_gsutil_mv_files_to_ingestion)
		bash_gsutil_mv_files_to_ingestion.set_downstream(wait)
				
	load_to_bg_GET_DESKTOP_TRAFFIC = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    	task_id='load_to_bg_GET_DESKTOP_TRAFFIC',
    	source_objects=['*'],
     	write_disposition='WRITE_TRUNCATE', #overwrite?
    	create_disposition='CREATE_IF_NEEDED',
    	bucket=DST_BUCKET,
    	destination_project_dataset_table=dst_table,
    	autodetect='true')
	end 	= DummyOperator(task_id='end')
wait 	>> load_to_bg_GET_DESKTOP_TRAFFIC 	>> end

Here is the cost reduction bit :

  1. Use BQ to load the data via external table.then use CTAS or another Airflow operator to convert the table from ROW to column.
  2. Alternatively, you can load the json directly to BQ via Naive table.
  3. further, change the process into delta , i.e let it run each month and get last available month

If you are wondering how to parse the json once BQ identified the schema via auto schema. below is an example. Notice the different parsing methods when using array of structs, struct of structs and structs. Notice, how i avoided the use of unnest since i know the api call will only return one line.

CREATE OR REPLACE VIEW
  `MyProject.DATA_LAKE.similar_web_desktop_transformed` AS
SELECT
  avg_visits,
  date,
  device,
  r.country AS country,
  r.domain AS domain,
  r.granularity AS granularity
FROM (
  SELECT
    CAST(visits[
    OFFSET
      (0)].visits AS int64) AS avg_visits,
    # array of struct
    visits[
  OFFSET
    (0)].date AS date,
    # array of struct
    meta.device AS device,
    #   struct
    meta.request AS r,
    #struct of struct
    #meta.status as status
  FROM
    `MyProject.DATA_LAKE.similar_web_desktop` )
  
 

Lets say you want to offload the the domain list , to external data source like db. how would you do that? well, one way is to keep it on the DB of airflow, i.e using xcomm. But then you would end up keeping a large DB for airflow.

I came up with another way Using GCS. Keep a file holing only a filename as the desired value in the GCS , and then read it from airflow as in the below example. Notice it just a “fofo” example to illustrate a concept that worked for me and saved money or larger DB instances. Notice the dynamic name i provided the bask task_id. A bit different as airflow only allows alphanumeric task names. the full example is also committed in out github

from google.cloud import storage
client = storage.Client()
i=0 
with models.DAG('loop_over_gcs_bucket_files_example', schedule_interval=None, default_args=default_dag_args) as dag:
	start = DummyOperator(task_id='start')
	wait  = DummyOperator(task_id='wait',trigger_rule=TriggerRule.ONE_SUCCESS)	
	for blob in client.list_blobs('data_lake', prefix='similar_web_desktop_traffic'):
		#task id must only contain alphanumeric chars
		bash_cmd="echo "+ str(blob.name)
		i=i+1
		bash_operator = BashOperator(task_id='bash_operator'+str(i),bash_command=bash_cmd)
		start.set_downstream(bash_operator)
		bash_operator.set_downstream(wait)
	end 	= DummyOperator(task_id='end')
wait >> end

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, 

feel free to contact me via LinkedIn: