Airflow file sensor example | Airflow Demystified

I recently encountered an ETL job, where the DAG worked perfectly and ended in success, however the underlying resources did not behave as I expected. i.e one of the task was expected to run and external python script. The scripted ended with success, which in turn forced Airflow DAG to report success. However, the python was suppose to create a file in GCS and it didn’t.

I Looked for a solution for this. As it turns out, Airflow Sensor are here to help. Airflow sensor, “sense” if the file exists or not. the operator has some basic configuration like path and timeout. The trick is to understand it is looking for one file and what is the correct the file is found or alternatively not found. The example is also committed in out Git. Notice there are three task. sensor_task is for a simple folder on local linux file system, gcs_file_sensor_yesterday is expected to succeed and will not stop until file will appear. gcs_file_sensor_today is expected to fail thus i added a timeout.

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator
from airflow.contrib.sensors.gcs_sensor  import GoogleCloudStorageObjectSensor
import datetime
from datetime import date, timedelta
import airflow
default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}
today = datetime.datetime.today() 
yesterday = date.today() - timedelta(days=1)
 #print ('Current date and time:', d)
 
# Converting date into YYYY-MM-DD format
#print(d.strftime('%Y-%m-%d'))
#we need yesterday and today date formats, but prefix and suffix are the same in our example.
file_prefix="myPrefiex/"
file_suffix="_file.csv"
file_date=today.strftime('%Y-%m-%d')
full_path_today=file_prefix+file_date+file_suffix
file_date_yesterday=yesterday.strftime('%Y-%m-%d')
full_path_yesterday=file_prefix+file_date_yesterday+file_suffix
with airflow.DAG( "file_sensor_example", default_args= default_args, schedule_interval= "@once"  ) as dag:
    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "file_sensor_task", poke_interval= 30,  filepath= "/tmp/" )
    #we expect yesterday to exist
    gcs_file_sensor_yesterday = GoogleCloudStorageObjectSensor(task_id='gcs_file_sensor_yesterday_task',bucket='myBucketName',object=full_path_yesterday)
    #for this example we expect today not to exist, keep running until 120 timeout, checkout docs for more options like mode  and soft_fail
    gcs_file_sensor_today = GoogleCloudStorageObjectSensor(task_id='gcs_file_sensor_today_task',bucket='myBucketName',object=full_path_today, timeout=120)
  
start_task >> sensor_task  >> gcs_file_sensor_yesterday >> gcs_file_sensor_today >> stop_task

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn: