DynamoDB: An Inside Look Into NoSQL – Part 6

In Part 5, we spoke about data versioning, the 2 reconciliation strategies and how vector clocks are used for the same. In this article, we will talk about Handling Failures.

Handling Failures – Hinted Handoff

Even under the simplest of failure conditions, DynamoDB would experience reduced durability and availability if the traditional form of quorum approach is used. In order to overcome this it uses a sloppy quorum; all READ and WRITE operations are performed on the first N healthy nodes from the preference list, which may not be the first N nodes encountered by traversing the consistent hashing ring.

Partitioning & Replication of Keys in Dynamo (Credit)

Consider the above figure: If A is temporarily not reachable during a WRITE operation, then the replica that would have lived on A will be sent to D to maintain the desired availability and durability guarantees. The replica sent to D will have a hint in its metadata which tells who the intended recipient was (in our case, it is A). The hinted replicas are stored in a separate local database which is scanned periodically to detect if A has recovered. Upon detection, D will send the replica to A and may delete the object from its local store without decreasing the total number of replicas. Using hinted handoff, DynamoDB ensures that READ and WRITE are successful even during temporary node or network failures.

Highly available storage systems must handle failures of an entire data center. DynamoDB is configured such that each object is replicated across data centers. In terms of implementation detail, the preference list of a key is constructed such that the storage nodes are spread across multiple data centers and these centers are connected via high speed network links.

Handling Permanent Failures – Replica Synchronization

Hinted Handoff is used to handle transient failures. What if the hinted replica itself becomes unavailable? To handle such situations, DynamoDB implements an anti-entropy protocol to keep the replicas synchronized. A Merkle Tree is used for the purpose of inconsistency detection and minimizing the amount of data transferred.

According to Wikipedia, a “Merkle tree is a tree in which every non-leaf node is labelled with the hash of the labels of its children nodes.”. Parent nodes higher in the tree are hashes of their respective children. The principal advantage of a Merkle tree is that each branch of the tree can be checked independently without requiring nodes to download the entire tree or the entire data set. Moreover, Merkle trees help in reducing the amount of data that needs to be transferred while checking for inconsistencies among replicas. For instance, if the hash values of the root of two trees are equal, then the values of the leaf nodes in the tree are equal and the nodes require no synchronization. If not, it implies that the values of some replicas are different. In such cases, the nodes may exchange the hash values of children and the process continues until it reaches the leaves of the trees, at which point the hosts can identify the keys that are “out of sync”.

DynamoDB uses Merkle trees for anti-entropy as follows: Each node maintains a separate Merkle tree for each key range (the set of keys covered by a virtual node) it hosts. This allows nodes to compare whether the keys within a key range are up-to-date. In this scheme, two nodes exchange the root of the Merkle tree corresponding to the key ranges that they host in common. Subsequently, using the tree traversal scheme described above the nodes determine if they have any differences and perform the appropriate synchronization action. The disadvantage with this scheme is that many key ranges change when a node joins or leaves the system thereby requiring the tree(s) to be recalculated.

In the next and the final part of this series, we will discuss Membership and Failure Detection.

Article authored by Vijay Olety.

X-Post from CloudAcademy.

Posted in Technical | Tagged , , , , , , | Leave a comment

DynamoDB: An Inside Look Into NoSQL – Part 5

In Part 4, we talked about partitioning and replication in detail. We introduced consistent hashing, virtual nodes and the concept of coordinator nodes and preference list. In this article, we will discuss Data Versioning.

Data Versioning

Eventual consistency, introduced by DynamoDB, allows for the updates to be pushed to all storage nodes asynchronously. A put operation returns before the update is pushed to all replicas, which results in scenarios where a subsequent get operation may return a value that does not reflect the latest changes. Depending on network partitions and server outages, not all replicas might have the latest updates even after an extended period of time.

But there are certain requirements that do not need latest updates and can still tolerate certain inconsistencies. One such requirement is “Add To Cart” where put operation should always succeed and a get operation returning an old object is still tolerable. If a user makes a change to an earlier version of the shopping cart object, that change is still meaningful and should be preserved at all costs. However, the currently unavailable latest state of the shopping cart can have its own version of updates which should also be preserved. It is evident that data versioning has to be implemented to handle such scenarios.

In order to achieve such guarantees, DynamoDB treats the result of every modification as a new and immutable version of data. This allows for multiple versions of an object to be present in the system at the same time. If the newer version subsumes the earlier one, then the system can automatically determine the authoritative version (syntactic reconciliation). However, in some cases the versions conflict and the client has to manually perform the reconciliation (semantic reconciliation).

Data Versioning (Credit)

DynamoDB uses vector clocks in order to capture causality between multiple versions of an object. A vector clock is a list of (node, counter) pairs. One vector clock is associated with one version of every object. By analyzing the vector clocks, you can find out if the versions have a causal ordering or are on parallel branches. When a client wishes to perform an update, it must specify which version it is updating. This version can be got from an earlier get operation.

Version evolution of an object (Credit)

Let’s understand how vector clocks works: A client writes a new object. Node Sx handles this write and creates a vector clock [(Sx, 1)] for the object D1. If the client now updates the object and node Sx again handles the request, we get a new object D2 and its vector clock [(Sx, 2)]. The client updates the object again and this time node Sy handles the request leading to object D3 with the vector clock [(Sx, 2), (Sy, 1)]. When a different client tries to update the object after reading D2, the new vector clock entry for object D4 is [(Sx, 2), (Sz, 1)] where Sz is the node that handled the request. Now when a new write request is issued by the client, it sees that there are already D3 and D4 objects. If node Sx is handling the request, it performs the reconciliation process and the new data object D5 is created with the vector clock [(Sx, 3), (Sy, 1), (Sz, 1)].

One possible issue with vector clocks is that its size may grow rapidly if multiple servers coordinate the writes to a particular object. However, this issue is unlikely since in production only the nodes from the preference list handle the operation. It is still desirable to limit the size of vector clock. DynamoDB implements the following clock truncation scheme: A timestamp, which indicates the last time that node updated an item, is stored along with (node, counter) pair. When the number of (mode, counter) pairs reaches a threshold (say 15), the oldest pair is removed from the clock.

In the next article, we will look into how to Handle Failures.

Article authored by Vijay Olety.

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , | Leave a comment

DynamoDB: An Inside Look Into NoSQL – Part 4

In Part 3, we mentioned the various distributed techniques used while architecting NoSQL data stores. A table nicely summarized these techniques and their advantages. In this article, we will go into the details of partitioning and replication.


As mentioned earlier, the key design requirement for DynamoDB is to scale incrementally. In order to achieve this, there must be a mechanism in place that dynamically partitions the entire data over a set of storage nodes. DynamoDB employs consistent hashing for this purpose. As per the Wikipedia page, “Consistent hashing is a special kind of hashing such that when a hash table is resized and consistent hashing is used, only K/n keys need to be remapped on average, where K is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped.”

Consistent HashingConsistent Hashing (Credit)

Let’s understand the intuition behind it: Consistent hashing is based on mapping each object to a point on the edge of a circle. The system maps each available storage node to many pseudo-randomly distributed points on the edge of the same circle. To find where an object O should be placed, the system finds the location of that object’s key (which is MD5 hashed) on the edge of the circle; then walks around the circle until falling into the first bucket it encounters. The result is that each bucket contains all the resources located between its point and the previous bucket point. If a bucket becomes unavailable (for example because the computer it resides on is not reachable), then the angles it maps to will be removed. Requests for resources that would have mapped to each of those points now map to the next highest point. Since each bucket is associated with many pseudo-randomly distributed points, the resources that were held by that bucket will now map to many different buckets. The items that mapped to the lost bucket must be redistributed among the remaining ones, but values mapping to other buckets will still do so and do not need to be moved. A similar process occurs when a bucket is added. You can clearly see that with this approach, the additions and deletions of a storage node only affects its immediate neighbors and all the other nodes remain unaffected.

The basic implementation of consistent hashing has its limitations:

  1. Random position assignment of each node leads to non-uniform data and load distribution.
  2. Heterogeneity of the nodes is not taken into account.

To overcome these challenges, DynamoDB uses the concept of virtual nodes. A virtual node looks like a single data storage node but each node is responsible for more than one virtual node. The number of virtual nodes that a single node is responsible for depends on its processing capacity.


Every data item is replicated at N nodes (and NOT virtual nodes). Each key is assigned to a coordinator node, which is responsible for READ and WRITE operation of that particular key. The job of this coordinator node is to ensure that every data item that falls in its range is stored locally and replicated to N - 1 nodes. The list of nodes responsible for storing a certain key is called preference list. To account for node failures, preference list usually contains more than N nodes. In the case of DynamoDB, the default replication factor is 3.

In the next article, we will look into Data Versioning.

Article authored by Vijay Olety.

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , , , , , | Leave a comment

DynamoDB: An Inside Look Into NoSQL – Part 3

In Part 2, we looked at Design Considerations of NoSQL and introduced the concept of eventual consistency. In this article, we will introduce the concepts and techniques used while architecting a NoSQL system.

The core distributed systems techniques employed in DynamoDB are – partitioning, replication, versioning, membership, failure handling and scaling. Phew! Did you really think that the internals will be simple? 🙂

DynamoDB Internals

DynamoDB (Credit)

The following table summarizes the list of techniques used in DynamoDB:

Problem Technique Advantage
Partioning Consistent Hashing Incremental Scalability
High Availability for Writes Vector Clocks with reconciliation during reads Version size is decoupled from update rates
Handling temporary failures Sloppy Quorum and Hinted Handoff Provides high availability & durability guarantee when some replicas are not available
Recovery from permanent failures Anti-entropy using Merkle trees Synchronizes divergent replicas in the background
Membership & Failure Detection Gossip-based membership protocol & failure detection Preserves symmetry and avoids having a centralized registry for storing membership and node liveness information

I know I have covered a lot of lingo and buzz words. If you really need to take a deep breath, now is the time! Fear not, in subsequent articles we will deep-dive into each of the above mentioned techniques. Remember, the devil is in the details!

System Interface

DynamoDB exposes two interfaces: get and put. The get(key) operation locates the object associated with the key and returns it along with a context. The put(key, context, object) operation uses key to determine the associated replicas and writes the object in those replicas. The context information is invisible to the user and contains metadata such as version. DynamoDB applies an MD5 hash on the key to generate a 128-bit identifier, which is used to determine the replicas that are responsible for serving the key.

Article authored by Vijay Olety

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , , , | Leave a comment

DynamoDB: An Inside Look Into NoSQL – Part 2

In Part 1, we introduced you to NoSQL, spoke about the CAP theorem and certain assumptions that need to be made while designing NoSQL data stores. Let’s dive deeper!

Design Considerations

Traditional commercial systems and applications perform data replication in a synchronized manner. The advantage of this approach is that data is always consistent. But the down side is that the system might itself not be available (CAP theorem). To put it simple: the data is unavailable until it is absolutely certain that it is replicated across all nodes correctly.

Alas! The Web world lives in its own perceived reality. 🙂 Systems go down and network fails regularly. Availability is the single largest factor which makes / breaks a company. It is thus imperative that we handle such scenarios. Netflix’s Chaos Monkey helps us architect our product to take into account these failures. In order to ensure availability at all costs, optimistic asynchronous replication strategies can be put in place. The drawback, however, is that it leads to conflicting changes to data which must be detected and resolved. The process of conflict resolution introduces 2 new problems: when to resolve them and who resolves them. DynamoDB introduces the novel concept of eventually consistent data store; that is all updates reach all nodes eventually.

Deciding when to perform the conflict resolution is a primary design consideration. We can perform it during the READ operation or WRITE operation. Many legacy data stores chose to do the conflict resolution during the WRITE operation. In such systems, WRITEs will be rejected if data is not replicated across all nodes. In large e-commerce companies such as Amazon, rejecting WRITEs is not an option as it leads to revenue loss and poor customer experience. Hence, DynamoDB does the complex conflict resolution during READs.

Let’s take an example to understand it better. Consider a system with 3 nodes: NODE1, NODE2 and NODE3. In a traditional system, a WRITE to NODE2 must be replicated to NODE1 and NODE3 and only then is the WRITE operation considered successful. This synchronized replication takes time to complete during which time the system is NOT available. But systems using DynamoDB have the option to defer this update in exchange for higher availability. So a WRITE to NODE2 is considered successful as long as NODE2 is able to honor that request. NODE2 eventually replicates it to NODE1 and NODE3. DynamoDB usually takes a second (or a maximum of a couple of seconds) to achieve consistency across all nodes.
Note: In case your product, like ours, needs a strongly consistent read just set the value of the attribute ConsistentRead to true.

Another very important design consideration is who performs the conflict resolution. It can either be done by the data store (DynamoDB in our case) or the application. The data store usually employs simple policies and rules such as “last WRITE wins”, which is pretty good in majority of the cases. If the application wishes to have complex rules and implement its own conflict resolution mechanisms, then it is free to do so.

A couple of other design considerations are as follows:

  1. Incremental Scalability: The data store should be able to scale-out 1 node at a time, with minimal or no impact on the system itself.
  2. Symmetry: All nodes in the data store are peers, i.e. all nodes are equal and share the same set of responsibilities.
  3. Decentralization: With a central authority, the most common problem faced is “single point of failure”. Decentralization helps us mitigate this and keep the system simple, more scalable and more available.
  4. Heterogeneity: Different nodes in the data store might have different configurations. Some nodes might be optimized for storage and some might be plain commodity hardware. The data store should take into account this heterogeneous mix of nodes to distribute tasks proportional to its capabilities.

In the next blog post, we will look into System Architecture.

Article authored by Vijay Olety

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , | Leave a comment

DynamoDB: An Inside Look Into NoSQL – Part 1

In our earlier posts (here and here), we introduced the Hadoop ecosystem & explained its various components using a real world example of the retail industry. We now possess a fair idea of the advantages of Big Data. NoSQL datastores are being used extensively in real-time & Big Data applications, which is why a look into its internals would help us make better design decisions in our applications.

NoSQL datastores provide a mechanism for retrieval and storage of data items that is modeled in a non-tabular manner. Simplicity of design, horizontal scalability and control over availability form the motivations for this approach. NoSQL is governed by the CAP theorem in the same way RDBMS is governed by ACID properties.

NoSQL Triangle (Credit)

From the AWS stable, DynamoDB is the perfect choice if you are looking for a NoSQL solution. DynamoDB is a “fast, fully managed NoSQL database service that makes it simple and cost-effective to store and retrieve any amount of data, and serve any level of request traffic. Its guaranteed throughput and single-digit millisecond latency make it a great fit for gaming, ad tech, mobile and many other applications.” Since it is a fully managed service, you need not worry about provisioning & managing of the underlying infrastructure. All the heavy-lifting is taken care for you.

Majority of the documentation available on the Net are how-to-get-started guides with examples of DynamoDB API usage. Let’s look at the thought process and design strategies that went into the making of DynamoDB.

“DynamoDB uses a synthesis of well known techniques to achieve scalability and availability: Data is partitioned and replicated using consistent hashing, and consistency is facilitated by object versioning. The consistency among replicas during updates is maintained by a quorum-like technique and a decentralized replica synchronization protocol. DynamoDB employs a gossip based distributed failure detection and membership protocol. Dynamo is a completely decentralized system with minimal need for manual administration. Storage nodes can be added and removed from DynamoDB without requiring any manual partitioning or redistribution.” You must be wondering – “Too much lingo for one paragraph”. Fret not, why fear when I am here 🙂 Let’s take one step at a time, shall we!

Requirements and Assumptions

This class of NoSQL storage system has the following requirements –

  • Query Model: A “key” uniquely identifies a data item. Read and write operations are performed on this data item. It must be noted that no operation spans across multiple data items. There is no need for relational schema and DynamoDB works best when a single data item is less than 1MB.
  • ACID Properties: As mentioned earlier, there is no need for relational schema and hence ACID (Atomicity, Consistency, Isolation, Durability) properties are not required. The industry and the academia acknowledge that ACID guarantees lead to poor availability. Dynamo targets applications that operate with weaker consistency if it results in high availability.
  • Efficiency: DynamoDB needs to run on commodity hardware infrastructure. Stringent SLA (Service Level Agreement) ensure that latency and throughput requirements are met for the 99.9% percentile of the distribution. But everything has a catch – the tradeoffs consist of performance, cost, availability and durability guarantees.

In subsequent articles, we will look into Design Considerations & System Architecture.

Article authored by Vijay Olety


  1. http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , | Leave a comment

Hazelcast & Distributed Sequence Counter

There always comes a time in your professional career when you get stuck, and for days at a time! Every developer worth his salt has definitely gone through this phase. Another such moment occurred in my career too recently.

We are building a product which uses FIFO queues. Simple right. Each request that we get needs to be queued with an always-incrementing-by-1 sequence counter. No problem here. A single application server can have AtomicLong‘s incrementAndGet() and you can be rest assured that the counter values are always incrementing. Piece of cake. What if you have 2 application servers? Oh oh! Your immediate answer would be – use MySQL’s AUTO_INCREMENT. What if MySQL is not the right tool? In our product, we had to go ahead with DynamoDB. And it does not support auto-increment. What now?

