Blog

AWS EMR

EMR Zeppelin Security

Apache Shiro is part of the installation of EMR with the options below:

  • Basic authentication (via Apache SHIRO): user management (user,pass,groups), even LDAP

https://zeppelin.apache.org/docs/0.7.3/security/shiroauthentication.html

  • notebook permissions management: read/write/share

https://zeppelin.apache.org/docs/0.7.3/security/notebook_authorization.html

  • Data source authorization (e.g 3rd party DB):

https://zeppelin.apache.org/docs/0.7.3/security/datasource_authorization.html

 

Adding HTTPS/SSL to the EMR Zeppelin GUI (3 options)

 

Walkthrough to add https to Zeppelin:

1) Generating PKCS1 keystore file : Log into the master instance of the EMR cluster and run the following commands:

openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 365 -out certificate.pem

openssl x509 -text -noout -in certificate.pem

openssl pkcs12 -inkey key.pem -in certificate.pem -export -out certificate.p12

openssl pkcs12 -in certificate.p12 -noout -info

Please enter the public DNS name of the master node when asked for hostname. The above commands would create a file named: /home/hadoop/certificate.p12 This file is your certificate

 

2) Change the below properties in the zeppelin-site.xml file located at /etc/zeppelin/conf.dist/zeppelin-site.xml (If not present, copy the /etc/zeppelin/conf.dist/zeppelin-site.xml.template file and rename)

zeppelin.ssl

true

Should SSL be used by the servers?

zeppelin.ssl.keystore.path

/home/hadoop/certificate.p12

Path to keystore relative to Zeppelin configuration directory

zeppelin.ssl.keystore.type

PKCS12

The format of the given keystore (e.g. JKS or PKCS12)

zeppelin.ssl.keystore.password

password

Keystore password.

Can be obfuscated by the Jetty Password tool zeppelin.server.ssl.port 8445 Server ssl port. (used when ssl property is set to true) 3)

Restart Zeppelin :sudo stop zeppelin

sudo start zeppelin

4) You would be able to access Zeppelin over https on port 8445 : https://:8445/#/

User management Via Shiro

Now in order to manage groups/roles, you could create the groups/roles under the “[roles]” section in the “shiro.ini” file. For example, I could have a set of groups like:

 

    [roles]

    admin = *

    readonly = *

    poweruser = *

    scientist = *

    engineer = *

Then in the “[users]” sections, it could be looking like the below:

    [users]

    admin = password>, admin

    user1 = password>, scientist, poweruser

    user2 = password>, engineer, poweruser

    user3 = password>, readonly

 

 

For example, the above means that:

       

    – user “admin” is in “admin” group;

    – user “user1”  is in “poweruser” and “scientist” group

    – etc.

 

    Owners  admin

Writers scientist,engineer,poweruser

Readers readonly

 

Once the groups/roles are created, the authorization setting will be similar to what described in https://zeppelin.apache.org/docs/0.7.3/security/notebook_authorization.html . For instance, when in a notebook permission page, you can put the group name, instead of the individual users.

good read: recommendation from horton works:

https://community.hortonworks.com/articles/141589/zeppelin-best-practices.html

 

Need to learn more about aws big data (demystified)?

       

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

AWS EMR, Hive, Performance, performance tuning, Spark

AWS S3 caching while working with Hive spark SQL and External table | LLAP

If you are looking for options to speed up your queries which are using subsets of the same data and you would like to know if there is any AWS solution that fits the requirement of caching frequently accessed data.

If you are using Hive, you may use LLAP(If not already). LLAP effectively is a daemon that caches metadata as well as the data itself. There is an AWS blog on enabling LLAP using a bootstrap action and then executing your queries. Please look at [1] and let me know if you have any questions regarding the same. LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You may specify the number of instances you want the daemon to run, the memory allocation, number of executors per instance and so forth. But it does have its default values as well.

# –instances – number of LLAP daemon instances, defaults to the number of slave nodes # –cache – LLAP cache for each daemon, defaults to 20% of physical memory

# –executors – number of executors per daemon, defaults to the number of CPU cores

# –iothreads – number of IO threads, defaults to the number of CPU cores

# –size – YARN container memory, defaults to 50% of available memory on a node

# –xmx – LLAP daemon memory, defaults to 50% of container memory

# –log-level – log level, defaults to INFO If you are using Spark, RDD Persistence is one of the configurations that you may use to cache data in memory across operations. There are multiple levels at which you can choose to cache the data. It could be Memory Only, or caching in Memory and Disk both amongst other in [2]. You can mark an RDD to be persisted using the persist() or cache() methods on it.

Tachyon(Alluxio) is basically similar. It sits between HDFS and Spark to provide in-memory file-system, like a virtual distributed storage. Integration of Alluxio in EMR is currently in dev stages. [3]

I personally have not tested the above solution, but i am planning too, and will update on this post in the future. tested this yourself? please contact me for you feedback.

References

[1] AWS Blog LLAP – https://aws.amazon.com/blogs/big-data/turbocharge-your-apache-hive-queries-on-amazon-emr-using-llap/

[2] RDD Persistence – https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#rdd-persistence [3] LLAP Wiki – https://cwiki.apache.org/confluence/display/Hive/LLAP#LLAP-Caching

[3] Alluxio Docs – http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html#class-alluxiohadoopfilesystem-not-found-issues-with-sparksql-and-hive-metastore

[4] LLAP benchmark: https://www.slideshare.net/Hadoop_Summit/hadoop-query-performance-smackdown

[5] Hive LLAP benchmark VS Impala: https://dzone.com/articles/3x-faster-interactive-query-with-apache-hive-llap

 

Need to learn more about aws big data (demystified)?

 

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



architecture, Data Engineering, Performance, performance tuning, Spark

How to work with maximize resource allocation and Spark dynamic allocation [ AWS EMR Spark ]

Spark resource tuning is essentially a case of fitting the number of executors we want to assign per job to the available resources in the cluster. In this case, we need to look at the EMR cluster from an “overall” perspective. If we assume your Core and Task nodes are 40 (20+20) instances of type m3.xlarge.
As mentioned before, each m3.xlarge instance has 4 vCPUs and 15GiB of RAM available for use [1]. Since we are going to leave 1 vCPU per node free as per the initial issue, this gives us a total cluster vCPU resources of:
40 x 3 vCPUs = 120vCPUs

How to assign / divide RAM per executor in Spark cluster? CPU and RAM per spark executor ?

For m3.xlarge instance types, the maximum amount of RAM available for each node is controlled by the property “yarn.nodemanager.resource.memory-mb” located in the /etc/hadoop/conf/yarn-site.xml file, located on the master instance node. It is set by default to 11520 (even with the maximumResourcesAllocation enabled). This allows for some RAM to be left free on each node for other tasks such as system procedures, etc.
(40 x 11520MB RAM) / 1024 (to convert to GiB) = 450GiB RAM*

* Note: this does not mean that it is OK to divide this 450GiB by 40 and to set each executor memory to this value. This is because this maximum is a hard maximum and each executor requires some memoryOverhead to account for things like VM overheads, interned strings, other native overheads, etc.[2]. Typically, the actual maximum memory set by –executor-memory is 80% of this value.

Examples of Spark executors configuration of RAM and CPU

In this case, we can assign almost any combination that fits into these parameters. For example, for m3.xlarge instance types we could assign a safe maximum of 3 cores and 9GiB of RAM per executor on each node to give us a maximum of 40 executorsThen we could simply assign the executors divided equally across the number of jobs. Of course, as you either increase your node count or increase your instance type, the available of resources and maxExecutors per job would go up. As such, if you do decide to create a cluster with a larger instance type, you can adjust the values in this reply accordingly.

Using this as a simple formula, we could have many examples such as (using your previously provided settings as a basis):
1) 1 Spark-submit Job -> –executor-cores 3 –executor-memory 9g –conf spark.dynamicAllocation.maxExecutors=40 –conf spark.yarn.executor.memoryOverhead=2048

2) 2 Spark-submit Jobs -> –executor-cores 1 –executor-memory 4500m –conf spark.dynamicAllocation.maxExecutors=80 –conf spark.yarn.executor.memoryOverhead=1024
(–executor-cores are 1 here as 3/2 = 1.5 and decimals are rounded down)

3) 3 Spark-submit Jobs -> –executor-cores 1 –executor-memory 3000m –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=768
etc

Lets double the EMR instance size and see what happens on spark … examples provided!

On a m3.2xlarge with double the resources per node we would have 40 x 8 vCPUs and 40 x 30GiB RAM. Again, leaving 1 vCPU free per node (8-1=7 vCPUs per node) and trusting that yarn limits the max used RAM to 23GiB (23552MiB) or so, our new values would look like:
1) 1 Spark-submit Job -> –executor-cores 7 –executor-memory 19g –conf spark.dynamicAllocation.maxExecutors=40 –conf spark.yarn.executor.memoryOverhead=4096

(19456MiB + 4096MiB) * 1 overall Job = 23552MiB => A perfect fit!
23552 * 0.8 = 18841.6MiB so we can probably round this up to 19GB of RAM or so without issue. The majority of the memory discrepency between 23-19 (4GiB) can be set in the memoryOverhead parameter)

2) 2 Spark-submit Jobs -> –executor-cores 3 –executor-memory 9g –conf spark.dynamicAllocation.maxExecutors=80 –conf spark.yarn.executor.memoryOverhead=2560

(9216 + 2560) *2 Jobs to run = 23552MiB => A perfect fit!
(–executor-cores are 3 here as 7/2 = 3.5 and decimals are rounded down)

3) 3 Spark-submit Jobs -> –executor-cores 2 –executor-memory 6g –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=1536
(–executor-cores are 2 here as 7/3 = 2.333 and decimals are rounded down.)
etc
(6144 + 1536) *3 = 23040MiB => Almost a perfect fit. 512MiB is left unused.

Please note that the memory settings are not linear or related to one another. They simply must fit into the maximum size allowed by yarn. In the above case for the m3.2xlarge, I used the default 23GiB as the maximum value allowed per node.

So for the example:
“3) 3 Spark-submit Jobs -> –executor-cores 2 –executor-memory 6g –conf spark.dynamicAllocation.maxExecutors=120 –conf spark.yarn.executor.memoryOverhead=1536”
We have 3 jobs with a maximum executor memory size of 6GiB. This is 18GiB that fits into the 23GiB. Of the 5GiB remaining, we allocate that as memoryOverhead but with an equal share per job. This means that we set this value to 1536MB RAM to use the remaining RAM, except for 512MiB.

As you can see above, the calculation is that the per job is:
(executor-memory + memoryOverhead) * number of concurrent jobs = value that must be <= Yarn threshold.
(6144MB + 1536MB)*3 = 23040MiB which is 512MiB less than the Yarn threshold value of 23552MiB (23GiB).

These are rough, basic settings that is only appropriate for jobs that work best with a lower number of larger executors. Some Spark jobs run better with a higher number of smaller executors. This depends completely on the type of job that you are running and as such it is unique to each Spark application run on the cluster. With these type of jobs, you would be better off again further subdividing the values above so that you can fit more executors into the amount of jobs required. Luckily, you can use the Spark dynamicAllocation to allow spark to calculate this for you and to allocate spare executors where available per job [3].

There is scope for more complex Spark tunings, which you can read more information about in links [4], [5], [6] and [7] below.

Should maximizeResourceAllocation should be used in all cases, or if not, what are the reasons not to use it?

Answer:

Using the dynamicAllocation settings (that are by default set to true on EMR 4.4 and later) we can better determine the number of overall resources that we have and how many executors that we want to give to each job. These examples below use the 40 m3.xlarge nodes as their clusters total resources reference.

For example, leaving the cluster as-is using only the original job per cluster, we would naturally want to allocate all of our resources to this one job. If the job runs better using large executors, we would then allocate all of our resources to this job. We could do this by creating the cluster using the maximizeResourceAllocation property set to true which would automatically tune our clusters to use the most resources available [8]. Your existing settings of “–executor-memory 9g –executor-cores 3 –conf spark.yarn.executor.memoryOverhead=2048” would be sufficient for this task, though not actually necessary if the maximizeResourceallocation option is set. You can see the default settings that the maximizeResourceAllocation options sets in the /etc/spark/conf/spark-defaults.conf file on your master node.

Example1 of spark tuning : 1 x spark Job, smaller number of larger executors:
For example, using your previous spark-submit job option (tweaked) you could use something like this:

spark-submit –master yarn –executor-cores 3
–conf maximizeResourceAllocation=true
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=40
–conf spark.dynamicAllocation.initialExecutors=40
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob1.jar
—–

Here, Spark dynamicAllocation will allocate the maximum available RAM for you for your executor. We will be running 40 individual executors that have 3 vCPUs each. You can read more about the Spark dynamicAllocation options in link [3] below.


However, say your job runs better with a smaller number of executors?

Spark tuning Example 2: 1x Job, greater number of smaller executors:
In this case you would simply set the dynamicAllocation settings in a way similar to the following, but adjust your memory and vCPU options in a way that allows for more executors to be launched on each node. So for twice the executors on our 40 nodes, we need to half the memory and the vCPUs in order to make the 2 executors “fit” onto each of our nodes.

For example, for our spark job we could submit the job in a similar way to the following:
—–
spark-submit –master yarn –executor-cores 1 –executor-memory=4500m
–conf spark.yarn.executor.memoryOverhead=1024
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=80
–conf spark.dynamicAllocation.initialExecutors=20**
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob1.jar
—–

This would create a minimum of 2 executors per node for the 1 job of size 1vCPU an 4.5GiB of memory each. The property “spark.dynamicAllocation.maxExecutors=80” can be set as this allows the number of executors to be scaled up to the maximum resource allocation of the cluster. This allows for 80 * 4.5GiB = 360GiB (80% of the maximum 450GiB) and 80 * 1 vCPU which fits within the maximum vCPUs available for the cluster

** This value is arbitrary. The dynamicAllocation will scale this value as needs be during the course of your job.

Now say that we want to run 2 jobs of equal importance over the cluster with the same amount of resources going to both jobs.

Spark tuning Example 3: 2 x Spark Jobs, equal number of 1 larger executor per node per job:

In this case, we could use the following settings (similar to the last one) but with the following tweaks:
—–
spark-submit –master yarn –executor-cores 1 –executor-memory=4500m
–conf spark.yarn.executor.memoryOverhead=1024
–conf “spark.executor.extraJavaOptions=$EXECUTOR_OPTS”
–conf “spark.driver.extraJavaOptions=$DRIVER_OPTS”
–conf spark.shuffle.service.enabled=true
–conf spark.dynamicAllocation.minExecutors=1
–conf spark.dynamicAllocation.maxExecutors=40
–conf spark.dynamicAllocation.initialExecutors=20
–conf spark.dynamicAllocation.enabled=true
–conf spark.driver.maxResultSize=10G
–conf spark.akka.frameSize=1000
–conf spark.ui.killEnabled=true
–jars “/usr/lib/spark/lib/RedshiftJDBC41-1.1.7.1007.jar,/usr/lib/spark/lib/java-dogstatsd-client-2.0.12.jar”
–packages com.databricks:spark-redshift_2.10:0.5.2
–class melidata.analysis.jobs.$JOB /home/hadoop/myJob<1 or 2>.jar
—–

The main difference here is the setting of the value:

“–conf spark.dynamicAllocation.maxExecutors=40”.

We need to set this value to 40 as our memory limit is essentially halved as we are running 2 separate jobs on each node. Hence, our total resources on our cluster effectively halves to 80vCPUs and 225GiB RAM per Spark-submit job.
So to fit in our 2 jobs into these newer limits, it gives us again 1vCPU per executor:
80 vCPUs / 40 nodes = 2 vCPUs per job. Since we need to leave 1 of the 4 cores free per node, that gives us 4-1=3 vCPUs per node divided by 2 jobs is 1.5. We round down then to 1 vCPU per job.
225 * 0.8 (to get 80% of settable memory) = 180GiB per job.
180 / 40 nodes = 4.5GiB per job per node.

By setting the value of spark.dynamicAllocation.maxExecutors to 40 for each job, it ensures that neither job encroaches into the total resources allocated for each job. By limiting the maxExecutors to 40 for each job, the maximum resources available per jobs is:
40 X 4500MiB RAM (divided by 1000 to convert to GiB) = 180GiB per job.
40 x 1 vCPU = 40vCPUs per job.
2 Jobs means that in total 360GiB RAM is used (80% of total RAM as expected) and 40vCPUs * 2 jobs is 80 vCPUs which again, fits in with the maximum vCPUs for the cluster.

The difference between Example 3 and Example 2 is that in Example 2, all of the resources are being used by 2 executors on each node for the 1 job. In Example 2, your job would have a maximum of 80 executors (40 nodes * 2 executors per node) running on the same 1 job. In Example 3, your 2 jobs would have 40 executors maximum (40 nodes * 1 executor per node) running on each job concurrently.

Some Notes about spark performance tuning :

Note 1: You may need to balance the instance types to the instance count in order to get a “best case” usage for your resource requirements.
There are a few points about the above settings that you may have noticed. First, the –executor-cores value was only set to 1. This is because since we are only using 3 out of the 4 vCPUs per node, and 3/2 = 1.5, decimal values are not allowed in this parameter. This means that a whole vCPU core is going unused. In Example 2 above, you could simply lower the –executor-memory value to 3GiB and increase the –conf spark.dynamicAllocation.maxExecutors value to 120 to use this core.

This goes back to my previous point in my last response where I said:
“Please note that with the reduced CPU power per node, it may be worthwhile either increasing your instance count or instance type in order to maintain the speeds that you want. Increasing the instance type though (say for example, to a m3.2xlarge type) also has the benefit of though of being able to use more of the nodes CPU for the executors. The OS only needs a small bit of CPU free, but the spark tunings only let us specify the CPU to use in terms of cores.

Hence, on a m3.xlarge instance, 1 core left free from the 4 cores is 25% of your CPU resources left free but on a m3.2xlarge, leaving 1 core free from the 8 cores is only 12.5% of your CPU resources left free, and so on.”

You may need to balance the instance types to the instance count in order to get a “best case” usage for your resource requirements.

Note 2: combine both of your jobs into the one

You could consider trying to amalgamate both of your jobs into the one .jar file that is being called. While this would likely require more coding and tweaking of your application, doing so would allow you to allow the Spark dynamicAllocation process to effectively “load balance” the number of executors across your jobs. Instead of running your 2 jobs concurrently in two separate spark-submit calls, using the 1 spark-submit call using settings similar to Example 1 above, would allow the resources from one job to be used on the other job if one job was to finish earlier than the other. Of course you have less control over which job gets more exclusive resources than the other, but it may mean that if using Example 3 above (as an example) resources left over from the end of Job 1 could be used to help speed up the remainder of Job 2, and vice-versa.

This may or may not be what you want to do, but it may be worth considering to try to make the most from your available resources.

Note 3:consider running each job on a separate EMR cluster

If running the jobs on the same cluster, please note that the job files will be grouped together. This can make debugging issues in your jobs more difficult as there will be less separation of the job logs. If you want a greater level of job log separation, then I would suggest that you perhaps consider running each job on a separate EMR cluster. This will also give you an extra cluster to monitor and will also give you separate CloudWatch metrics per job, instead of having the 1 set of metrics for both jobs. This will make monitoring your jobs a lot easier as the metrics per cluster will be related to only one of your jobs. If there is an issue with either job, it will be a lot easier to debug the issue as it would be a lot harder to see from the CloudWatch metrics which specific job was causing the issue.

Need to learn more about aws big data (demystified)?

 



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



architecture, AWS EMR, meetup, Performance, performance tuning, Spark

Spark performance tuning demystified

I think, perhaps, a good place to start with this is with an older article that runs down all of these parts together [1].

To start with, we want to be sure we know what resources are available to us, example cluster :

– 1 c4.8xlarge master
– 4 c4.8xlarge instances
– each with 36 CPU cores
– and each with 60 GB RAM.

First, The master will not count towards the calculation since it should be managing and not running jobs like a worker would.

Before doing anything else, we need to look at YARN, which allocates resources to the cluster. There are some things to consider:

1) To figure out what resources YARN has available, there is a chart for emr-5.11.0 showing that [2].

2) By default (for c4.8xlarge in the chart), YARN has 53248 MB (52 GB) max and 32 MB min available to it.

3) The max is supposed to be cleanly divisible by the minimum (1664 in this case), since memory incremented by the minimum number.

4) YARN is basically set for each instance it is running on (Total RAM to allocate to YARN to use).

5) Be sure to leave at least 1-2GB RAM and 1 vCPU for each instance’s O/S and other applications to run too. The default amount of RAM (52 GB out of 60 GB RAM on the instance) seems to cover this, but this will lave us with 35 (36-1) vCPUs per instance off the top.

7) If a Spark container runs over the amount of memory YARN has allocated, YARN will kill the container and retry to run it again.

When it comes to executors it may be worth considering the following:

1) This is set for the entire cluster. it doesn’t have to use all resources

2) Likely, you are going to want more than 1 executor per node, since parallelism is usually the goal. This can be experimented with to manually size a job, again it doesn’t even have to use all resources.

3) If we were to use the article [1] as an example, we may want to try an do 3 executors per node to start with (Not too few or too many), so –num-executors 11 [3 executors per node x 4 nodes = 12 executors, -1 for application master running = 11].
 
There are some big things to know about executor cores too, of course:

1) Although it doesn’t explicitly say it, I have noticed problems working with other customers, when total executors exceeds the instance’s vCPU / core count.

2) It is suggested to put aside at least 1 executor core per node to ensure there are resources left over to run O/S, App Master and the like.

3) Executor cores are being assigned per executor (discussed just above). So this isn’t total per node or for the cluster, but rather depending on the resources available divided by the number of executors wanted per node.

4) In the article’s example this works out to –executor-cores= 11 [36 cores/ vCPU per instance – 1 for AM/overhead = 35 cores. 35 cores/ 3 executors per node = 11 executor cores]

Finally, looking at the executor-memory, we also want to be sure of few things:

1) This is assigned like the executor cores (finally, something sized similarly). This is per executor and thus is not the total amount of memory on a node, or set in YARN, but rather depending on the resources available divided by the number of executors again (Like executor-cores)

2) Sometimes, some off-heap memory and other things can make an executor/container larger than the memory set. Leave room so YARN doesn’t kill it due to using more memory than is provided.

3) In the article’s example this works out to –executor-memory 16G (16384 MB) [52 GB provided to YARN / 3 executors = 17 GB RAM max. 17 GB-1 GB for extra overhead = 16GB (16384 MB)]
 
So, putting the above together as an example, we could use 3 executors per node with: –num-executors 11 –executor-cores 11 –executor-memory 16G

That’s the first part. As for the rest, let’s cover the cluster mode(s) [4] and the driver.

1) When we use –deploy-mode client, we tell YARN to deploy to an ‘external node’, but since this is being run from the master, it is straining the master for the entire cluster. Client mode is best if you have spark-shell on another instance in the same network and can submit a job from there to the cluster but use the instance’s resources for the driver (taking the load off the cluster) or when using it interactively to test since the application master is on the master instance and its logs will be there).

2) We probably want to use YARN as our deployment mode [5]. In particular, we should be looking at yarn-cluster. This means each container’s App master is deployed in the cluster and not just on one device (That is also managing the whole cluster). The resources running are spread out, but the logs are too for the application masters). There’s another good explanation on this available [6].
 
3) Remember, if you over-subscribe the driver memory and cores, there’s less for the rest of the cluster to use for actual work. Most often, the driver memory is set to 1-2GB; 20GB would take more than half the RAM and 4 cores would take most of the cores in our earlier example. I t may be worth leaving these as their defaults (not using the options) for now and then conservatively raising them if the logs inform you to do so.
 
I suspect you already have it, but as there are articles from all over in here, this is the EMR document with the options for emr-5.11.0.

Also, if this works and you want to tune further, you can see about increasing resources available to YARN (Like using perhaps 58 GB of RAM instead of 52 GB) and test those too.

When working withing Zeppelin to change the settings, it should allow changes by adding them in: Menu (top right) -> Interpreter -> (Add under the Spark section) as noted here [8]. The settings above with the “–” in-front are for doing a Spark submit. In Zeppelin they would be be the 3 like options you mentioned:
spark.executor.instances (–num-executors)
spark.executor.cores (–executor-cores)
spark.executor.memory (–executor-memory) (set using a G at the end for GB, like 16G )

Also while I was going through the attachment that you send I see that the performance is also hit by job doing spark speculation. S3 being a object if it does speculation then this is going to slow down the performance. [9]

So
a. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-taskschedulerimpl-speculative-execution.html(https://spark.apache.org/docs/latest/configuration.html) ensure spark.speculation os set to false (which should be default, but may allow the setting to work)
b. Insert oiverwrite + append +s3= bad, maybe writer to HDFS and do a s3distcp to s3 afterwards which will be much faster [10]

Resources:

[1] http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

[2] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-jvm

[3] http://spark.apache.org/docs/latest/submitting-applications.html

[4] http://spark.apache.org/docs/latest/cluster-overview.html

[5] http://spark.apache.org/docs/latest/running-on-yarn.html

[6] https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

[7] http://docs.aws.amazon.com/emr/latest/ReleaseGuide/latest/emr-spark-configure.html

[8] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-zeppelin.html#zeppelin-considerations

[9] http://agrajmangal.in/blog/big-data/spark-parquet-s3/

[10] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Using INSERT OVERWRITE requires listing the contents of the Amazon S3 bucket or folder. This is an expensive operation. If possible, manually prune the path instead of having Hive list and delete the existing objects. [1]

Amazon S3 is the most popular input and output source for Amazon EMR. A common mistake is to treat Amazon S3 as you would a typical file system. [2]
There are differences between Amazon S3 and a file system that you need to take into account when running your cluster.
* If an internal error occurs in Amazon S3, your application needs to handle this gracefully and re-try the operation.
* If calls to Amazon S3 take too long to return, your application may need to reduce the frequency at which it calls Amazon S3.
* Listing all the objects in an Amazon S3 bucket is an expensive call. Your application should minimize the number of times it does this.
There are several ways you can improve how your cluster interacts with Amazon S3.
* Use S3DistCp to move objects in and out of Amazon S3. S3DistCp implements error handling, retries and back-offs to match the requirements of Amazon S3. For more information, see Distributed Copy Using S3DistCp.
* Design your application with eventual consistency in mind. Use HDFS for intermediate data storage while the cluster is running and Amazon S3 only to input the initial data and output the final results.
* If your clusters will commit 200 or more transactions per second to Amazon S3, contact support to prepare your bucket for greater transactions per second and consider using the key partition strategies described in Amazon S3 Performance Tips & Tricks.
* Set the Hadoop configuration setting io.file.buffer.size to 65536. This causes Hadoop to spend less time seeking through Amazon S3 objects.
* Consider disabling Hadoop’s speculative execution feature if your cluster is experiencing Amazon S3 concurrency issues. You do this through the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution configuration settings. This is also useful when you are troubleshooting a slow cluster.

References:
[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-error-hive.html#emr-troubleshoot-error-hive-3
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#emr-troubleshoot-errors-io-1

 

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/



AWS EMR, Spark

Securing Spark JDBC + thrift connection (SSL) @ AWS EMR (demystified)

To secure the thrift connection you can enable the ssl encryption and restart the hive-server2 and thrift service on emr master instance.

Following are the list of step to do so:
1. Create the self-signed certificate and add it to a keystore file using:
$ keytool -genkey -alias public-dnshostname -keyalg RSA -keystore keystore.jks -keysize 2048

Make sure the name used in the self signed certificate matches the hostname (use public dns name since you are connecting from outside of VPC) where Thrift server will run.

2. List the keystore entries to verify that the certificate was added. Note that a keystore can contain multiple such certificates:

$ keytool -list -keystore keystore.jks

3. Export this certificate from keystore.jks to a certificate file:
$ keytool -export -alias  public-dnshostname -file example.com.crt -keystore keystore.jks

4. Add this certificate to the client’s truststore to establish trust from where you want to connect. since you are connecting from local instance, copy the certificate “example.com.crt” to your local instance from emr master node and then import it.

$keytool -import -trustcacerts -alias  public-dnshostname -file example.com.crt -keystore truststore.jks

5. Verify that the certificate exists in truststore.jks:
$keytool -list -keystore truststore.jks

Once the certificate is imported, make the following changes in /etc/hive/conf/hive-xml site.
+++
hive.server2.transport.mode : http
hive.server2.use.SSL : true
hive.server2.keystore.path : path/to/your/keystore/jks
hive.server2.keystore.password : “keystorepassword”
+++

Restart hive-server2 and thrift server
$ sudo stop hive-server2 && sudo start hive-server2
$ sudo -u spark /usr/lib/spark/sbin/stop-thriftserver.sh && sudo -u spark /usr/lib/spark/sbin/start-thriftserver.sh

check whether service started successfully and also verify that master instance is listening on port 10001
+++
$ sudo netstat -tulpan |grep 10001
tcp        0      0 :::10001                    :::*                        LISTEN      12494/java
+++

Once service is started then you can make connection using  jdbc driver as below

jdbc:hive2://emr-dnsname:10001/default;hive.server2.transport.mode=http;ssl=true;sslTrustStore=/pathto/truststore.jks;trustStorePassword=”password

Need to learn more about aws big data (demystified)?



——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/