Airflow file sensor example
Author: Omid Vahdaty 14.11.2019
I recently encountered an ETL job, where the DAG worked perfectly and ended in success, however the underlying resources did not behave as I expected. i.e one of the task was expected to run an external python script. The script ended with success, Airflow DAG reported success. However, the python script was suppose to create a file in GCS and it didn’t.
I Looked for a solution for this. As it turns out, Airflow Sensor is here to help.
Airflow sensor, “senses” if the file exists or not. The operator has some basic configuration like path and timeout.
The trick is to understand What file it is looking for.
The example is also committed in our Git.
Notice there are three tasks:
1. Sensor_task is for “sensing” a simple folder on local linux file system.
2. gcs_file_sensor_yesterday is expected to succeed and will not stop until a file will appear.
3. gcs_file_sensor_today is expected to fail thus I added a timeout.
File sensor code example:
from airflow.contrib.sensors.file_sensor import FileSensor from airflow.operators.dummy_operator import DummyOperator from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor import datetime from datetime import date, timedelta import airflow default_args = { "depends_on_past" : False, "start_date" : airflow.utils.dates.days_ago( 1 ), "retries" : 1, "retry_delay" : datetime.timedelta( hours= 5 ), } today = datetime.datetime.today() yesterday = date.today() - timedelta(days=1) #print ('Current date and time:', d) # Converting date into YYYY-MM-DD format #print(d.strftime('%Y-%m-%d')) #we need yesterday and today date formats, but prefix and suffix are the same in our example. file_prefix="myPrefiex/" file_suffix="_file.csv" file_date=today.strftime('%Y-%m-%d') full_path_today=file_prefix+file_date+file_suffix file_date_yesterday=yesterday.strftime('%Y-%m-%d') full_path_yesterday=file_prefix+file_date_yesterday+file_suffix with airflow.DAG( "file_sensor_example", default_args= default_args, schedule_interval= "@once" ) as dag: start_task = DummyOperator( task_id= "start" ) stop_task = DummyOperator( task_id= "stop" ) sensor_task = FileSensor( task_id= "file_sensor_task", poke_interval= 30, filepath= "/tmp/" ) #we expect yesterday to exist gcs_file_sensor_yesterday = GoogleCloudStorageObjectSensor(task_id='gcs_file_sensor_yesterday_task',bucket='myBucketName',object=full_path_yesterday) #for this example we expect today not to exist, keep running until 120 timeout, checkout docs for more options like mode and soft_fail gcs_file_sensor_today = GoogleCloudStorageObjectSensor(task_id='gcs_file_sensor_today_task',bucket='myBucketName',object=full_path_today, timeout=120) start_task >> sensor_task >> gcs_file_sensor_yesterday >> gcs_file_sensor_today >> stop_task
——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,
feel free to contact me via LinkedIn: