Tuesday, February 21, 2017

Enable Back Pressure To Make Your Spark Streaming Application Production Ready

In order for a spark streaming application to run stably and efficiently in production, its batch process time should be close to the batch interval time, but consistently below it. If the batch process time is always higher than the batch interval, the schedule delay keeps increasing and cannot recover. As a result, the spark streaming application becomes unstable. On the other hand, if the batch process time is always much lower than the batch interval, it is a waste of cluster resource. 

When using the direct API for Kafka and Spark streaming integration, we have an easy way to control the max flow rate -- a configuration called spark.streaming.kafka.maxRatePerPartition. According to the documentation, it is
the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API
The spark.streaming.kafka.maxRatePerPartition configuration is especially crucial to prevent the streaming application from overloading in two scenarios

  1. It prevents the first micro-batch from being overwhelmed when there are a large number of unprocessed messages in the Kafka topic initially and we set the auto.offset.reset in Kafka parameters to smallest. 
  2. It prevents micro-batches from being overwhelmed when there is a sudden surge of messages from the Kafka producers
Please note this configuration only takes effect when using direct API. For receiver-based Kafka-Spark integration, spark.streaming.receiver.maxRate is used to achieve the similar message max rate control. Since direct API is the recommended integration option, we will not spend time on spark.streaming.receiver.* configurations in this blog post.

However, setting the max rate per Kafka partition statically has its own drawback. For a long-running spark streaming application that runs in production for months, things change over time, so is the optimal max rate per Kafka partition. Sometimes the message characteristics such as message size change over time, causing the processing time of the same number of messages varies. Sometimes a multi-tenant cluster becomes busy during the daytime when other big data applications such as Impala/Hive/MR jobs compete for shared system resources such as CPU/Memory/Network/Disk IO. 




The backpressure comes to the rescue! Backpressure was a highly demanded feature that allows the ingestion rate to be set dynamically and automatically, basing on previous micro-batch processing time. Such feedback loop makes it possible to adapt to the fluctuation nature of the streaming application. 

Spark Streaming back pressure was introduced in Spark 1.5. In order to enable backpressure, we can add the below code in spark streaming

sparkConf.set("spark.streaming.backpressure.enabled",”true”)

What about the first micro-batch rate? Since there is no previous micro-batch processing time available, there is no basis to estimate what is the optimal rate the application should use. Sifting through the spark documentation, there is a configuration called "spark.streaming.backpressure.initialRate" seems to control the initial rate when backpressure is enabled. This is a common misconception! That configuration only applies for received-based integration approach and is not used by the direct API. As Cody Koeninger explained in one of his mailing list answers

... that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :(  So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with.


As a matter of fact, there is an open JIRA: SPARK-18580: Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream about this issue.

So the recommended approach is to use spark.streaming.kafka.maxRatePerPartition to control the initial rate before the backpressure feedback loop takes effect. I usually recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate and let the backpressure algorithm to take care of the rest. Please note that spark.streaming.kafka.maxRatePerPartition still serves as the max rate that backpressure algorithm would not exceed.


Currently, PID Rate Estimator is the only available rate estimator. There are a few parameters to control its behavior, according to the documentation

  • spark.streaming.backpressure.pid.proportional (default: 1.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.integral (default: 0.2) can be 0 or greater.
  • spark.streaming.backpressure.pid.derived (default: 0.0) can be 0 or greater.
  • spark.streaming.backpressure.pid.minRate (default: 100) must be greater than 0.
Usually, the only parameter I would tune is spark.streaming.backpressure.pid.minRate, since the default is 100. In certain use cases, 100 messages per second per partition is still too high and needs to be adjusted down. 


It is important to point out that all following micro-batches are scheduled based on the existing rate (initially set by spark.streamng.kafka.maxRatePerPartition), until the first micro-batch is completed. Please see the below Spark UI graph (I took it from Cody Koeninger's spark mailing list post). When the scheduling delay is 31 seconds, the first 7 micro-batches with interval 5 seconds still use the ingestion rate of 20 records per batch. It is only the 8th micro batch that is affected by the backpressure and changes the ingestion rate to be 5 records. To avoid such huge delay for backpressure to take effect, I recommend setting spark.streaming.kafka.maxRatePerPartition to be 150% ~ 200% of the optimal estimated rate.


To observe the backpressure behavior, we can set the below log4j setting

log4j.logger.org.apache.spark.streaming.scheduler.rate.PIDRateEstimator=TRACE

When PID Rate Estimator starts computing the rate, you should see similar messages below:

TRACE PIDRateEstimator:

time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]

When there is no enough information for the estimator to calculate the rate, you should see a similar message below:

TRACE PIDRateEstimator: Rate estimation skipped

When there is a new rate set by the backpressure, you should see below message:

TRACE PIDRateEstimator: New rate = [newRate]

In summary, enabling backpressure is an important technique to make your spark streaming application production ready. It set the message ingestion rate dynamically and automatically based on previous batch performance, thus making your spark streaming application stable and efficient, without the pitfall of statically capped max rate. 

No comments: