Thursday, April 23, 2015

Why I choose Scala for Apache Spark project


    Apache Spark currently supports multiple programming languages, including Java, Scala, Python. Words on the street is that Spark 1.4, expected in June, will add R language support too. What language to choose for Spark project is a common question asked on different forums and mailing lists.

    The answer to the question is quite subjective. Each team has to answer the question based on its own skillset, use cases, and ultimately personal taste.  For me personally, Scala is my language of choice.

    First of all, I elimiate Java from the list. Don't get me wrong, I love Java. I have been working on Java for more than 14 years. However, when it comes to big data project, Java is just not suitable. Compared to Python and Scala, Java is too verbose. To achieve the same goal, you have to write many more lines of codes. Java 8 makes it better by introducing Lambda expressions, but it is still not as terse as Python and Scala. Most importantly, Java does not support REPL (Read-Evaluate-Print Loop) interactive shell. That's a deal breaker for me. With an interactive shell, developers and data scientists can explore and access their dataset and prototype their application easily without full-blown development cycle. It is a must-have tool for big data project.

    Now it comes down to Python vs. Scala. Both have succinct syntax. Both are Object Oriented plus Functional. Both have passionate support communities.

I ultimately choose Scala due to the below reasons
  1. Python is in general slower than Scala. If you have significent processing logic written in your own codes, Scala definitely will offer better performance. 
  2. Scala is static typed. It looks like dynamic-typed language because it uses a sophisticated type inference mechanism. It means that I still have the compiler to catch the compile-time errors for me. Call me old school. 
  3. Apache Spark is built on Scala, thus being proficient in Scala helps you digging into the source code when something does not work as you expect. It is especially true for a young fast-moving open source project like Spark. 
  4. When Python wrapper calls the underlying Spark codes written in Scala running on a JVM, translation between two different environments and languages might be the source of more bugs and issues. 
  5. Last but not least, because Spark is implemented in Scala, using Scala allows you to access the latest greatest features. Most features are first availabe on Scala and then port to Python.
    • Spark StreamingStreaming processing is probably has the weakest support in Python. Python streaming API was first introduced in Spark 1.2, which only supports basic source like text file or text over socket. Only in Spark 1.3 did it introduce Python Kafka source. Other sources such as Flume and Kenesis are still not available in Python API.  To make things worse, Python custom source are not support either. Streaming output operations such as saveAsObjectFile() and saveAsHadoopFile() are also not available as of today. In today's Enterprise big data projects, we see more and more designs that combine batch processing and streaming processing together to give end users a holistic view of data (i.e. Lambda Architecture). I see laggard streaming support in Python API as a major drawback. 
    • Spark MLlib for Machine LearningIt is true that Python has an impressive machine learning libraries such as Pandas and Scikit-learn. However, these libraries are mostly suitable for working with data that fits into one single machine.  Spark MLlib is more suitable for big data machine learning that store data across a cluster of nodes. Most MLlib algorithms are first implemented in Scala and then port to Python. As of Spark 1.3.1, some ML algorithms such as Clustering (PIC, LDA, Streaming K-means), Dimentionality Reduction (SVD, PCA), Frequent Pattern Mining (FP-growth)  are still not available in Python. When the new Machine Learning Pipeline API was introduced in Spark 1.2, it was only available in Scala and Java too. 
    Of course, Python still fits some use cases especially in the machine learning projects. MLlib only contains parallel ML algorithms that are suitable to run on a cluster of distributed dataset. Some classic ML algorithms are not implemented in the MLlib. Equiped with Python knowlege, you can still use ML single node library such as scikit-learn together with Spark core parallel processing framework to distribute workload in the cluster. Another use case is your dataset is small and can fit in one machine. But you are required to tune your parameters to fit your model better. You can use Spark parallelize() call to distribute the list of parameteres to the cluster and run the single node algorithm available in scikit-learn, with multiple nodes running in parallel with same dataset and pick the best result for your model.

    In summary, Scala is my first choice of programming language for Spark projects and I will keep Python in mind when the use case fits.

Thursday, February 12, 2015

Flume or Kafka for Real-Time Event Processing

Flume and Kakfa both can act as the event backbone for real-time event processing. Some features are overlapping between the two and there are some confusions about what should be used in what use cases. This post tries to elaborate on the pros and cons of both products and the use cases that they fit the best.
Flume and Kafka are actually two quite different products. Kafka is a general purpose publish-subscribe model messaging system, which offers strong durability, scalability and fault-tolerance support. It is not specifically designed for Hadoop. Hadoop ecosystem is just be one of its possible consumers.

Image taken from
Flume is a distributed, reliable, and available system for efficiently collecting, aggregating, and moving large amounts of data from many different sources to a centralized data store, such as HDFS or HBase. It is more tightly integrated with Hadoop ecosystem. For example, the flume HDFS sink integrates with the HDFS security very well. So its common use case is to act as a data pipeline to ingest data into Hadoop.

Compared to Flume, Kafka wins on the its superb scalability and messsage durablity.
Kafka is very scalable. One of the key benefits of Kafka is that it is very easy to add large number of consumers without affecting performance and without down time. That's because Kafka does not track which messages in the topic have been consumed by consumers. It simply keeps all messages in the topic within a configurable period. It is the consumers' responsibility to do the tracking throughoffset. In contrast, adding more consumers to Flume means changing the topology of Flume pipeline design, replicating the channel to deliver the messages to a new sink. It is not really a scalable solution when you have huge number of consumers. Also since the flume topology needs to be changed, it requires some down time.
Kafka's scalability is also demonstrated by its ability to handle spike of the events. This is where Kakfa truly shines because it acts as a "shock absorber" between the producers and consumers. Kafka can handle events at 100k+ per second rate coming from producers. Because Kafka consumers pull data from the topic, different consumers can consume the messages at different pace. Kafka also supports different consumption model. You can have one consumer processing the messages at real-time and another consumer processing the messages in batch mode. On the contrary, Flume sink supports push model. When event producers suddenly generate a flood of messages, even though flume channel somewhat acts as a buffer between source and sink, the sink endpoints might still be overwhelmed by the write operations. 

Message durability is also an important consideration. Flume supports both ephemeral memory-based channel and durable file-based channel. Even when you use a durable file-based channel, any event stored in a channel not yet written to a sink will be unavailable until the agent is recovered. Moreoever, the file-based channel does not replicate event data to a different node. It totally depends on the durability of the storage it writes upon. If message durability is crucial, it is recommended to use SAN or RAID. Kafka supports both synchronous and asynchronous replication based on your durability requirement and it uses commodity hard drive. 

Flume does have some features that makes it attractive to be a data ingestion and simple event processing framework. The key benefit of Flume is that it supports many built-in sources and sinks, which you can use out of box. If you use Kafka, most likely you have to write your own producer and consumer. Of course, as Kakfa becomes more and more popular, other frameworks are constantly adding integration support for Kafka. For example, Apache Storm added Kafka Spout in release 0.9.2, allowing Storm topology to consume data from Kafka 0.8.x directly. 

Kafka does not provider native support for message processing. So mostly likely it needs to integrate with other event processing frameworks such as Apache Storm to complete the job. In contrast, Flume supports different data flow models and interceptors chaining, which makes event filtering and transforming very easy. For example, you can filter out messages that you are not interested in the pipeline first before sending it through the network for obvious performance reason. However, It is not suitable for complex event processing, which I will address in a future post. 

The good news is that the latest trend is to use both together to get the best of both worlds. For example, Flume in CDH 5.2 starts to accept data from Kafka via the KafkaSource and push to Kafka using the KafkaSink. Also CDH 5.3 (the latest release) adds Kafka Channel support, which addresses the event durability issue mentioned above.

Tuesday, February 10, 2015

Ways to import unstructured data into Hadoop

There are multiple ways to import unstructured data into Hadoop, depending on your use cases . 
  1. Using HDFS shell commands such as put or copyFromLocal to move flat files into HDFS. For details, please see
    File System Shell Guide
  2. Using WebHDFS REST API for application integration. 
  3. Using Apache Flume. It is a distributed, reliable, and available system for efficiently collecting, aggregating, and moving large amounts of data from many different sources to a centralized data store, such as HDFS. Even though historically lots of use cases of Flume are involved with log data collection/aggregation, Flume can be used together with Kafka and turn itself into a real-time event processing pipeline.
  4. Using Storm,  a general-purpose, event-processing system. Within a topology composed of bolts and spouts, it can be used to ingest the event-based unstructured data into Hadoop
  5. Spark's streaming component offers another alternative to ingest real-time unstructured data into the HDFS. Its processing model is quite different from Storm though.  While Storm processes incoming event one at a time, Spark streaming actually batches up events that arrive within a short time window before processing them. It is called mini-batch. Spark streaming of course runs on top of Spark Core computing engine, which is claimed to be 100x faster than MapReduce in memory and 10x faster on disk. :-)