architecture, AWS, AWS EMR, cost reduction, meetup, Performance, performance tuning, Spark

Spark performance tuning demystified

I think, perhaps, a good place to start with this is with an older article that runs down all of these parts together [1].

To start with, we want to be sure we know what resources are available to us, example cluster :

– 1 c4.8xlarge master
– 4 c4.8xlarge instances
– each with 36 CPU cores
– and each with 60 GB RAM.

First, The master will not count towards the calculation since it should be managing and not running jobs like a worker would.

Before doing anything else, we need to look at YARN, which allocates resources to the cluster. There are some things to consider:

1) To figure out what resources YARN has available, there is a chart for emr-5.11.0 showing that [2].

2) By default (for c4.8xlarge in the chart), YARN has 53248 MB (52 GB) max and 32 MB min available to it.

3) The max is supposed to be cleanly divisible by the minimum (1664 in this case), since memory incremented by the minimum number.

4) YARN is basically set for each instance it is running on (Total RAM to allocate to YARN to use).

5) Be sure to leave at least 1-2GB RAM and 1 vCPU for each instance’s O/S and other applications to run too. The default amount of RAM (52 GB out of 60 GB RAM on the instance) seems to cover this, but this will lave us with 35 (36-1) vCPUs per instance off the top.

7) If a Spark container runs over the amount of memory YARN has allocated, YARN will kill the container and retry to run it again.

When it comes to executors it may be worth considering the following:

1) This is set for the entire cluster. it doesn’t have to use all resources

2) Likely, you are going to want more than 1 executor per node, since parallelism is usually the goal. This can be experimented with to manually size a job, again it doesn’t even have to use all resources.

3) If we were to use the article [1] as an example, we may want to try an do 3 executors per node to start with (Not too few or too many), so –num-executors 11 [3 executors per node x 4 nodes = 12 executors, -1 for application master running = 11].
 
There are some big things to know about executor cores too, of course:

1) Although it doesn’t explicitly say it, I have noticed problems working with other customers, when total executors exceeds the instance’s vCPU / core count.

2) It is suggested to put aside at least 1 executor core per node to ensure there are resources left over to run O/S, App Master and the like.

3) Executor cores are being assigned per executor (discussed just above). So this isn’t total per node or for the cluster, but rather depending on the resources available divided by the number of executors wanted per node.

4) In the article’s example this works out to –executor-cores= 11 [36 cores/ vCPU per instance – 1 for AM/overhead = 35 cores. 35 cores/ 3 executors per node = 11 executor cores]

Finally, looking at the executor-memory, we also want to be sure of few things:

1) This is assigned like the executor cores (finally, something sized similarly). This is per executor and thus is not the total amount of memory on a node, or set in YARN, but rather depending on the resources available divided by the number of executors again (Like executor-cores)

2) Sometimes, some off-heap memory and other things can make an executor/container larger than the memory set. Leave room so YARN doesn’t kill it due to using more memory than is provided.

3) In the article’s example this works out to –executor-memory 16G (16384 MB) [52 GB provided to YARN / 3 executors = 17 GB RAM max. 17 GB-1 GB for extra overhead = 16GB (16384 MB)]
 
So, putting the above together as an example, we could use 3 executors per node with: –num-executors 11 –executor-cores 11 –executor-memory 16G

That’s the first part. As for the rest, let’s cover the cluster mode(s) [4] and the driver.

1) When we use –deploy-mode client, we tell YARN to deploy to an ‘external node’, but since this is being run from the master, it is straining the master for the entire cluster. Client mode is best if you have spark-shell on another instance in the same network and can submit a job from there to the cluster but use the instance’s resources for the driver (taking the load off the cluster) or when using it interactively to test since the application master is on the master instance and its logs will be there).

2) We probably want to use YARN as our deployment mode [5]. In particular, we should be looking at yarn-cluster. This means each container’s App master is deployed in the cluster and not just on one device (That is also managing the whole cluster). The resources running are spread out, but the logs are too for the application masters). There’s another good explanation on this available [6].
 
3) Remember, if you over-subscribe the driver memory and cores, there’s less for the rest of the cluster to use for actual work. Most often, the driver memory is set to 1-2GB; 20GB would take more than half the RAM and 4 cores would take most of the cores in our earlier example. I t may be worth leaving these as their defaults (not using the options) for now and then conservatively raising them if the logs inform you to do so.
 
I suspect you already have it, but as there are articles from all over in here, this is the EMR document with the options for emr-5.11.0.

Also, if this works and you want to tune further, you can see about increasing resources available to YARN (Like using perhaps 58 GB of RAM instead of 52 GB) and test those too.

When working withing Zeppelin to change the settings, it should allow changes by adding them in: Menu (top right) -> Interpreter -> (Add under the Spark section) as noted here [8]. The settings above with the “–” in-front are for doing a Spark submit. In Zeppelin they would be be the 3 like options you mentioned:
spark.executor.instances (–num-executors)
spark.executor.cores (–executor-cores)
spark.executor.memory (–executor-memory) (set using a G at the end for GB, like 16G )

Also while I was going through the attachment that you send I see that the performance is also hit by job doing spark speculation. S3 being a object if it does speculation then this is going to slow down the performance. [9]

So
a. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskschedulerimpl-speculative-execution.html(https://spark.apache.org/docs/latest/configuration.html) ensure spark.speculation os set to false (which should be default, but may allow the setting to work)
b. Insert oiverwrite + append +s3= bad, maybe writer to HDFS and do a s3distcp to s3 afterwards which will be much faster [10]

Resources:

[1] http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

[2] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-jvm

[3] http://spark.apache.org/docs/latest/submitting-applications.html

[4] http://spark.apache.org/docs/latest/cluster-overview.html

[5] http://spark.apache.org/docs/latest/running-on-yarn.html

[6] https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

[7] http://docs.aws.amazon.com/emr/latest/ReleaseGuide/latest/emr-spark-configure.html

[8] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-zeppelin.html#zeppelin-considerations

[9] http://agrajmangal.in/blog/big-data/spark-parquet-s3/

[10] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Using INSERT OVERWRITE requires listing the contents of the Amazon S3 bucket or folder. This is an expensive operation. If possible, manually prune the path instead of having Hive list and delete the existing objects. [1]

Amazon S3 is the most popular input and output source for Amazon EMR. A common mistake is to treat Amazon S3 as you would a typical file system. [2]
There are differences between Amazon S3 and a file system that you need to take into account when running your cluster.
* If an internal error occurs in Amazon S3, your application needs to handle this gracefully and re-try the operation.
* If calls to Amazon S3 take too long to return, your application may need to reduce the frequency at which it calls Amazon S3.
* Listing all the objects in an Amazon S3 bucket is an expensive call. Your application should minimize the number of times it does this.
There are several ways you can improve how your cluster interacts with Amazon S3.
* Use S3DistCp to move objects in and out of Amazon S3. S3DistCp implements error handling, retries and back-offs to match the requirements of Amazon S3. For more information, see Distributed Copy Using S3DistCp.
* Design your application with eventual consistency in mind. Use HDFS for intermediate data storage while the cluster is running and Amazon S3 only to input the initial data and output the final results.
* If your clusters will commit 200 or more transactions per second to Amazon S3, contact support to prepare your bucket for greater transactions per second and consider using the key partition strategies described in Amazon S3 Performance Tips & Tricks.
* Set the Hadoop configuration setting io.file.buffer.size to 65536. This causes Hadoop to spend less time seeking through Amazon S3 objects.
* Consider disabling Hadoop’s speculative execution feature if your cluster is experiencing Amazon S3 concurrency issues. You do this through the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution configuration settings. This is also useful when you are troubleshooting a slow cluster.

References:
[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-error-hive.html#emr-troubleshoot-error-hive-3
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#emr-troubleshoot-errors-io-1

 

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



architecture, AWS, AWS athena, cost reduction

16 Tips to reduce costs on AWS SQL Athena

How save costs on AWS SQL Athena? Cost of using AWS SQL Athena is killing you?

consider the below

  1. Did you switch to columnar? if not try the this link as reference: convert to columnar  from raw based data.
  2. Did you use parquet or orc? one of them take less space.
  3. Did u use partitioning? did you use the correct partitioning for your query?
  4. If using ORC consider using bucketing on top of partitioning, not sure if Athena supports this. confirming this. TBD.
  5. If using highly nested data , perhaps AVRO will be your space saver, need to test this.
  6. Did you compress via gzip? or something else?  there more compressions supported, each have there own storage put print and scan put print….
  7. Was your data spliced into chunks? if so try to change chunk size. more complicated but doable, again, could go either way – need to test this will your data.
  8. Apply hints on the table may help on data scan in some cases. not sure if Athena supports this. confirming this. TBD.
  9. If using multiple tables join, order of joins, may impact scanned data
  10. Consider pre aggregating data if possible as part of your transformation/cleansing process. even if it is on each (using window table, each row will hold aggregation tables. )
  11. Consider pre calculating table with heavy group by on raw-data. i.e have the data already calculated on s3, and have your production user/ end user query that table.
  12. If your application permits this – using caching layer like elastic cache. remember Athena has it own caching as well (results are saved for 24 hours)
  13. have a data engineer review each query, to make sure data scan is minimised. for example
    1. Minimise the columns in the results set… a results set of longs strings maybe be very costly.
    2. where possible switch strings to ints, this will minimise footprint on storage greatly.
    3. if possible switch from bigint to tinyint. this will save some disk space as well. notice the list of supported data types: https://prestodb.io/docs/current/language/types.html
  14. Consider EMR with presto and task group with spots + auto scaling – you could have a tighter control on max budget that will be used, and in some cases it may be faster running on AWS EMR.
  15. Use bucketing ( each partition can be divided into smaller parts. official docs: https://docs.aws.amazon.com/athena/latest/ug/bucketing-vs-partitioning.html

16.Benchmarking different types of Hadoop / Spark / Hive / Presto storage and compressions types.

Using this blog on convert raw based data to columnar on tpch data , I created new destinations tables , and converted to different type storage and compression. For completeness I used popular ORC and parquet the following compressions types GZIP, GZIP, Deflate, LZO, Zlib. Afterwards I ran a simple preview query on each of the 5 compressions  below:

SELECT * FROM “tpch_data”.”lineitem_compresation_name” limit 10

Results on data read via Athena on cold queries (data scanned only once, after 72 hours):

  • Parquet, GZIP: (Run time: 4.8 seconds, Data scanned: 84MB)
  • Parquet, BZIP: (Run time: 6.0 seconds, Data scanned: 242MB)
  • Parquet, Deflate: (Run time: 5.81 seconds, Data scanned: 242MB)
  • Parquet, LZO: (Run time: 9 seconds, Data scanned: 15GB)
  • ORC, Zlib : (Run time: 10.1 seconds, Data scanned: 11GB)
  • Text,GZIP:  (Run time: 5.9 seconds, Data scanned: 49MB)

Results on data read via Athena on hot queries (data scanned several times):

  1. Parquet, GZIP: (Run time: 6 seconds, Data scanned: 2.5GB)
  2. Parquet, BZIP: (Run time: 6.39 seconds, Data scanned: 4.84GB)
  3. Parquet, Deflate: (Run time: 4.88 seconds, Data scanned: 5.31GB)
  4. Parquet, LZO: (Run time: 5.77 seconds, Data scanned: 8.97GB)
  5. ORC, Zlib : (Run time: 6.5 seconds, Data scanned: 5.07GB)

The results are not surprising as there are many compressions, and each can behave differently on different data types (strings, ints, floats, rows, columns) . notice the scan time is more or less the same.

I ran a columnar test on a specific int column instead of all columns in this queries, and the results were the same, as all compressions read the same amount of giga scanning this specific column. again notice the different running time.

SELECT avg( l_orderkey) FROM “tpch_data”.”lineitem_parquet”

  • Parquet, GZIP:  (Run time: 6.84 seconds, Data scanned: 3.77GB)
  • Parquet, BZIP:  (Run time: 9.03 seconds, Data scanned: 3.77GB)
  • Parquet, Deflate: (Run time: 9 seconds, Data scanned: 3.77GB)
  • Parquet, LZO: (Run time: 9.94 seconds, Data scanned: 3.77GB)
  • ORC, Zlib : (Run time: 11.27 seconds, Data scanned: 3.77GB)

Again, This test should not convince you to prefer one storage type over the other, nor to prefer one compression type over the other. Test it on you data, and understand the differences in your use case.

and don’t forget monthly AWS s3 Storage costs…. 

  1. Old data can be reduced to reduced availability.
  2. how much data is read in your queries. if your invoice of Athena is 500$, this means you are reading 100 TB of compressed data. so… can you minimised the amount of data read in your query – or can your reduce the history from 2 years to 1 year ?

Conclusion on cost reduction using AWS SQL Athena

  1. As you can see, you could be saving a 50% or more. easily on your AWS SQL Athena costs simply by changing to the correct compression.
  2. Check the running time, be sure it is a non issues for your use case.
  3. regarding the text vs parquet, be sure to understand the use-case, not always you need to extract all the rows, thus column based storage, will be more use-full.
  4. if you have any more tips to reduce costs – please contact me if you are willing to share 🙂

Resources:

https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/

https://docs.aws.amazon.com/athena/latest/ug/service-limits.html

https://docs.aws.amazon.com/athena/latest/ug/work-with-data.html

Need to learn more about aws big data?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



architecture, AWS athena, AWS EMR, cost reduction

When should we use EMR and When should we use Redshift? EMR VS Redshift

Use Redshift when

  1. Traditional data warehouse
  2. When you need the data relatively hot for analytics such as BI
  3. When there is no data engineering team
  4. When your queries require joins
  5. When you need a cluster 24X7
  6. When you data type are simple, i.e not Arrays, or Structs
  7. When data has no nested jsons
  8. When you have petabyte scale database
  9. When you want analize massive amount of data (spectrum)
  10. When you need update/delete
  11. When you require and ACID DBMS

Use EMR (SparkSQL, Presto, hive) when

  1. When you need a transient cluster, for night or hourly automation 
  2. When compute elasticity is important (auto scaling on tasks)
  3. When cost is important: spot instances. 
  4. When you data scales until a few hundred TB’s
  5. When you want to decouple compute and storage (external table + task node + auto scaling). this is cloud architecture best practice.
  6. When you require more flexibility
    1. Complex partitions + dynamic partitioning + insert overwrite. click on the link for an example.
    2. Complex data type
      1. Structs
      2. Arrays <–> nested json
    3. Orchestration built in such as Oozie, although Airflow is more common.
    4. Notebook built in – mix your code with SQL via   Zeppelin

Watch this meetup video to understand in depth Big Data Architecture conciderations in AWS.

Please check below Redshift specific faq: 

Q: When would I use Amazon Redshift vs. Amazon EMR?
Q: Can Redshift Spectrum replace Amazon EMR?
Q: Can I use Redshift Spectrum to query data that I process using Amazon EMR?

— Reference : Redshift faq
https://aws.amazon.com/redshift/faqs/

Please check below EMR specific faq:

Q: What can I do with Amazon EMR?
Q: Who can use Amazon EMR?
Q: What can I do with Amazon EMR that I could not do before?
Q: What is the data processing engine behind Amazon EMR?
Q: What is Apache Spark?
Q: What is Presto?

— Reference : EMR faq
https://aws.amazon.com/emr/faqs/

** Point 2. I am listing other resources which can help to understand RDS and EMR use cases better.

— Reference :
AWS redshift related case studies > Look for case study section :
https://aws.amazon.com/redshift/getting-started/
https://pages.awscloud.com/redshift-proof-of-concept-request.html

— Reference :
AWS EMR related case studies > Look for case study section :
https://aws.amazon.com/emr/
https://pages.awscloud.com/GLOBAL_OT_emr-poc_20170530.html

** Point 3. I have tried to check some of AWS blogs which shows how EMR and RDS can be used together in specific use cases. 

— How I built a data warehouse using Amazon Redshift and AWS services in record time
https://aws.amazon.com/blogs/big-data/how-i-built-a-data-warehouse-using-amazon-redshift-and-aws-services-in-record-time/

— Build a Healthcare Data Warehouse Using Amazon EMR, Amazon Redshift, AWS Lambda, and OMOP
https://aws.amazon.com/blogs/big-data/build-a-healthcare-data-warehouse-using-amazon-emr-amazon-redshift-aws-lambda-and-omop/

— Powering Amazon Redshift Analytics with Apache Spark and Amazon Machine Learning
https://aws.amazon.com/blogs/big-data/powering-amazon-redshift-analytics-with-apache-spark-and-amazon-machine-learning/

Hope this information helps in understanding EMR and Redshift use cases better.

Need to learn more about aws big data?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/