Airflow XComs example
Author: Omid Vahdaty 15.4.2020
Getting started on Airflow Xcom | 6 Examples
Learning Airflow XCom is no trivial, So here are some examples based on use cases I have personaly tested:
- Basic push/pull example based on official example.
- Airflow Push and pull same ID from several operator.
- Push and pull from other Airflow Operator than pythonOperator.
- Push return code from bash operator to XCom.
- Pull between different DAGS
- Querying MySQL directly in Airflow using SQLAlchemy and not using XCom!
Example 1- Airflow XCom basic example:
Some instructions below:
- Read the airflow official XCom docs.
- Go over the official example and astrnomoer.io examples.
- Be sure to understand the documentation of pythonOperator.
- be sure to understand:
context
becomes available only when Operator is actually executed, not during DAG-definition. And it makes sense because in taxonomy of Airflow, XComs are communication mechanism between tasks in realtime: talking to each other while they are running - Read this post about missing “ti”.
- From the example- push1 and puller are missing provide context=True.
- Fix pythonOperator import if needed (based on specific airflow and python version your are running)
- Fix print (based on python version).
Go over airflow DAG – “example_xcom” trigger the DAG For each PythonOperator – and view log –> watch the Xcom section & “task instance details“
For push1 –> key: “value from pusher 1″, value:”[1,2,3]”
For push2: –> key=”return_value”, value={‘a’:’b’}
Corrected airflow xcom example DAG was committed here:
and displayed here:
from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators import PythonOperator args = { 'owner': 'airflow', 'start_date': days_ago(2), } dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example']) value_1 = [1, 2, 3] value_2 = {'a': 'b'} def push(**kwargs): """Pushes an XCom without a specific target""" kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1) def push_by_returning(**kwargs): """Pushes an XCom without a specific target, just by returning it""" return value_2 def puller(**kwargs): """Pull all previously pushed XComs and check if the pushed values match the pulled values.""" ti = kwargs['ti'] # get value_1 pulled_value_1 = ti.xcom_pull(key=None, task_ids='push') if pulled_value_1 != value_1: raise ValueError('The two values differ {pulled_value_1} and {value_1}') # get value_2 pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning') if pulled_value_2 != value_2: raise ValueError('The two values differ {pulled_value_2} and {value_2}') # get both value_1 and value_2 pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning']) if pulled_value_1 != value_1: #notice this was changed to match older version of python. (without the f) raise ValueError('The two values differ {pulled_value_1} and {value_1}') if pulled_value_2 != value_2: #notice this was changed to match older version of python. (without the f) raise ValueError('The two values differ {pulled_value_2} and {value_2}') push1 = PythonOperator( task_id='push',provide_context=True, #provide context is for getting the TI (task instance ) parameters dag=dag, python_callable=push, ) push2 = PythonOperator( task_id='push_by_returning', dag=dag, python_callable=push_by_returning, ) pull = PythonOperator( task_id='puller',provide_context=True,#provide context is for getting the TI (task instance ) parameters dag=dag, python_callable=puller, ) # set of operators, push1,push2 are upstream to pull pull << [push1, push2]
Example 2- Airflow XCom Push and Pull same ID from several operator:
Here is an example to add optional arguments for pythonoperator post.
op_kwargs={'new_study_id': new_study_id,'study_name': study}
and “dynamic” pusher, based on task id, example, the idea is to demonstrate a point where xcom is sent the operator id as part of the push. So you need to pull based on the push operator id:
def pusher_dynamic(my_task_id, **kwargs): #print(ds) print("pushing| my task id: "+str(my_task_id)+" Notice, the task operator id is also pushed, imliciltly") print(kwargs) print(kwargs['ti']) kwargs['ti'].xcom_push(key='value from pusher dynamic', value=int(my_task_id) ) return 'Whatever you return gets printed in the logs' def puller_dynamic(my_task_id,**kwargs): ti = kwargs['ti'] pulled_value = ti.xcom_pull(key='value from pusher dynamic', task_ids='push_'+str(my_task_id) ) print ("pulled value based on pusher_id: " +str(pulled_value)) i=1 push1 = PythonOperator(task_id='push_1', provide_context=True,dag=dag,python_callable=pusher) pull1 = PythonOperator(task_id='pull_1', provide_context=True,dag=dag,python_callable=puller) #notice I am pulling based on push_1 id, expeted value to push is 2, for pull is1, b/c we are sending the push_1 id... i=i+1 push2 = PythonOperator(task_id='push_2', provide_context=True,dag=dag,python_callable=pusher) pull2 = PythonOperator(task_id='pull_2', provide_context=True,dag=dag,python_callable=puller) #trying to create a dynamic pusher called pusher_synami, accpeting a counter and pushes it to the MySQL i=i+1 my_task_id=i push3 = PythonOperator(task_id='push_'+str(i), provide_context=True,python_callable=pusher_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag) pull3 = PythonOperator(task_id='pull_'+str(i), provide_context=True,python_callable=puller_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag) i=i+1 my_task_id=i push4 = PythonOperator(task_id='push_'+str(i), provide_context=True,python_callable=pusher_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag) pull4 = PythonOperator(task_id='pull_'+str(i), provide_context=True,python_callable=puller_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag) push1 >> pull1 >> push2 >> pull2 >> push3 >> pull3 >> push4 >> pull4
Example 3- Airflow XCom push and pull from other Operator than pythonOperator:
read this post.
This is not advisable. All XCom pull/push actions are translated to Insert/Select statements in airflow DB. This will degrade the scheduler performance in time and slow down the whole processing because of high number of pull (queries) or the large amounts of rows retrieved. Full example is committed here:
t1 = BashOperator( task_id="t1", bash_command='echo "{{ ti.xcom_push(key="k1", value="v1") }}" "{{ti.xcom_push(key="k2", value="v2") }}"', dag=dag, ) t2 = BashOperator( task_id="t2", bash_command='echo "{{ ti.xcom_pull(key="k1") }}" "{{ ti.xcom_pull(key="k2") }}"', dag=dag, ) t1 >> t2
Example 4- Airflow XCom Push return code from bash operator to XCom:
Based on this post, all you need is to add to bash operator
xcom_push=True
and
; echo $?
example (also committed in git):
t3 = BashOperator(task_id='t3', xcom_push=True, bash_command="ls -la ; echo $?", dag=dag) t4 = BashOperator(task_id="t4",bash_command='echo "{{ ti.xcom_pull(task_ids="t3") }}"',dag=dag) t3 >> t4
Example 5: Airflow XCom pull between different DAGS
Read this post: The example in this above post did not work for me 🙁
Example 6: Querying MySQL directly in Airflow using SQLAlchemy and not using XCom!
Eventually, it was so frustrating using XCom , started checking how fast and simple would be to query the MySQL db directly from the dag (using a pythonOperator).
I tried using SQLAlchemy because I assumed since airflow is using it, the packages will be set.
However I got errors:
missing module MySQLdb ImportError: this is MySQLdb version (1, 2, 4, 'beta', 4), but _mysql is version (1, 2, 5, 'final', 1)
I was required installing packages:
pip install mysql-python pip install mysqlclient==1.4.6
To learn quickly SQLAlchemy: I used this blog for the select and this blog for the insert, 1 hour later the below sample code was born.
import sqlalchemy as db engine = db.create_engine('mysql://airflow:airflow@1.2.3.4:3306/airflow') def get_name_from_airflow_db(my_name): connection = engine.connect() metadata = db.MetaData() study_table = db.Table('my_table', metadata, autoload=True, autoload_with=engine) #Equivalent to 'SELECT * FROM study' #query = db.select([my_table]) #SQL :SELECT min(myCol) FROM my_table #SQLAlchemy : query = db.select([db.func.min(study_table.columns.myCol)]).where(study_table.columns.myCol2 ==my_name ) ResultProxy = connection.execute(query) ResultSet = ResultProxy.fetchall() #print (ResultSet[:0]) #parsing results [value for value, in ResultSet] print (value) def insert_new_name(my_name): ## tyring now to insert data try: conn = engine.connect() trans = conn.begin() insert_into='INSERT INTO study(study) VALUES (\''+my_name+'\' );' conn.execute(insert_into) trans.commit() print ("inserted: "+my_name) return ("sababa") except: return ("not sababa") new_name="MyLatestSTR" insert_new_name(my_name) get_name_from_airflow_db(my_name)
full example combined with Airflow dag and PythonBranchOperator (also committed to git)
import os import logging from datetime import timedelta, date import datetime from airflow import DAG from airflow import models from airflow.contrib.operators import bigquery_to_gcs from airflow.contrib.operators import gcs_to_bq from airflow.operators.dummy_operator import DummyOperator from airflow.operators import BashOperator from airflow.contrib.operators import gcs_to_gcs from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.utils import trigger_rule from airflow.operators import PythonOperator from airflow.utils import trigger_rule from airflow.operators.python_operator import BranchPythonOperator # for insert/select form Aiflow DB import sqlalchemy as db engine = db.create_engine('mysql://airflow:airflow@1.2.3.4:3306/airflow') ##################################### yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) ################################ ## default args for airflow ################################ default_dag_args = { # Setting start date as yesterday starts the DAG immediately when it is # detected in the Cloud Storage bucket. 'start_date': yesterday, 'owner': 'Jutomate', # To email on failure or retry set 'email' arg to your email and enable # emailing here. 'email_on_failure': False, 'email_on_retry': False, # If a task fails, retry it once after waiting at least 5 minutes 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } ################################ ## insert to airflow db # ################################ def insert(input): ## tyring now to insert data try: conn = engine.connect() trans = conn.begin() insert_into='INSERT INTO table_name(id,value) VALUES (\''+id+'\',\'' +input+ '\' );' conn.execute(insert_into) trans.commit() return ("insert input!!!") except: return ("unable insert ") ###################################### ## select from airflow db ###################################### def is_contidional_select(input): connection = engine.connect() metadata = db.MetaData() table_name = db.Table('table_name', metadata, autoload=True, autoload_with=engine) #table contain #1. col_id #2. col_value #SQL :SELECT min(id) FROM table_name query = db.select([db.func.min(dicom_dedup.columns.col_id)]).where(dicom_dedup.columns.col_value ==input ) ResultProxy = connection.execute(query) ResultSet = ResultProxy.fetchall() #parsing results v=[value for value, in ResultSet] if v[0]: print ("input found") return True else: print ("input not found") return False ####################### ## conditional logic # ####################### def is_contidional(input): if is_contidional_select(input): return True else: print( insert(input) ) return False ################################## ## conditional function ################################### def conditional_func(input): if is_contidional(input) == True: # only Saturday we rest return 'stop' else: return 'continue' with models.DAG( 'dag_with_branching_and_xcom', # Continue to run DAG once per day schedule_interval=None, default_args=default_dag_args) as dag: branching = BranchPythonOperator(task_id='branch',python_callable=conditional_func,op_kwargs={'input': input},dag=dag) duumy_stop = DummyOperator(task_id='stop') dummy_continue = DummyOperator(task_id='continue')
——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,
feel free to contact me via LinkedIn:
1 thought on “Airflow XComs example”