Airflow Dynamic Operators used in Google Ad Manager API data pipeline
Author: Omid Vahdaty 29.2.2020
Following the previous blog of GAM demystified, we created a python script that connects to Google Ad Manager API and download a file based on metrics and dimensions. in this blog we will use that script , and wrap in a new airflow job.
Challenges:
How wrap python script with is own prerequisites of python packages?
I create a stand alone GCE machine , with all the python packages required for the GAM api and python 3. and i am going to use airflow to connect remotely via ssh and then running the script there. the instance should the cheapest possible.
How to connect to a remote machine in GCE via airflow?
I used bashOperator to ran the below gcloud utility to connect the remote machine, command:
bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid ls /tmp/"'
How to make an airflow ETL job recurrent? and why?
I am going to use a file with naming convention of date in each python run of the script , i.e the start and end will be the same day, so the output of the script will be one day worth of day. Then I am going to run them all in parallel based on the amount of days i need to be overridden. say 3 days. before i do that, I need to ensure there some cleanup of older files, to avoid duplication in the data.
Also i am going to create special python function to loop over date ranges. so when i create the dynamic operators look, each operator will run on a different day, running the instance of the python with the respective date.
Airflow Dynamic Operator considerations You need to:
- Set the downstream of each operator. and dynamic task name per operator.
- Set the retries to 0, b/c sometimes cleanup of non existing files is ok. (think first run of airflow)
- Set the trigger_rule=”all_done”, because sometime API may fail, and it is ok.
Example of setting downstream operator dynamically (bash_cleanup is dynamically created operator with different task id) :
start.set_downstream(bash_cleanup)
How to load GAM api files to BigQuery via airflow?
- One time load manually it into the BigQuery, “using automatically detect schema”, this will create a table which is not partitioned. (using a partition based on ingesting time is not a good option in this use case.)
- Then use this blog to create table as select in big query using “Dimension_DATE” as your partition.
CREATE TABLE `MyProject.DATA_LAKE_GOOGLE_US.example_report_partitioned` PARTITION BY Dimension_DATE AS SELECT * from `MyProject.DATA_LAKE_GOOGLE_US.example_report`
also you can add an optional operator to load the data in the end, notice the WRITE _disposition and create_disposition:
load_to_bq_from_gcs = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id='load_to_bq_from_gcs', source_objects='*', skip_leading_rows=1, create_disposition='CREATE_NEVER', # b/c of the manual CTAS write_disposition='WRITE_TRUNCATE', #overwrite? bucket='myBucket/google/gam/example_report', destination_project_dataset_table='DATA_LAKE_GOOGLE_US.example_report_partitioned')
The full example of Airflow DAG to run python scripts based on remote machine is below and also committed in out GIT.
def daterange(start_date, end_date): for n in range(int ((end_date - start_date).days)): yield start_date + timedelta(n) ### start & end date = delta period. ## -7 days? ## deleta must be -3 or lower (negative number), -3 to produce days of history from today including today. delta=-3 start_date = datetime.date.today() + datetime.timedelta(delta) end_date = datetime.date.today() bash_run_report_remotly_cmd='gcloud beta compute --project myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/report_example_using_service_account_with_date_range.py --start 2020-02-27 --end 2020-02-27"' bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid gsutil -m mv /tmp/*report_example_using_service_account_with_date_range* gs://myBucket/google/gam/example_report"' #the python creates a random temporary file name suffix, so we delete the files in teh bucket before we proceed bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_2020-02-26*' with models.DAG( 'run_example_gam_report_remote_machine', # Continue to run DAG once per day schedule_interval="@once", default_args=default_dag_args) as dag: start = DummyOperator(task_id='start') wait = DummyOperator(task_id='wait',trigger_rule="all_done") end = DummyOperator(task_id='end',trigger_rule="all_done") #notice if delta has to be negative, -3 or lower, so will have some dates in date range - you wont have an operator. for single_date in daterange(start_date, end_date): temp_date=single_date.strftime("%Y-%m-%d") ##notice trigger_rule="all_done" bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_'+temp_date+'*' bash_cleanup = BashOperator(task_id='bash_cleanup_'+temp_date,retries=0,bash_command=bash_cleanup_cmd,trigger_rule="all_done") ##notice trigger_rule="all_done" bash_run_report_remotly_cmd='gcloud beta compute --project ynet-data-analytics ssh scheduler2 --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/report_example_using_service_account_with_date_range.py --start '+temp_date+" --end "+temp_date+'"' run_report_remotly = BashOperator(task_id='run_report_remotly_'+temp_date,retries=0,bash_command=bash_run_report_remotly_cmd,trigger_rule="all_done") start.set_downstream(bash_cleanup) bash_cleanup.set_downstream(run_report_remotly) run_report_remotly.set_downstream(wait) ##notice trigger_rule="all_done" run_gsutil_mv = BashOperator(task_id='bash_gsutil_mv_cmd',retries=0,bash_command=bash_gsutil_mv_cmd,trigger_rule="all_done") wait >> run_gsutil_mv >> end
——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,
feel free to contact me via LinkedIn: