Tuesday, February 21, 2017

Enable Back Pressure To Make Your Spark Streaming Application Production Ready

In order for a spark streaming application to run stably and efficiently in production, its batch process time should be close to the batch interval time, but consistently below it. If the batch process time is always higher than the batch interval, the schedule delay keeps increasing and cannot recover. As a result, the spark streaming application becomes unstable. On the other hand, if the batch process time is always much lower than the batch interval, it is a waste of cluster resource. 

When using the direct API for Kafka and Spark streaming integration, we have an easy way to control the max flow rate -- a configuration called spark.streaming.kafka.maxRatePerPartition. According to the documentation, it is
the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API
The spark.streaming.kafka.maxRatePerPartition configuration is especially crucial to prevent the streaming application from overloading in two scenarios

  1. It prevents the first micro-batch from being overwhelmed when there are a large number of unprocessed messages in the Kafka topic initially and we set the auto.offset.reset in Kafka parameters to smallest. 
  2. It prevents micro-batches from being overwhelmed when there is a sudden surge of messages from the Kafka producers
Please note this configuration only takes effect when using direct API. For receiver-based Kafka-Spark integration, spark.streaming.receiver.maxRate is used to achieve the similar message max rate control. Since direct API is the recommended integration option, we will not spend time on spark.streaming.receiver.* configurations in this blog post.

However, setting the max rate per Kafka partition statically has its own drawback. For a long-running spark streaming application that runs in production for months, things change over time, so is the optimal max rate per Kafka partition. Sometimes the message characteristics such as message size change over time, causing the processing time of the same number of messages varies. Sometimes a multi-tenant cluster becomes busy during the daytime when other big data applications such as Impala/Hive/MR jobs compete for shared system resources such as CPU/Memory/Network/Disk IO. 

The backpressure comes to the rescue! Backpressure was a highly demanded feature that allows the ingestion rate to be set dynamically and automatically, basing on previous micro-batch processing time. Such feedback loop makes it possible to adapt to the fluctuation nature of the streaming application. 

Spark Streaming back pressure was introduced in Spark 1.5. In order to enable backpressure, we can add the below code in spark streaming


What about the first micro-batch rate? Since there is no previous micro-batch processing time available, there is no basis to estimate what is the optimal rate the application should use. Sifting through the spark documentation, there is a configuration called "spark.streaming.backpressure.initialRate" seems to control the initial rate when backpressure is enabled. This is a common misconception! That configuration only applies for received-based integration approach and is not used by the direct API. As Cody Koeninger explained in one of his mailing list answers

... that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :(  So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with.

As a matter of fact, there is an open JIRA: SPARK-18580: Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream about this issue.

So the recommended approach is to use spark.streaming.kafka.maxRatePerPartition to control the initial rate before the backpressure feedback loop takes effect. I usually recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate and let the backpressure algorithm to take care of the rest. Please note that spark.streaming.kafka.maxRatePerPartition still serves as the max rate that backpressure algorithm would not exceed.

Currently, PID Rate Estimator is the only available rate estimator. There are a few parameters to control its behavior, according to the documentation

  • spark.streaming.backpressure.pid.proportional (default: 1.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.integral (default: 0.2) can be 0 or greater.
  • spark.streaming.backpressure.pid.derived (default: 0.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.minRate (default: 100) must be greater than 0.
Usually, the only parameter I would tune is spark.streaming.backpressure.pid.minRate, since the default is 100. In certain use cases, 100 messages per second per partition is still too high and needs to be adjusted down. 

It is important to point out that all following micro-batches are scheduled based on the existing rate (initially set by spark.streamng.kafka.maxRatePerPartition), until the first micro-batch is completed. Please see the below Spark UI graph (I took it from Cody Koeninger's spark mailing list post). When the scheduling delay is 31 seconds, the first 7 micro-batches with interval 5 seconds still use the ingestion rate of 20 records per batch. It is only the 8th micro batch that is affected by the backpressure and changes the ingestion rate to be 5 records. To avoid such huge delay for backpressure to take effect, I recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate.

To observe the backpressure behavior, we can set the below log4j setting


When PID Rate Estimator starts computing the rate, you should see similar messages below:

TRACE PIDRateEstimator:

time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]

When there is no enough information for the estimator to calculate the rate, you should see a similar message below:

TRACE PIDRateEstimator: Rate estimation skipped

When there is a new rate set by the backpressure, you should see below message:

TRACE PIDRateEstimator: New rate = [newRate]

In summary, enabling backpressure is an important technique to make your spark streaming application production ready. It set the message ingestion rate dynamically and automatically based on previous batch performance, thus making your spark streaming application stable and efficient, without the pitfall of statically capped max rate. 

Friday, February 10, 2017

The Gotcha Of Using Spark Dependency in Cloudera Maven Repository (Mac User Only)

One night I setup a basic word count spark application in IntelliJ IDE on my MacBook. Usually I would specify the spark core dependency in maven pom.xml as below:

But that night I decided to use the spark artifacts from Cloudera's maven repository.  It seems to be a good idea because ultimately my spark application is going to be deployed on a CDH cluster. Even though the CDH spark distribution is mostly identical to the upstream open source Apache Spark project, it contains patches and other tweaks so that it works well with other Hadoop components included in the Cloudera CDH distribution. I am a big believer that the build environment should be as identical as possible to the the runtime environment.  For details about how to setup Cloudera Maven repository, please follow the link here. 

My spark core dependency looks like below:

Then I ran into trouble. Within the IDE, the maven compile and package goals ran fine, but running the application hit a problem. The IDE spilled out the below exception:

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
... 41 more

If I tried to use spark-submit to submit the packaged jar file to a running CDH cluster deployed in AWS (RedHat 7 as OS), the word count application ran fine. The problem only happened when I tried to run the spark application locally on my MacBook.  WEIRD!

After some digging, I found out the real cause. The snappyjava library cannot be found is actually caused the snappy-java version used by spark in CDH. If you check spark 1.6.0-cdh5.9 maven dependency, the snappy-java version is actually That version contains a bug, which is described here: https://github.com/xerial/snappy-java/issues/6. It ONLY affect MAC OS. It is related to a Java call System.mapLibraryName(). It you call System.mapLibraryName("snappyjava"), it will prepend "lib" at the beginning of string and based on OS to choose extension. This will use .so on Linux and .dll on Windows. MAC OS support multiple extensions, and mapLibraryName can only support one by design. In Java 6 .jnilib is used and Java 7+ starts to use .dylib instead. The version of snappy-java only packages file libsnappyjava.jnilib, hence the error. This problem does not exist if you use the open source spark, since the version of snappyjava is according to spark github 1.6 branch. I chose 1.6 branch to check because that's the same spark version as what CDH 5.9 includes.

How do we get around this issue? Of course, switching to the spark dependency in Maven Central would work, but making the build and runtime environments identical is in general recommended.

Looking at implementation https://github.com/xerial/snappy-java/blob/master/src/main/java/org/xerial/snappy/SnappyLoader.java, I find that it tries to find a System property "org.xerial.snappy.lib.name" before calling mapLibraryName. Thus the easiest solution is to add -Dorg.xerial.snappy.lib.name=libsnappyjava.jnilib in the IDE Run configuration

Let's summarize. You only need the above solution if you meet ALL below conditions
  1. You are using a Mac, not Windows or Linux for development
  2. You are running the spark application locally either in IDE Run configure.
  3. You use spark artifacts from Cloudera's maven repository, not from Maven Central repository.
  4. The spark artifacts you use contains dependency for snappyjava version older than 1.0.5. If you use CDH 5.x release (the latest is CDH 5.10 at the time of the writing), you fall into that category. 
The good news is that Cloudera has decided to release Spark 2.x as a separate parcel.  For how to install spark 2 parcel to a CDH cluster, please refer to official online document from Cloudera. The snappy-java version included in spark 2.0 parcel is Thus if you use spark 2.0, you won't run into this issue. Here is the spark 2 dependency in Cloudera Maven repository:

Hope this blog can save time for those who runs into the same issue.

Thursday, February 02, 2017

How to Shutdown a Spark Streaming Job Gracefully

Spark Streaming application is by definition long-running. But how do can we shut it down gracefully, allowing the inflight messages being processed properly before the application stops?

There are many blog posts suggested that we should do it through JVM shutdown hook. Please see the code below, taking from https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully.

However, this approach does not work in new Spark version (after Spark 1.4). It will cause deadlock situation.

There are currently two ways to stop the spark streaming job gracefully. The first way is to set spark.streaming.stopGracefullyOnShutdown parameter to be true (default is false). This parameter is introduced in Spark to solve the graceful shutdown issue. Developers do not need to call ssc.stop() in the codes any more. Instead, they need to send SIGTERM signal to the driver. In practice, we need to do the following:

  1. Using Spark UI to find out on which node that the driver process is running. In the yarn cluster deploy mode, the driver process and AM are running in the same container. 
  2. Login that node and do ps -ef |grep java |grep ApplicationMaster and find out the pid. Please note that your grep string might be different based on your application/environment, etc. 
  3. kill -SIGTERM <AM-PID> to send SIGTERM to the process. 

After the Spark driver received the SIGTERM signal, you should see the below messages in the log

17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called

There is a catch though. Be default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts in YARN. The value default is 2. Thus after the first AM is stopped by your kill command, YARN will automatically launch another AM/driver. You have to kill the second one again. You can set --conf spark.yarn.maxAppAttempts=1 during the spark-submit, but you have to ask yourself whether you really want to give your driver no chance of failure.

You CANNOT use yarn application -kill <applicationid> to kill the job. This command does send SIGTERM signal to the container, but then almost immediately send a SIGKILL signal. The interval between SIGTERM and SIGKILL is configured by configuration yarn.nodemanager.sleep-delay-before-sigkill.ms (default 250). Of course you can increase this number,  but somehow even after I changed this to 60000 (1 minute), it still does not work. The application containers were killed almost immediately and the log file only contains below lines:

17/02/02 12:12:27 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 12:12:27 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

So for now, I do not recommend using the yarn application -kill <applicationid> command to send SIGTERM.

The second solution is to somehow inform the spark streaming application that it should be shut down gracefully by means other than SIGTERM signal. One way is to place a marker file on HDFS that the spark streaming application can check periodically. If the marker file exists, scc.stop(true, true) is called. The first "true" means the underline spark context should be stopped. The second "true" means it is a graceful shutdown, allowing inflight messages to be completed.

It is crucial that you do not call ssc.stop(true, true) within your micro-batch code. Think about it: if you call scc.stop(true, true) within your micro batch code , it will wait till all in flight messages are processed, including the current micro batch. But the current micro batch cannot be done until ssc.stop(true, true) is returned. It is a deadlock situation. Instead, you should check the marker file and call ssc.stop(true, true) in a different thread. I put a simple example on github, in which I do the checking and call ssc.stop() at the main thread after ssc.start(). You can find the source code here: https://github.com/lanjiang/streamingstopgraceful. Of course, using HDFS marker file is just one way. Other alternatives are using a separate thread to listen on a socket, starting a RESTful service, etc.

I wish in the future release Spark can take care of this issue more elegantly. For example, in the Spark UI we might add a button to stop the spark streaming job gracefully, so that we do not have to resort to custom coding or mess around with pid and SIGTERM signal.