Thursday, August 25, 2016

Common Pitfalls When Writing Spark Applications (Part 1: Logging Side Effect)

As a Big Data Solutions Architect, I have done quite a few spark application reviews for my customers. I have the opportunity to witness some common pitfalls when writing spark applications first-hand. I decided to write a blog series on the subject, helping the spark community to avoid these costly mistakes.

Spark 101: operations can be divided into transformations and actions. All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action is invoked. This design enables Spark to optimize the RDD pipeline and run more efficiently. 

During one of my spark application review engagement, I noticed that the development team has done a great deal of logging through log4j API. There are many rdd.count() in the logging statements. Something like below"contract_devices:"+ contract_devices.count())

contract_devices is an RDD with long lineage. 

Even if the logger level is set to be warn or error, the action count on the RDD is still called. This is the logging side effect we need to watch out, because it will trigger all the previous transformation up to this RDD to be executed, including reading from HDFS at the very beginning. There are many such log statements throughout the codes, literally calling count() action on every RDDs in the long lineage. One can image how inefficient this becomes because the whole lineage is executed again and again. 

Instead, we should have written code like below:

if (logger.isInfoEnabled()) {"contract_devices:"+ contract_devices.count());

Because of the checking done by logger.isInfoEnabled(), the action on RDD will not be triggered when logger level is set properly. 

Tuesday, August 23, 2016

Kafka and Load Balancer

I was reviewing a Kafka - Spark Streaming application architecture for a client. The client proposed the below architecture at the Kafka producer side. 

Kafka Producer --> F5 --> Kafka Broker cluster

The Kafka Broker cluster is composed of 3 nodes and is hidden from the Kafka producer behind the F5 load balancer. Producer cannot connects to the Kafka brokers directly without going through F5. I immediately pointed out that such architecture does not work. 

There are total two steps when Kafka producer sends messages to Kafka broker. 

The first step is to retrieve the metadata information. During this step, We use configuration to pass in a list of bootstrap brokers. This list does not need to include ALL brokers in the Kafka cluster. Any broker in the cluster can retrieve metadata information. We usually recommend set at least 3 brokers in the list to achieve HA. It is OK to use a load balancer during the metadata retrieval step. 

However, once Kafka producer has the metadata information, during the second step, the producer connects to the broker directly, without F5 sitting in the middle. The producer is a smart client. For example, it uses partition key to determine the destination partition of the message. By default, a hashing-based partitioner is used to determine the partition id given the key, and people can use customized partitioners too. Hiding the whole Kakfa broker cluster behinds the firewall will defeat the purpose. 

What if the event producer side has to go through a load balancer to access the Kafka brokers? One possible solution is to build a restful service acting as Kafka producer. The event generators are going to post events to the restful service end point, which is behind a load balancer and can scale out based on the volume of the events. The restful service then sends messages to Kafka brokers directly without load balancer in the middle. If you don't feel like writing your own restful service as Kafka producer client, you can use this open source project However, building a restful service is not very hard if you decide to DIY.