There are some elegant solutions such as Twitter’s Snowflake. It always guarantees unique, roughly sortable, increasing counters in a distributed environment. But we needed a solution that guarantees increment by 1.

As mentioned earlier, the “days at a time” started. 🙂 Thank God it wasn’t “weeks at a time”, though it seemed that way, as we soon came across Hazelcast! Hazelcast is an in-memory Open Source data grid based on Java. And its “Cross-JVM communication / shared storage” using Distributed Primitives was the perfect fit for our problem, i.e. a cluster-wide distributed sequence counter.

Let get our hands dirty, shall we? I will only touch upon IAtomicLong in this blog though we have extensively used ILock as well.

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;

String hazelcastCounter = "<name of your counter>"; // eg.: web_requests_counter
HazelcastInstance client = HazelcastClient.newHazelcastClient();
IAtomicLong counter = client.getAtomicLong(hazelcastCounter);
Long incrementedValue = counter.incrementAndGet();

It is simple, isn’t it! As a developer, it seems like all the operations are carried out locally because of the familiarity with functions such as incrementAndGet(). But under the hood lies the complexity of data synchronization across the cluster. And that, my friends, is the beauty of Hazelcast.

How do you form a cluster of AWS EC2 instances using Hazelcast? I will be back after a short commercial break. 🙂

Posted in Technical | Tagged , , , , , , | 2 Comments

Big Data for Retail Analysis – Hive & Flume

Last week we introduced Big Data ecosystem and showed a glimpse of the possibilities. This week we take one industry (Retail) use case and illustrate how the various tools can be orchestrated to provide insights.


The last couple of decades has seen a tectonic shift in the retail industry. The hawkers and mom and pop stores are being sidelined by heavy weight retail hypermarkets who operate in a complex landscape involving franchisees, JVs and multi partner vendors. In this kind of an environment, try visualizing the inventory, sales, supplier info for thousands of SKUs (Stock Keeping Units) per store and multiply it with the several thousand stores across cities, states and even countries over days, months and years and you will realize the volume of data they would be collecting. One such retail hypermarket, lets say BigRetail had 5-years of data containing vast amounts of semi-structured dataset which they wanted to be analysed for trends and patterns.

The Problem

The 5-year dataset was 13TB in size. Traditional business intelligence (BI) tools works best in the presence of a pre-defined schema. BigRetail dataset was mostly logs which didn’t conform to any specific schema. BigRetail took close to half a day to move the data into their BI systems weekly. They wanted to reduce this time. Queries over this data set took hours

The Solution

This is where Hadoop shines in all its glory! The problem is 2-fold: Problem 1: Moving the logs into HDFS periodically Problem 2: Performing analysis on this HDFS dataset As we had seen in the previous post, Apache Sqoop is used to move structured dataset into HDFS. Alas! How do we move semi-structured data? Fret not. Apache Flume is specially designed for collecting, aggregating, and moving large amounts of log data into HDFS. Once the dataset is inside HDFS, Hive was used to perform analysis. Let’s dig deep. Mind you – The devil is in the details.

Problem 1: How Flume solved the data transfer problem?

The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent HDFS store.

Flume’s typical dataflow is as follows: A Flume Agent is installed on each node of the cluster that produces log messages. These streams of log messages from every node are then sent to the Flume Collector. The collectors then aggregate the streams into larger streams which can then be efficiently written to a storage tier such as HDFS.

Problem 2: Analysis using Hive

