Big Query, cost reduction

BigQuery Cheat Sheet

How much data in a specific dataset in GCP BigQuery?

select sum(size_bytes)/(1024*1024*1024) as size_GB

how much data in BQ project?

  count(*) AS tables,
  SUM(row_count) AS total_rows,
  SUM(size_bytes) AS size_bytes
ORDER BY size_bytes DESC

Parsing Google Analytics custom dimensions

(SELECT distinct value FROM h.customDimensions where index=4) as cd_4
  `MyProject.MyDataSEt.ga_sessions_*` t, t.hits h

Notice , that here you may want to notice the costs. Relevant article to different approaches to parse google analytics .

Logo Jutomate
airflow, architecture, Big Query, cost reduction

DFP Data Transfer Files Use Case | Airflow and BigQuery 93% Cost Reduction

DFP Data Transfer Files Use Case | Airflow and BigQuery 93% Cost Reduction

Author: Omid Vahdaty 27.11.2019

DFP Data Transfer Files Use Case :

DFP data transfer files delivers impression level data files on a GCS bucket. The amount of raw data is highly correlated with your web traffic. Big data challenge!

The Big data problems with the DFP data transfer file format being delivered:

  1. Data transfer file names contain the timestamp of server timezone instead of your local time.
  2. Your time zone is in a columns called time. The column contains unsupported datetime format: ‘2019-11-12-20:15:17’ , notice the “-” between date and time.
  3. There are several file types per table: Network, and network BackFill each of which per table – impressions and clicks. i.e 4 filenames per hour in the same bucket (not in different folder)
  4. all the files of all the tables we supplied in the same bucket….


So... What is the problem with DFP data transfer files?

  1. You can’t load the data simply b/c the there is no date column to partition the data with. so using a unpartitioned table loading all the data nightly, and then repartition the data to another table. is doable – BUT VERY COSTLY.
  2. You can ‘t transform the data on GCS level. (we are talking Serious amount of TB’s).
  3. How can you orchestrate it? how will you make the process incremental? instead of each night loading EVERYTHING.

What is the Solution to load DFP data transfer files?


BigQuery Sharded table loading via Airflow Dynamic Workflow , template_fields and Loop :

  1. Load incrementally (every day ) each 24 files with same date to a different table shard of one day (similar to GA_Sessions_* table) . base the shard date on the date from the file name
  2. Use Airflow to load data to via dynamic workflow.
  3. Once you have the table, create a view on top of the sharded table with the correct dates.
  4. Transform the data incrementally.


Airflow Dynamic workflow with loop over operators and templated fields

Also committed in our Git

import datetime
import os
import logging
from airflow import DAG
from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
#from airflow.operators import dummy_operator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs
from airflow.utils import trigger_rule
# Output file for job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'MyBucket','%Y%m%d-%H%M%S')) + os.sep
# Path to GCS buckets. no need to add gs://
DST_BUCKET = ('MyBucket')
yesterday = datetime.datetime.combine( - datetime.timedelta(1),
default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
network_table							='MyDataSet.table_shard_'
from datetime import timedelta, date
def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)
### start & end date = delta period.
## -3 days?
start_date = + datetime.timedelta(delta)
end_date =
today_macro='{{ ds_nodash }}'
with models.DAG('BigQueryShardsLoading', schedule_interval='@once', default_args=default_dag_args) as dag:
	for single_date in daterange(start_date, end_date):
		load_to_bq_from_gcs_NetworkImpressions = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    		skip_leading_rows=1 ,
    		write_disposition='WRITE_TRUNCATE', #overwrite?

Notice the loop, this save you the need write duplicate operator per day and dynamically increase/decrease the amount of days to overwrite each night.

for single_date in daterange(start_date, end_date):

and notice template_fields – task_id as suggested in the airflow docs, each dynamic task required it own unique name


Also notice the template_fields – source_objects as suggested in the airflow docs, this save you the need to repartition the files to different folders to distinguish between the different 4 files names


also notice the template_fields – destination_project_dataset_table as suggested in the airflow docs this allows you to create table per day based on the filename. i.e a shard.


From there parsing the date in the next layer of your DAG should be as simple as:

 _TABLE_SUFFIX as dfp_date,
 cast(SUBSTR(Time, 0,10) as date) as impression_date, 
 cast (SUBSTR(Time, 12,8) as time) as impression_time,
 cast (concat(SUBSTR(Time, 0,10),'T',SUBSTR(Time, 12,8)) as datetime) impression_datetime,
  limit 1

Just to give a clear perspective

  1. This jobs costed us 140$ per day.
  2. Now it costs about 20$ per day.
  3. We have more ideas to decrease it to 10$ using SuperQuery Airflow connector. we will disclose them once we implement them.


I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

architecture, Big Query, cost reduction, GCP Big Data Demystified, superQuery

80% Cost Reduction in Google Cloud BigQuery

The second in series of lectures GCP Big Data Demystified. In this lecture I will share with how I saved 80% of BigQuery monthly billing of Lectures slides:

Videos from the meetup:

Link to previous lecture GCP Big Data Demystified #1


I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

Big Query, cost reduction

what is the cheapest ways to parse GA_sessions at big query? What is the fastest way to parse GA_sessions at big query? What is the simplest way to parse GA_sessions at big query?

Before we begin , be sure to be familiar with the Google analytics schema. Notice the different Record types, especially the hits record.

Option one: sub query

The simplest solution would be using sub query for each custom dimension, processing 3.18 GB. The problem is that if you you a view on top of it, the cost does not behave as you would expect. so be careful, and test everything.

fullVisitorId ,
(select value from h.customDimensions where index = 1) as UID,
(select distinct value from h.customDimensions where index = 36) as App_Edition
1234.ga_sessions_20180501 t, t.hits h

For example

Option 2: User Defined Function:

using used defined function with SQL. it involves baseis coding. This would be the slowest solution, but unlikely to hit a resources problem.

The query below processed 3.18 GB

CREATE TEMPORARY FUNCTION getCustomDimension(cd ARRAY>, index INT64)
for(var i = 0; i < cd.length; i++) {
var item = cd[i];
if(item.index == index) {
return item.value
return ”;

getCustomDimension(hits.customDimensions, 1) AS customDimension1_uid,
getCustomDimension(hits.customDimensions, 36) AS customDimension36_edition
FROM 1234.ga_sessions_20180501 gs_sessions,
UNNEST(gs_sessions.hits) hits

Option 3: With table as select

this option is the cheapest… 570 mb was processed, However. with a large number of custom dimensions , you might hit an error of “Error: Query exceeded resource limits. 86676.81298774751 CPU seconds were used,this query must use less than 55000.0 CPU seconds.” and there is nothing you can do about it . furthermore – the solution is not elegant in terms of maintenance, and it will slow down the more custom dimensions you have.

FROM 1234.ga_sessions_20180501 with_ga_sessions,
UNNEST(with_ga_sessions.hits) with_hits,
UNNEST(with_ga_sessions.customDimensions) with_cd
WHERE with_cd.index IN (1, 36)
CD_1.value AS customDimension1_uid,
CD_36.value AS customDimension36_edition
FROM 1234.ga_sessions_20180501 ga_sessions,
UNNEST(ga_sessions.hits) hits
LEFT JOIN CD CD_1 ON ga_sessions.fullVisitorId = CD_1.fullVisitorId
AND ga_sessions.visitId = CD_1.visitId
AND hits.hitNumber = CD_1.hitNumber
AND CD_1.index = 1
LEFT JOIN CD CD_36 ON ga_sessions.fullVisitorId = CD_36.fullVisitorId
AND ga_sessions.visitId = CD_36.visitId
AND hits.hitNumber = CD_36.hitNumber
AND CD_36.index =36

Option4: Flatten everything using AirFlow or any other orchestration tool.

This is the most expensive way, as each day we overwrite the entire history, as the custom dimension may change in some use case. if they don’t, then this approach may be be simple, cheap and fast. The idea is simple – convert each record to a bunch of columns and utilize the columnar power of bigquery. make sure the target flat table is partitioned by date. Notice that currently each table in BigQuery has a limitation of 2000 partition which means about 5.5 years. so the below is not good enough , consider sharded tables based on years. The storage here will be flattened, which means, anot more storage will be saved in BQ compared to GA_sessions storage only (about doble), but the benefit will be in compute time as the the table is columnar, and most queries require a short amount of columns.

CREATE  table `myProject.myDataSet.GA_FLAT` 
  ( date date,
 hits_type string,
 hits_page_pagePath string,
 hits_page_pagePathLevel1	string,
 hits_page_pagePathLevel2	string,
 hits_page_pagePathLevel3		string,
 hits_page_pagePathLevel4		string,
 hits_page_hostname string,
 hits_page_pageTitle string,
 hits_appInfo_appId string,
 hits_appInfo_appName string,
 hits_appInfo_screenName string,
 hits_eventInfo_eventCategory string,
 hits_eventInfo_eventAction string,
 hits_eventInfo_eventLabel string,
 hits_eventInfo_eventValue string,
    uid string,
    app_edition string,
    clientId string,
    fullVisitorId string,
    visitorId string,
    userId string,
    visitNumber string,
    visitId string,
    visitStartTime string,
    totals_bounces string,
    totals_hits string,
    totals_newVisits string,
    totals_pageviews string,
    totals_screenviews string,
    totals_sessionQualityDim string,
    totals_timeOnScreen string,
    totals_timeOnSite string,
    totals_totalTransactionRevenue string,
    totals_transactionRevenue string,
    totals_transactions string,
    totals_UniqueScreenViews string,
    totals_visits string,
    trafficSource_adContent string,
    rafficSource_adwordsClickInfo_adGroupId string,
    trafficSource_adwordsClickInfo_adNetworkType string,
    trafficSource_adwordsClickInfo_campaignId string,
    trafficSource_adwordsClickInfo_creativeId string,
    trafficSource_adwordsClickInfo_criteriaId string,
    trafficSource_adwordsClickInfo_criteriaParameters string,
    trafficSource_adwordsClickInfo_customerId string,
    trafficSource_adwordsClickInfo_gclId string,
    trafficSource_adwordsClickInfo_isVideoAd string,
    trafficSource_adwordsClickInfo_page string,
    trafficSource_adwordsClickInfo_slot string,
    trafficSource_adwordsClickInfo_targetingCriteria_boomUserlistId string,
    trafficSource_campaign string,
    trafficSource_campaignCode string,
    trafficSource_isTrueDirect string,
    trafficSource_keyword string,
    trafficSource_medium string,
    trafficSource_referralPath string,
    trafficSource_source string,
    socialEngagementType string,
    channelGrouping string,
    device_browser string,
    device_browserSize string,
    device_browserVersion string,
    device_deviceCategory string,
    device_mobileDeviceInfo string,
    device_mobileDeviceModel string,
    device_mobileInputSelector string,
    device_mobileDeviceMarketingName string,
    device_operatingSystem string,
    device_operatingSystemVersion string,
    device_isMobile string,
    device_mobileDeviceBranding string,
    device_flashVersion string,
    device_javaEnabled string,
    device_language string,
    device_screenColors string,
    device_screenResolution string,
    geoNetwork_continent string,
    geoNetwork_subContinent string,
    geoNetwork_country string,
    geoNetwork_region string,
    geoNetwork_metro string,
    geoNetwork_city string,
    geoNetwork_cityId string,
    geoNetwork_latitude string,
    geoNetwork_networkDomain string
 partition by date

PARSE_DATE('%Y%m%d', REPLACE(_TABLE_SUFFIX, "intraday_", "")) as date, 
type as hit_type, 	  as hits_page_pagePath, as hits_page_pagePathLevel1, as hits_page_pagePathLevel2, as hits_page_pagePathLevel3, as hits_page_pagePathLevel4, as hit_page_hostname, as hits_page_pageTitle,
h.appInfo.appId as hit_appInfo_appId,
h.appInfo.appName as hit_appInfo_appName,
h.appInfo.screenName as hit_appInfo_screenName,
h.eventInfo.eventCategory as hits_eventInfo_eventCategory ,
h.eventInfo.eventAction   as hits_eventInfo_eventAction ,
h.eventInfo.eventLabel    as hits_eventInfo_eventLabel ,
h.eventInfo.eventValue    as hits_eventInfo_eventValue ,
(select value from h.customDimensions where index = 1) as CD_1,
(select distinct value from h.customDimensions where index = 35) as CD_35,
clientId	,
fullVisitorId	,
visitorId	,
userId	,
visitNumber	,
visitId	,
visitStartTime	,
totals.bounces as totals_bounces,
totals.hits	as totals_hits,
totals.newVisits as totals_newVisits	,
totals.pageviews	as totals_pageviews,
totals.screenviews	as totals_screenviews,
totals.sessionQualityDim	as totals_sessionQualityDim,
totals.timeOnScreen	as totals_timeOnScreen,
totals.timeOnSite	as totals_timeOnSite,
totals.totalTransactionRevenue	as totals_totalTransactionRevenue,
totals.transactionRevenue	as totals_transactionRevenue,
totals.transactions	 as totals_transactions,
totals.UniqueScreenViews	as totals_UniqueScreenViews,
totals.visits  as totals_visits,
trafficSource.adContent as trafficSource_adContent	,
trafficSource. adwordsClickInfo.adGroupId as trafficSource_adwordsClickInfo_adGroupId	,
trafficSource. adwordsClickInfo.adNetworkType as trafficSource_adwordsClickInfo_adNetworkType	,
trafficSource. adwordsClickInfo.campaignId as trafficSource_adwordsClickInfo_campaignId	,
trafficSource. adwordsClickInfo.creativeId as trafficSource_adwordsClickInfo_creativeId	,
trafficSource. adwordsClickInfo.criteriaId as trafficSource_adwordsClickInfo_criteriaId	,
trafficSource. adwordsClickInfo.criteriaParameters as trafficSource_adwordsClickInfo_criteriaParameters,
trafficSource. adwordsClickInfo.customerId	as trafficSource_adwordsClickInfo_customerId,
trafficSource. adwordsClickInfo.gclId	as trafficSource_adwordsClickInfo_gclId,
trafficSource. adwordsClickInfo.isVideoAd	as trafficSource_adwordsClickInfo_isVideoAd,
trafficSource. as trafficSource_adwordsClickInfo_page	,
trafficSource. adwordsClickInfo.slot as trafficSource_adwordsClickInfo_slot	,
trafficSource. adwordsClickInfo.targetingCriteria.boomUserlistId	as trafficSource_adwordsClickInfo_targetingCriteria_boomUserlistId,
trafficSource.campaign	as trafficSource_campaign,
trafficSource.campaignCode as trafficSource_campaignCode	,
trafficSource.isTrueDirect as trafficSource_isTrueDirect	,
trafficSource.keyword	as trafficSource_keyword,
trafficSource.medium	as trafficSource_medium,
trafficSource.referralPath as trafficSource_referralPath	,
trafficSource.source	as trafficSource_source,
socialEngagementType	  ,
channelGrouping	,
device.browser as device_browser	,
device.browserSize	 as device_browserSize	,
device.browserVersion	 as device_browserVersion,
device.deviceCategory	 as device_deviceCategory,
device.mobileDeviceInfo	as device_mobileDeviceInfo,
device.mobileDeviceModel	as device_mobileDeviceModel,
device.mobileInputSelector	as device_mobileInputSelector,
device.mobileDeviceMarketingName	as device_mobileDeviceMarketingName,
device.operatingSystem	as device_operatingSystem,
device.operatingSystemVersion	as device_operatingSystemVersion,
device.isMobile	as device_isMobile,
device.mobileDeviceBranding	as device_mobileDeviceBranding,
device.flashVersion as device_flashVersion	,
device.javaEnabled	as device_javaEnabled,
device.language	 as device_language,
device.screenColors	as device_screenColors	,
device.screenResolution	 as device_screenResolution	, 
geoNetwork.continent	as geoNetwork_continent	,
geoNetwork.subContinent	as geoNetwork_subContinent	,	as geoNetwork_country	,
geoNetwork.region	as geoNetwork_region	,
geoNetwork.metro	as geoNetwork_metro	,	as geoNetwork_city	,
geoNetwork.cityId	as geoNetwork_cityId	,
geoNetwork.latitude	as geoNetwork_latitude	,
geoNetwork.longitude as geoNetwork_longitude	,
geoNetwork.networkDomain as geoNetwork_networkDomain,
geoNetwork.networkLocation as geoNetwork_networkLocation
  `myProject.MyDataSet.ga_sessions_*` t, t.hits h


I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

AWS EMR, cost reduction

39 Tips to reduce costs on AWS EMR

There will be a time you will think on ways to reduce costs on your EMR cluster. Notice some of the tips will simply let your ran faster, and this usually imply less costs simply b/c of CPU utilization. If this is the case ,consider the below:

  1. Conceptually , switching to the Transient cluster instead 24X7 will be the best. It requires some preparation work of the automation, but it may be worth your time.
  2. Consider using R instances if your EMR jobs are memory intensive, remember the the over head of the Yarn, JVM  is about 50% of what you have on the machine.  You can confirm this via EMR ganglia, and check the amount of available RAM while the jobs are running.
  3. If you use a larger instance type, the NIC attached will have a faster quite of network bandwidth. the difference may be worth in in term of time vs cost while working with large external tables. the difference may be 300MB/s instead of 100MB/s on a smaller instance. If you witnessed something much faster – please let me know.
  4. Consider using the configuration “Maximise resource allocation “, this may shorten your running time.
  5. Naturally tuning of cluster specific / job specific configuration may be worth your time as well. but personally i am too lazy to try this, and the risk of these per query customs config propagating to there jobs and slowing them down is high due to human element. if u must tune, use the tuning per session – run it before your query, and set back to default values after you are done. you may want to consider cluster per jobs types. example: cluster for joins jobs, and cluster for aggregations. or cluster per DEV, QA, STG, PROD
  6. Consider using Task nodes. they provisioned quickly, can be killed and resized freely with zero impact on your cluster in terms of stability. peace of advice – make sure the task node is as same size of your data node, other wise your risk of underutilising your task nodes as the executors confirmations are as same of the data node.
  7. Consider using Spot instances…. especially on task nodes. save upto 80%!
  8. Consider using Reserved instances for your Data Nodes and master nodes. save upto 35% on costs. 
  9. consider using several task groups, with different size/configs
  10. Auto scaling will help be more accurate in terms of costs, you can auto scale in a resolution of 5 mins (time it takes to provision an EMR node). Autoscaling is highly useful on 24X7 EMR cluster. using Auto scaling require some testing, as it does not behave exactly as you think all the time. In Cluster with Dynamic Resources allocation, the resources may be ready, but the boost in performance may take its time. auto scaling and task saved me about 50%. Naturally, when you save costs using tasks nodes and auto scaling, your get greey on a simple performance test, well until the auto scale in kicks in 🙂
  11. In good we trust, all the rest must bring data – use Ganglia to track the exact amount of resources your need (perhaps you are over provisioning).
    1. yarn.QueueMetrics.AvailableMB
    2. yarn.QueueMetrics.AvailableVCores
    3. yarn.NodeManagerMetrics.AvailableGB
    4. yarn.NodeManagerMetrics.AvailableVCores
  12. Minimal recommended cluster size is 3 machines, one 1 Master, 2 Data nodes. conceder the below suggestions
    1. EMR with only one machine (new feature), which is the master node, data node in the same nachine
    2. EMR with 1 master, 1 Data, and if you must scale, add Task nodes, with auto scaling. notice the minimal amount of machines in the task group can be zero. notice, this should not be used in production as the stability of your cluster is much lower, even if you are not using your data on local tables. if your data node dies, the entire cluster becomes unusable, and this is unrecoverable.
  13. Encryption at rest and Encryption in motion, may be good for security reason, but may have a massive impact on production in terms of resources, running time etc. confirm security is a must before you apply security on transient cluster. Consult your CISO for this. Notice the encryption on S3 is hardware based, but still i would perform a simple benchmark test to see the cost benefit ration.
  14. If you can afford it, and it is technically valid , please test your jobs on both Hive / Spark/ Presto. Further more, test different compression types and storage types.
    1. I know for a fact from benchmarks i performed there are some cases Hive will be faster than spark.
    2. I am less familiar the presto, but i am positive there may be useless it will be faster.
    3. from a few benchmarks I performed , your will be surprised to know that using different compression types, may have massive impact on Write time to S3 and Read time (if the data is compressed better). I personally work with Parquet of GZIP. but this only work perfectly with my useless.
    4. Notice compression has impact on CPU utilisation, so it is not a clear cut what will be cheaper (parquet/orc GZIP, BZIP) nor which will be faster (spark / hive/ presto) .
  15. did you switch to columnar? if not try the this link as reference: convert to columnar  from raw based data.
  16. did u use partitioning? did you use the correct partitioning for your query?
  17. if using ORC consider using bucketing on top of partitioning
  18. was your data spliced into chunks? if so try to change chunk size. more complicated but doable, again, could go either way – need to test this will your data.
  19. apply hints on the table may help on time spend on data scan in some cases.
  20. if using multiple tables join, order of joins, may impact scanned data, shotterning running time.
  21. consider pre aggregating data if possible as part of your transformation/cleansing process. even if it is on each (using window table, each row will hold aggregation tables. )
  22. consider pre calculating table with heavy group by on raw-data. i.e have the data already calculated on s3, and have your production user/ end user query that table.
  23. have a data engineer review each query, to make sure data scan is minimised. for example
    1. Minimise the columns in the results set… a results set of longs strings maybe be very costly.
    2. where possible switch strings to ints, this will minimise footprint on storage greatly.
    3. if possible switch from bigint to tinyint. this will save some disk space as well. notice the list of supported data types:



As you can see, There are many ways to save costs on AWS EMR. The easiest thing will be to use task groups and scale in/out based on your needs. The rest may take some time, but will be worth it.



Need to learn more about aws big data (demystified)?


I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me: