Wednesday, December 14, 2016

Namenode Handler Configuration Best Practice

HDFS has two configuration parameters, which are dfs.namenode.handler.count  and dfs.namenode.service.handler.count. The blog tries to explain what they are and what value we should set them. Setting them too low causes degraded performance at HDFS layer. Even worse, it can cause namenode or datanode in bad health status or namenode HA constantly has failover. In the namenode log, you might see message like below

INFO org.apache.hadoop.ipc.Server: IPC Server handler xx on 8022 caught an exception java.nio.channels.ClosedChannelException

Also you might see large RPC call queue length when you monitor your namenode through Cloudera Manager. RPC queue length should be 0. 



Namenode is a RPC server that requires a thread pool to handle incoming RPC calls. The number of thread in the pool is controlled by dfs.namenode.handler.count. If dfs.namenode.servicerpc-address is configured (which is also recommended), the namenode starts an extra RPC server to handle the non-client related RPC call, such as from datanodes daemon themselves. That extra RPC server's thread count is controlled by dfs.namenode.service.handler.count. In that case, threads controlled by dfs.namenode.handler.count only handle client RPC call, such as from your MapReduce jobs, HDFS cli commands, etc. 

The use of dfs.namenode.service.handler.count was not documented clearly in earlier version of HDFS releases, hence the JIRA https://issues.apache.org/jira/browse/HDFS-8443

So what is the value we should set for dfs.namenode.handler.count or dfs.namenode.service.handler.count? 

Both dfs.namenode.service.handler.count and dfs.namenode.service.handler.count should be set to the same value, which is ln(num of datanodes) * 20. For example, if your cluster has 100 datanodes, these parameters should be set to 92. By default, they are both 10. But if you use Cloudera Manager to do the installation, Cloudera Manager should set this for you automatically based on the number of datanodes you have in your cluster. But if you add nodes to your cluster after the initial installation, this value should be increased accordingly, one thing that lots of Hadoop administrators miss after expanding their Hadoop cluster. 

Tuesday, November 08, 2016

Hive on Spark Field Guide

Hive On Spark has become more and more popular. Hive has been in the Hadoop ecosystem for a long time. It is the most popular SQL on Hadoop option (not necessarily the best one). It is quite useful especially in ETL use cases. However, when running on MapReduce execution engine, it inherits the biggest drawback of MapReduce framework -- slow performance. Hive On Spark allows end user to switch the execution engine from MapReduce to Spark without rewriting your Hive scripts.

Because Hive on Spark is quite new, there are still some usability/manageability issues. This blog is going to explore some problems I ran into in the field and provide solutions and workarounds.

There is an important distinction between Hive On Spark and Hive on MapReduce.  This usually confuses people who use Hive on Spark the first time. With Hive on Spark, assuming your spark application running in YARN,  there is only one long running YARN application per user hive session. It is shared among all the queries submitted within the same Hive session. When the first query is executed in a Hive session (through Hue or beeline or any other Hiveserver2 client), a Hive on Spark YARN application is launched.  Hive on Spark uses yarn-cluster mode, and thus Spark driver is executed in AM. When the query finishes, Hive doesn't terminate this spark application. Instead, the Spark application would be kept running and used by subsequent queries submitted in the same Hive session, until the session is closed. Each Hive query is translated to at least one Spark job within that spark application.  On the contrary, when running Hive on MapReduce, each Hive query translates to a chain of MapReduce jobs, depending on the complexity. As soon as the Hive query is done, all MapReduce jobs are finished and not reused by the next Hive query.

Below I started a beeline shell. Within the same beeline shell, I executed two different Hive queries. As you can see from Cloudera Manager YARN application page, it shows there is only one application running and it shows 10% complete. But don't be fooled. Both queries have already been completed. This Hive on Spark application is going to keep running, until you quit your beeline shell.






If you go to the Spark History Server UI, you should find your application in the "incomplete application list". Click the uncompleted application link, you should see there are two spark jobs in the Completed Jobs list.




As soon as you exit your beeline session, the Hive on Spark application is done. The duration shown in the below page is how long the hive session has been kept open, not the duration of the queries that are executed.





Now here comes the questions I asked by customers in the field.

How can I find out the queries that are submitted by other hive users in Cloudera Manager YARN application page? 

When we run Hive on MR, the Hive query string can be found easily through CM YARN application page.  For Hive on Spark, it is not possible.  One Hive on Spark application corresponds to many Hive queries. A workaround is to use HiveServer 2 Web UI, which shows all the active sessions and active/past queries running.

Why Hive on Spark applications stay alive forever and they occupy almost all cluster resources and no more jobs can be run. What shall we do?

Hive on Spark application is long-running by design, as was explained at the beginning of the blog. The purpose of this design is that with AM and executor already launched,  the subsequent hive query can reuse them and run much faster.

However, if a user opens a beeline shell, submit a query and leave the shell open, the Hive on Spark application will hold YARN resources. To Hue user it is even worse. Because Hue is a web tool, currently it does not have an elegant way to close the Hive on Spark session to the HiveServer2. When you are using CDH 5.7, which comes with Hue 3.9, you would notice that the Hive on Spark session is kept open, even after you logout Hue. The end result is that the Hive on Spark YARN application is running forever, unnecessarily occupying YARN resources, even after users have long logout the Hue application and closed their browsers.  Very soon the YARN is going to run out of CPU and Memory and no more job can be submitted. The only way I found out that you can close the Hive on Spark session manually through Hue in CDH 5.7 is to execute "set hive.execution.engine=mr" in the Hive editor in Hue. This will essentially close the Hive on Spark session immediately. Of course, you can then execute "set hive.execution.engine=spark" again to switch back to spark engine.

The Hue comes with CDH 5.8 seems to try to address this issue. It adds a feature called "close session", which allows you to close the hive session manually. However, according to my own testing, this feature is still a little unstable. Sometimes the close session operation works fine, which terminates the Hive on Spark YARN application for you. But sometimes it does not work and throw error "Failed to close session, session handle may already be closed or timed out" error in Hue and the HoS session in YARN is still running forever.

There are two things you can do to help alleviate the issues.

1. We need to enable dynamic executor allocation for Hive On Spark. This allows Spark to add and remove executors dynamically to Hive jobs. This is done based on the workload. The spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.minExecutors by default are set to be 1 in Cloudera Manager.  It means by default every idle Hive on Spark session will take 2 containers. One is for the ApplicationMaster/Spark driver and the other is for the minimum executor. The executor usually takes more resources from YARN than AM.  For example, usually we recommend 4-6 CPU for executor container while AM container only takes 1 CPU. To reduce the waste, it might be wise to reduce the initial executor number and minimum executor number from 1 to 0. Thus while the Hive on Spark session is idle, only AM container hold the YARN resource. Of course this will add a delay for each query as new executors have to be launched first. Thus you have to decide which configuration fits your scenario better.



2. The hiveserver2 has a configuration called hive.server2.idle.session.timeout. After the timeout, the Hive on Spark session is closed and all YARN resources are released. By default this value is set to be 12 hours. We can reduce it to a smaller internal and allow idle session to be closed automatically when the timeout has passed. Please note that by default this idle time does not start counting while there is an active query running. The clock only starts when the last query result is returned.  hive.server2.idle.session.timeout_check_operation by default is set to be true.




The combination of above two methods can help preserve the YARN resources and support more users sharing the Hadoop cluster resource.



Thursday, August 25, 2016

Common Pitfalls When Writing Spark Applications (Part 1: Logging Side Effect)

As a Big Data Solutions Architect, I have done quite a few spark application reviews for my customers. I have the opportunity to witness some common pitfalls when writing spark applications first-hand. I decided to write a blog series on the subject, helping the spark community to avoid these costly mistakes.

Spark 101: operations can be divided into transformations and actions. All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action is invoked. This design enables Spark to optimize the RDD pipeline and run more efficiently. 

During one of my spark application review engagement, I noticed that the development team has done a great deal of logging through log4j API. There are many rdd.count() in the logging statements. Something like below 

logger.info("contract_devices:"+ contract_devices.count())

contract_devices is an RDD with long lineage. 

Even if the logger level is set to be warn or error, the action count on the RDD is still called. This is the logging side effect we need to watch out, because it will trigger all the previous transformation up to this RDD to be executed, including reading from HDFS at the very beginning. There are many such log statements throughout the codes, literally calling count() action on every RDDs in the long lineage. One can image how inefficient this becomes because the whole lineage is executed again and again. 

Instead, we should have written code like below:

if (logger.isInfoEnabled()) {
    logger.info("contract_devices:"+ contract_devices.count());
}

Because of the checking done by logger.isInfoEnabled(), the action on RDD will not be triggered when logger level is set properly. 

Tuesday, August 23, 2016

Kafka and Load Balancer

I was reviewing a Kafka - Spark Streaming application architecture for a client. The client proposed the below architecture at the Kafka producer side. 

Kafka Producer --> F5 --> Kafka Broker cluster

The Kafka Broker cluster is composed of 3 nodes and is hidden from the Kafka producer behind the F5 load balancer. Producer cannot connects to the Kafka brokers directly without going through F5. I immediately pointed out that such architecture does not work. 

There are total two steps when Kafka producer sends messages to Kafka broker. 

The first step is to retrieve the metadata information. During this step, We use configuration metadata.broker.list to pass in a list of bootstrap brokers. This list does not need to include ALL brokers in the Kafka cluster. Any broker in the cluster can retrieve metadata information. We usually recommend set at least 3 brokers in the list to achieve HA. It is OK to use a load balancer during the metadata retrieval step. 

However, once Kafka producer has the metadata information, during the second step, the producer connects to the broker directly, without F5 sitting in the middle. The producer is a smart client. For example, it uses partition key to determine the destination partition of the message. By default, a hashing-based partitioner is used to determine the partition id given the key, and people can use customized partitioners too. Hiding the whole Kakfa broker cluster behinds the firewall will defeat the purpose. 

What if the event producer side has to go through a load balancer to access the Kafka brokers? One possible solution is to build a restful service acting as Kafka producer. The event generators are going to post events to the restful service end point, which is behind a load balancer and can scale out based on the volume of the events. The restful service then sends messages to Kafka brokers directly without load balancer in the middle. If you don't feel like writing your own restful service as Kafka producer client, you can use this open source project https://github.com/confluentinc/kafka-rest. However, building a restful service is not very hard if you decide to DIY.