Airflow Demystified | Airflow examples of jobs | GCP Composer and FAQ’s

I Started recently integrating airflow into one my Data Pipelines. Since the learning curve is steep, each working example will be committed into GitHub and shown here. the problem with this wordpress template, is that it is not flexible enough to show code properly, especially for indentation. I appologize for that. You are welcome to add/send me more examples

For now , you can either view the example and indent it on your IDE, or use examples i committed to our git repository:

Big Data Demystified Git

Quick overview of Airflow by the below presentation.

Working example of running a query on bigQuery and saving the results into a new table

import datetime
import os
import logging

from airflow import DAG
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator

today_date = datetime.datetime.now().strftime(“%Y%m%d”)

#table_name = ‘omid.test_results’ + ‘$’ + today_date
table_name = ‘DATA_LAKE_INGESTION_US.Daily_Stats’

yesterday = datetime.datetime.combine(
datetime.datetime.today() – datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
‘start_date’: yesterday,
# To email on failure or retry set ’email’ arg to your email and enable
# emailing here.
’email_on_failure’: False,
’email_on_retry’: False,
# If a task fails, retry it once after waiting at least 5 minutes
‘retries’: 0,
‘retry_delay’: datetime.timedelta(minutes=5),
‘project_id’: models.Variable.get(‘gcp_project’)
}

with DAG(dag_id=’Daily_Stats_Dag’,
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:

start = DummyOperator(task_id=’start’)

end = DummyOperator(task_id=’end’)

logging.error(‘trying to bq_query: ‘)
logging.error(‘table name: ‘+table_name)
sql = “”” SELECT * FROM `omid.test1` “””

bq_query = BigQueryOperator(
task_id=’bq_query’,
bql=sql,
destination_dataset_table=table_name,
bigquery_conn_id=’bigquery_default’,
use_legacy_sql=False,
write_disposition=’WRITE_TRUNCATE’,
create_disposition=’CREATE_IF_NEEDED’,
dag=dag
)

start >> bq_query >> end

Working example of loading data into BigQuery table from google cloud storage ( GCS )

import datetime
import os
import logging

from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.operators import dummy_operator
#from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs
from airflow.utils import trigger_rule

# Output file for job.
output_file = os.path.join(
models.Variable.get(‘gcs_bucket’), ‘android_reviews_file_transfer’,
datetime.datetime.now().strftime(‘%Y%m%d-%H%M%S’)) + os.sep
# Path to GCS buckets. no need to add gs://
DST_BUCKET = (‘pubsite_prod_rev/reviews’)
DST_BUCKET_UTF8 = (‘pubsite_prod_rev_ingestion/reviews_utf8’)
# source bucekt is not in out project, and there permission issues with IT. using bash to copy the files to DST_BUCKET
# and then encoding the files to UTF8 to load to BQ (UTF8 is not supported yet. will change in the future)

#SRC_BUCKET = (‘pubsite_prod_rev’)

yesterday = datetime.datetime.combine(
datetime.datetime.today() – datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
‘start_date’: yesterday,
# To email on failure or retry set ’email’ arg to your email and enable
# emailing here.
’email_on_failure’: False,
’email_on_retry’: False,
# If a task fails, retry it once after waiting at least 5 minutes
‘retries’: 1,
‘retry_delay’: datetime.timedelta(minutes=5),
‘project_id’: models.Variable.get(‘gcp_project’)
}

with models.DAG(
‘android_reviews_load_to_bigQuery’,
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
#load from local bucket o GCS table of android
load_to_bq_from_gcs = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id=’load_to_bq_from_gcs’,
source_objects=’*’,
skip_leading_rows=1,
write_disposition=’WRITE_TRUNCATE’, #overwrite?
bucket=DST_BUCKET_UTF8,
destination_project_dataset_table=’DATA.Replica_android_review’
)

# Define DAG dependencies.
#create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

load_to_bq_from_gcs

Another Example for BigQueryToCloudStorageOperator, the only thing you need to pay attention to is that the parameter destination_cloud_storage_uris must be a list, i.e:

destination_cloud_storage_uris1 = ‘gs://data_lake_modeling_us/qv_revenue_snapshot/’ +’dt=’ + today_date+ ‘/qv_revenue-*.avro’

Otherwise your might be getting weird errors such as “Required parameter is missing”. you can have a look at the full example on our git: https://github.com/omidvd79/Big_Data_Demystified/blob/master/airflow/examples/table_snapshots_example.py

How to pause Airflow execution GCP composer ?

You can pause the execution by clicking on the “ON” switch in the Airflow UI to switch it to “OFF”. 

How to stop a running DATA in Airflow / GCP Composer ?

To stop a running DAG in Airflow UI you should go to “Browse” then “DAG Runs”, select the DAG you want to stop and click “With selected:” and then “Delete”. This will delete this DAG run but if you have this DAG scheduled to run periodically, it will start again in the next scheduled time. 

How to delete a DAG from GCP composer ( air flow) ?

example from the cli :

gcloud beta composer environments storage dags delete  –environment airflow-cluster-name –location gs://us-central1-airflow-cluster-xxxxxxx-bucket/dags/  myDag.py

In case you want to permanently delete the DAG, you can follow first one of the above steps and then delete the DAG file from the DAG folder[*]. The Airflow UI may need a couple of minutes to update after this action. 

[*] https://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag

how to schedule an AirFlow job to run hourly?

Change line from:     
default_args=default_dag_args) as dag:
to:
default_args=default_dag_args, schedule_interval=’@hourly’) as dag:

GCP Composer & Airflow FAQ

How do I upgrade the cluster?

I understand that you would like to increase the number of nodes for a composer environment. In order to do so, you can use the console, going to your environment, to the “ENVIRONMENT CONFIGURATION” tab, and clicking in edit. Here you can select the new value for the number of nodes. You can also use gcloud commands [1]. 

How to size the right amount of resources for GCP Composer & Airflow?

I understand that you would like to know how to choose the right size for your Composer cluster. This depend on what kind of tasks you intend to perform. As a general rule, for a lot of small DAGs, you need more machines, but small ones. For few DAGs but heavy ones, you need fewer machines but big ones. There is no exact way to tell, you need to make an estimation, and then you can check the CPU and RAM usage to adjust the number. 

What is the correct way to migrate from an old GCP Composer airflow cluster to a new one ?

I understand that you would like to migrate all the DAGs and configuration to a new cluster. To copy the DAGs, you should go to the bucket where they are stored and copy them to the bucket that the new environment is using. To copy the configuration, you need to do it manually. Go to the environment in the console, where you can see all the configuration (such as PyPi packages, environment variables etc.) and copy them to the new environment. 

Triggers rules? Conditional example of airflow tasks?

You need to use trigger rules , as each task has a trigger by default to success. I wrote an as simple example , you can check it out in github:

https://github.com/omidvd79/Big_Data_Demystified/blob/master/airflow/examples/conditional_example.py

When should you use this case? for example you a dataproc cluster or EMR cluster, and you have an ETL. say you want to scale out BEFORE the ETL, and scale in AFTER the ETL. a Considered a failure in the ETL, conditional would be helpful, in order to run the scale in regardless of the success or failure of the ETL.

from airflow.utils.trigger_rule import TriggerRule
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

dag = DAG(
    dag_id='conditional_example',
    schedule_interval='@once',
    start_date=dt.datetime(2019, 2, 28)
)

task_start = DummyOperator(
        task_id='task_start',       
        dag=dag)

conditional_task = DummyOperator(
        task_id='conditional_task',        
        dag=dag)

task_failure = DummyOperator(
        task_id='task_failure',       
        trigger_rule=TriggerRule.ALL_FAILED,
        dag=dag)
        
task_follow_failure = DummyOperator(
        task_id='task_follow_failure',       
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag)

task_success = DummyOperator(
        task_id='task_success',        
        trigger_rule=TriggerRule.ALL_SUCCESS,        
        dag=dag)

cleanup_task = DummyOperator(
        task_id='cleanup_task',
        trigger_rule=TriggerRule.ONE_SUCCESS,
        dag=dag)


conditional_task.set_upstream(task_start)
task_failure.set_upstream(conditional_task)
task_follow_failure.set_upstream(task_failure)
task_success.set_upstream(conditional_task)
cleanup_task.set_upstream(task_failure)
cleanup_task.set_upstream(task_success)

When should I use airflow branching? how to use airflow branching? airflow branching simple example?

Say you have a task/ETL that needs to run every weekday, and not on the weekend? what would do ? for this reason, i wrote a simple branching example based on week of day.

https://github.com/omidvd79/Big_Data_Demystified/blob/master/airflow/examples/branching_example.py

from airflow.utils.trigger_rule import TriggerRule
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

dag = DAG(
    dag_id='branching_example',
    schedule_interval='@once',
    start_date=dt.datetime(2019, 2, 28)
)


import datetime
datetime.datetime.today()
#Return the day of the week as an integer, where Monday is 0 and Sunday is 6.
my_date = datetime.datetime.today().weekday()


def weekday_or_weekend_branch():
    if my_date == 5:  # only Saturday we rest
        return 'weekend_task'
    else:
        return 'weekday_task'
    

weekday_task = DummyOperator(task_id='weekday_task', dag=dag)
weekend_task = DummyOperator(task_id='weekend_task', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=weekday_or_weekend_branch,
    dag=dag,
)

branch_task >> weekday_task 
branch_task >> weekend_task

How to create , destroy , scale GCP dataproc cluster via Airflow?

This example is also committed to our big data demystified github.

Few things you should know , when you scale , you might want to use preemptible_workers to save costs. the minimum is 0 workers. so feel free to play with the amount of workers. even consider scaling in and out based your ETL jobs.

Furthermore, I used a 1024 GB disk size – this is for performance purposes. test this as well.

As for the bucket, it is important, if you are planning to use transient cluster ( created and destroyed in a short interval after completion of ETL jobs).

import datetime
import os
import logging

from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
#from airflow.operators import dummy_operator
from airflow.contrib.operators.dataproc_operator import DataprocClusterScaleOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator
from airflow.contrib.operators.dataproc_operator import DataProcSparkSqlOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator



from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs


from airflow.utils import trigger_rule

# Output file for job.
 
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

my_cluster_name='omid-test4'
my_region='us-central1'
my_bucket='omid-eu-west-2' #avoid cross region, this only for testing purposes.
my_instance='n1-highmem-4'
my_disk_size=1024 # for performance purposes.
my_zone='' #default
my_idle_delete_ttl=70000 #  time in seconds to auto delete in case of no destroy_cluster


with models.DAG(
        'dataproc_create_and_destroy_poc',
        schedule_interval='@once', 
        default_args=default_dag_args) as dag:
            
     create_dataproc_cluster = DataprocClusterCreateOperator(
    	task_id='create_dataproc_cluster',
    	cluster_name=my_cluster_name,
    	region=my_region,
    	num_workers=2,
    	storage_bucket=my_bucket,
    	#init_actions_uris='zeppelin',
    	master_machine_type=my_instance,
    	master_disk_size=my_disk_size,
    	worker_machine_type=my_instance,
    	worker_disk_size=my_disk_size,
        num_preemptible_workers=0, #use scale out/in operator
        zone=my_zone,
        idle_delete_ttl=my_idle_delete_ttl,
     	dag=dag)
    
     
     dataproc_scale_out = DataprocClusterScaleOperator(
        task_id='dataproc_scale_out',
        cluster_name=my_cluster_name,
        region=my_region,
        num_workers=2,
        num_preemptible_workers=0,
        graceful_decommission_timeout='1h',
        dag=dag)
      
     dummy_ETL = DummyOperator(task_id='dummy_ETL', dag=dag)
     
     dataproc_scale_in = DataprocClusterScaleOperator(
        task_id='dataproc_scale_in',
        cluster_name=my_cluster_name,
        region=my_region,
        num_workers=2,
        num_preemptible_workers=0,
        graceful_decommission_timeout='1h',
        dag=dag)
    
     delete_dataproc_cluster = DataprocClusterDeleteOperator(
    	task_id='delete_dataproc_cluster',
    	cluster_name=my_cluster_name,
    	region=my_region,
     	dag=dag)
 	
	
create_dataproc_cluster >> dataproc_scale_out  >> dummy_ETL >> dataproc_scale_in >> delete_dataproc_cluster

Airflow example of gcs_delete_operator GoogleCloudStorageDeleteOperator – module not found? workaround?

This operator is comming in the next airflow version, 1.10.4. The best would be waiting for 1.10.4 to be released to use it. If you cannot wait, a workaround can be copying the code of this operator [*] in your DAG and use it without importing. Be aware that this may fail as the Hook may not be well configured to accept the delete operator until version 1.10.4 and if this doesn’t work, the solution will be waiting for the release.  Once it will be supported i will updated here a working example

____ 
[*] https://github.com/apache/airflow/blob/master/airflow/contrib/operators/gcs_delete_operator.py

Workaround : Airflow example of gcs_delete_operator with Bash Operator and GSutil

as a workaround i can recommend a combination of Bash Operation

from airflow.models import DAG
import datetime as dt
from airflow.operators import BashOperator

dag = DAG(
    dag_id='GCS_Delete_via_Bash_Example',
    schedule_interval='@once',
    start_date=dt.datetime(2019, 2, 28)
)

GCS_Delete_via_Bash_Example = BashOperator(
        task_id='GCS_Delete_via_Bash_Example', 
        bash_command='gsutil rm -r gs://my_bucket/*',      
        dag=dag)
        
GCS_Delete_via_Bash_Example

This above GCS delete bucket is also available in Our big data demystified GitHub:

Running R Scripts via Airflow GCP Composer?

Composer doesn’t support installing arbitrary binary packages on the worker pods so it is not possible to install R. However, You could create a Docker image that has R installed and launch it via KubernetesPodOperator[*]. 

For future reference, there is also an open pull-request[**] to add an ROperator for running R scripts in Airflow but it hasn’t been merged yet. 

[*]: https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator
[**]: https://github.com/apache/airflow/pull/3115

How to launch , create, scale, and destory GCP Dataproc cluster via Airflow?

import datetime
import os
import logging

from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
#from airflow.operators import dummy_operator
from airflow.contrib.operators.dataproc_operator import DataprocClusterScaleOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator
from airflow.contrib.operators.dataproc_operator import DataProcSparkSqlOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator



from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs


from airflow.utils import trigger_rule

# Output file for job.
 
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

The example of create cluster via Airflow is also committed to our Big Data Demystified GitHub

If you need more airflow examples, Official Airflow examples on GitHub:

https://github.com/apache/airflow/tree/master/airflow/contrib/example_dags

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s