How to convert colabs notebook into python and scheduling it in Apache Airflow?

Author: Omid Vahdaty 2.2.2020

using colab from google is wonderful way to get started on data research, but once you want go production you need to schedule it daily via Apache Airflow. You need to convert the packages to make it fully python compatible. Simply exporting your colabs (via colabs–> File –> Download.py) into python sctipt.py will results in lines being automatically commented out.

First you need to confirm you are working on plain vanilla python environment , nothing like colabs, or datalabs.

I used this documentation to write a full example. i started with the python prerequisites.

pip install pandas google-colab google-cloud-bigquery pip install google-cloud-bigquery-storage pip install pyarrow

All the unsupported python code in colabs will be marked out. I started going over them one by one. The first challenge to handle querying bigquery and putting the result in dataframe. the example below is also committed in our big data demystified git. Notice the dataframe will be stored on the airflow machine, you might needs some tuning handle big results sets from BigQuery. example of python script.py

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download query results.
query_string = """select * from t"""
dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

How to schedule python script via Apache Airflow?

  1. you may put your python script.py script in your Apache Airflow dags folder. It appears Airflow will not mind reading a normal python script and ignore and python files not returning a DAG object. or you may put it on your CI/CD server and call the script remotely after connecting via SSH to the CI/CD server
  2. I used the airflow python operator example to write the following Dag example. But that create some problems as the python of the DAG expected some modules that have not been installed yet via pip install. as the imports are evaluated at the import dag level which creates an error loading the dag.
  3. I used bashOperators in the end, just the sure to add a bash operator in your DAG script to install missing packages. Assume the machine will die on you. this is the cloud after all :). Notice – this is a hack. not a recommended approach.
  4. The only problem i could not solve yet – the need path for full path for the python script.py
  5. The example dag is committed in our git

from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import datetime
import os
import logging
from airflow import DAG
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
from airflow.contrib.operators import gcs_to_bq
import time
from pprint import pprint
today_date = datetime.datetime.now().strftime("%Y%m%d")
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}
with DAG(dag_id='monitor_dag', schedule_interval=None, default_args=default_dag_args) as dag:
	bash_prerequisites_install_cmd="""sudo apt install -y python-pip"""
	bash_prerequisites_install = BashOperator(task_id='bash_prerequisites_install', bash_command=bash_prerequisites_install_cmd)
    
	bash_pip_install_cmd="""sudo pip install pandas google-colab google-cloud-bigquery google-cloud-bigquery-storage pyarrow pyTelegramBotAPI"""
	bash_pip_install = BashOperator(task_id='bash_pip_install', bash_command=bash_pip_install_cmd)
	bash_colab_export_script_cmd="""python /home/omid/gs_dags/script.py"""
	bash_colab_export_scriptTask = BashOperator(task_id='bash_colab_export_script', bash_command=bash_colab_export_script_cmd)	
bash_prerequisites_install  >> bash_pip_install >>  bash_colab_export_scriptTask

——————————————————————————————————————————

————————————————————————————–—————————————-

To join our meetups:
https://www.meetup.com/Big-Data-Demystified/

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Leave a Reply