DFP Data Transfer Files Use Case | Airflow and BigQuery 93% Cost Reduction
Author: Omid Vahdaty 27.11.2019
DFP Data Transfer Files Use Case :
DFP data transfer files delivers impression level data files on a GCS bucket. The amount of raw data is highly correlated with your web traffic. Big data challenge!
The Big data problems with the DFP data transfer file format being delivered:
- Data transfer file names contain the timestamp of server timezone instead of your local time.
- Your time zone is in a columns called time. The column contains unsupported datetime format: ‘2019-11-12-20:15:17’ , notice the “-” between date and time.
- There are several file types per table: Network, and network BackFill each of which per table – impressions and clicks. i.e 4 filenames per hour in the same bucket (not in different folder)
- all the files of all the tables we supplied in the same bucket….
So... What is the problem with DFP data transfer files?
- You can’t load the data simply b/c the there is no date column to partition the data with. so using a unpartitioned table loading all the data nightly, and then repartition the data to another table. is doable – BUT VERY COSTLY.
- You can ‘t transform the data on GCS level. (we are talking Serious amount of TB’s).
- How can you orchestrate it? how will you make the process incremental? instead of each night loading EVERYTHING.
What is the Solution to load DFP data transfer files?
BigQuery Sharded table loading via Airflow Dynamic Workflow , template_fields and Loop :
- Load incrementally (every day ) each 24 files with same date to a different table shard of one day (similar to GA_Sessions_* table) . base the shard date on the date from the file name
- Use Airflow to load data to via dynamic workflow.
- Once you have the table, create a view on top of the sharded table with the correct dates.
- Transform the data incrementally.
Airflow Dynamic workflow with loop over operators and templated fields
Also committed in our Git
import datetime import os import logging from airflow import DAG from airflow import models from airflow.contrib.operators import bigquery_to_gcs from airflow.contrib.operators import gcs_to_bq #from airflow.operators import dummy_operator from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators import BashOperator # Import operator from plugins from airflow.contrib.operators import gcs_to_gcs from airflow.utils import trigger_rule # Output file for job. output_file = os.path.join( models.Variable.get('gcs_bucket'), 'MyBucket', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # Path to GCS buckets. no need to add gs:// DST_BUCKET = ('MyBucket') yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # Setting start date as yesterday starts the DAG immediately when it is # detected in the Cloud Storage bucket. 'start_date': yesterday, # To email on failure or retry set 'email' arg to your email and enable # emailing here. 'email_on_failure': False, 'email_on_retry': False, # If a task fails, retry it once after waiting at least 5 minutes 'retries': 0, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } network_table ='MyDataSet.table_shard_' from datetime import timedelta, date def daterange(start_date, end_date): for n in range(int ((end_date - start_date).days)): yield start_date + timedelta(n) ### start & end date = delta period. ## -3 days? delta=-3 start_date = datetime.date.today() + datetime.timedelta(delta) end_date = datetime.date.today() today_macro='{{ ds_nodash }}' with models.DAG('BigQueryShardsLoading', schedule_interval='@once', default_args=default_dag_args) as dag: for single_date in daterange(start_date, end_date): load_to_bq_from_gcs_NetworkImpressions = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"), source_objects=[ 'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*' ], skip_leading_rows=1 , write_disposition='WRITE_TRUNCATE', #overwrite? create_disposition='CREATE_IF_NEEDED', bucket=DST_BUCKET, destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d"), autodetect='true')
Notice the loop, this save you the need write duplicate operator per day and dynamically increase/decrease the amount of days to overwrite each night.
for single_date in daterange(start_date, end_date):
and notice template_fields – task_id as suggested in the airflow docs, each dynamic task required it own unique name
task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"),
Also notice the template_fields – source_objects as suggested in the airflow docs, this save you the need to repartition the files to different folders to distinguish between the different 4 files names
source_objects=[ 'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*' ],
also notice the template_fields – destination_project_dataset_table as suggested in the airflow docs this allows you to create table per day based on the filename. i.e a shard.
destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d")
From there parsing the date in the next layer of your DAG should be as simple as:
select _TABLE_SUFFIX as dfp_date, cast(SUBSTR(Time, 0,10) as date) as impression_date, cast (SUBSTR(Time, 12,8) as time) as impression_time, cast (concat(SUBSTR(Time, 0,10),'T',SUBSTR(Time, 12,8)) as datetime) impression_datetime, * from MyDataSet.table_shard_* limit 1
Just to give a clear perspective
- This jobs costed us 140$ per day.
- Now it costs about 20$ per day.
- We have more ideas to decrease it to 10$ using SuperQuery Airflow connector. we will disclose them once we implement them.
——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,
feel free to contact me via LinkedIn: