airflow, architecture, Big Query, cost reduction

DFP Data Transfer Files Use Case | BigQuery 93% Cost Reduction demystified

DFP Data Transfer Files Use Case :

DFP data transfer files delivers impression level data files on a GCS bucket. The amount of raw data is highly correlated with your web traffic. Big data challenge!

The Big data problems with the DFP data transfer file format being delivered:

  1. data transfer file names contain the timestamp of server timezone instead of your local time.
  2. Your time zone is in a columns called time. The column contains unsupported datetime format: ‘2019-11-12-20:15:17’ , notice the “-” between date and time.
  3. There are several file types per table: Network, and network BackFill each of which per table – impressions and clicks. i.e 4 filenames per hour in the same bucket (not in different folder)
  4. all the files of all the tables we supplied in the same bucket….

So… What is the problem with DFP data transfer files?

  1. You can’t load the data simply b/c the there is no date column to partition the data with. so using a unpartitioned table loading all the data nightly, and then repartition the data to another table. is doable – BUT VERY COSTLY.
  2. You can ‘t transform the data on GCS level. (we are talking Serious amount of TB’s).
  3. How can you orchestrate it? how will you make the process incremental? instead of each night loading EVERYTHING.

So what is the Solution to load DFP data transfer files?

BigQuery Sharded table loading via Airflow Dynamic Workflow , template_fields and Loop :

  1. Load incrementally (every day ) each 24 files with same date to a different table shard of one day (similar to GA_Sessions_* table) . base the shard date on the date from the file name
  2. Use Airflow to load data to via dynamic workflow.
  3. Once you have the table, create a view on top of the sharded table with the correct dates.
  4. Transform the data incrementally.

Airflow Dynamic workflow with loop over operators and templated fields

also committed in our Git

import datetime
import os
import logging

from airflow import DAG
from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
#from airflow.operators import dummy_operator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator

# Import operator from plugins
from airflow.contrib.operators import gcs_to_gcs


from airflow.utils import trigger_rule

# Output file for job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'MyBucket',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to GCS buckets. no need to add gs://
DST_BUCKET = ('MyBucket')
 
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}



network_table							='MyDataSet.table_shard_'

from datetime import timedelta, date

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)


### start & end date = delta period.
## -3 days?
delta=-3 
start_date = datetime.date.today() + datetime.timedelta(delta)
end_date = datetime.date.today()

today_macro='{{ ds_nodash }}'
 
with models.DAG('BigQueryShardsLoading', schedule_interval='@once', default_args=default_dag_args) as dag:

	for single_date in daterange(start_date, end_date):
		load_to_bq_from_gcs_NetworkImpressions = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    		task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"),
    		source_objects=[
            	'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*'
        	],
    		skip_leading_rows=1 ,
    		write_disposition='WRITE_TRUNCATE', #overwrite?
    		create_disposition='CREATE_IF_NEEDED',
    		bucket=DST_BUCKET,
    		destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d"),
    		autodetect='true')

Notice the loop, this save you the need write duplicate operator per day and dynamically increase/decrease the amount of days to overwrite each night.

for single_date in daterange(start_date, end_date):

and notice template_fields – task_id as suggested in the airflow docs, each dynamic task required it own unique name

task_id='load_to_bq_from_gcs_'+single_date.strftime("%Y%m%d"),

Also notice the template_fields – source_objects as suggested in the airflow docs, this save you the need to repartition the files to different folders to distinguish between the different 4 files names

source_objects=[
            	'my_file_prefix_'+single_date.strftime("%Y%m%d")+'*'
        	],

also notice the template_fields – destination_project_dataset_table as suggested in the airflow docs this allows you to create table per day based on the filename. i.e a shard.

    		destination_project_dataset_table=network_table+single_date.strftime("%Y%m%d")

From there parsing the date in the next layer of your DAG should be as simple as:

select 
 _TABLE_SUFFIX as dfp_date,
 cast(SUBSTR(Time, 0,10) as date) as impression_date, 
 cast (SUBSTR(Time, 12,8) as time) as impression_time,
 cast (concat(SUBSTR(Time, 0,10),'T',SUBSTR(Time, 12,8)) as datetime) impression_datetime,
 *
 from
 MyDataSet.table_shard_*
  limit 1

Just to give a clear perspective

  1. This jobs costed us 140$ per day.
  2. Now it costs about 20$ per day.
  3. We have more ideas to decrease it to 10$ using SuperQuery Airflow connector. we will disclose them once we implement them.

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

architecture, Big Query, cost reduction, GCP Big Data Demystified, superQuery

80% Cost Reduction in Google Cloud BigQuery | Tips and Tricks | Big Query Demystified | GCP Big Data Demystified #2

The second in series of lectures GCP Big Data Demystified. In this lecture I will share with how I saved 80% of BigQuery monthly billing of investing.com. Lectures slides:

Videos from the meetup:

Link to previous lecture GCP Big Data Demystified #1

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

To learn more about superQuery:

Big Query, cost reduction

what is the cheapest ways to parse GA_sessions at big query? What is the fastest way to parse GA_sessions at big query? What is the simplest way to parse GA_sessions at big query?

Before we begin , be sure to be familiar with the Google analytics schema. Notice the different Record types, especially the hits record.

Option one: sub query

The simplest solution would be using sub query for each custom dimension, processing 3.18 GB. The problem is that if you you a view on top of it, the cost does not behave as you would expect. so be careful, and test everything.

select
fullVisitorId ,
(select value from h.customDimensions where index = 1) as UID,
(select distinct value from h.customDimensions where index = 36) as App_Edition
FROM
1234.ga_sessions_20180501 t, t.hits h

For example

Option 2: User Defined Function:

using used defined function with SQL. it involves baseis coding. This would be the slowest solution, but unlikely to hit a resources problem.

The query below processed 3.18 GB

CREATE TEMPORARY FUNCTION getCustomDimension(cd ARRAY>, index INT64)
RETURNS STRING
LANGUAGE js AS “””
for(var i = 0; i < cd.length; i++) {
var item = cd[i];
if(item.index == index) {
return item.value
}
}
return ”;
“””;

SELECT
gs_sessions.fullVisitorId,
getCustomDimension(hits.customDimensions, 1) AS customDimension1_uid,
getCustomDimension(hits.customDimensions, 36) AS customDimension36_edition
FROM 1234.ga_sessions_20180501 gs_sessions,
UNNEST(gs_sessions.hits) hits

Option 3: With table as select

this option is the cheapest… 570 mb was processed, However. with a large number of custom dimensions , you might hit an error of “Error: Query exceeded resource limits. 86676.81298774751 CPU seconds were used,this query must use less than 55000.0 CPU seconds.” and there is nothing you can do about it . furthermore – the solution is not elegant in terms of maintenance, and it will slow down the more custom dimensions you have.

WITH CD AS (
SELECT
with_ga_sessions.fullVisitorId,
with_ga_sessions.visitId,
with_hits.hitNumber,
with_cd.index,
with_cd.value
FROM 1234.ga_sessions_20180501 with_ga_sessions,
UNNEST(with_ga_sessions.hits) with_hits,
UNNEST(with_ga_sessions.customDimensions) with_cd
WHERE with_cd.index IN (1, 36)
)
SELECT
ga_sessions.visitId,
CD_1.value AS customDimension1_uid,
CD_36.value AS customDimension36_edition
FROM 1234.ga_sessions_20180501 ga_sessions,
UNNEST(ga_sessions.hits) hits
LEFT JOIN CD CD_1 ON ga_sessions.fullVisitorId = CD_1.fullVisitorId
AND ga_sessions.visitId = CD_1.visitId
AND hits.hitNumber = CD_1.hitNumber
AND CD_1.index = 1
LEFT JOIN CD CD_36 ON ga_sessions.fullVisitorId = CD_36.fullVisitorId
AND ga_sessions.visitId = CD_36.visitId
AND hits.hitNumber = CD_36.hitNumber
AND CD_36.index =36

Option4: Flatten everything using AirFlow or any other orchestration tool.

This is the most expensive way, as each day we overwrite the entire history, as the custom dimension may change in some use case. if they don’t, then this approach may be be simple, cheap and fast. The idea is simple – convert each record to a bunch of columns and utilize the columnar power of bigquery. make sure the target flat table is partitioned by date. Notice that currently each table in BigQuery has a limitation of 2000 partition which means about 5.5 years. so the below is not good enough , consider sharded tables based on years. The storage here will be flattened, which means, anot more storage will be saved in BQ compared to GA_sessions storage only (about doble), but the benefit will be in compute time as the the table is columnar, and most queries require a short amount of columns.

CREATE  table `gap---all-sites-1245.myDataSet.GA_FLAT` 
  ( date date,
  
 hits_type string,
 
 hits_page_pagePath string,
 hits_page_pagePathLevel1	string,
 hits_page_pagePathLevel2	string,
 hits_page_pagePathLevel3		string,
 hits_page_pagePathLevel4		string,
 
 hits_page_hostname string,
 hits_page_pageTitle string,
 hits_appInfo_appId string,
 hits_appInfo_appName string,
 hits_appInfo_screenName string,
 
 hits_eventInfo_eventCategory string,
 hits_eventInfo_eventAction string,
 hits_eventInfo_eventLabel string,
 hits_eventInfo_eventValue string,
 
    uid string,
    app_edition string,

 
 
    clientId string,
    fullVisitorId string,
    visitorId string,
    userId string,
    visitNumber string,
    visitId string,
    visitStartTime string,
    totals_bounces string,
    totals_hits string,
    totals_newVisits string,
    totals_pageviews string,
    totals_screenviews string,
    totals_sessionQualityDim string,
    totals_timeOnScreen string,
    totals_timeOnSite string,
    totals_totalTransactionRevenue string,
    totals_transactionRevenue string,
    totals_transactions string,
    totals_UniqueScreenViews string,
    totals_visits string,
    trafficSource_adContent string,
    rafficSource_adwordsClickInfo_adGroupId string,
    trafficSource_adwordsClickInfo_adNetworkType string,
    trafficSource_adwordsClickInfo_campaignId string,
    trafficSource_adwordsClickInfo_creativeId string,
    trafficSource_adwordsClickInfo_criteriaId string,
    trafficSource_adwordsClickInfo_criteriaParameters string,
    trafficSource_adwordsClickInfo_customerId string,
    trafficSource_adwordsClickInfo_gclId string,
    trafficSource_adwordsClickInfo_isVideoAd string,
    trafficSource_adwordsClickInfo_page string,
    trafficSource_adwordsClickInfo_slot string,
    trafficSource_adwordsClickInfo_targetingCriteria_boomUserlistId string,
    trafficSource_campaign string,
    trafficSource_campaignCode string,
    trafficSource_isTrueDirect string,
    trafficSource_keyword string,
    trafficSource_medium string,
    trafficSource_referralPath string,
    trafficSource_source string,
    socialEngagementType string,
    channelGrouping string,
    device_browser string,
    device_browserSize string,
    device_browserVersion string,
    device_deviceCategory string,
    device_mobileDeviceInfo string,
    device_mobileDeviceModel string,
    device_mobileInputSelector string,
    device_mobileDeviceMarketingName string,
    device_operatingSystem string,
    device_operatingSystemVersion string,
    device_isMobile string,
    device_mobileDeviceBranding string,
    device_flashVersion string,
    device_javaEnabled string,
    device_language string,
    device_screenColors string,
    device_screenResolution string,
    geoNetwork_continent string,
    geoNetwork_subContinent string,
    geoNetwork_country string,
    geoNetwork_region string,
    geoNetwork_metro string,
    geoNetwork_city string,
    geoNetwork_cityId string,
    geoNetwork_latitude string,
    geoNetwork_networkDomain string
 )
 partition by date


SELECT
PARSE_DATE('%Y%m%d', REPLACE(_TABLE_SUFFIX, "intraday_", "")) as date, 

type as hit_type,

 h.page.pagePath 	  as hits_page_pagePath,
 h.page.pagePathLevel1 as hits_page_pagePathLevel1,
 h.page.pagePathLevel2 as hits_page_pagePathLevel2,
 h.page.pagePathLevel3 as hits_page_pagePathLevel3,
 h.page.pagePathLevel4 as hits_page_pagePathLevel4,

h.page.hostname as hit_page_hostname,
h.page.pageTitle as hits_page_pageTitle,
h.appInfo.appId as hit_appInfo_appId,
h.appInfo.appName as hit_appInfo_appName,
h.appInfo.screenName as hit_appInfo_screenName,

h.eventInfo.eventCategory as hits_eventInfo_eventCategory ,
h.eventInfo.eventAction   as hits_eventInfo_eventAction ,
h.eventInfo.eventLabel    as hits_eventInfo_eventLabel ,
h.eventInfo.eventValue    as hits_eventInfo_eventValue ,

(select value from h.customDimensions where index = 1) as CD_1,
(select distinct value from h.customDimensions where index = 35) as CD_35,



clientId	,
fullVisitorId	,
visitorId	,
userId	,
visitNumber	,
visitId	,
visitStartTime	,
totals.bounces as totals_bounces,
totals.hits	as totals_hits,
totals.newVisits as totals_newVisits	,
totals.pageviews	as totals_pageviews,
totals.screenviews	as totals_screenviews,
totals.sessionQualityDim	as totals_sessionQualityDim,
totals.timeOnScreen	as totals_timeOnScreen,
totals.timeOnSite	as totals_timeOnSite,
totals.totalTransactionRevenue	as totals_totalTransactionRevenue,
totals.transactionRevenue	as totals_transactionRevenue,
totals.transactions	 as totals_transactions,
totals.UniqueScreenViews	as totals_UniqueScreenViews,
totals.visits  as totals_visits,
trafficSource.adContent as trafficSource_adContent	,
trafficSource. adwordsClickInfo.adGroupId as trafficSource_adwordsClickInfo_adGroupId	,
trafficSource. adwordsClickInfo.adNetworkType as trafficSource_adwordsClickInfo_adNetworkType	,
trafficSource. adwordsClickInfo.campaignId as trafficSource_adwordsClickInfo_campaignId	,
trafficSource. adwordsClickInfo.creativeId as trafficSource_adwordsClickInfo_creativeId	,
trafficSource. adwordsClickInfo.criteriaId as trafficSource_adwordsClickInfo_criteriaId	,
trafficSource. adwordsClickInfo.criteriaParameters as trafficSource_adwordsClickInfo_criteriaParameters,
trafficSource. adwordsClickInfo.customerId	as trafficSource_adwordsClickInfo_customerId,
trafficSource. adwordsClickInfo.gclId	as trafficSource_adwordsClickInfo_gclId,
trafficSource. adwordsClickInfo.isVideoAd	as trafficSource_adwordsClickInfo_isVideoAd,
trafficSource. adwordsClickInfo.page as trafficSource_adwordsClickInfo_page	,
trafficSource. adwordsClickInfo.slot as trafficSource_adwordsClickInfo_slot	,
trafficSource. adwordsClickInfo.targetingCriteria.boomUserlistId	as trafficSource_adwordsClickInfo_targetingCriteria_boomUserlistId,
trafficSource.campaign	as trafficSource_campaign,
trafficSource.campaignCode as trafficSource_campaignCode	,
trafficSource.isTrueDirect as trafficSource_isTrueDirect	,
trafficSource.keyword	as trafficSource_keyword,
trafficSource.medium	as trafficSource_medium,
trafficSource.referralPath as trafficSource_referralPath	,
trafficSource.source	as trafficSource_source,
socialEngagementType	  ,
channelGrouping	,
device.browser as device_browser	,
device.browserSize	 as device_browserSize	,
device.browserVersion	 as device_browserVersion,
device.deviceCategory	 as device_deviceCategory,
device.mobileDeviceInfo	as device_mobileDeviceInfo,
device.mobileDeviceModel	as device_mobileDeviceModel,
device.mobileInputSelector	as device_mobileInputSelector,
device.mobileDeviceMarketingName	as device_mobileDeviceMarketingName,
device.operatingSystem	as device_operatingSystem,
device.operatingSystemVersion	as device_operatingSystemVersion,
device.isMobile	as device_isMobile,
device.mobileDeviceBranding	as device_mobileDeviceBranding,
device.flashVersion as device_flashVersion	,
device.javaEnabled	as device_javaEnabled,
device.language	 as device_language,
device.screenColors	as device_screenColors	,
device.screenResolution	 as device_screenResolution	, 
geoNetwork.continent	as geoNetwork_continent	,
geoNetwork.subContinent	as geoNetwork_subContinent	,
geoNetwork.country	as geoNetwork_country	,
geoNetwork.region	as geoNetwork_region	,
geoNetwork.metro	as geoNetwork_metro	,
geoNetwork.city	as geoNetwork_city	,
geoNetwork.cityId	as geoNetwork_cityId	,
geoNetwork.latitude	as geoNetwork_latitude	,
geoNetwork.longitude as geoNetwork_longitude	,
geoNetwork.networkDomain as geoNetwork_networkDomain,
geoNetwork.networkLocation as geoNetwork_networkLocation


FROM
  `gap---all-sites-1245.123492829.ga_sessions_*` t, t.hits h
  

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

AWS EMR, cost reduction

39 Tips to reduce costs on AWS EMR

There will be a time you will think on ways to reduce costs on your EMR cluster. Notice some of the tips will simply let your ran faster, and this usually imply less costs simply b/c of CPU utilization. If this is the case ,consider the below:

  1. Conceptually , switching to the Transient cluster instead 24X7 will be the best. It requires some preparation work of the automation, but it may be worth your time.
  2. Consider using R instances if your EMR jobs are memory intensive, remember the the over head of the Yarn, JVM  is about 50% of what you have on the machine.  You can confirm this via EMR ganglia, and check the amount of available RAM while the jobs are running.
  3. If you use a larger instance type, the NIC attached will have a faster quite of network bandwidth. the difference may be worth in in term of time vs cost while working with large external tables. the difference may be 300MB/s instead of 100MB/s on a smaller instance. If you witnessed something much faster – please let me know.
  4. Consider using the configuration “Maximise resource allocation “, this may shorten your running time.
  5. Naturally tuning of cluster specific / job specific configuration may be worth your time as well. but personally i am too lazy to try this, and the risk of these per query customs config propagating to there jobs and slowing them down is high due to human element. if u must tune, use the tuning per session – run it before your query, and set back to default values after you are done. you may want to consider cluster per jobs types. example: cluster for joins jobs, and cluster for aggregations. or cluster per DEV, QA, STG, PROD
  6. Consider using Task nodes. they provisioned quickly, can be killed and resized freely with zero impact on your cluster in terms of stability. peace of advice – make sure the task node is as same size of your data node, other wise your risk of underutilising your task nodes as the executors confirmations are as same of the data node.
  7. Consider using Spot instances…. especially on task nodes. save upto 80%!
  8. Consider using Reserved instances for your Data Nodes and master nodes. save upto 35% on costs. 
  9. consider using several task groups, with different size/configs
  10. Auto scaling will help be more accurate in terms of costs, you can auto scale in a resolution of 5 mins (time it takes to provision an EMR node). Autoscaling is highly useful on 24X7 EMR cluster. using Auto scaling require some testing, as it does not behave exactly as you think all the time. In Cluster with Dynamic Resources allocation, the resources may be ready, but the boost in performance may take its time. auto scaling and task saved me about 50%. Naturally, when you save costs using tasks nodes and auto scaling, your get greey on a simple performance test, well until the auto scale in kicks in 🙂
  11. In good we trust, all the rest must bring data – use Ganglia to track the exact amount of resources your need (perhaps you are over provisioning).
    1. yarn.QueueMetrics.AvailableMB
    2. yarn.QueueMetrics.AvailableVCores
    3. yarn.NodeManagerMetrics.AvailableGB
    4. yarn.NodeManagerMetrics.AvailableVCores
  12. Minimal recommended cluster size is 3 machines, one 1 Master, 2 Data nodes. conceder the below suggestions
    1. EMR with only one machine (new feature), which is the master node, data node in the same nachine
    2. EMR with 1 master, 1 Data, and if you must scale, add Task nodes, with auto scaling. notice the minimal amount of machines in the task group can be zero. notice, this should not be used in production as the stability of your cluster is much lower, even if you are not using your data on local tables. if your data node dies, the entire cluster becomes unusable, and this is unrecoverable.
  13. Encryption at rest and Encryption in motion, may be good for security reason, but may have a massive impact on production in terms of resources, running time etc. confirm security is a must before you apply security on transient cluster. Consult your CISO for this. Notice the encryption on S3 is hardware based, but still i would perform a simple benchmark test to see the cost benefit ration.
  14. If you can afford it, and it is technically valid , please test your jobs on both Hive / Spark/ Presto. Further more, test different compression types and storage types.
    1. I know for a fact from benchmarks i performed there are some cases Hive will be faster than spark.
    2. I am less familiar the presto, but i am positive there may be useless it will be faster.
    3. from a few benchmarks I performed , your will be surprised to know that using different compression types, may have massive impact on Write time to S3 and Read time (if the data is compressed better). I personally work with Parquet of GZIP. but this only work perfectly with my useless.
    4. Notice compression has impact on CPU utilisation, so it is not a clear cut what will be cheaper (parquet/orc GZIP, BZIP) nor which will be faster (spark / hive/ presto) .
  15. did you switch to columnar? if not try the this link as reference: convert to columnar  from raw based data.
  16. did u use partitioning? did you use the correct partitioning for your query?
  17. if using ORC consider using bucketing on top of partitioning
  18. was your data spliced into chunks? if so try to change chunk size. more complicated but doable, again, could go either way – need to test this will your data.
  19. apply hints on the table may help on time spend on data scan in some cases.
  20. if using multiple tables join, order of joins, may impact scanned data, shotterning running time.
  21. consider pre aggregating data if possible as part of your transformation/cleansing process. even if it is on each (using window table, each row will hold aggregation tables. )
  22. consider pre calculating table with heavy group by on raw-data. i.e have the data already calculated on s3, and have your production user/ end user query that table.
  23. have a data engineer review each query, to make sure data scan is minimised. for example
    1. Minimise the columns in the results set… a results set of longs strings maybe be very costly.
    2. where possible switch strings to ints, this will minimise footprint on storage greatly.
    3. if possible switch from bigint to tinyint. this will save some disk space as well. notice the list of supported data types: https://prestodb.io/docs/current/language/types.html

 

Conclusion

As you can see, There are many ways to save costs on AWS EMR. The easiest thing will be to use task groups and scale in/out based on your needs. The rest may take some time, but will be worth it.

 

 

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



architecture, AWS, AWS EMR, cost reduction, meetup, Performance, performance tuning, Spark

Spark performance tuning demystified

I think, perhaps, a good place to start with this is with an older article that runs down all of these parts together [1].

To start with, we want to be sure we know what resources are available to us, example cluster :

– 1 c4.8xlarge master
– 4 c4.8xlarge instances
– each with 36 CPU cores
– and each with 60 GB RAM.

First, The master will not count towards the calculation since it should be managing and not running jobs like a worker would.

Before doing anything else, we need to look at YARN, which allocates resources to the cluster. There are some things to consider:

1) To figure out what resources YARN has available, there is a chart for emr-5.11.0 showing that [2].

2) By default (for c4.8xlarge in the chart), YARN has 53248 MB (52 GB) max and 32 MB min available to it.

3) The max is supposed to be cleanly divisible by the minimum (1664 in this case), since memory incremented by the minimum number.

4) YARN is basically set for each instance it is running on (Total RAM to allocate to YARN to use).

5) Be sure to leave at least 1-2GB RAM and 1 vCPU for each instance’s O/S and other applications to run too. The default amount of RAM (52 GB out of 60 GB RAM on the instance) seems to cover this, but this will lave us with 35 (36-1) vCPUs per instance off the top.

7) If a Spark container runs over the amount of memory YARN has allocated, YARN will kill the container and retry to run it again.

When it comes to executors it may be worth considering the following:

1) This is set for the entire cluster. it doesn’t have to use all resources

2) Likely, you are going to want more than 1 executor per node, since parallelism is usually the goal. This can be experimented with to manually size a job, again it doesn’t even have to use all resources.

3) If we were to use the article [1] as an example, we may want to try an do 3 executors per node to start with (Not too few or too many), so –num-executors 11 [3 executors per node x 4 nodes = 12 executors, -1 for application master running = 11].
 
There are some big things to know about executor cores too, of course:

1) Although it doesn’t explicitly say it, I have noticed problems working with other customers, when total executors exceeds the instance’s vCPU / core count.

2) It is suggested to put aside at least 1 executor core per node to ensure there are resources left over to run O/S, App Master and the like.

3) Executor cores are being assigned per executor (discussed just above). So this isn’t total per node or for the cluster, but rather depending on the resources available divided by the number of executors wanted per node.

4) In the article’s example this works out to –executor-cores= 11 [36 cores/ vCPU per instance – 1 for AM/overhead = 35 cores. 35 cores/ 3 executors per node = 11 executor cores]

Finally, looking at the executor-memory, we also want to be sure of few things:

1) This is assigned like the executor cores (finally, something sized similarly). This is per executor and thus is not the total amount of memory on a node, or set in YARN, but rather depending on the resources available divided by the number of executors again (Like executor-cores)

2) Sometimes, some off-heap memory and other things can make an executor/container larger than the memory set. Leave room so YARN doesn’t kill it due to using more memory than is provided.

3) In the article’s example this works out to –executor-memory 16G (16384 MB) [52 GB provided to YARN / 3 executors = 17 GB RAM max. 17 GB-1 GB for extra overhead = 16GB (16384 MB)]
 
So, putting the above together as an example, we could use 3 executors per node with: –num-executors 11 –executor-cores 11 –executor-memory 16G

That’s the first part. As for the rest, let’s cover the cluster mode(s) [4] and the driver.

1) When we use –deploy-mode client, we tell YARN to deploy to an ‘external node’, but since this is being run from the master, it is straining the master for the entire cluster. Client mode is best if you have spark-shell on another instance in the same network and can submit a job from there to the cluster but use the instance’s resources for the driver (taking the load off the cluster) or when using it interactively to test since the application master is on the master instance and its logs will be there).

2) We probably want to use YARN as our deployment mode [5]. In particular, we should be looking at yarn-cluster. This means each container’s App master is deployed in the cluster and not just on one device (That is also managing the whole cluster). The resources running are spread out, but the logs are too for the application masters). There’s another good explanation on this available [6].
 
3) Remember, if you over-subscribe the driver memory and cores, there’s less for the rest of the cluster to use for actual work. Most often, the driver memory is set to 1-2GB; 20GB would take more than half the RAM and 4 cores would take most of the cores in our earlier example. I t may be worth leaving these as their defaults (not using the options) for now and then conservatively raising them if the logs inform you to do so.
 
I suspect you already have it, but as there are articles from all over in here, this is the EMR document with the options for emr-5.11.0.

Also, if this works and you want to tune further, you can see about increasing resources available to YARN (Like using perhaps 58 GB of RAM instead of 52 GB) and test those too.

When working withing Zeppelin to change the settings, it should allow changes by adding them in: Menu (top right) -> Interpreter -> (Add under the Spark section) as noted here [8]. The settings above with the “–” in-front are for doing a Spark submit. In Zeppelin they would be be the 3 like options you mentioned:
spark.executor.instances (–num-executors)
spark.executor.cores (–executor-cores)
spark.executor.memory (–executor-memory) (set using a G at the end for GB, like 16G )

Also while I was going through the attachment that you send I see that the performance is also hit by job doing spark speculation. S3 being a object if it does speculation then this is going to slow down the performance. [9]

So
a. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskschedulerimpl-speculative-execution.html(https://spark.apache.org/docs/latest/configuration.html) ensure spark.speculation os set to false (which should be default, but may allow the setting to work)
b. Insert oiverwrite + append +s3= bad, maybe writer to HDFS and do a s3distcp to s3 afterwards which will be much faster [10]

Resources:

[1] http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

[2] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-jvm

[3] http://spark.apache.org/docs/latest/submitting-applications.html

[4] http://spark.apache.org/docs/latest/cluster-overview.html

[5] http://spark.apache.org/docs/latest/running-on-yarn.html

[6] https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

[7] http://docs.aws.amazon.com/emr/latest/ReleaseGuide/latest/emr-spark-configure.html

[8] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-zeppelin.html#zeppelin-considerations

[9] http://agrajmangal.in/blog/big-data/spark-parquet-s3/

[10] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Using INSERT OVERWRITE requires listing the contents of the Amazon S3 bucket or folder. This is an expensive operation. If possible, manually prune the path instead of having Hive list and delete the existing objects. [1]

Amazon S3 is the most popular input and output source for Amazon EMR. A common mistake is to treat Amazon S3 as you would a typical file system. [2]
There are differences between Amazon S3 and a file system that you need to take into account when running your cluster.
* If an internal error occurs in Amazon S3, your application needs to handle this gracefully and re-try the operation.
* If calls to Amazon S3 take too long to return, your application may need to reduce the frequency at which it calls Amazon S3.
* Listing all the objects in an Amazon S3 bucket is an expensive call. Your application should minimize the number of times it does this.
There are several ways you can improve how your cluster interacts with Amazon S3.
* Use S3DistCp to move objects in and out of Amazon S3. S3DistCp implements error handling, retries and back-offs to match the requirements of Amazon S3. For more information, see Distributed Copy Using S3DistCp.
* Design your application with eventual consistency in mind. Use HDFS for intermediate data storage while the cluster is running and Amazon S3 only to input the initial data and output the final results.
* If your clusters will commit 200 or more transactions per second to Amazon S3, contact support to prepare your bucket for greater transactions per second and consider using the key partition strategies described in Amazon S3 Performance Tips & Tricks.
* Set the Hadoop configuration setting io.file.buffer.size to 65536. This causes Hadoop to spend less time seeking through Amazon S3 objects.
* Consider disabling Hadoop’s speculative execution feature if your cluster is experiencing Amazon S3 concurrency issues. You do this through the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution configuration settings. This is also useful when you are troubleshooting a slow cluster.

References:
[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-error-hive.html#emr-troubleshoot-error-hive-3
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#emr-troubleshoot-errors-io-1

 

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/