Big Data for Retail Analysis – Hive & Flume

Last week we introduced Big Data ecosystem and showed a glimpse of the possibilities. This week we take one industry (Retail) use case and illustrate how the various tools can be orchestrated to provide insights.

Background

The last couple of decades has seen a tectonic shift in the retail industry. The hawkers and mom and pop stores are being sidelined by heavy weight retail hypermarkets who operate in a complex landscape involving franchisees, JVs and multi partner vendors. In this kind of an environment, try visualizing the inventory, sales, supplier info for thousands of SKUs (Stock Keeping Units) per store and multiply it with the several thousand stores across cities, states and even countries over days, months and years and you will realize the volume of data they would be collecting. One such retail hypermarket, lets say BigRetail had 5-years of data containing vast amounts of semi-structured dataset which they wanted to be analysed for trends and patterns.

The Problem

The 5-year dataset was 13TB in size. Traditional business intelligence (BI) tools works best in the presence of a pre-defined schema. BigRetail dataset was mostly logs which didn’t conform to any specific schema. BigRetail took close to half a day to move the data into their BI systems weekly. They wanted to reduce this time. Queries over this data set took hours

The Solution

This is where Hadoop shines in all its glory! The problem is 2-fold: Problem 1: Moving the logs into HDFS periodically Problem 2: Performing analysis on this HDFS dataset As we had seen in the previous post, Apache Sqoop is used to move structured dataset into HDFS. Alas! How do we move semi-structured data? Fret not. Apache Flume is specially designed for collecting, aggregating, and moving large amounts of log data into HDFS. Once the dataset is inside HDFS, Hive was used to perform analysis. Let’s dig deep. Mind you – The devil is in the details.

Problem 1: How Flume solved the data transfer problem?

The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent HDFS store.

image
Flume’s typical dataflow is as follows: A Flume Agent is installed on each node of the cluster that produces log messages. These streams of log messages from every node are then sent to the Flume Collector. The collectors then aggregate the streams into larger streams which can then be efficiently written to a storage tier such as HDFS.

Problem 2: Analysis using Hive

image
Hive uses “Schema on Read” unlike a traditional database which uses “Schema on Write”. Schema on Write implies that a table’s schema is enforced at data load time. If the data being loaded doesn’t conform to the schema, then it is rejected. This mechanism might slow the loading process of the dataset usually, Whereas Schema on Read doesn’t verify the data when it’s loaded, but rather when a query is issued. For this precise reason, once the dataset is in HDFS moving it into Hive controlled namespace is usually instantaneous. Hive can also perform analysis on dataset in HDFS or local storage. But the preferred approach is to move the entire dataset into Hive controlled namespace (default location – hdfs://user/hive/warehouse) to enable additional query optimizations. While reading log files, the simplest recommended approach during Hive table creation is to use a RegexSerDe. It uses regular expression (regex) to serialize/deserialize. It deserializes the data using regex and extracts groups as columns. It can also serialize the row object using a format string.

Caveats:

– With RegexSerDe all columns have to be strings. Use CAST (a AS INT) to convert columns to other types.

– While moving data from HDFS to Hive, DO NOT use the keyword OVERWRITE

Solution Architecture using Flume + Hive

image
The merchandise details, user information, time of transaction, area / city / state information, coupon codes (if any) , customer data and other related details were collected and aggregated from various backend servers.

As mentioned earlier, the data-set to be analyzed was 13TB. Using the Hadoop default replication factor of 3, it would require 13TB * 3 = 39TB of storage capacity. After a couple of iterations on a smaller sample data set and subsequent performance tuning, we finalized the below cluster configuration and capacities

– 45 virtual instances, each with 64-bit OS platform, 12 GB RAM, 4-6 CPU cores and 1TB Storage

Flume configuration

Following Flume parameters were configured (sample)

flume.event.max.size.bytes uses the default value of 32KB.

flume.agent.logdir was changed to point to an appropriate HDFS directory

flume.master.servers: 3 Flume Masters – flumeMaster1, flumeMaster2, flumeMaster3

flume.master.store uses the default value – zookeeper

Hive configuration

Following Hive parameters were configured (sample)

javax.jdo.option.ConnectionURL

javax.jdo.option.ConnectionDriverName: set the value to com.mysql.jdbc.Driver

javax.jdo.option.ConnectionUserName

javax.jdo.option.ConnectionPassword

By default, Hive metadata is usually stored in an embedded Derby database which allows only one user to issue queries. This is not ideal for production purposes. Hence, Hive was configured to use MySQL in this case.

Using the Hadoop system, log transfer time was reduced to ~3 hours weekly and querying time also was significantly improved.

Some of the schema tables that were present in the final design were – facts, products, customers, categories, locations and payments. Some sample Hive queries that were executed as part of the analysis are as follows –

Count the number of transactions

 Select count (*) from facts;

Count the number of distinct users by gender

Select gender, count (DISTINCT customer_id) from customers group by gender;

Only equality joins, inner & outer joins, semi joins and map joins are supported in Hive. Hive does not support join conditions that are not equality conditions as it is very difficult to express such conditions as a MapReduce job. Also, more than two tables can be joined in Hive.

List the category to which the product belongs

Select products .product_name, products .product_id, categories.category_name from products JOIN categories ON (products.product_category_id = categories.category_id);

Count of the number of transactions from each location

Select locations.location_name, count (DISTINCT facts.payment_id) from facts JOIN locations ON (facts.location_id = locations.location_id) group by locations .location_name;

Interesting trends / analysis using Hive

Some of the interesting trends that were observed from this dataset using Hive were:

  • There was a healthy increase in YoY growth across all retail product categories
  • Health & Beauty Products saw the highest growth rate at 72%, closely followed by Food Products (65 %) and Entertainment (57.8%).
  • Northern states spend more on Health & Beauty Products while the South spent more on Books and Food Products
  • 2 metros took the top spot for the purchase of Fashion & Apparels
  • A very interesting and out-of-the-ordinary observation was that men shop more than women! Though the difference isn’t much, it’s quite shocking J (Note: This also could be because when couples tend to shop together the man pays the bill!)

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , , | Leave a comment

Big Data: Getting Started with Hadoop, Sqoop & Hive

We live in the Data Age! Web has been growing rapidly in size as well as scale during the last 10 years and shows no signs of slowing down. Statistics show that every passing year more data gets generated than all the previous years combined. Moore’s law not only holds true for hardware but for data being generated too! Without wasting time for coining a new phrase for such vast amounts of data, the computing industry decided to just call it, plain and simple, Big Data.

Apache Hadoop is a framework that allows for the distributed processing of such large data sets across clusters of machines. At its core, it consists of 2 sub-projects – Hadoop MapReduce and Hadoop Distributed File System (HDFS). Hadoop MapReduce is a programming model and software framework for writing applications that rapidly process vast amounts of data in parallel on large clusters of compute nodes. HDFS is the primary storage system used by Hadoop applications. HDFS creates multiple replicas of data blocks and distributes them on compute nodes throughout a cluster to enable reliable, extremely rapid computations.

Figure 1: Map Reduce Archirecture

The logical question arises – How do we set up a Hadoop cluster?

Installation of Apache Hadoop 1.x

We will proceed to install Hadoop on 3 machines. One machine, the master, is the NameNode & JobTracker and the other two, the slaves, are DataNodes & TaskTrackers.

Prerequisites

  1. Linux as development and production platform (Note: Windows is only a development platform. It is not recommended to use in production)
  2. Java 1.6 or higher, preferably from Sun, must be installed
  3. ssh must be installed and sshd must be running
  4. From a networking standpoint, all the 3 machines must be pingable from one another

Before proceeding with the installation of Hadoop, ensure that the prerequisites are in place on all the 3 machines. Update /etc/hosts on all machines so as to enable references as masterslave1 and slave2.

Download and Installation

Download Hadoop 1.2.1. Installing a Hadoop cluster typically involves unpacking the software on all the machines in the cluster.

Configuration Files

The below mentioned files need to be updated:

conf/hadoop-env.sh

On all machines, edit the file conf/hadoop-env.sh to define JAVA_HOME to be the root of your Java installation. The root of the Hadoop distribution is referred to as HADOOP_HOME. All machines in the cluster usually have the same HADOOP_HOME path.

conf/masters

Update this file on master machine alone with the following line:

master

conf/slaves

Update this file on master machine alone with the following lines:

slave1
slave2

conf/core-site.xml

Update this file on all machines:

<property>
	<name>fs.default.name</name>
	<value>hdfs://master:54310</value>
</property>

conf/mapred-site.xml

Update this file on all machines:

<property>
	<name>mapred.job.tracker</name>
	<value>master:54311</value>
</property>

conf/hdfs-site.xml

The default value of dfs.replication is 3. Since there are only 2 DataNodes in our Hadoop cluster, we update this value to 2. Update this file on all machines –

<property>
	<name>dfs.replication</name>
	<value>2</value>
</property>

After changing these configuration parameters, we have to format HDFS via the NameNode.

bin/hadoop namenode -format

We start the Hadoop daemons to run our cluster.

On the master machine,

bin/start-dfs.sh
bin/start-mapred.sh

Your Hadoop cluster is up and running!

Loading Data into HDFS

Data stored in databases and data warehouses within a corporate datacenter has to be efficiently transferred into HDFS. Apache Sqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance. The input to the import process is a database table. Sqoop will read the table row-by-row into HDFS. The output of this import process is a set of files containing a copy of the imported table.

Scoop Architecture

Prerequisites

  1. A working Hadoop cluster

Download and Installation

Download Sqoop 1.4.4. Installing Sqoop typically involves unpacking the software on the NameNodemachine. Set SQOOP_HOME and add it to PATH.

Let’s consider that MySQL is the corporate database. In order for Sqoop to work, we need to copymysql-connector-java-<version>.jar into SQOOP_HOME/lib directory.

Import data into HDFS

As an example, a basic import of a table named CUSTOMERS in the cust database:

sqoop import --connect jdbc:mysql://db.foo.com/cust --table CUSTOMERS

On successful completion, a set of files containing a copy of the imported table is present in HDFS.

Analysis on HDFS Data

Now that data is in HDFS, it’s time to perform analysis on the data and gain valuable insights.

During the initial days, end users have to write map/reduce programs for simple tasks like getting raw counts or averages. Hadoop lacks the expressibility of popular query languages like SQL and as a result users ended up spending hours (if not days) to write programs for typical analysis.

Enter Hive!

Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query and analysis. Hive was created to make it possible for analysts with strong SQL skills (but meager Java programming skills) to run queries on the huge volumes of data to extract patterns and meaningful information. It provides an SQL-like language called HiveQL while maintaining full support for MapReduce. Any HiveQL query is divided into MapReduce tasks which run on the robust Hadoop framework.

Hive Architecture

Prerequisites

  1. A working Hadoop cluster

Download and Installation

Download Hive 0.11. Installing Hive typically involves unpacking the software on the NameNodemachine. Set HIVE_HOME and add it to PATH.

In addition, you must create /tmp and /user/hive/warehouse (a.k.a.hive.metastore.warehouse.dir) and set them chmod g+w in HDFS before you can create a table in Hive. The commands are listed below –

$HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

Another important feature of Sqoop is that it can import data directly into Hive.

sqoop import --connect jdbc:mysql://db.foo.com/cust --table CUSTOMERS --hive-import

The above command created a new Hive table CUSTOMERS and loads it with the data from the corporate database. It’s time to gain business insights!

select count (*) from CUSTOMERS;

Apache Hadoop, along with its ecosystem, enables us to deal with Big Data in an efficient, fault-tolerant and easy manner!

Verticals such as airlines have been collecting data (eg: flight schedules, ticketing inventory, weather information, online booking logs) which reside in disparate machines. Many of these companies do not yet have systems in place to analyze these data points at a large scale. Hadoop and its vast ecosystem of tools can be used to gain valuable insights, from understanding customer buying patterns to upselling certain privileges to increasing ancillary revenue, to stay competitive in today’s dynamic business landscape.

References

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , | Leave a comment

Apache Solr 4 Cookbook – A Review

I got an opportunity to review Apache Solr 4 Cookbook.

Read my review below –

I recently read this book and I am really impressed! This book provides good understanding of Apache Solr for both developers as well as consultants.

The book starts off well with an introduction to Apache Solr, the web / app servers required, the role of Zookeeper, why clustering your data is vital, the various directory implementations, performance-oriented caching mechanisms, a sample crawler module which coupled with Solr gives a complete end-to-end solution, the role of Apache Tika as a extracting toolkit and the ease of customizing Solr. From then on, the book dwells into the details.

The first step is indexing. It plays a vital part in the entire search solution. Data can be in the form on .txt, .pdf or any other format. It is imperative that all such formats are easily indexable. One of the widely used tools for extracting metadata and language detection is Apache Tika. Data can also be present in a database, for which the Data Import Handler (DIH) is handy. It comes in two variants – full and delta. Every detail is nicely explained with examples which can make the development time faster. DIH also helps us to modify the data while importing which I felt is a pretty neat feature! One of the nicest features included in Solr 4 is the ability to update single field in a document. I am not sure why this was included in the earlier versions but it’s a classic case of better late than never.

The next step in the pipeline is the data analysis which is achieved through the use of analyzers and tokenizers. Various use cases include elimination HTML and XML tags, copying the contents of one field to another and stemming words amongst others. The detailing that has gone into explaining every concept, the examples and the associated step-by-step explanation is really helpful.

Now that the data is indexed and the data preparation is completed, it’s time to query Apache Solr! Searches can be performed on individual words or on a phrase. You can boost or elevate certain documents over others based on your requirements. Simple concepts such as sorting and faceting of results to complex ones such as ignoring typos using n-grams and detecting duplicates are very simple to understand and perform. Faceting, in particular, is gaining momentum as it helps in implementing the auto-suggest feature and narrowing down the search criteria. A newly introduced feature called the pivot faceting was a much needed one and it vastly simplifies certain use cases related to faceting. Solr provides immense capabilities when it comes to querying and this book explains each of them in great detail taking real-world examples.

We indexed and queried the data. But as our application scales, we have to get our hands dirty and start fine-tuning the performance metrics in order to give a good user experience to our customers. This is where caches and its various flavors and granularities starts to make sense. Cache always plays a major role in any deployment and it is necessary to monitor Solr at all times to gauge its performance. This book can done a great job in clearly explaining the various types of caches, the commit operation and its impact on searchers and how to overcome these. This topic is really important for any Solr real-world deployment and this book has not let me down!

Apache Solr 4.0 introduced the most-awaited SolrCloud feature that allows us to use distributed indexing and searching. Setting up of SolrCloud cluster along with a Zookeeper ensemble to enable replication, fault-tolerance and high availability along with disaster recovery is a piece-of-a-cake now. I really appreciate the time and effort spent on documenting and explaining how to set up two collections inside a single cluster. It was a nightmare to find information on this particular topic when we were implementing SolrCloud for one of our customers. But I am rest assured that others referring this book will save precious time of theirs. Adding / deleting nodes from a cluster is no longer a tedious task as the entire process is automated through the presence of Zookeeper nodes. The in-depth knowledge of the author in these topics is clearly visible and is of great help to all the readers. A touch on Zookeeper Rolling Restart, though off-topic, might enable readers to get a complete birds-eye view of the entire cluster. Certain features such as soft commit and NRT search have been explained in detail afterwards (under Real-life Situations) but I felt that at least a mention earlier would have provided a much needed continuity in that section. For the geeky readers like me, a detailed description about load balancing across shards and replicas and their customizations, if any, would have added an extra amount of spice to this well-cooked food!

As with any other tool, Solr deployment too will run into some kind of a problem. This section details the common problems that are encountered and effective ways to overcome these. Shrinking the size of the index and allocating enough memory in advance amongst others are some of the solutions explained in detail and is clearly documented in this book.

Lastly, as every developer would have wanted it, the real-world scenarios are described and the various Solr concepts that were explained in earlier sections are put together as part of a complete end-to-end solution.
Any one trying Solr 4.0 must read this book in its entirety before recommending a Solr production architecture. As mentioned above, there are a few suggestions which if incorporated in this book would benefit readers. All in all, this book will be really helpful for the developers and consultants alike!

Posted in Technical | Tagged , , | Leave a comment

easyXDM made easy. Really!

Today’s world is connected. In the early 1950s, the degrees of separation was 6. Today, with all the Facebooking and Twittering it is just 3. Can you imagine? Just 3. Times have changed. Phew!

With this connectedness comes the additional technical challenge of talking to other web services to make yourself relevant. It isn’t straight forward to call another service as it includes lots of security considerations such as CSRF, failing which you can kiss you ass goodbye!

Enter easyXDM, a JS library that enables developers to easily work around the limitation set in place by the Same Origin Policy. easyXDM is really high-profile with customers such as Twitter and LinkedIn. When my work required me to talk to other web services, I jumped into easyXDM.

And that’s when the terror started. Documentation is bad. There are no easy-to-follow tutorials. Everything was as cryptic as easyXDM itself. But with lots of help from my team member, we were able to get it working like a charm. I know the pain that we faced and I don’t want you to go through the same.

In any easyXDM integration, there are always two components – Provider and Consumer. Provider provides the easyXDM services whereas Consumer consumes those services provided by the Provider.

So without any further delay, I will list down the steps required to integrate easyXDM into your application. Follow these steps and you will not have any issues.

1. easyXDM ships with name.html and easyxdm.swf. Upload these files to a CDN of your choice.

2. easyXDM also ships with index.html under the cors folder. This file HAS to be served from the Provider server if you need to work with cookies.

3. Either serve easyXDM.min.js and json2.js from CDN or include it inline.

4. Don’t know why, but set useAccessControl = false in index.html and save yourself a couple of days of headache.

5. Initialize the RPC object as shown –

     
     var REMOTE = "CDN_URL";
     function createRPCObject(providerUrl) {
          var rpc = new easyXDM.Rpc({
               local: REMOTE + "/name.html",
               swf: REMOTE + "/easyxdm.swf",
               remote: providerUrl + "/cors/index.html",
               remoteHelper: REMOTE + "/name.html"
          }, {
                  remote: {
                       request: {}
                  }
          });
          return rpc;
    }

6. Now that RPC object is initialized and ready, you can start making cross origin requests.

     rpc.request({
          url: "path_to_cross_origin_request",
    	  method: "GET or POST",
    	  data: "optional_data_to_be_sent"
     }, function(data) {
             // success callback
             var jsonData = $.parseJSON(data.data); // access your response data
        }, 
        function (data) {
             // error callback
        }
    );

It’s that simple! Go ahead, give it a try.

Points to remember

1. It can be used for compatibility with older versions of browsers too. No special changes required.

2. In index.html, the following code is present –

     if (errorMessage) {
          error(errorMessage);
     }
     else {
          success({
               data: req.responseText,
               status: req.status,
               headers: headers
          });
     }

success and error functions are the callbacks. As you can see, the response that you send is present in the data: req.responseText. And in the success callback of the rpc.request(), data contains all of the success parameters. To access your response, you have to call $.parseJSON(data.data) as shown earlier.

3. When the form method GET was used, I observed that cookies were not being sent in IE. I did not investigate it further but I think it is best to use POST.

References

1. http://easyxdm.net/

Posted in Technical | Tagged , , | Leave a comment

South Bollywood

You exactly know what this blog post is all about if you have read my tweet.

The growing trend in Bollywood is to “go down”. (pun intended but geographically ;-)) The stories are going down, the performances are going down and the stunts are going down. No really. Bollywood movies are becoming crap. There is absolutely no grace in the movies anymore. Well I do know that there is “poetic license” for the movies too and there should be but how far can it be stretched is something to ponder about. Case in point: Bhaag Milkha Bhaag.

Why did this trend start in the first place? “North” people were bored with the “same old same old”. They wanted something different. Not as different as that of Anurag Kashyap and Ram Gopal Verma because they couldn’t understand it. Something more masala-ish… that’s all they asked. And where can you find the true masala – South India.

South Indian movies were, I repeat were, predominantly love stories. Rich girl poor guy, Rich guy poor girl, Family opposition to their love and so on. But always with a twist. And that twist was “larger than life” picturizations. See this video to understand what I mean. South Indian heroes make Rambo seem like a kindergarten kid! Their movies are simple and commercial. And people love it. It’s a classic case of – “Patient also wanted the same thing. Doctor also prescribed the same thing”. (It packs more punch when I say it in Kannada but you get the drift)

For Bollywood, it was as simple as – There is a proven formula down South. Why not “take it and go”! AK, SK and the one with an ‘R’ in between “taked it and goed” to glorious heights. AK remade a South movie but I don’t remember which one (wink wink…). If I talk about SK, then I would immediately be put on the Wanted list and I would need a Bodyguard to protect me. The one with an ‘R’ glorifies stereotypes to a whole new level. Enna rascala, mind it! Needless to day, they all found their mojo and are happy with their 100 crores. And Bollywood is basking in the glory again being the richest film industry. Credit needs to be given to the South but I think South-stereotype jokes are running around 😦 “Biting the hand that feeds u” – not good macha.

The world knows that India is the answer to the “Life, Universe and Everything”. And within India, the answer is down South! I am a proud South Indian. And I am not the way you depict us.

We are like that only, ok va!

Posted in Personal | Leave a comment

Bhaag Milkha Bhaag – The dialogue, not the movie

The highly anticipated, much awaited Bhaag Milkha Bhaag had a grand opening. (pun intended – read the post completely, u will understand :-)) People thronged to the theaters during office hours to see the legend come alive. I too saw it first day first show at Cauvery Theater (technically first day first show but not technically technically, if you know what I mean!) The main drawing point was Farhan Akhtar’s 6 pack abs and his athletic look. Man, was I impressed!

After I saw the movie, I did some reading on Wikipedia and really hats-off to the great man! Everyone does mistakes. To err is human.. remember. Lets leave it at that. He has moved on and so should you, rather than discussing about his ‘turning back leading to his failure’. (On a side note, I saw a video which shows that he was not in the fray from the start.)

Another point that caught my attention was – his affair with a foreign girl. I could not find any mention of any affair of his on the internet. Is it so secretive that it’s not on the internet or is it that he told only the director about it or is it just bollywoodized? Can the director show “Inspired by a true story” at the start and take liberty at will? I don’t know.

This blog post is related to his affair.

But before that, a small context check – Cauvery theater is a local theater. I usually go to malls to watch movies (sigh… first world problems). This time, I wanted to watch it with the ‘not-so-sophisticated’ crowd. Watching the movie in Cauvery… man, had I missed something for a long time. The environment was awesome, lively, energetic & lovely. People shouting, whistling, screaming… phew.. it was good. That’s when I got into the mood! I waited for the right moment to join in with the crowd. And that right moment was when he goes into the room with that foreign girl. And that’s when everyone knows there will be a kissing scene. And that’s when the theater became silent. And that’s when he started kissing the foreign girl. And that’s when I shouted “Bhaag Milkha Bhaag”. And that’s when the theater came to whistles again!

And that’s when I realized I am local too! And that’s when you realize how yours truly rolls!

Posted in Personal | Leave a comment

Play Framework – A simple cookbook

Play Framework is creating a lot of hype but it is still way behind that generated by Bitcoin. Just saying.

Anyways, let’s get to the point without beating around the bush.

The documentation of Play sucks big (no pun intended :-)). The differences between Play 1.x and 2.x are so profound that it requires a complete code change. This can be a huge thumbs-down in deciding to use Play in the first place. But, you know, LinkedIn is using it. So there must be something about Play. Innit?

Bearing this in mind, I decided to go ahead with Play come what may. There were lots of places where I got stuck for days with no help whatsoever except from yours truly :-). This (running) blog post aims to make the lives of some developers easy by giving detailed instructions on some of the issues I faced and how to overcome it.

Async Programming

Play is built on Akka which bills itself as “… toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM”. Hence, async must come natively. It does. But it isn’t very easy to understand how to implement it.

Any initial documentation search tells us to use –

F.Promise<WS.HttpResponse> remoteCall = WS.url("http://www.pristinecrap.com").getAsync();

This works perfectly as long as we use URLs directly. What if we use third party libraries? This is where the documentation tends to get murky.

Do not worry. The below code snippet worked for me –

import play.libs.F.Promise;
import play.libs.Akka;
import play.mvc.Result;
import play.mvc.Controller;

public class ApplicationController extends Controller {
     public static Result asyncProcess() {
          ...
          Promise<BusinessObject> businessObject = Akka.future(new Callable() {
               public BusinessObject call() {
                    return newBusinessObject(originRequestParameters);
               }
          });
          return async(
               businessObject.map(
                    new Function () {
                         public Result apply(BusinessObject businessObject) {
                              return (ok(page.render()));
                         }
                    }
               }
          );
     }
}

I am yet to figure out how to implement async operations outside of the Controller 😦

Jobs & Global Settings

Play 1.x has support for Jobs. It was very easy to understand the concepts with annotations such as @OnApplicationStart for running bootstrap jobs, @Every and @On for running scheduled and cron jobs.

Play 2.x completely removed this concept of Jobs. Why? Don’t ask me. Nevertheless, I spent a few hours to figure out how to simulate a job. The @OnApplicationStart functionality is obtained by extending GlobalSettings and writing your logic inside onStart(Application app) function.

import play.GlobalSettings;

public class ApplicationGlobal extends GlobalSettings {
     @Override
     public void onStart(Application app) {
          ...
     }
}

Scheduler

Ok you explained how to implement a bootstrap job in Play 2.x. Nice! But how do we implement a scheduled job? This question really frustated me as none of the available documentation worked.

The first implementation that you will come across is for Play 2.0 –

import play.libs.Akka;
import akka.util.Duration;

Akka.system().scheduler().schedule(
     Duration.create(0, TimeUnit.MILLISECONDS),
     Duration.create(30, TimeUnit.MINUTES)
     testActor, 
     "tick"
);

I am not sure if the above code snippet works in Play 2.0. But in Play 2.1, it doesn’t compile. The package akka.util.Duration doesn’t even exist and it is not a one-off case. Why have they made so many changes in minor version upgrades too? Blows my mind! I spent one day just to find out how to implement a scheduled job in Play 2.1. You don’t have to go through the same mental torture –

import play.libs.Akka;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;

Akka.system().scheduler().schedule(new FiniteDuration(0, TimeUnit.SECONDS), new FiniteDuration(1, TimeUnit.MINUTES), new MyCustomJob(), Akka.system().dispatcher());

The above job runs every 1 minute and executes MyCustomJob() which is a class that implements Runnable.

To run a scheduled job just once at a pre-determined time, use the following –

import play.libs.Akka;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;

Akka.system().scheduler().scheduleOnce(new FiniteDuration(1, TimeUnit.MINUTES), new MyCustomJob(), Akka.system().dispatcher());

As you can see, these are just some of the issues that I have faced and their solutions. Just pray that I run into more issues 🙂

Posted in Technical | Tagged , , , , , , , | Leave a comment

Play Framework – Good or Bad?

The latest buzz word doing the rounds is Play Framework. It is deployed at high-profile companies such as LinkedIn. Chances are that you would not have heard of Play or Akka (because I hadn’t :-))

When we started work on a new product, we were asked to evaluate Play and its feasibility for our requirements. I, being over enthusiastic, took up this task and, lo & behold, what did I find out?

But before the specifics, let’s take a look at the test environment – 1 m1.large Amazon Linux EC2 instance (AppServer), 1 m1.large Amazon Linux EC2 instance (RabbitMQ Server), 1 m1.large JMeter Master Windows EC2 instance and 1 m1.large JMeter Slave Amazon Linux EC2 instance, all in the same AZ. The test environment also contains 2 AWS DynamoDB tables with an appropriately provisioned throughput. The load for the experiments mentioned below was 500 threads running in parallel with each thread looping 100 times.

Now for the details for the various test cases –

1. A sample Play project
AppServer was able to handle 4000+ requests per second at a max of 40% CPU utilization. Initial impression – Not bad!

2. A sample Play project + 1 RabbitMQ call
AppServer was able to handle 500+ requests per second. Impression – Dramatic reduction in concurrency. Is RabbitMQ Server to blame?

3. A sample Play project + 1 AWS DynamoDB call (Callback Async)
AppServer was able to handle 300 requests per second at 100% CPU utilization.

4. A sample Play project + 2 AWS DynamoDB calls (Futures Async – 100 milliseconds sleep per thread)
AppServer was able to handle 50 requests per second at 60% CPU utilization.

5. A sample Play project + 2 AWS DynamoDB calls (Futures Async – 10 milliseconds sleep per thread)
AppServer was able to handle 75 requests per second at 100% CPU utilization.

6. A sample Play project + 2 AWS DynamoDB calls (Sync) + 1 RabbitMQ call
AppServer was able to handle 90+ requests per second at 100% CPU utilization.

Test case 6 gives a throughput of 500+ requests per second on a c1.xlarge EC2 instance.

Why is Play not living up to its hype? Is m1.large not powerful enough? (Earlier experiments suggest that m1.large is a pretty mean machine!) Are my coding skills bad? 😦 Is Scala better than Java when it comes to performance? Am I missing something very trivial in Play? Lots of questions remain. But for the time being, we are going ahead with Play and using both scale-up and scale-out to meet our requirements.

Has anyone played with Play? Was it your ideal play ground? What were your experiences?

Posted in Technical | Tagged , , , , , , | Leave a comment

To all my ex-crushes, ex-GFs and one not-so-special-anymore…

I write this post from the bottom of my heart.

I have had my share of crushes and GFs over the years. It is natural to love someone and there is no reason to hide it. But at the same time, you have to understand that there is a time and a place for the world to know about it. For me, the time and the place is now!

Like every student, I had a crush on my teacher. And another teacher. The best part is that both of their names start with S. So let’s call them Mrs. S from now on. (Fortunately, it leads to confusion to my beloved readers :-))

Mrs. S used to come in my dreams and I don’t know why I still remember that dream! It seems like it happened yesterday but the dream is still so fresh in my mind. I don’t know where she is and how she is.

Mrs. S was very fond of me. I was usually in the top 5. But somehow she liked me more than the student who used to get first rank. In fourth standard, I got second rank in the finals. I had gone to collect my report card with my parents and she kissed me on my left cheek. Yippie! I know where she lives and she is doing fine.

No more one-way love stories from school days apart from a couple of F.L.A.M.E.S! #youremember

We will now proceed to the most happening phase in any student life – College. Just to add more confusion to my loved readers, I shall refer to College to mean PUC and B.E.

In college for the first time in my life, I had 3 one-way GFs. But here’s the twist: the one-way was not from my side! My friend suggested to keep nicknames – Pink, White & Diagonal. I could understand Pink and White. But what’s with Diagonal? My friend explained that as mine was a rectangular love story, Diagonal was the farthest from me. Truer words were never explained in such a simple way!

I have already shared with you this incident – Why I Love YouTube!

All of these were mostly puppy love. Once I finished my golden phase of life, I had one true GF. One serious GF. It all started when I was in my early second decade of my life. (sigh… I am in third decade now!) She (Thank God… it was a she… yaaay!) was this gorgeous, sexy-looking girl. It started as an infatuation, remained as an infatuation for a long time and then some. Slowly it graduated from that and I mustered courage again and asked her out for which she readily accepted. The love phase was awesome and like any other BF-GF, we roamed around Bangalore trying our best not to meet people we knew. I have to give credit to myself… I didn’t meet anyone I knew when I was with her. Pat on the back for yours truly! But all this was short-lived and then started the on-off-on-off thingy. It was depressing, really. It was the saddest phase in my life and I consider myself lucky as I didn’t end up in NIMHANS. There were just 2 memorable moments with her – both happened to be on the same day. (wink wink :-)) I don’t where in which part of the world she is now and I don’t intend to know too.

To all my ex-crushes, ex-GFs and one not-so-special-anymore, I really thank you for breaking up with me. My life is perfect without you!

Happy ValENDtines Day!

Posted in Personal | Tagged , , | Leave a comment

Solr – Pre & Post 4.0

Solr is the popular, blazing fast open source enterprise search platform from the Apache Lucene project. Its major features include powerful full-text search, hit highlighting, faceted search, near real-time indexing, dynamic clustering, database integration, rich document (e.g., Word, PDF) handling, and geospatial search. Solr is highly reliable, scalable and fault tolerant, providing distributed indexing, replication and load-balanced querying, automated failover and recovery, centralized configuration and more. Solr powers the search and navigation features of many of the world’s largest internet sites.

The major feature introductions in Solr 4.0 are SolrCloud, Near Real Time Search, Atomic Updates and Optimistic Concurrency. Before getting into the details, first let’s understand how distributed indexing & querying and master-slave index replication worked in Solr 3.6 or earlier.

Solr in AWS: Shards & Distributed Search provides a detailed explanation of what a shard is, initializing shards, indexing documents into shards and performing a distributed search. Learnings from this article are as follows –

  • Solr doesn’t have any logic for distributing indexed data over shards. It is up to you to ensure that the documents are evenly distributed across shards.
  • Performing a distributed search requires you to have knowledge of all the existing shards so that the entire data set can be queried.
  • Both distributed indexing and querying is NOT a transparent process
  • What happens when one of the shards goes down? Is the data incomplete now?

Distributed indexing & searching is a very significant feature, no doubt. But can it be handled better and in a much more transparent way?

Solr in AWS: Master Slave Replication & Mitigation Strategies offers a detailed explanation of what an index replication means, setting it up in master-slave mode and mitigation steps needed when the master fails. Learnings from this article are as follows –

  • Master is the single point of failure (SPOF). Once the master is down, no more writes to the index are possible.
  • Master has no clue of the number of slaves that are present.
  • Absence of a centralized coordination service for maintaining configuration information and providing distributed synchronization leading to inevitable race conditions.
  • Replication is done pull-style which leads to index inconsistencies due to replication lag
  • Nomination of one of the slaves as the new master requires implementation of custom scripts and / or deployment of NFS.
  • What happens when the master goes down and you do not receive any notification?

It is obvious that setting up distributed indexing & searching and master-slave replication is a complex process and requires the presence of a Solr expert.

Do not fret! Enter Solr 4.0 and its state-of-the-art capabilities.

SolrCloud

SolrCloud is the name of a set of new distributed capabilities in Solr. Passing parameters to enable these capabilities will enable you to set up a highly available, fault tolerant cluster of Solr servers. Use SolrCloud when you want high scale, fault tolerant, distributed indexing and search capabilities.

Truth be told – Setting up a sharded, replicated, highly available, fault tolerant SolrCloud cluster takes less than 15 minutes! Ok, enough of the theatrics. Let’s get our hands dirty.

A SolrCloud cluster consists of one or more shards, each having one or more replicas and coordinated by ZooKeeper to facilitate high availability and fault tolerance.

Solr4.0

Figure 1: 2 Shard 2 Replica Architecture

ZooKeeper

ZooKeeper is used as a repository for cluster configuration and coordination – think of it as a distributed file system that contains information about all of the Solr servers. A single ZooKeeper service can handle the cluster but then it becomes SPOF. In order to avoid such a scenario, it is recommended that a ZooKeeper Ensemble, i.e. running multiple ZooKeeper servers in concert, is in action. Every ZooKeeper server needs to know about every other ZooKeeper server in the ensemble, and a majority of servers (called a Quorum) are needed to provide service. For example, a ZooKeeper ensemble of 3 servers allows anyone to fail with the remaining 2 constituting a majority to continue providing service. 5 ZooKeeper servers are needed to allow for the failure of up to 2 servers at a time.

Installing & running ZooKeeper Ensemble

cd /opt
wget http://apache.techartifact.com/mirror/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
tar -xzvf zookeeper-3.4.5.tar.gz
cd zookeeper-3.4.5
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
	dataDir=/opt/zookeeper/data
	server.1=<ip>:<quorum_port>:<leader_election_port>
	server.2=<ip>:<quorum_port>:<leader_election_port>
	server.3=<ip>:<quorum_port>:<leader_election_port>
cd /opt/zookeeper/data
vim myid
	1 (or 2 or 3… depending on the server)
/opt/zookeeper-3.4.5/bin/zkServer.sh start
/opt/zookeeper-3.4.5/bin/zkServer.sh stop

Follow the above steps in all the ZooKeeper servers. Refer Clustered (Multi-Server) Setup and Configuration Parameters for understanding quorum_port, leader_election_port and the file myid. We now have a 3-node ZooKeeper Ensemble running!

Installing & running Solr 4.0

cd /opt
wget http://apache.techartifact.com/mirror/lucene/solr/4.0.0/apache-solr-4.0.0.tgz
tar -xzvf apache-solr-4.0.0.tgz 
rm -f apache-solr-4.0.0.tgz
cd /opt/apache-solr-4.0.0/example/
java -Dbootstrap_confdir=./solr/collection1/conf -Dcollection.configName=myconf -DnumShards=2 -DzkHost=<server1_ip>:<client_port>,<server2_ip>:<client_port>,<server3_ip>:<client_port> -jar start.jar
java -DzkHost= DzkHost=<server1_ip>:<client_port>,<server2_ip>:<client_port>,<server3_ip>:<client_port> -jar start.jar

A couple of points worth mentioning –

  • -DnumShards: the number of shards that will be present. Note that once set, this number cannot be increased or decreased without re-indexing the entire data set. (Dynamically changing the number of shards is part of the Solr roadmap!)
  • -DzkHost: a comma-separated list of ZooKeeper servers.
  • -Dbootstrap_confdir, -Dcollection.configName: these parameters are specified only when starting up the first Solr instance. This will enable the transfer of configuration files to ZooKeeper. Subsequent Solr instances need to just point to the ZooKeeper ensemble.

The above command with –DnumShards=2 specifies that it is a 2-shard cluster. The first Solr server automatically becomes shard1 and the second Solr server automatically becomes shard2. What happens when we run a third & fourth Solr instances? Since it’s a 2-shard cluster, the third Solr server automatically becomes a replica of shard1 and the fourth Solr server becomes a replica of shard2. We now have 4-node (2-shard 2-replica) Solr cluster running!

An important point to remember: The first Solr server that joins shard1 is assigned the role of a Leader of the shard1 sub-cluster and subsequent servers that join shard1 are the replicas. All writes happen on the Leader and the same are pushed to the replicas. If the Leader goes down, automatic fail over kicks in and a replica will automatically be promoted to the role of the Leader.

Indexing documents into the cluster

Unlike pre Solr 4.0, the entire indexing process is transparent. Any document write operation can be sent to any shard leader / replica and the indexing module ensures that the document is sent to the correct shard.

The index process is as follows –

  • If an index document request is sent to a shard replica, the same is forwarded to the leader of that shard.
  • If an index document request is sent to a leader of a shard, then the leader first checks if the document belongs to the same shard using MurmurHash3 algorithm.
    • If yes, then the document is written to its transaction log.
    • If no, then the document is forwarded to the leader of the correct shard which in turn writes to its transaction log.

It then transfers the delta of the transaction log to all its replicas. The replicas in turn  update their respective transaction logs and send an acknowledgement to the leader.  Once the leader receives acknowledgements that all the slaves have received the updates, the original index request is responded to.

Performing a distributed search

Unlike pre Solr 4.0, the entire distributed search is completely transparent. You need not know the number of shards nor the number of replicas. All this information is maintained in ZooKeeper.

When a query request is sent to one of the instances, either a leader or a replica, Solr load balances the requests across replicas, using different servers to satisfy each request. SolrCloud can continue to serve results without interruption as long as at least one server hosts every shard. To return just the documents that are available in the shards that are still alive (and avoid a 503 error), add the following query parameter: shards.tolerant=true.

Soft Commit

A hard commit calls fsync on the index files to ensure that they have been flushed to stable storage and no data loss will result from a power failure. It is an expensive operation as it involves disk IO.

A soft commit, introduced in Solr 4.0 to handle NRT search, is much faster as it only makes the index change visible to the searchers but does not call fsync. This is accomplished by writing the changes to the transaction log. Think of it as adding documents to an in-memory writeable segment. As the size of the transaction log increases over time, it is imperative that these are written to disk using hard commit periodically.

Near Real Time Search

The /get NRT handler ensures that the latest version of a document is retrieved. It is NOT designed to be used as full-text search. Instead, its responsibility is the guaranteed return of the latest version of a particular document.

When a /get request is raised by providing an id or ids (a set of id), the search process is as follows –

  • It checks if the document is present in the transaction log
    • If yes, then the document is returned from the transaction log
    • If no, then the latest opened searcher is used to retrieve the document

Atomic Updates

Atomic Updates is a new feature in Solr 4.0 that allows you to update on a field level rather than on a document level (pre Solr 4.0). This means that you can update individual fields without having to send the entire document to Solr with the update fields’ values. Internally Solr re-adds the document to the index with the updated fields. Please note that fields in your schema.xml must be stored=true to enable atomic updates.

Optimistic Concurrency

Optimistic concurrency control is a Solr NoSQL feature that allows a conditional update based on the document _version_. Using optimistic concurrency normally involves a READ-MODIFY-WRITE process. Solr automatically adds a _version_ field to all documents. A client can specify a value for _version_ on any update operation to invoke optimistic concurrency control.

  • If the value of _version_ matches what currently exists in Solr, then the update operation will be successful and a new _version_ value will be set to that document.
  • If the _version_ does not match what currently exists in Solr, then a HTTP error with code 409 (Conflict) will be returned.

FAQ

  1. How are the replicas assigned to shards?
    1. When a new Solr server is added, it is assigned to the shard with the fewest replicas, tie-breaking on the lowest shard number.
  2. Why should I specify all the ZooKeepers for the parameter –DzkHost?
    1. TCP connections are established between the Solr instance and the ZooKeeper servers. When a ZooKeeper server goes down, the corresponding TCP connection is lost. However, other existing TCP connections are still functional and hence this ensures fault tolerance of the SolrCloud cluster even when one or more ZooKeeper servers are down.
  3. What is a transaction log?
    1. It is an append-only log of write operations maintained by each node. It records all write operations performed on an index between two commits. Anytime the indexing process is interrupted, any uncommitted updates can be replayed from the transaction log.
  4. When does the old-style replication kick in?
    1. When a Solr machine is added to the cluster as a replica, it needs to get itself synchronized with the concerned shard. If more than 100 updates are present, then an old-style master-slave replication kicks off. Otherwise, transaction log is replayed to synchronize the replica.
  5. How is load balancing performed on the client side?
    1. Solr client uses LBHttpSolrServer. It is a dumb round-robin implementation. Please note that this should NOT be used for indexing.
  6. What will happen if the entire ZooKeeper ensemble goes down or quorum is not maintained?
    1. ZooKeeper periodically sends the current cluster configuration information to all the SolrCloud instances. When a search request needs to be performed, the Solr instance reads the current cluster information from its local cache and executes the query. Hence, search requests need not have the ZooKeeper ensemble running. Please bear in mind that any new instances that are added to the cluster will not be visible to the other instances.
    2. However, a write index request is a bit more complicated. An index write operation results in a new Lucene segment getting added or existing Lucene segments getting merged. This information has to be sent to ZooKeeper. Each Solr server must report to ZooKeeper which cores it has installed. Each host file is of the form host_version. It is the responsibility of each Solr host/server to match the state of the cores_N file. Meaning, each Solr server must install the cores defined for it and after successful install, write the hosts file out to ZooKeeper. Hence, an index write operation always needs ZooKeeper ensemble to be running.
  7. Can the ZooKeeper cluster be dynamically changed?
    1. ZooKeeper cluster is not easily changed dynamically but is part of their roadmap. A workaround is to do a Rolling Restart.
  8. Is there a way to find out which Solr instance is a leader and which one is a replica?
    1. Solr 4.0 Admin console shows these roles in a nice graphical manner.
  9. How much time does it take to add a new replica to a shard?
    1. For a shard leader index size of 500MB, an m1.medium EC2 cold-start replica instance takes about 30 seconds to synchronize its index with the leader and become part of the cluster.
  10. Is it possible to ensure that the SolrCloud cluster is HA even when an entire AZ goes out of action?
    1. When an entire AZ is down and the cluster is expected to be running successfully, the simplest and recommended approach is to have all leaders in one AZ and replicas in other AZs with the ZooKeeper ensemble spanning across AZs.

References

  1. http://wiki.apache.org/solr/SolrCloud
  2. http://zookeeper.apache.org/
  3. http://wiki.apache.org/solr/SolrReplication
  4. https://pristinecrap.com/2013/01/12/solr-in-aws-shards-distributed-search/
  5. http://wiki.apache.org/solr/UpdateXmlMessages
  6. http://wiki.apache.org/solr/Atomic_Updates
  7. http://yonik.com/solr/realtime-get/
  8. http://yonik.com/solr/optimistic-concurrency/
Posted in Technical | Tagged , , , , , , , , | 5 Comments