architecture, Big Query

When to use BigQuery? When not to use BigQuery?

When to use BigQuery?

  1. 80% of all data use case would be a good reason to use BigQuery.
  2. If you are not sure – start with BigQuery
  3. If you are using google analytics or firebase
  4. When you have a director connector that load the data for you to BigQuery
  5. When your production is in GCP

When not to use BigQuery:

  1. When are processing more than one 5GB compressed file per hour (load limit of GCP BigQuery per file). Try opening a compressed file of 6GB and splitted into smaller files… your one hour window of time will be shining as the file uncompressed into something huge of ±20 GB ….
  2. When you are hitting an error in BigQuery that says your query is consuming too much resources, ona weekly basis, and there nothing you can do about that.
  3. When you need to self Join Billion of records on a regular basis.
  4. When you you are using complex Window functions. You are likely to get an error of too many resource are being user for your Query and there nothing you can do except rewriting your query.

So what are the alternative to BigQuery:

  1. Hadoop ecosystem: Data Proc / Cloudera
  2. SQream DB , a database designed to handle huge files, massive joins at surprising amount of speed, simplicity and cost effectiveness.

Big Query

BigQuery Error : UPDATE or DELETE statement over table would affect rows in the streaming buffer, which is not supported

in case you get this error, you are most likely tryto detelte data which falls in range of windo of time the streaming insert is using.

Streaming insert is a services that allow ingestion of events in real time to big query.

something it take upto 90 mins to ingest the data. thus…. try to delete data from further back in time for example

DELETE FROM `MyProject.MyDataset.MyTable` 
WHERE id LIKE '%BigDataDemystified%'
AND time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 180 MINUTE)

Big Query, BQ ML, Machine Learning

K Means Via BQ ML Demystified

I took BigQuery ML for a test drive. Below are the details of a basic POC with k-means clustering. The idea was to divide the users of Investing.com into a number of groups and see how different groups of users behave in terms of basic web traffic KPI’s.

If you want to learn more about k-means clustering, check out our blog about Machine Learning Essentials

BigQuery ML k-means clustering- a step-by-step example:

Naturally all code snippets below are committed to our Big Data Demystified GitHub

The basic BQ ML k-means training

CREATE OR REPLACE MODEL
  BQ_ML.users_clusters OPTIONS (model_type='kmeans',
    num_clusters=4,
    standardize_features = TRUE) AS (
  SELECT
    SUM(PVs)AS pvs,
    SUM(sessions) AS sessions,
    COUNT(*) AS visits
  FROM
    `MyProject.DATA.Daily_Stats`
  WHERE
    date < '2019-12-01'
    AND date > '2019-11-01'
  GROUP BY
    users )

After the Kmeans model is created , prediction is used as follows:

SELECT
  CENTROID_ID,
  pvs,
  sessions,
  visits
FROM
  ML.PREDICT( MODEL BQ_ML.users_clusters,
    (
    SELECT
      SUM(PVs)AS pvs,
      SUM(sessions) AS sessions,
      COUNT(*) AS visits
    FROM
      `MyProject.DATA.Daily_Stats`
    WHERE
      date < '2019-11-01'
      AND date > '2019-10-01'
    GROUP BY
      users
    LIMIT
      100000000 ) )

Analize results:

SELECT
  CENTROID_ID,
  #min(pvs) as min_pvs,
  CAST (AVG(pvs)AS int64) AS avg_pvs,
  #max(pvs) as max_pvs,
  #min(sessions) as min_sessions,
  CAST(AVG(sessions) AS int64) AS avg_sessions,
  #max(sessions) as max_sessions
  CAST(AVG(visits) AS int64) AS avg_visits
FROM
  BQ_ML.test3
GROUP BY
  CENTROID_ID

Another way to analyze results (based on GCP documentations):

WITH
  T AS (
  SELECT
    centroid_id,
    ARRAY_AGG(STRUCT(feature AS name,
        ROUND(numerical_value,1) AS value)
    ORDER BY
      centroid_id) AS cluster
  FROM
    ML.CENTROIDS(MODEL BQ_ML.users_clusters)
  GROUP BY
    centroid_id )
SELECT
  CONCAT('Cluster#', CAST(centroid_id AS STRING)) AS centroid,
  (
  SELECT
    value
  FROM
    UNNEST(cluster)
  WHERE
    name = 'pvs') AS pvs,
  (
  SELECT
    value
  FROM
    UNNEST(cluster)
  WHERE
    name = 'sessions') AS sessions,
  (
  SELECT
    value
  FROM
    UNNEST(cluster)
  WHERE
    name = 'visits') AS visits
FROM
  T
ORDER BY
  centroid

Review results (notice the results are not real, just for example)

SELECT
  CENTROID_ID,
  pvs,
  sessions,
  visits
FROM
  BQ_ML.test3
LIMIT
  10

By The way, A very important note about BQ ML costs:

When creating a model, if you are only using BigQuery Console, Google will only display the amount of data scanned. In my case it was 53.5GB, which sounds cheap. However, what Google doesn’t display outright is how many times it reads the data – which is several times over….

If you you use superQuery the cost will be shown clearly, and will probably surprise you:

summary

  1. BQ ML is a very nice way to quickly get insights from data without the use of a data scientist. The recommended use case for BQ ML – benchmarking against the model written by your in-house data scientist.
  2. Use BQ ML only for exploration, not for production (recommended by Google as well).
  3. You should be worried about the costs of using BQ ML – superQuery is a nice way to get visibility into the costs of BQ ML
sq-logotype-dark-transparent.png

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

avro, Big Query

AVRO and BQ example

Creating the schema from an AVRO file could be done using a python operator [1].

It will be quite similar to the process that you are following on the step 6 of the blog attached [2], but instead of specifying the avro.schema.url we will specify the avro.schema.literal.

First, we have to extract the avro schema from the avro file located in GCS. I have made the python code [3], that will output the json of the avro schema.

Now that we have the schema we should create the Hive Job in order to create the table, I have made an small test with the API [4], the REST request should look similar to [4].

────────────────────

[1]: https://cloud.google.com/composer/docs/how-to/using/writing-dags#pythonoperator

[2]: https://big-data-demystified.ninja/2018/05/27/how-to-export-data-from-google-big-query-into-aws-s3-emr-hive/

[3]:

from google.cloud import storage
from avro.datafile import DataFileReader
from avro.io import DatumReader
import sys
import json


bucket_name = "<Bucket Name>"
blob_name = "<Bucket Path>/<AVRO FILE NAME>"
downloaded_blob = "downloaded_blob.avro"

client = storage.Client()
bucket = client.lookup_bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.download_to_filename(downloaded_blob)

with DataFileReader(open(downloaded_blob, 'rb'), DatumReader()) as avf:
schema = avf.meta['avro.schema'].decode("utf-8")
print(schema) # example: {"type":"record","name":"Root","fields":[{"name":"string_field_0","type":["null","string"]}]}


[4]: https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.jobs/submit

[5]:

{
"job": {
"hiveJob": {
"queryList": {
"queries": [
"CREATE EXTERNAL TABLE transactions \nSTORED AS AVRO LOCATION 'gs://<BUCKET NAME>/<PATH TO AVRO FOLDER>/*' TBLPROPERTIES ('avro.schema.literal'='<OUTPUT OF THE PYTHON SCRIPT>')"
]
}
},
"placement": {
"clusterName": "<CLUSTER NAME>"
},
"reference": {
"jobId": "<JOB NAME>"
}
}
}

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

airflow, architecture, Big Query, cost reduction

DFP Data Transfer Files Use Case | BigQuery 93% Cost Reduction demystified

DFP Data Transfer Files Use Case :

DFP data transfer files delivers impression level data files on a GCS bucket. The amount of raw data is highly correlated with your web traffic. Big data challenge!

The Big data problems with the DFP data transfer file format being delivered:

  1. data transfer file names contain the timestamp of server timezone instead of your local time.
  2. Your time zone is in a columns called time. The column contains unsupported datetime format: ‘2019-11-12-20:15:17’ , notice the “-” between date and time.
  3. There are several file types per table: Network, and network BackFill each of which per table – impressions and clicks. i.e 4 filenames per hour in the same bucket (not in different folder)
  4. all the files of all the tables we supplied in the same bucket….

So… What is the problem with DFP data transfer files?

  1. You can’t load the data simply b/c the there is no date column to partition the data with. so using a unpartitioned table loading all the data nightly, and then repartition the data to another table. is doable – BUT VERY COSTLY.
  2. You can ‘t transform the data on GCS level. (we are talking Serious amount of TB’s).
  3. How can you orchestrate it? how will you make the process incremental? instead of each night loading EVERYTHING.

So what is the Solution to load DFP data transfer files?

BigQuery Sharded table loading via Airflow Dynamic Workflow , template_fields and Loop :

  1. Load incrementally (every day ) each 24 files with same date to a different table shard of one day (similar to GA_Sessions_* table) . base the shard date on the date from the file name
  2. Use Airflow to load data to via dynamic workflow.
  3. Once you have the table, create a view on top of the sharded table with the correct dates.
  4. Transform the data incrementally.

Airflow Dynamic workflow with loop over operators and templated fields

also committed in our Git

import datetime
import os
import logging

from airflow import DAG
from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
#from airflow.operators import dummy_operator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs


from airflow.utils import trigger_rule

# Output file for job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'MyBucket',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to GCS buckets. no need to add gs://
DST_BUCKET = ('MyBucket')
 
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}



network_table							='MyDataSet.table_shard_'

from datetime import timedelta, date

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


### start & end date = delta period.
## -3 days?
delta=-3 
start_date = datetime.date.today() + datetime.timedelta(delta)
end_date = datetime.date.today()

today_macro='{{ ds_nodash }}'
 
with models.DAG('BigQueryShardsLoading', schedule_interval='@once', default_args=default_dag_args) as dag:

	for single_date in daterange(start_date, end_date):
		load_to_bq_from_gcs_NetworkImpressions = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    		task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"),
    		source_objects=[
            	'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*'
        	],
    		skip_leading_rows=1 ,
    		write_disposition='WRITE_TRUNCATE', #overwrite?
    		create_disposition='CREATE_IF_NEEDED',
    		bucket=DST_BUCKET,
    		destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d"),
    		autodetect='true')

Notice the loop, this save you the need write duplicate operator per day and dynamically increase/decrease the amount of days to overwrite each night.

for single_date in daterange(start_date, end_date):

and notice template_fields – task_id as suggested in the airflow docs, each dynamic task required it own unique name

task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"),

Also notice the template_fields – source_objects as suggested in the airflow docs, this save you the need to repartition the files to different folders to distinguish between the different 4 files names

source_objects=[
            	'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*'
        	],

also notice the template_fields – destination_project_dataset_table as suggested in the airflow docs this allows you to create table per day based on the filename. i.e a shard.

    		destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d")

From there parsing the date in the next layer of your DAG should be as simple as:

select 
 _TABLE_SUFFIX as dfp_date,
 cast(SUBSTR(Time, 0,10) as date) as impression_date, 
 cast (SUBSTR(Time, 12,8) as time) as impression_time,
 cast (concat(SUBSTR(Time, 0,10),'T',SUBSTR(Time, 12,8)) as datetime) impression_datetime,
 *
 from
 MyDataSet.table_shard_*
  limit 1

Just to give a clear perspective

  1. This jobs costed us 140$ per day.
  2. Now it costs about 20$ per day.
  3. We have more ideas to decrease it to 10$ using SuperQuery Airflow connector. we will disclose them once we implement them.

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/