Airflow Demystified | Air Flow examples of jobs | GCP Composer

I Started recently integrating airflow into one my Data Pipelines. Since the learning curve is steep, each working example will be committed into GitHub and shown here. the problem with this wordpress template, is that it is not flexible enough to show code properly, especially for indentation. I apologize for that. You are welcome to add/send me more examples

For now , you can either view the example and indent it on your IDE, or use examples i committed to our git repository:

Big Data Demystified Git

Quick overview of Airflow by the below presentation.

Working example of running a query on bigQuery and saving the results into a new table

import datetime
import os
import logging

from airflow import DAG
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator

today_date = datetime.datetime.now().strftime(“%Y%m%d”)

#table_name = ‘omid.test_results’ + ‘$’ + today_date
table_name = ‘DATA_LAKE_INGESTION_US.Daily_Stats’

yesterday = datetime.datetime.combine(
datetime.datetime.today() – datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
‘start_date’: yesterday,
# To email on failure or retry set ’email’ arg to your email and enable
# emailing here.
’email_on_failure’: False,
’email_on_retry’: False,
# If a task fails, retry it once after waiting at least 5 minutes
‘retries’: 0,
‘retry_delay’: datetime.timedelta(minutes=5),
‘project_id’: models.Variable.get(‘gcp_project’)
}

 
with DAG(dag_id=’Daily_Stats_Dag’,
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:

start = DummyOperator(task_id=’start’)

end = DummyOperator(task_id=’end’)

logging.error(‘trying to bq_query: ‘)
logging.error(‘table name: ‘+table_name)
sql = “”” SELECT * FROM `omid.test1` “””

bq_query = BigQueryOperator(
task_id=’bq_query’,
bql=sql,
destination_dataset_table=table_name,
bigquery_conn_id=’bigquery_default’,
use_legacy_sql=False,
write_disposition=’WRITE_TRUNCATE’,
create_disposition=’CREATE_IF_NEEDED’,
dag=dag
)

start >> bq_query >> end

Working example of loading data into BigQuery table from google cloud storage ( GCS )

import datetime
import os
import logging

from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.operators import dummy_operator
#from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs
from airflow.utils import trigger_rule

# Output file for job.
output_file = os.path.join(
models.Variable.get(‘gcs_bucket’), ‘android_reviews_file_transfer’,
datetime.datetime.now().strftime(‘%Y%m%d-%H%M%S’)) + os.sep
# Path to GCS buckets. no need to add gs://
DST_BUCKET = (‘pubsite_prod_rev/reviews’)
DST_BUCKET_UTF8 = (‘pubsite_prod_rev_ingestion/reviews_utf8’)
# source bucekt is not in out project, and there permission issues with IT. using bash to copy the files to DST_BUCKET
# and then encoding the files to UTF8 to load to BQ (UTF8 is not supported yet. will change in the future)

#SRC_BUCKET = (‘pubsite_prod_rev’)

yesterday = datetime.datetime.combine(
datetime.datetime.today() – datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
‘start_date’: yesterday,
# To email on failure or retry set ’email’ arg to your email and enable
# emailing here.
’email_on_failure’: False,
’email_on_retry’: False,
# If a task fails, retry it once after waiting at least 5 minutes
‘retries’: 1,
‘retry_delay’: datetime.timedelta(minutes=5),
‘project_id’: models.Variable.get(‘gcp_project’)
}

with models.DAG(
‘android_reviews_load_to_bigQuery’,
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
#load from local bucket o GCS table of android
load_to_bq_from_gcs = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id=’load_to_bq_from_gcs’,
source_objects=’*’,
skip_leading_rows=1,
write_disposition=’WRITE_TRUNCATE’, #overwrite?
bucket=DST_BUCKET_UTF8,
destination_project_dataset_table=’DATA.Replica_android_review’
)

# Define DAG dependencies.
#create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

load_to_bq_from_gcs

Another Example for BigQueryToCloudStorageOperator, the only thing you need to pay attention to is that the parameter destination_cloud_storage_uris must be a list, i.e:

destination_cloud_storage_uris1 = ‘gs://data_lake_modeling_us/qv_revenue_snapshot/’ +’dt=’ + today_date+ ‘/qv_revenue-*.avro’

Otherwise your might be getting weird errors such as “Required parameter is missing”. you can have a look at the full example on our git: https://github.com/omidvd79/Big_Data_Demystified/blob/master/airflow/examples/table_snapshots_example.py

How to delete a DAG from GCP composer ( air flow) ?

example from the cli :

gcloud beta composer environments storage dags delete  –environment airflow-cluster-name –location gs://us-central1-airflow-cluster-xxxxxxx-bucket/dags/  myDag.py

how to schedule an AirFlow job to run hourly?

Change line from:     
default_args=default_dag_args) as dag:
to:
default_args=default_dag_args, schedule_interval=’@hourly’) as dag:

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s