Hive uses “Schema on Read” unlike a traditional database which uses “Schema on Write”. Schema on Write implies that a table’s schema is enforced at data load time. If the data being loaded doesn’t conform to the schema, then it is rejected. This mechanism might slow the loading process of the dataset usually, Whereas Schema on Read doesn’t verify the data when it’s loaded, but rather when a query is issued. For this precise reason, once the dataset is in HDFS moving it into Hive controlled namespace is usually instantaneous. Hive can also perform analysis on dataset in HDFS or local storage. But the preferred approach is to move the entire dataset into Hive controlled namespace (default location – hdfs://user/hive/warehouse) to enable additional query optimizations. While reading log files, the simplest recommended approach during Hive table creation is to use a RegexSerDe. It uses regular expression (regex) to serialize/deserialize. It deserializes the data using regex and extracts groups as columns. It can also serialize the row object using a format string.


– With RegexSerDe all columns have to be strings. Use CAST (a AS INT) to convert columns to other types.

– While moving data from HDFS to Hive, DO NOT use the keyword OVERWRITE

Solution Architecture using Flume + Hive

The merchandise details, user information, time of transaction, area / city / state information, coupon codes (if any) , customer data and other related details were collected and aggregated from various backend servers.

As mentioned earlier, the data-set to be analyzed was 13TB. Using the Hadoop default replication factor of 3, it would require 13TB * 3 = 39TB of storage capacity. After a couple of iterations on a smaller sample data set and subsequent performance tuning, we finalized the below cluster configuration and capacities

– 45 virtual instances, each with 64-bit OS platform, 12 GB RAM, 4-6 CPU cores and 1TB Storage

Flume configuration

Following Flume parameters were configured (sample)

flume.event.max.size.bytes uses the default value of 32KB.

flume.agent.logdir was changed to point to an appropriate HDFS directory

flume.master.servers: 3 Flume Masters – flumeMaster1, flumeMaster2, flumeMaster3

flume.master.store uses the default value – zookeeper

Hive configuration

Following Hive parameters were configured (sample)


javax.jdo.option.ConnectionDriverName: set the value to com.mysql.jdbc.Driver



By default, Hive metadata is usually stored in an embedded Derby database which allows only one user to issue queries. This is not ideal for production purposes. Hence, Hive was configured to use MySQL in this case.

Using the Hadoop system, log transfer time was reduced to ~3 hours weekly and querying time also was significantly improved.

Some of the schema tables that were present in the final design were – facts, products, customers, categories, locations and payments. Some sample Hive queries that were executed as part of the analysis are as follows –

Count the number of transactions

 Select count (*) from facts;

Count the number of distinct users by gender

Select gender, count (DISTINCT customer_id) from customers group by gender;

Only equality joins, inner & outer joins, semi joins and map joins are supported in Hive. Hive does not support join conditions that are not equality conditions as it is very difficult to express such conditions as a MapReduce job. Also, more than two tables can be joined in Hive.

List the category to which the product belongs

Select products .product_name, products .product_id, categories.category_name from products JOIN categories ON (products.product_category_id = categories.category_id);

Count of the number of transactions from each location

Select locations.location_name, count (DISTINCT facts.payment_id) from facts JOIN locations ON (facts.location_id = locations.location_id) group by locations .location_name;

Interesting trends / analysis using Hive

Some of the interesting trends that were observed from this dataset using Hive were:

  • There was a healthy increase in YoY growth across all retail product categories
  • Health & Beauty Products saw the highest growth rate at 72%, closely followed by Food Products (65 %) and Entertainment (57.8%).
  • Northern states spend more on Health & Beauty Products while the South spent more on Books and Food Products
  • 2 metros took the top spot for the purchase of Fashion & Apparels
  • A very interesting and out-of-the-ordinary observation was that men shop more than women! Though the difference isn’t much, it’s quite shocking J (Note: This also could be because when couples tend to shop together the man pays the bill!)

X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , , | Leave a comment

Big Data: Getting Started with Hadoop, Sqoop & Hive

We live in the Data Age! Web has been growing rapidly in size as well as scale during the last 10 years and shows no signs of slowing down. Statistics show that every passing year more data gets generated than all the previous years combined. Moore’s law not only holds true for hardware but for data being generated too! Without wasting time for coining a new phrase for such vast amounts of data, the computing industry decided to just call it, plain and simple, Big Data.

Apache Hadoop is a framework that allows for the distributed processing of such large data sets across clusters of machines. At its core, it consists of 2 sub-projects – Hadoop MapReduce and Hadoop Distributed File System (HDFS). Hadoop MapReduce is a programming model and software framework for writing applications that rapidly process vast amounts of data in parallel on large clusters of compute nodes. HDFS is the primary storage system used by Hadoop applications. HDFS creates multiple replicas of data blocks and distributes them on compute nodes throughout a cluster to enable reliable, extremely rapid computations.

Figure 1: Map Reduce Archirecture

The logical question arises – How do we set up a Hadoop cluster?

Installation of Apache Hadoop 1.x

We will proceed to install Hadoop on 3 machines. One machine, the master, is the NameNode & JobTracker and the other two, the slaves, are DataNodes & TaskTrackers.


  1. Linux as development and production platform (Note: Windows is only a development platform. It is not recommended to use in production)
  2. Java 1.6 or higher, preferably from Sun, must be installed
  3. ssh must be installed and sshd must be running
  4. From a networking standpoint, all the 3 machines must be pingable from one another

Before proceeding with the installation of Hadoop, ensure that the prerequisites are in place on all the 3 machines. Update /etc/hosts on all machines so as to enable references as masterslave1 and slave2.

Download and Installation

Download Hadoop 1.2.1. Installing a Hadoop cluster typically involves unpacking the software on all the machines in the cluster.

Configuration Files

The below mentioned files need to be updated:


On all machines, edit the file conf/hadoop-env.sh to define JAVA_HOME to be the root of your Java installation. The root of the Hadoop distribution is referred to as HADOOP_HOME. All machines in the cluster usually have the same HADOOP_HOME path.


Update this file on master machine alone with the following line:



Update this file on master machine alone with the following lines:



Update this file on all machines:



Update this file on all machines:



The default value of dfs.replication is 3. Since there are only 2 DataNodes in our Hadoop cluster, we update this value to 2. Update this file on all machines –


After changing these configuration parameters, we have to format HDFS via the NameNode.

bin/hadoop namenode -format

We start the Hadoop daemons to run our cluster.

On the master machine,


Your Hadoop cluster is up and running!

Loading Data into HDFS

Data stored in databases and data warehouses within a corporate datacenter has to be efficiently transferred into HDFS. Apache Sqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance. The input to the import process is a database table. Sqoop will read the table row-by-row into HDFS. The output of this import process is a set of files containing a copy of the imported table.

Scoop Architecture


  1. A working Hadoop cluster

Download and Installation

Download Sqoop 1.4.4. Installing Sqoop typically involves unpacking the software on the NameNodemachine. Set SQOOP_HOME and add it to PATH.

Let’s consider that MySQL is the corporate database. In order for Sqoop to work, we need to copymysql-connector-java-<version>.jar into SQOOP_HOME/lib directory.

Import data into HDFS

As an example, a basic import of a table named CUSTOMERS in the cust database:

sqoop import --connect jdbc:mysql://db.foo.com/cust --table CUSTOMERS

On successful completion, a set of files containing a copy of the imported table is present in HDFS.

Analysis on HDFS Data

Now that data is in HDFS, it’s time to perform analysis on the data and gain valuable insights.

During the initial days, end users have to write map/reduce programs for simple tasks like getting raw counts or averages. Hadoop lacks the expressibility of popular query languages like SQL and as a result users ended up spending hours (if not days) to write programs for typical analysis.

Enter Hive!

Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query and analysis. Hive was created to make it possible for analysts with strong SQL skills (but meager Java programming skills) to run queries on the huge volumes of data to extract patterns and meaningful information. It provides an SQL-like language called HiveQL while maintaining full support for MapReduce. Any HiveQL query is divided into MapReduce tasks which run on the robust Hadoop framework.

Hive Architecture


  1. A working Hadoop cluster

Download and Installation

Download Hive 0.11. Installing Hive typically involves unpacking the software on the NameNodemachine. Set HIVE_HOME and add it to PATH.

In addition, you must create /tmp and /user/hive/warehouse (a.k.a.hive.metastore.warehouse.dir) and set them chmod g+w in HDFS before you can create a table in Hive. The commands are listed below –

$HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

Another important feature of Sqoop is that it can import data directly into Hive.

sqoop import --connect jdbc:mysql://db.foo.com/cust --table CUSTOMERS --hive-import

The above command created a new Hive table CUSTOMERS and loads it with the data from the corporate database. It’s time to gain business insights!

select count (*) from CUSTOMERS;

Apache Hadoop, along with its ecosystem, enables us to deal with Big Data in an efficient, fault-tolerant and easy manner!

Verticals such as airlines have been collecting data (eg: flight schedules, ticketing inventory, weather information, online booking logs) which reside in disparate machines. Many of these companies do not yet have systems in place to analyze these data points at a large scale. Hadoop and its vast ecosystem of tools can be used to gain valuable insights, from understanding customer buying patterns to upselling certain privileges to increasing ancillary revenue, to stay competitive in today’s dynamic business landscape.


X-Post from CloudAcademy.com

Posted in Technical | Tagged , , , | Leave a comment

Apache Solr 4 Cookbook – A Review

I got an opportunity to review Apache Solr 4 Cookbook.

Read my review below –

I recently read this book and I am really impressed! This book provides good understanding of Apache Solr for both developers as well as consultants.

The book starts off well with an introduction to Apache Solr, the web / app servers required, the role of Zookeeper, why clustering your data is vital, the various directory implementations, performance-oriented caching mechanisms, a sample crawler module which coupled with Solr gives a complete end-to-end solution, the role of Apache Tika as a extracting toolkit and the ease of customizing Solr. From then on, the book dwells into the details.

The first step is indexing. It plays a vital part in the entire search solution. Data can be in the form on .txt, .pdf or any other format. It is imperative that all such formats are easily indexable. One of the widely used tools for extracting metadata and language detection is Apache Tika. Data can also be present in a database, for which the Data Import Handler (DIH) is handy. It comes in two variants – full and delta. Every detail is nicely explained with examples which can make the development time faster. DIH also helps us to modify the data while importing which I felt is a pretty neat feature! One of the nicest features included in Solr 4 is the ability to update single field in a document. I am not sure why this was included in the earlier versions but it’s a classic case of better late than never.

The next step in the pipeline is the data analysis which is achieved through the use of analyzers and tokenizers. Various use cases include elimination HTML and XML tags, copying the contents of one field to another and stemming words amongst others. The detailing that has gone into explaining every concept, the examples and the associated step-by-step explanation is really helpful.

Now that the data is indexed and the data preparation is completed, it’s time to query Apache Solr! Searches can be performed on individual words or on a phrase. You can boost or elevate certain documents over others based on your requirements. Simple concepts such as sorting and faceting of results to complex ones such as ignoring typos using n-grams and detecting duplicates are very simple to understand and perform. Faceting, in particular, is gaining momentum as it helps in implementing the auto-suggest feature and narrowing down the search criteria. A newly introduced feature called the pivot faceting was a much needed one and it vastly simplifies certain use cases related to faceting. Solr provides immense capabilities when it comes to querying and this book explains each of them in great detail taking real-world examples.

We indexed and queried the data. But as our application scales, we have to get our hands dirty and start fine-tuning the performance metrics in order to give a good user experience to our customers. This is where caches and its various flavors and granularities starts to make sense. Cache always plays a major role in any deployment and it is necessary to monitor Solr at all times to gauge its performance. This book can done a great job in clearly explaining the various types of caches, the commit operation and its impact on searchers and how to overcome these. This topic is really important for any Solr real-world deployment and this book has not let me down!

Apache Solr 4.0 introduced the most-awaited SolrCloud feature that allows us to use distributed indexing and searching. Setting up of SolrCloud cluster along with a Zookeeper ensemble to enable replication, fault-tolerance and high availability along with disaster recovery is a piece-of-a-cake now. I really appreciate the time and effort spent on documenting and explaining how to set up two collections inside a single cluster. It was a nightmare to find information on this particular topic when we were implementing SolrCloud for one of our customers. But I am rest assured that others referring this book will save precious time of theirs. Adding / deleting nodes from a cluster is no longer a tedious task as the entire process is automated through the presence of Zookeeper nodes. The in-depth knowledge of the author in these topics is clearly visible and is of great help to all the readers. A touch on Zookeeper Rolling Restart, though off-topic, might enable readers to get a complete birds-eye view of the entire cluster. Certain features such as soft commit and NRT search have been explained in detail afterwards (under Real-life Situations) but I felt that at least a mention earlier would have provided a much needed continuity in that section. For the geeky readers like me, a detailed description about load balancing across shards and replicas and their customizations, if any, would have added an extra amount of spice to this well-cooked food!

As with any other tool, Solr deployment too will run into some kind of a problem. This section details the common problems that are encountered and effective ways to overcome these. Shrinking the size of the index and allocating enough memory in advance amongst others are some of the solutions explained in detail and is clearly documented in this book.

Lastly, as every developer would have wanted it, the real-world scenarios are described and the various Solr concepts that were explained in earlier sections are put together as part of a complete end-to-end solution.
Any one trying Solr 4.0 must read this book in its entirety before recommending a Solr production architecture. As mentioned above, there are a few suggestions which if incorporated in this book would benefit readers. All in all, this book will be really helpful for the developers and consultants alike!

Posted in Technical | Tagged , , | Leave a comment