Airflow Dynamic Operators used in Google Ad Manager API data pipeline

Author: Omid Vahdaty 29.2.2020

Following the previous blog of GAM demystified, we created a python script that connects to Google Ad Manager API and download a file based on metrics and dimensions. in this blog we will use that script , and wrap in a new airflow job.



How wrap python script with is own prerequisites of python packages?

I create a stand alone GCE machine , with all the python packages required for the GAM api and python 3. and i am going to use airflow to connect remotely via ssh and then running the script there. the instance should the cheapest possible.

How to connect to a remote machine in GCE via airflow?

I used bashOperator to ran the below gcloud utility to connect the remote machine, command:

bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid ls /tmp/"'

How to make an airflow ETL job recurrent? and why?

I am going to use a file with naming convention of date in each python run of the script , i.e the start and end will be the same day, so the output of the script will be one day worth of day. Then I am going to run them all in parallel based on the amount of days i need to be overridden. say 3 days. before i do that, I need to ensure there some cleanup of older files, to avoid duplication in the data.

Also i am going to create special python function to loop over date ranges. so when i create the dynamic operators look, each operator will run on a different day, running the instance of the python with the respective date.


Airflow Dynamic Operator considerations You need to:

  1. Set the downstream of each operator. and dynamic task name per operator.
  2. Set the retries to 0, b/c sometimes cleanup of non existing files is ok. (think first run of airflow)
  3. Set the trigger_rule=”all_done”, because sometime API may fail, and it is ok.

Example of setting downstream operator dynamically (bash_cleanup is dynamically created operator with different task id) :



How to load GAM api files to BigQuery via airflow?

  1. One time load manually it into the BigQuery, “using automatically detect schema”, this will create a table which is not partitioned. (using a partition based on ingesting time is not a good option in this use case.)
  2. Then use this blog to create table as select in big query using “Dimension_DATE” as your partition.

AS SELECT * from `MyProject.DATA_LAKE_GOOGLE_US.example_report`

also you can add an optional operator to load the data in the end, notice the WRITE _disposition and create_disposition:

load_to_bq_from_gcs = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    		create_disposition='CREATE_NEVER', # b/c of the manual CTAS
    		write_disposition='WRITE_TRUNCATE', #overwrite?

The full example of Airflow DAG to run python scripts based on remote machine is below and also committed in out GIT.

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)
### start & end date = delta period.
## -7 days?
## deleta must be -3 or lower (negative number), -3 to produce days of history from today including today.
start_date = + datetime.timedelta(delta)
end_date =
bash_run_report_remotly_cmd='gcloud beta compute --project myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/ --start 2020-02-27 --end 2020-02-27"'
bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid gsutil -m mv /tmp/*report_example_using_service_account_with_date_range* gs://myBucket/google/gam/example_report"'
#the python creates a random temporary file name suffix, so we delete the files in teh bucket before we proceed 
bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_2020-02-26*'
with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:
	start = DummyOperator(task_id='start')
	wait = DummyOperator(task_id='wait',trigger_rule="all_done")
	end = DummyOperator(task_id='end',trigger_rule="all_done")
	#notice if delta has to be negative, -3 or lower,  so will have some dates in date range - you wont have an operator.
	for single_date in daterange(start_date, end_date):
		##notice trigger_rule="all_done"
		bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_'+temp_date+'*'
		bash_cleanup = BashOperator(task_id='bash_cleanup_'+temp_date,retries=0,bash_command=bash_cleanup_cmd,trigger_rule="all_done")
		##notice trigger_rule="all_done"
		bash_run_report_remotly_cmd='gcloud beta compute --project ynet-data-analytics ssh scheduler2 --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/ --start '+temp_date+" --end "+temp_date+'"'
		run_report_remotly = BashOperator(task_id='run_report_remotly_'+temp_date,retries=0,bash_command=bash_run_report_remotly_cmd,trigger_rule="all_done")
	##notice trigger_rule="all_done"
	run_gsutil_mv = BashOperator(task_id='bash_gsutil_mv_cmd',retries=0,bash_command=bash_gsutil_mv_cmd,trigger_rule="all_done")
wait >> run_gsutil_mv >> end

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Leave a Reply