Blog

architecture, Big Query, cost reduction, GCP Big Data Demystified, superQuery

80% Cost Reduction in Google Cloud BigQuery | Tips and Tricks | Big Query Demystified | GCP Big Data Demystified #2

The second in series of lectures GCP Big Data Demystified. In this lecture I will share with how I saved 80% of BigQuery monthly billing of investing.com. Lectures slides:

Videos from the meetup:

Link to previous lecture GCP Big Data Demystified #1

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

To learn more about superQuery:

NoSQL

RedisGears

RedisGears utilizes an embedded Python interpreter to allow users to run their Python-based Map-Reduce queries on Redis and It serves as an integration mechanism that allows combining functionality of Redis Modules like RedisGraph, RediSearch, and RedisAI.
You are invited to listen and understand how RedisGears works behind the scenes and how to achieve capabilities that without gears would be very hard to achieve

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

analytics

Google Analytics dataset demystified | BigQuery ga_sessions

The first you need to know, I am referring to go google analytics 360. one click and the data is downloaded automatically to BigQuery. Notice this ga_session table is sharded, be sure to understand the differences between a sharded table vs partitioned table.

Documentation and Sample data can be seen here, below is basic examples:

SELECT *

FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170101`

gs session query date range variable suffix

SELECT
  *
FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
  _table_suffix BETWEEN '20160801'
  AND '20170801'

ga session date range which is dynamic (last 2 weeks):

SELECT
  *
FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
  _table_suffix BETWEEN FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 15 DAY))
  AND FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))

The schema of the ga_session table is rather complex as it contains many complex data structures for the hits. the hits records contain all the custom dimensions user define in google analytics.

Analyzing the ga_session and extraction records and custom dimensions should like along the line:

SELECT
trafficSource.campaignCode AS Campaign_Code,
(SELECT distinct value FROM h.customDimensions where index=4) as cd_4

FROM
  `MyProject.MyDataSEt.ga_sessions_*` t, t.hits h

I highly recommend the following article, as it is an excellent standard sql BigQuery cookbook by Johan van de Werken with steps by step explanations and examples.

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

BI

Tableau Demystified | Quick introduction in 10 minutes


A 10 minutes  video demonstration – introduction to Tableau

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



airflow

Similarweb API data pipeline| Airflow & Cost reduction

The documentation of similarweb api is available here:

API integration documentation.

Sample api call for Desktop visits, Notice domain, key,start, end,country, granularity. This returns the average page views per desktop visit for the given domain & country.

curl --location --request GET 'https://api.similarweb.com/v1/website/bbc.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2017-11&end_date=2018-01&country=gb&granularity=monthly&main_domain_only=false&format=json'

for investing (notice the max date range is 2 months back, this blog was written in 2020-01, so 2019-11 is latest):

curl –location –request GET ‘https://api.similarweb.com/v1/website/investing.com/traffic-and-engagement/visits?api_key={{similarweb_api_key}}&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json’

so….

What we need is

  1. list of domains , country codes, dates,API’s
  2. airflow script to run in loop over all the possible permutations of API calls
  3. List of competitors to compare traffic.

The challenges with this specific airflow DAG for this job are

  1. using dynamic operators – controlling how many iterations will be used in parallel by using dynamic task id’s.
  2. continuing after the loop of airflow operators by controlling the downstream order of execution. e.g inside loop: start.set_downstream(bash_*)
  3. the amount of Operators in this current DAG is a bit big. Try to consider what should be the trigger for the load_to_bg_GET_DESKTOP_TRAFFIC , perhaps one_success? (assume something will fail in the api calls). A trigger_conditional example is committed in out git:
  4. Consider moving bash_gsutil_mv_files_to_ingestion further downstream to cut the amount of operators by half. I left it there simply b/c I wanted to test behaviour of airflow using 2 operators in the same loop.

The example is also commit in out git

from datetime import timedelta, date

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


### start & end date = delta period.
## -3 days?
delta=-2 
start_date = datetime.date.today() + datetime.timedelta(delta)
end_date = datetime.date.today()

 
with models.DAG('similar_web_api_pipeline', schedule_interval=None, default_args=default_dag_args) as dag:

	start = DummyOperator(task_id='start')
	wait 	= DummyOperator(task_id='wait')
	
	
	for single_date in daterange(start_date, end_date):
		bash_cmd="""curl --location --request GET 'https://api.similarweb.com/v1/website/big-data-demystified.ninja/traffic-and-engagement/visits?api_key=myApiKey123456789&start_date=2019-11&end_date=2019-11&country=gb&granularity=monthly&main_domain_only=false&format=json' > /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json'	
		bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_api_call_GET_DESKTOP_TRAFFIC'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd)
		
		bash_cmd2="""gsutil mv /tmp/file_"""+single_date.strftime("%Y%m%d")+'.json gs://data_lake/similar_web_desktop_traffic/'	
		bash_gsutil_mv_files_to_ingestion = BashOperator(task_id='bash_gsutil_mv_files_to_ingestion'+single_date.strftime("%Y%m%d"),bash_command=bash_cmd2)
		#bash_cmd="""ls"""
		#bash_api_call_GET_DESKTOP_TRAFFIC = BashOperator(task_id='bash_opr_'+str(item),bash_command=bash_cmd)
		start.set_downstream(bash_api_call_GET_DESKTOP_TRAFFIC)
		bash_api_call_GET_DESKTOP_TRAFFIC.set_downstream(bash_gsutil_mv_files_to_ingestion)
		bash_gsutil_mv_files_to_ingestion.set_downstream(wait)

				
	load_to_bg_GET_DESKTOP_TRAFFIC = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    	task_id='load_to_bg_GET_DESKTOP_TRAFFIC',
    	source_objects=['*'],
     	write_disposition='WRITE_TRUNCATE', #overwrite?
    	create_disposition='CREATE_IF_NEEDED',
    	bucket=DST_BUCKET,
    	destination_project_dataset_table=dst_table,
    	autodetect='true')

	end 	= DummyOperator(task_id='end')

wait 	>> load_to_bg_GET_DESKTOP_TRAFFIC 	>> end

Here is the cost reduction bit :

  1. Use BQ to load the data via external table.then use CTAS or another Airflow operator to convert the table from ROW to column.
  2. Alternatively, you can load the json directly to BQ via Naive table.
  3. further, change the process into delta , i.e let it run each month and get last available month

If you are wondering how to parse the json once BQ identified the schema via auto schema. below is an example. Notice the different parsing methods when using array of structs, struct of structs and structs. Notice, how i avoided the use of unnest since i know the api call will only return one line.

CREATE OR REPLACE VIEW
  `MyProject.DATA_LAKE.similar_web_desktop_transformed` AS
SELECT
  avg_visits,
  date,
  device,
  r.country AS country,
  r.domain AS domain,
  r.granularity AS granularity
FROM (
  SELECT
    CAST(visits[
    OFFSET
      (0)].visits AS int64) AS avg_visits,
    # array of struct
    visits[
  OFFSET
    (0)].date AS date,
    # array of struct
    meta.device AS device,
    #   struct
    meta.request AS r,
    #struct of struct
    #meta.status as status
  FROM
    `MyProject.DATA_LAKE.similar_web_desktop` )

  
 

Lets say you want to offload the the domain list , to external data source like db. how would you do that? well, one way is to keep it on the DB of airflow, i.e using xcomm. But then you would end up keeping a large DB for airflow.

I came up with another way Using GCS. Keep a file holing only a filename as the desired value in the GCS , and then read it from airflow as in the below example. Notice it just a “fofo” example to illustrate a concept that worked for me and saved money or larger DB instances. Notice the dynamic name i provided the bask task_id. A bit different as airflow only allows alphanumeric task names. the full example is also committed in out github

from google.cloud import storage
client = storage.Client()
i=0 
with models.DAG('loop_over_gcs_bucket_files_example', schedule_interval=None, default_args=default_dag_args) as dag:

	start = DummyOperator(task_id='start')
	wait  = DummyOperator(task_id='wait',trigger_rule=TriggerRule.ONE_SUCCESS)	
	for blob in client.list_blobs('data_lake', prefix='similar_web_desktop_traffic'):
		#task id must only contain alphanumeric chars
		bash_cmd="echo "+ str(blob.name)
		i=i+1
		bash_operator = BashOperator(task_id='bash_operator'+str(i),bash_command=bash_cmd)
		start.set_downstream(bash_operator)
		bash_operator.set_downstream(wait)

	end 	= DummyOperator(task_id='end')
wait >> end

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

Data Science, meetup

If you’re using statistics there is a chance you are wrong

“if you’re using statistics there is a chance you are wrong”

you will get a different perspective of the usage of numbers and statistics.
You will learn how to judge results and not treat numeric result as the truth just because the numbers tell it.

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/