Thursday, February 02, 2017

How to Shutdown a Spark Streaming Job Gracefully

Spark Streaming application is by definition long-running. But how do can we shut it down gracefully, allowing the inflight messages being processed properly before the application stops?

There are many blog posts suggested that we should do it through JVM shutdown hook. Please see the code below, taking from https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully.


However, this approach does not work in new Spark version (after Spark 1.4). It will cause deadlock situation.

There are currently two ways to stop the spark streaming job gracefully. The first way is to set spark.streaming.stopGracefullyOnShutdown parameter to be true (default is false). This parameter is introduced in Spark to solve the graceful shutdown issue. Developers do not need to call ssc.stop() in the codes any more. Instead, they need to send SIGTERM signal to the driver. In practice, we need to do the following:

  1. Using Spark UI to find out on which node that the driver process is running. In the yarn cluster deploy mode, the driver process and AM are running in the same container. 
  2. Login that node and do ps -ef |grep java |grep ApplicationMaster and find out the pid. Please note that your grep string might be different based on your application/environment, etc. 
  3. kill -SIGTERM <AM-PID> to send SIGTERM to the process. 

After the Spark driver received the SIGTERM signal, you should see the below messages in the log

17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
...
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook
...
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext
...
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called

There is a catch though. Be default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts in YARN. The value default is 2. Thus after the first AM is stopped by your kill command, YARN will automatically launch another AM/driver. You have to kill the second one again. You can set --conf spark.yarn.maxAppAttempts=1 during the spark-submit, but you have to ask yourself whether you really want to give your driver no chance of failure.

You CANNOT use yarn application -kill <applicationid> to kill the job. This command does send SIGTERM signal to the container, but then almost immediately send a SIGKILL signal. The interval between SIGTERM and SIGKILL is configured by configuration yarn.nodemanager.sleep-delay-before-sigkill.ms (default 250). Of course you can increase this number,  but somehow even after I changed this to 60000 (1 minute), it still does not work. The application containers were killed almost immediately and the log file only contains below lines:

17/02/02 12:12:27 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
17/02/02 12:12:27 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

So for now, I do not recommend using the yarn application -kill <applicationid> command to send SIGTERM.

The second solution is to somehow inform the spark streaming application that it should be shut down gracefully by means other than SIGTERM signal. One way is to place a marker file on HDFS that the spark streaming application can check periodically. If the marker file exists, scc.stop(true, true) is called. The first "true" means the underline spark context should be stopped. The second "true" means it is a graceful shutdown, allowing inflight messages to be completed.

It is crucial that you do not call ssc.stop(true, true) within your micro-batch code. Think about it: if you call scc.stop(true, true) within your micro batch code , it will wait till all in flight messages are processed, including the current micro batch. But the current micro batch cannot be done until ssc.stop(true, true) is returned. It is a deadlock situation. Instead, you should check the marker file and call ssc.stop(true, true) in a different thread. I put a simple example on github, in which I do the checking and call ssc.stop() at the main thread after ssc.start(). You can find the source code here: https://github.com/lanjiang/streamingstopgraceful. Of course, using HDFS marker file is just one way. Other alternatives are using a separate thread to listen on a socket, starting a RESTful service, etc.

I wish in the future release Spark can take care of this issue more elegantly. For example, in the Spark UI we might add a button to stop the spark streaming job gracefully, so that we do not have to resort to custom coding or mess around with pid and SIGTERM signal.

No comments: