Airflow XComs example | Airflow Demystified

Getting started on Airflow Xcom | 5 Examples

Getting started on Airflow XCom is non trivial, So I put some links to post examples, and put all the use case I have personally tested here. I have published several examples here:

  1. Basic push/pull example based on official example
  2. AirPush and pull same ID from several operator
  3. Push and pull from other Airflow Operator than pythonOperator
  4. Push return code from bash operator to XCom
  5. Pull between different DAGS

Example 1 : Airflow XCom basic example (correcting the example posted):

Some instructions below

  1. Read the airflow official XCom docs:

2. go over the official example and astrnomoer.io examples

3. be sure to understand the documentation of python operator

4. be sure to understand: context becomes available only when Operator is actually executed, not during DAG-definition. And it makes sense because in taxonomy of AirflowXComs are communication mechanism between tasks in realtime: talking to each other while they are running

5. read this post about missing “ti”

6. push1 and puller are missing provide context=True

7.Fix python operator import if needed (based on specific airflow and python version your are running)

8.Fix print (based on python version)

example_xcom–> trigger_dag–>For each Python operator –> view log –> watch the Xcom section & “task instance details

for push1 –> key: “value from pusher 1″, value:”[1,2,3]”

for push2: –> key=”return_value”, value={‘a’:’b’}

corrected airflow xcom example DAG was committed here:

and displayed here:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators import PythonOperator
args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2
def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    if pulled_value_1 != value_1:
        raise ValueError('The two values differ {pulled_value_1} and {value_1}')
    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    if pulled_value_2 != value_2:
        raise ValueError('The two values differ {pulled_value_2} and {value_2}')
    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
    if pulled_value_1 != value_1:
        #notice this was changed to match older version of python. (without the f)
        raise ValueError('The two values differ {pulled_value_1} and {value_1}')
    if pulled_value_2 != value_2:
         #notice this was changed to match older version of python. (without the f)
        raise ValueError('The two values differ {pulled_value_2} and {value_2}')
push1 = PythonOperator(
    task_id='push',provide_context=True, #provide context is for getting the TI (task instance ) parameters
    dag=dag,
    python_callable=push,
)
push2 = PythonOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
)
pull = PythonOperator(
    task_id='puller',provide_context=True,#provide context is for getting the TI (task instance ) parameters
    dag=dag,
    python_callable=puller,
)
# set of operators, push1,push2 are upstream to pull
pull << [push1, push2]

Example 2: Airflow XCom Push and Pull same ID from several operator

example to add optional arguments python operator post.

op_kwargs={'new_study_id': new_study_id,'study_name': study}

and “dynamic” pusher, based on task id, example, the idea is to demostracte a point where eh xcom is send the operator id as part of the push. so you need to pull based on the push operator id:

def pusher_dynamic(my_task_id, **kwargs):
	#print(ds)
	print("pushing| my task id: "+str(my_task_id)+" Notice, the task operator id is also pushed, imliciltly")
	print(kwargs)
	print(kwargs['ti'])
	kwargs['ti'].xcom_push(key='value from pusher dynamic', value=int(my_task_id) )
	return 'Whatever you return gets printed in the logs'
def puller_dynamic(my_task_id,**kwargs):
        ti = kwargs['ti']
        pulled_value = ti.xcom_pull(key='value from pusher dynamic', task_ids='push_'+str(my_task_id) )
        print ("pulled value based on pusher_id: " +str(pulled_value))
i=1
push1 = PythonOperator(task_id='push_1',	provide_context=True,dag=dag,python_callable=pusher)
pull1 = PythonOperator(task_id='pull_1',	provide_context=True,dag=dag,python_callable=puller)
#notice I am pulling based on push_1 id, expeted value to push is 2, for pull is1,  b/c we are sending the push_1 id...
i=i+1
push2 = PythonOperator(task_id='push_2', 	provide_context=True,dag=dag,python_callable=pusher)
pull2 = PythonOperator(task_id='pull_2',       	provide_context=True,dag=dag,python_callable=puller)
#trying to create a dynamic pusher called pusher_synami, accpeting a counter  and pushes it to the MySQL 
i=i+1
my_task_id=i
push3 = PythonOperator(task_id='push_'+str(i),        provide_context=True,python_callable=pusher_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag)
pull3 = PythonOperator(task_id='pull_'+str(i),        provide_context=True,python_callable=puller_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag)
i=i+1
my_task_id=i
push4 = PythonOperator(task_id='push_'+str(i),        provide_context=True,python_callable=pusher_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag)
pull4 = PythonOperator(task_id='pull_'+str(i),        provide_context=True,python_callable=puller_dynamic,op_kwargs={'my_task_id': my_task_id},dag=dag)
push1 >> pull1 >> push2 >> pull2 >> push3 >> pull3 >> push4 >> pull4

Example 3: Airflow XCom push and pull from other Airflow Operator than pythonOperator

read this post.

this is not advisable. All XCom pull/push actions are translated to Insert/Select statements in airflow DB. This will degrade the scheduler performance in time and slow down the whole processing either because of high number of pull(queries) run or the large amounts of rows retrieved which will be retrieved through Full Table scans instead of Index based scans. full example is committed here:

t1 = BashOperator(
    task_id="t1",
    bash_command='echo "{{ ti.xcom_push(key="k1", value="v1") }}" "{{ti.xcom_push(key="k2", value="v2") }}"',
    dag=dag,
)
t2 = BashOperator(
    task_id="t2",
    bash_command='echo "{{ ti.xcom_pull(key="k1") }}" "{{ ti.xcom_pull(key="k2") }}"',
    dag=dag,
)
t1 >> t2

Example 4: Airflow XCom Push return code from bash operator to XCom:

based on this post, all you need to is add to bash operator

xcom_push=True

and

 ; echo $?

example (also committed in git):

t3 = BashOperator(task_id='t3', xcom_push=True, bash_command="ls -la ; echo $?", dag=dag)
t4 = BashOperator(task_id="t4",bash_command='echo "{{ ti.xcom_pull(task_ids="t3") }}"',dag=dag)
t3 >> t4

Example 5: Airflow XCom pull between different DAGS

read this post: The example in this above post did not work for 🙁

TBD.

 

Example 6: Querying MySQL directly in Airflow using SQLAlchemy and not using XCom!

eventually – it was so frustrating using XCom , started checking how fast and simple would be to query the MySQL db directly from the dag (using a python operator).

I tried using SQLAlchemy because i assumed since airflow is using it the packages will be set.

However i got errors:

 

missing module MySQLdb
 
ImportError: this is MySQLdb version (1, 2, 4, 'beta', 4), but _mysql is version (1, 2, 5, 'final', 1)

I was required installing packages:

pip install mysql-python
pip install mysqlclient==1.4.6

To learn quickly SQLAlchemy: I used this blog for the select and this blog for the insert, 1 hour later the below sample code was born.

import sqlalchemy as db
engine = db.create_engine('mysql://airflow:airflow@1.2.3.4:3306/airflow')
def get_name_from_airflow_db(my_name):
	connection = engine.connect()
	metadata = db.MetaData()
	study_table = db.Table('my_table', metadata, autoload=True, autoload_with=engine)
#Equivalent to 'SELECT * FROM study'
#query = db.select([my_table])
#SQL :SELECT min(myCol) FROM my_table
#SQLAlchemy :	
	query = db.select([db.func.min(study_table.columns.myCol)]).where(study_table.columns.myCol2 ==my_name )
	ResultProxy = connection.execute(query)
	ResultSet = ResultProxy.fetchall()
	#print (ResultSet[:0])
	#parsing results
	[value for value, in ResultSet]
	print (value)
def insert_new_name(my_name):
	## tyring now to insert data
	try:
		conn = engine.connect()
		trans = conn.begin()
		insert_into='INSERT INTO study(study)  VALUES (\''+my_name+'\' );'
		conn.execute(insert_into)
		trans.commit()
		print ("inserted: "+my_name)
		return ("sababa")
	except:
		return ("not sababa")
new_name="MyLatestSTR"
insert_new_name(my_name)
get_name_from_airflow_db(my_name)

full example combined with Airflow dag and PythonBranchOperator (also committed to git)

import os
import logging
from datetime import timedelta, date
import datetime   
from airflow import DAG
from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
from airflow.contrib.operators import gcs_to_gcs
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.utils import trigger_rule
from airflow.operators import PythonOperator
from airflow.utils import trigger_rule
from airflow.operators.python_operator import BranchPythonOperator
# for insert/select form Aiflow DB
import sqlalchemy as db
engine = db.create_engine('mysql://airflow:airflow@1.2.3.4:3306/airflow')
#####################################
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
################################
## default args for airflow
################################
default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    'owner': 'Jutomate',    
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}
################################
## insert to airflow db  #
################################
def insert(input):
	
	## tyring now to insert data
	try:
		conn = engine.connect()
		trans = conn.begin()
		insert_into='INSERT INTO table_name(id,value)  VALUES (\''+id+'\',\'' +input+ '\' );'
		conn.execute(insert_into)
		trans.commit()
		return ("insert input!!!")
	except:
		return ("unable insert ")
######################################
## select from airflow db
######################################
def is_contidional_select(input):
	connection = engine.connect()
	metadata = db.MetaData()
	table_name = db.Table('table_name', metadata, autoload=True, autoload_with=engine)
	#table contain
	#1. col_id
	#2. col_value
	#SQL :SELECT min(id) FROM table_name 	
	query = db.select([db.func.min(dicom_dedup.columns.col_id)]).where(dicom_dedup.columns.col_value ==input )
	ResultProxy = connection.execute(query)
	ResultSet = ResultProxy.fetchall()
	#parsing results
	v=[value for value, in ResultSet]
	
	if v[0]: 
		print ("input found")
		return True
	else:
		print ("input not found")
		return False
	
#######################
## conditional logic  #
#######################
def is_contidional(input):
		if is_contidional_select(input):
				return True
		else:
				print( insert(input) )	
				return False	
		
##################################
## conditional function
###################################
def conditional_func(input):
    if is_contidional(input) == True:  # only Saturday we rest
        return 'stop'
    else:
        return 'continue'
with models.DAG(
        'dag_with_branching_and_xcom',
        # Continue to run DAG once per day
        schedule_interval=None,
        default_args=default_dag_args) as dag:
		branching = BranchPythonOperator(task_id='branch',python_callable=conditional_func,op_kwargs={'input': input},dag=dag)
		duumy_stop 			= DummyOperator(task_id='stop')
		dummy_continue 		= DummyOperator(task_id='continue') 
		

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn: