Tuesday, February 21, 2017

Enable Back Pressure To Make Your Spark Streaming Application Production Ready

In order for a spark streaming application to run stably and efficiently in production, its batch process time should be close to the batch interval time, but consistently below it. If the batch process time is always higher than the batch interval, the schedule delay keeps increasing and cannot recover. As a result, the spark streaming application becomes unstable. On the other hand, if the batch process time is always much lower than the batch interval, it is a waste of cluster resource. 

When using the direct API for Kafka and Spark streaming integration, we have an easy way to control the max flow rate -- a configuration called spark.streaming.kafka.maxRatePerPartition. According to the documentation, it is
the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API
The spark.streaming.kafka.maxRatePerPartition configuration is especially crucial to prevent the streaming application from overloading in two scenarios

  1. It prevents the first micro-batch from being overwhelmed when there are a large number of unprocessed messages in the Kafka topic initially and we set the auto.offset.reset in Kafka parameters to smallest. 
  2. It prevents micro-batches from being overwhelmed when there is a sudden surge of messages from the Kafka producers
Please note this configuration only takes effect when using direct API. For receiver-based Kafka-Spark integration, spark.streaming.receiver.maxRate is used to achieve the similar message max rate control. Since direct API is the recommended integration option, we will not spend time on spark.streaming.receiver.* configurations in this blog post.

However, setting the max rate per Kafka partition statically has its own drawback. For a long-running spark streaming application that runs in production for months, things change over time, so is the optimal max rate per Kafka partition. Sometimes the message characteristics such as message size change over time, causing the processing time of the same number of messages varies. Sometimes a multi-tenant cluster becomes busy during the daytime when other big data applications such as Impala/Hive/MR jobs compete for shared system resources such as CPU/Memory/Network/Disk IO. 

The backpressure comes to the rescue! Backpressure was a highly demanded feature that allows the ingestion rate to be set dynamically and automatically, basing on previous micro-batch processing time. Such feedback loop makes it possible to adapt to the fluctuation nature of the streaming application. 

Spark Streaming back pressure was introduced in Spark 1.5. In order to enable backpressure, we can add the below code in spark streaming


What about the first micro-batch rate? Since there is no previous micro-batch processing time available, there is no basis to estimate what is the optimal rate the application should use. Sifting through the spark documentation, there is a configuration called "spark.streaming.backpressure.initialRate" seems to control the initial rate when backpressure is enabled. This is a common misconception! That configuration only applies for received-based integration approach and is not used by the direct API. As Cody Koeninger explained in one of his mailing list answers

... that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :(  So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with.

As a matter of fact, there is an open JIRA: SPARK-18580: Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream about this issue.

So the recommended approach is to use spark.streaming.kafka.maxRatePerPartition to control the initial rate before the backpressure feedback loop takes effect. I usually recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate and let the backpressure algorithm to take care of the rest. Please note that spark.streaming.kafka.maxRatePerPartition still serves as the max rate that backpressure algorithm would not exceed.

Currently, PID Rate Estimator is the only available rate estimator. There are a few parameters to control its behavior, according to the documentation

  • spark.streaming.backpressure.pid.proportional (default: 1.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.integral (default: 0.2) can be 0 or greater.
  • spark.streaming.backpressure.pid.derived (default: 0.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.minRate (default: 100) must be greater than 0.
Usually, the only parameter I would tune is spark.streaming.backpressure.pid.minRate, since the default is 100. In certain use cases, 100 messages per second per partition is still too high and needs to be adjusted down. 

It is important to point out that all following micro-batches are scheduled based on the existing rate (initially set by spark.streamng.kafka.maxRatePerPartition), until the first micro-batch is completed. Please see the below Spark UI graph (I took it from Cody Koeninger's spark mailing list post). When the scheduling delay is 31 seconds, the first 7 micro-batches with interval 5 seconds still use the ingestion rate of 20 records per batch. It is only the 8th micro batch that is affected by the backpressure and changes the ingestion rate to be 5 records. To avoid such huge delay for backpressure to take effect, I recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate.

To observe the backpressure behavior, we can set the below log4j setting


When PID Rate Estimator starts computing the rate, you should see similar messages below:

TRACE PIDRateEstimator:

time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]

When there is no enough information for the estimator to calculate the rate, you should see a similar message below:

TRACE PIDRateEstimator: Rate estimation skipped

When there is a new rate set by the backpressure, you should see below message:

TRACE PIDRateEstimator: New rate = [newRate]

In summary, enabling backpressure is an important technique to make your spark streaming application production ready. It set the message ingestion rate dynamically and automatically based on previous batch performance, thus making your spark streaming application stable and efficient, without the pitfall of statically capped max rate. 

Friday, February 10, 2017

The Gotcha Of Using Spark Dependency in Cloudera Maven Repository (Mac User Only)

One night I setup a basic word count spark application in IntelliJ IDE on my MacBook. Usually I would specify the spark core dependency in maven pom.xml as below:

But that night I decided to use the spark artifacts from Cloudera's maven repository.  It seems to be a good idea because ultimately my spark application is going to be deployed on a CDH cluster. Even though the CDH spark distribution is mostly identical to the upstream open source Apache Spark project, it contains patches and other tweaks so that it works well with other Hadoop components included in the Cloudera CDH distribution. I am a big believer that the build environment should be as identical as possible to the the runtime environment.  For details about how to setup Cloudera Maven repository, please follow the link here. 

My spark core dependency looks like below:

Then I ran into trouble. Within the IDE, the maven compile and package goals ran fine, but running the application hit a problem. The IDE spilled out the below exception:

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
... 41 more

If I tried to use spark-submit to submit the packaged jar file to a running CDH cluster deployed in AWS (RedHat 7 as OS), the word count application ran fine. The problem only happened when I tried to run the spark application locally on my MacBook.  WEIRD!

After some digging, I found out the real cause. The snappyjava library cannot be found is actually caused the snappy-java version used by spark in CDH. If you check spark 1.6.0-cdh5.9 maven dependency, the snappy-java version is actually That version contains a bug, which is described here: https://github.com/xerial/snappy-java/issues/6. It ONLY affect MAC OS. It is related to a Java call System.mapLibraryName(). It you call System.mapLibraryName("snappyjava"), it will prepend "lib" at the beginning of string and based on OS to choose extension. This will use .so on Linux and .dll on Windows. MAC OS support multiple extensions, and mapLibraryName can only support one by design. In Java 6 .jnilib is used and Java 7+ starts to use .dylib instead. The version of snappy-java only packages file libsnappyjava.jnilib, hence the error. This problem does not exist if you use the open source spark, since the version of snappyjava is according to spark github 1.6 branch. I chose 1.6 branch to check because that's the same spark version as what CDH 5.9 includes.

How do we get around this issue? Of course, switching to the spark dependency in Maven Central would work, but making the build and runtime environments identical is in general recommended.

Looking at implementation https://github.com/xerial/snappy-java/blob/master/src/main/java/org/xerial/snappy/SnappyLoader.java, I find that it tries to find a System property "org.xerial.snappy.lib.name" before calling mapLibraryName. Thus the easiest solution is to add -Dorg.xerial.snappy.lib.name=libsnappyjava.jnilib in the IDE Run configuration

Let's summarize. You only need the above solution if you meet ALL below conditions
  1. You are using a Mac, not Windows or Linux for development
  2. You are running the spark application locally either in IDE Run configure.
  3. You use spark artifacts from Cloudera's maven repository, not from Maven Central repository.
  4. The spark artifacts you use contains dependency for snappyjava version older than 1.0.5. If you use CDH 5.x release (the latest is CDH 5.10 at the time of the writing), you fall into that category. 
The good news is that Cloudera has decided to release Spark 2.x as a separate parcel.  For how to install spark 2 parcel to a CDH cluster, please refer to official online document from Cloudera. The snappy-java version included in spark 2.0 parcel is Thus if you use spark 2.0, you won't run into this issue. Here is the spark 2 dependency in Cloudera Maven repository:

Hope this blog can save time for those who runs into the same issue.

Thursday, February 02, 2017

How to Shutdown a Spark Streaming Job Gracefully

Spark Streaming application is by definition long-running. But how do can we shut it down gracefully, allowing the inflight messages being processed properly before the application stops?

There are many blog posts suggested that we should do it through JVM shutdown hook. Please see the code below, taking from https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully.

However, this approach does not work in new Spark version (after Spark 1.4). It will cause deadlock situation.

There are currently two ways to stop the spark streaming job gracefully. The first way is to set spark.streaming.stopGracefullyOnShutdown parameter to be true (default is false). This parameter is introduced in Spark to solve the graceful shutdown issue. Developers do not need to call ssc.stop() in the codes any more. Instead, they need to send SIGTERM signal to the driver. In practice, we need to do the following:

  1. Using Spark UI to find out on which node that the driver process is running. In the yarn cluster deploy mode, the driver process and AM are running in the same container. 
  2. Login that node and do ps -ef |grep java |grep ApplicationMaster and find out the pid. Please note that your grep string might be different based on your application/environment, etc. 
  3. kill -SIGTERM <AM-PID> to send SIGTERM to the process. 

After the Spark driver received the SIGTERM signal, you should see the below messages in the log

17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called

There is a catch though. Be default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts in YARN. The value default is 2. Thus after the first AM is stopped by your kill command, YARN will automatically launch another AM/driver. You have to kill the second one again. You can set --conf spark.yarn.maxAppAttempts=1 during the spark-submit, but you have to ask yourself whether you really want to give your driver no chance of failure.

You CANNOT use yarn application -kill <applicationid> to kill the job. This command does send SIGTERM signal to the container, but then almost immediately send a SIGKILL signal. The interval between SIGTERM and SIGKILL is configured by configuration yarn.nodemanager.sleep-delay-before-sigkill.ms (default 250). Of course you can increase this number,  but somehow even after I changed this to 60000 (1 minute), it still does not work. The application containers were killed almost immediately and the log file only contains below lines:

17/02/02 12:12:27 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 12:12:27 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

So for now, I do not recommend using the yarn application -kill <applicationid> command to send SIGTERM.

The second solution is to somehow inform the spark streaming application that it should be shut down gracefully by means other than SIGTERM signal. One way is to place a marker file on HDFS that the spark streaming application can check periodically. If the marker file exists, scc.stop(true, true) is called. The first "true" means the underline spark context should be stopped. The second "true" means it is a graceful shutdown, allowing inflight messages to be completed.

It is crucial that you do not call ssc.stop(true, true) within your micro-batch code. Think about it: if you call scc.stop(true, true) within your micro batch code , it will wait till all in flight messages are processed, including the current micro batch. But the current micro batch cannot be done until ssc.stop(true, true) is returned. It is a deadlock situation. Instead, you should check the marker file and call ssc.stop(true, true) in a different thread. I put a simple example on github, in which I do the checking and call ssc.stop() at the main thread after ssc.start(). You can find the source code here: https://github.com/lanjiang/streamingstopgraceful. Of course, using HDFS marker file is just one way. Other alternatives are using a separate thread to listen on a socket, starting a RESTful service, etc.

I wish in the future release Spark can take care of this issue more elegantly. For example, in the Spark UI we might add a button to stop the spark streaming job gracefully, so that we do not have to resort to custom coding or mess around with pid and SIGTERM signal.

Wednesday, December 14, 2016

Namenode Handler Configuration Best Practice

HDFS has two configuration parameters, which are dfs.namenode.handler.count  and dfs.namenode.service.handler.count. The blog tries to explain what they are and what value we should set them. Setting them too low causes degraded performance at HDFS layer. Even worse, it can cause namenode or datanode in bad health status or namenode HA constantly has failover. In the namenode log, you might see message like below

INFO org.apache.hadoop.ipc.Server: IPC Server handler xx on 8022 caught an exception java.nio.channels.ClosedChannelException

Also you might see large RPC call queue length when you monitor your namenode through Cloudera Manager. RPC queue length should be 0. 

Namenode is a RPC server that requires a thread pool to handle incoming RPC calls. The number of thread in the pool is controlled by dfs.namenode.handler.count. If dfs.namenode.servicerpc-address is configured (which is also recommended), the namenode starts an extra RPC server to handle the non-client related RPC call, such as from datanodes daemon themselves. That extra RPC server's thread count is controlled by dfs.namenode.service.handler.count. In that case, threads controlled by dfs.namenode.handler.count only handle client RPC call, such as from your MapReduce jobs, HDFS cli commands, etc. 

The use of dfs.namenode.service.handler.count was not documented clearly in earlier version of HDFS releases, hence the JIRA https://issues.apache.org/jira/browse/HDFS-8443

So what is the value we should set for dfs.namenode.handler.count or dfs.namenode.service.handler.count? 

Both dfs.namenode.service.handler.count and dfs.namenode.service.handler.count should be set to the same value, which is ln(num of datanodes) * 20. For example, if your cluster has 100 datanodes, these parameters should be set to 92. By default, they are both 10. But if you use Cloudera Manager to do the installation, Cloudera Manager should set this for you automatically based on the number of datanodes you have in your cluster. But if you add nodes to your cluster after the initial installation, this value should be increased accordingly, one thing that lots of Hadoop administrators miss after expanding their Hadoop cluster. 

Tuesday, November 08, 2016

Hive on Spark Field Guide

Hive On Spark has become more and more popular. Hive has been in the Hadoop ecosystem for a long time. It is the most popular SQL on Hadoop option (not necessarily the best one). It is quite useful especially in ETL use cases. However, when running on MapReduce execution engine, it inherits the biggest drawback of MapReduce framework -- slow performance. Hive On Spark allows end user to switch the execution engine from MapReduce to Spark without rewriting your Hive scripts.

Because Hive on Spark is quite new, there are still some usability/manageability issues. This blog is going to explore some problems I ran into in the field and provide solutions and workarounds.

There is an important distinction between Hive On Spark and Hive on MapReduce.  This usually confuses people who use Hive on Spark the first time. With Hive on Spark, assuming your spark application running in YARN,  there is only one long running YARN application per user hive session. It is shared among all the queries submitted within the same Hive session. When the first query is executed in a Hive session (through Hue or beeline or any other Hiveserver2 client), a Hive on Spark YARN application is launched.  Hive on Spark uses yarn-cluster mode, and thus Spark driver is executed in AM. When the query finishes, Hive doesn't terminate this spark application. Instead, the Spark application would be kept running and used by subsequent queries submitted in the same Hive session, until the session is closed. Each Hive query is translated to at least one Spark job within that spark application.  On the contrary, when running Hive on MapReduce, each Hive query translates to a chain of MapReduce jobs, depending on the complexity. As soon as the Hive query is done, all MapReduce jobs are finished and not reused by the next Hive query.

Below I started a beeline shell. Within the same beeline shell, I executed two different Hive queries. As you can see from Cloudera Manager YARN application page, it shows there is only one application running and it shows 10% complete. But don't be fooled. Both queries have already been completed. This Hive on Spark application is going to keep running, until you quit your beeline shell.

If you go to the Spark History Server UI, you should find your application in the "incomplete application list". Click the uncompleted application link, you should see there are two spark jobs in the Completed Jobs list.

As soon as you exit your beeline session, the Hive on Spark application is done. The duration shown in the below page is how long the hive session has been kept open, not the duration of the queries that are executed.

Now here comes the questions I asked by customers in the field.

How can I find out the queries that are submitted by other hive users in Cloudera Manager YARN application page? 

When we run Hive on MR, the Hive query string can be found easily through CM YARN application page.  For Hive on Spark, it is not possible.  One Hive on Spark application corresponds to many Hive queries. A workaround is to use HiveServer 2 Web UI, which shows all the active sessions and active/past queries running.

Why Hive on Spark applications stay alive forever and they occupy almost all cluster resources and no more jobs can be run. What shall we do?

Hive on Spark application is long-running by design, as was explained at the beginning of the blog. The purpose of this design is that with AM and executor already launched,  the subsequent hive query can reuse them and run much faster.

However, if a user opens a beeline shell, submit a query and leave the shell open, the Hive on Spark application will hold YARN resources. To Hue user it is even worse. Because Hue is a web tool, currently it does not have an elegant way to close the Hive on Spark session to the HiveServer2. When you are using CDH 5.7, which comes with Hue 3.9, you would notice that the Hive on Spark session is kept open, even after you logout Hue. The end result is that the Hive on Spark YARN application is running forever, unnecessarily occupying YARN resources, even after users have long logout the Hue application and closed their browsers.  Very soon the YARN is going to run out of CPU and Memory and no more job can be submitted. The only way I found out that you can close the Hive on Spark session manually through Hue in CDH 5.7 is to execute "set hive.execution.engine=mr" in the Hive editor in Hue. This will essentially close the Hive on Spark session immediately. Of course, you can then execute "set hive.execution.engine=spark" again to switch back to spark engine.

The Hue comes with CDH 5.8 seems to try to address this issue. It adds a feature called "close session", which allows you to close the hive session manually. However, according to my own testing, this feature is still a little unstable. Sometimes the close session operation works fine, which terminates the Hive on Spark YARN application for you. But sometimes it does not work and throw error "Failed to close session, session handle may already be closed or timed out" error in Hue and the HoS session in YARN is still running forever.

There are two things you can do to help alleviate the issues.

1. We need to enable dynamic executor allocation for Hive On Spark. This allows Spark to add and remove executors dynamically to Hive jobs. This is done based on the workload. The spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.minExecutors by default are set to be 1 in Cloudera Manager.  It means by default every idle Hive on Spark session will take 2 containers. One is for the ApplicationMaster/Spark driver and the other is for the minimum executor. The executor usually takes more resources from YARN than AM.  For example, usually we recommend 4-6 CPU for executor container while AM container only takes 1 CPU. To reduce the waste, it might be wise to reduce the initial executor number and minimum executor number from 1 to 0. Thus while the Hive on Spark session is idle, only AM container hold the YARN resource. Of course this will add a delay for each query as new executors have to be launched first. Thus you have to decide which configuration fits your scenario better.

2. The hiveserver2 has a configuration called hive.server2.idle.session.timeout. After the timeout, the Hive on Spark session is closed and all YARN resources are released. By default this value is set to be 12 hours. We can reduce it to a smaller internal and allow idle session to be closed automatically when the timeout has passed. Please note that by default this idle time does not start counting while there is an active query running. The clock only starts when the last query result is returned.  hive.server2.idle.session.timeout_check_operation by default is set to be true.

The combination of above two methods can help preserve the YARN resources and support more users sharing the Hadoop cluster resource.

Thursday, August 25, 2016

Common Pitfalls When Writing Spark Applications (Part 1: Logging Side Effect)

As a Big Data Solutions Architect, I have done quite a few spark application reviews for my customers. I have the opportunity to witness some common pitfalls when writing spark applications first-hand. I decided to write a blog series on the subject, helping the spark community to avoid these costly mistakes.

Spark 101: operations can be divided into transformations and actions. All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action is invoked. This design enables Spark to optimize the RDD pipeline and run more efficiently. 

During one of my spark application review engagement, I noticed that the development team has done a great deal of logging through log4j API. There are many rdd.count() in the logging statements. Something like below 

logger.info("contract_devices:"+ contract_devices.count())

contract_devices is an RDD with long lineage. 

Even if the logger level is set to be warn or error, the action count on the RDD is still called. This is the logging side effect we need to watch out, because it will trigger all the previous transformation up to this RDD to be executed, including reading from HDFS at the very beginning. There are many such log statements throughout the codes, literally calling count() action on every RDDs in the long lineage. One can image how inefficient this becomes because the whole lineage is executed again and again. 

Instead, we should have written code like below:

if (logger.isInfoEnabled()) {
    logger.info("contract_devices:"+ contract_devices.count());

Because of the checking done by logger.isInfoEnabled(), the action on RDD will not be triggered when logger level is set properly. 

Tuesday, August 23, 2016

Kafka and Load Balancer

I was reviewing a Kafka - Spark Streaming application architecture for a client. The client proposed the below architecture at the Kafka producer side. 

Kafka Producer --> F5 --> Kafka Broker cluster

The Kafka Broker cluster is composed of 3 nodes and is hidden from the Kafka producer behind the F5 load balancer. Producer cannot connects to the Kafka brokers directly without going through F5. I immediately pointed out that such architecture does not work. 

There are total two steps when Kafka producer sends messages to Kafka broker. 

The first step is to retrieve the metadata information. During this step, We use configuration metadata.broker.list to pass in a list of bootstrap brokers. This list does not need to include ALL brokers in the Kafka cluster. Any broker in the cluster can retrieve metadata information. We usually recommend set at least 3 brokers in the list to achieve HA. It is OK to use a load balancer during the metadata retrieval step. 

However, once Kafka producer has the metadata information, during the second step, the producer connects to the broker directly, without F5 sitting in the middle. The producer is a smart client. For example, it uses partition key to determine the destination partition of the message. By default, a hashing-based partitioner is used to determine the partition id given the key, and people can use customized partitioners too. Hiding the whole Kakfa broker cluster behinds the firewall will defeat the purpose. 

What if the event producer side has to go through a load balancer to access the Kafka brokers? One possible solution is to build a restful service acting as Kafka producer. The event generators are going to post events to the restful service end point, which is behind a load balancer and can scale out based on the volume of the events. The restful service then sends messages to Kafka brokers directly without load balancer in the middle. If you don't feel like writing your own restful service as Kafka producer client, you can use this open source project https://github.com/confluentinc/kafka-rest. However, building a restful service is not very hard if you decide to DIY.