Data Engineering, GCP, python, windows

How to Install Python and PIP on Windows running on GCP?

This is blog a “cut the bulltishit” and give me what i need to get started blog. end to end if this is your first time – 1 hour you are up and running.

The business use case: the data science team need a server with GUI to run python scripts daily and keep changing them manually until the get the POC results expected.

The technical steps to install GCE machine with windows OS and GUI

  1. Install GCE machine, like any other in GCE, but change the boot disk to run “windows server” and version should be: ” Windows server 2016 datacenter with desktop Experience” . In addition, allow access Scope “Allow full access to all cloud API’s
  2. you need to confirm RDP network access , port 3389
  3. press the down facing arrows on the right of the RDP button to set password for your users.
  4. press RDP on the GCE console, if you are using chrome, install the chrome RDP plugin, it will simply your RDP experience , use the password from “3” and no need for domain, Notice you can change the screen size in the plugin” options

Technical steps to install Python and pip on window server machine

  1. Disable IE Enhanced Security, I used this manual . basically : Server Manager –> Local Server –> IE Enhanced Security Configuration >> Off
  2. Installed python using this blog. don’t for get to right click and “run as administrator“. browse to : https://www.python.org/downloads/windows/ download latest version. customize the installation to ensure all components are installed including pip, and ADD to PATH.
  3. To ensure python is accessible everywhere = ensured the path is updated using this blog. sample path :”C:\Users\Username\AppData\Local\Programs\Python\Python37
  4. If you have not installed pip, Install pip using this blog, don’t forget to add it to the path.
  5. used this video to schedule python to run daily. make sure to mark in the properties, that the job should run even if the user is not logged in.

Further more, some official Google compute engined tutorials


——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

airflow, Data Engineering

How to convert colabs notebook into python and scheduling it in Apache Airflow?

How to convert colabs notebook into python and scheduling it in Apache Airflow?

using colab from google is wonderful way to get started on data research, but once you want go production you need to schedule it daily via Apache Airflow. You need to convert the packages to make it fully python compatible. Simply exporting your colabs (via colabs–> File –> Download.py) into python sctipt.py will results in lines being automatically commented out.

First you need to confirm you are working on plain vanilla python environment , nothing like colabs, or datalabs.

I used this documentation to write a full example. i started with the python prerequisites.

pip install pandas google-colab google-cloud-bigquery pip install google-cloud-bigquery-storage pip install pyarrow

All the unsupported python code in colabs will be marked out. I started going over them one by one. The first challenge to handle querying bigquery and putting the result in dataframe. the example below is also committed in our big data demystified git. Notice the dataframe will be stored on the airflow machine, you might needs some tuning handle big results sets from BigQuery. example of python script.py

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download query results.
query_string = """select * from t"""
dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

How to schedule python script via Apache Airflow?

  1. you may put your python script.py script in your Apache Airflow dags folder. It appears Airflow will not mind reading a normal python script and ignore and python files not returning a DAG object. or you may put it on your CI/CD server and call the script remotely after connecting via SSH to the CI/CD server
  2. I used the airflow python operator example to write the following Dag example. But that create some problems as the python of the DAG expected some modules that have not been installed yet via pip install. as the imports are evaluated at the import dag level which creates an error loading the dag.
  3. I used bashOperators in the end, just the sure to add a bash operator in your DAG script to install missing packages. Assume the machine will die on you. this is the cloud after all :). Notice – this is a hack. not a recommended approach.
  4. The only problem i could not solve yet – the need path for full path for the python script.py
  5. The example dag is committed in our git

from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import datetime
import os
import logging
from airflow import DAG
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
from airflow.contrib.operators import gcs_to_bq
import time
from pprint import pprint
today_date = datetime.datetime.now().strftime("%Y%m%d")
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}
with DAG(dag_id='monitor_dag', schedule_interval=None, default_args=default_dag_args) as dag:
	bash_prerequisites_install_cmd="""sudo apt install -y python-pip"""
	bash_prerequisites_install = BashOperator(task_id='bash_prerequisites_install', bash_command=bash_prerequisites_install_cmd)
    
	bash_pip_install_cmd="""sudo pip install pandas google-colab google-cloud-bigquery google-cloud-bigquery-storage pyarrow pyTelegramBotAPI"""
	bash_pip_install = BashOperator(task_id='bash_pip_install', bash_command=bash_pip_install_cmd)
	bash_colab_export_script_cmd="""python /home/omid/gs_dags/script.py"""
	bash_colab_export_scriptTask = BashOperator(task_id='bash_colab_export_script', bash_command=bash_colab_export_script_cmd)	
bash_prerequisites_install  >> bash_pip_install >>  bash_colab_export_scriptTask

——————————————————————————————————————————

————————————————————————————–—————————————-

To join our meetups:
https://www.meetup.com/Big-Data-Demystified/

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

airflow, AWS, Cloud SQL, Data Engineering, GCP

Airflow Demystified | Everything you need to know about installing a DIY LocalExecutor Airflow cluster backed by MySQL Cloud SQL

Airflow Demystified | Everything you need to know about installing a DIY LocalExecutor Airflow cluster backed by MySQL Cloud SQL

Whether you are using Google’s Composer or you want experiment with Airflow

there many reason to start using Airflow:

  1. The first and foremost – orchestration visibility and management, full
  2. The second would be – Cloud Vendor Agnostic – u can used it in any cloud like any other open source tech.
  3. Large community
  4. Massive amount of connectors built in.

Reasons to use Airflow DIY cluster:

  1. high customization options like type of several types Executors.
  2. Cost control a GCP compsor starts with a min of 3 nodes – about 300$ monthly. compared with a DYI cluster – start with 5$ monthly for a a Sequential Executor Airflow server or about 40$ for a Local Executor Airflow Cluster backed by Cloud MySQL (with 1 CPU and 4 GB RAM)
  3. Our moto in Big Data Demystified community is : Faster , Cheaper, Simpler. It is easier to connect to the data in the DIY cluster in order to perform some custom analytical reports. such Cost Per DAG (if you can wait for the blog about this, start reading bout cost per Query per user in BigQuery)

Getting started on Airflow? want to learn Airflow by example?

start playing with GCP composer, get to know the basic functionality of airflow.

We already of a good airflow blog for you with examples and slides

How to Install a Sequential Executor airflow server? (standalone, no concurrency) ?

we already have a basic airflow installation blog for your.

 

Installing a DIY Airflow cluster in LocalExecutor mode?

Tips for DIY cluster

  1. My first tip would RTFM… read the airflow docs.
  2. Generally speaking – get your self very familiar with Airflow.cfg, if you get lost in the documentations… here is a working example of airflow.cfg configuration file.

 

There several thing you need to consider when deploying a DIY Airflow Cluster

  1. You must have a DB for concurrency in LocalExceutor
  2. The Backed DB in GCP should be Cloud SQL (cheaper instance would do), and then you can connect the Cloud SQL to BQ Via Federated queries . In AWS consider using AWS Aurora for maximum flexibility in billing . To configure Cloud SQL read our blog
  3. If you are using GCP Composer , and you want to connecting the Backed DB, it is not going to be straight forward, you are going to need SQL proxy or SQLAlchemy, and no query federation to BQ will be supported..
  4. You are going to need to create a service on the OS level for start / stop of airflow. Or a basic start airflow / Stop ariflow script if you are lazy like me , just dont forget to stop/start the FUSE from section 5.
  5. For Security at Rest and high availability of data, Consider using FUSE on top for GCS or AWS s3 for DAGs and LOGS folder.
  6. For Security in Motion consider adding to HTTPS and configure your HTTPS certificate in your Airflow cluster.
  7. For Security in Access consider using Airflow GMAIL Authentication or any other solution like LDAP
  8. For basic airflow monitoring on the ETL level, consider the cool using Airflow Slack Integration as explained in our blog. or email integration. There are more options, we shall cover them in the future.

Some more of our blogs about Airflow

The another installation manual for SequentialExecutor:

Airflow SequentialExecutor Installation manual and basic commands

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Big Query, Cloud SQL, Data Engineering, GCP

How can I connect BigQuery and GCP Cloud SQL? | GCP Cloud SQL federated queries

The use case? we wanted to analyze the airflow mysql DB via BigQuery so we can created dashboard of cost per DAG.

In order to run a query from GCP BigQuery on a table in GCP Cloud SQL , you need to perform the following steps:

you must first create a new connection through Bigquery to the cloud SQL instance [1].

on your BigQuery console

  1. press “+ADD DATA” –> “create connection” –> MySQL
  2. connection ID –> take if from te Cloud SQL instance , it is called “Connection Name” and is of following format: MyProject:us-central1:my-cloud-sql-instance-name

Once the connection with the Cloud SQL instance has been created, you can create new queries in Bigquery, retrieving data from a database in the Cloud SQL instance [2].

Bear in mind that the “connection id” is obtained in the first step to be able to enter it in the query in the second step.

Example of federated Query (not as stated in the BQ documentations):

  SELECT * FROM EXTERNAL_QUERY("<YOUR PROJECT ID>.<YOUR LOCATION>.<YOUR CONNECTION ID>", "SELECT * FROM DATASET.TABLE")

example:

create or replace view `gap---all-sites-1245.DATA_LAKE.airflow_dag` as
SELECT * FROM EXTERNAL_QUERY("myProject.us.connection-name", "SELECT * FROM airflow.dag")

notice the “Your connection ID” is the id you supplied while creating the external connection in BigQuery.

[1] https://cloud.google.com/bigquery/docs/cloud-sql-federated-queries#create_a_connection_resource
[2] https://cloud.google.com/bigquery/docs/cloud-sql-federated-queries#federated_query_syntax

Common errors:

Common errors: connections

“Invalid table-valued function EXTERNAL_QUERY Failed to connect to MySQL database. Error: MysqlErrorCode(2013): Lost connection to MySQL server during query at”

Common errors: Regions

The BigQuery dataset and the Cloud SQL instance must be in the same region, or same location if the dataset is in a multi-region location such as US and EU. check you region is supported.

common error Public IP :

Cloud SQL must have a public IP

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

airflow, AWS, Cloud SQL, Data Engineering, GCP

Airflow MySQL Integration – how to?

Airflow MySQL Integration – how to?

The default installation of Airflow come with SQLlite as backend. this mode does not allow concurrency in your DAG’s.

In this blog we will upgrade an Airflow vanilla installation to work with localExceutor and GCP Cloud SQL (MySQL).

  1. Stop Airflow and change the airflow configuration file: airflow.cfg to contain “LocalExecutor”, Note: SequentialExecutor is the default. 🙂

#stop server:  Get the PID of the service you want to stop 
ps -eaf | grep airflow
# Kill the process 
kill -9 {PID}

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor

2. If you are using cloud, setup AWS RDS Aurora (MySQL) or GCP CloudSQL with minimal resources. Point SQL Alchemy to MySQL (if using MySQL)

sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

  • make sure user/pass are created to match your needs. also set the password for the root user. make sure you also set the password of the root user.
  • Notice the user/pass are in plain text. this is merely a temporary workaround

3.this section is the less recommened approach to setup mysql in GCP. if you are using GCP cloud sql skip to t section 3.1. To Setup MySQL (if using MySQL traditional approach) , creating a new db called airflow and grant the above user permissions.

CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci; 
CREATE USER 'airflow'@'34.68.35.27' IDENTIFIED BY 'airflow';
grant all on airflow.* TO 'airflow'@'34.68.35.27' IDENTIFIED BY 'airflow'; 

if You are using GCP Cloud SQL, You can use the GUI to create the DB. you can also do it from the airflow machine via installing a mysql client (assuming ubuntu18.04), Note I recommend using both way to connect to the MySQL DB as one is easy, and the other is to test network access from Airflow Cluster to Cloud SQL

# AWS
sudo apt install -y mysql-client-core-5.7 #AWS
sudo apt-get install python-dev libmysqlclient-dev pip install MySQL-python


#GCP 
sudo apt install -y default-mysql-client #GCP debian
mysql -u airflow -h 31.2.3.4 -pairflow

3.1 if you are are using GCP CloudSQL console , you can do the Airflow DB setup as follows:

  • create airflow user via the console (allow any host)
  • in the console, connect via cloud shell to the DB using airflow user and create airflow db: “CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
  • set via the console: explicit_defaults_for_timestamp to on

if you are using GCP , you can connect via cloud shell and the following cli cmd (remember to set the password for airflow user):

gcloud sql connect airflow --user=airflow --quiet

4. Initiate the Airflow tables via, Notice – this is done only once when setting up and environment.

airflow initdb

confirm there are no errors. if you are getting “explicit_defaults_for_timestamp” error see the end of this document for a suggested solution how to resolve.

5. Start Airflow

airflow webserver -p 8080

Common issues / errors

  • Cant connect to MySQL/ unable to connect mysql – check network is allowed, check user/password. test user/pass via Cloud Shell – this will bypass network problem for u. and test network via MySQK CLI once user/pass are confirmed.

  • Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql

  1. In the Google Cloud Platform Console, create a new GCP Console project, or open an existing project by selecting the project name. …see naming guidelines
  2. Open the instance and click Edit.
  3. Scroll down to the Flags section.
  4. To set a flag that has not been set on the instance before, click Add item, choose the flag “explicit_defaults_for_timestamp” from the drop-down menu, and set its value.
  5. Click Save to save your changes.
  6. Confirm your changes under Flags on the Overview page.

  • Notice: all DAGs will be turned off

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn: