Friday, July 19, 2013

Terracotta BigMemory-Hadoop connector: A detailed hands-on tutorial

In my previous post, "How to reconcile Real-Time and Batch processing using In-Memory technology: A demo at the AFCEA Cyber Symposium Plugfest", I went over the challenges and benefits of reconciling real-time analytics with batched analytics. Doing so, I explained the solution we put together to create an integrated Real-Time analytical capability "augmented" by a batched BigData Hadoop cluster.

A critical piece of that architecture is the ability for Terracotta BigMemory to act as a fast In-Memory buffer, accessible by both the real-time world and the batch world...effectively bridging the gap between the 2.
The Terracotta BigMemory-Hadoop connector is at the center of that piece, allowing hadoop to write seamlessly to BigMemory.

For general information, please refer to existing writings about this connector:
But in this post, I want to be "hands-on" and enable you to see it running for yourself on your own development box. I've outlined the 5 major steps to successfully install and test the Hadoop-to-BigMemory connector on your own development platform.
I'll be using as a guide the code I put together for "AFCEA Cyber Symposium Plugfest", available on github at https://github.com/lanimall/cyberplugfest

Master Step 1 - Get the software components up and running


1 - Let's download the needed components:


2 - Clone the git repository to get the cyberplugfest code: 

git clone https://github.com/lanimall/cyberplugfest
In the rest of the article, we will assume that $CYBERPLUGFEST_CODE_HOME is the root install directory for the code.

3 - Extract the hadoop connector somewhere on your development box. 


The content of the package has some simple instructions as well as a "wordcount" map reduce package.
If you want to explore and follow the default instructions + sample word count program, it works fine…but please note that I took some liberties when it comes to my setup…and these will be explained in this article.
In the rest of the article, we will assume that $TC_HADOOP_HOME is the root install directory of the terracotta hadoop connector.

4 - Install, configure, and start BigMemory Max 


Follow this guide at http://terracotta.org/documentation/4.0/bigmemorymax/get-started. Make sure to try the helloWorld application to see if things are setup properly.
In the rest of the article, we will assume that $TC_HOME is the root directory of BigMemory Max.

I added a sample tc-config.xml at https://github.com/lanimall/cyberplugfest/blob/master/configs/tc-config.xml.

To get bigmemory-max started with that configuration file on your local machine, run:

export CYBERPLUGFEST_CODE_HOME=<root path to cyberplugfest code cloned from github>
export TC_HOME=<root path to terracotta install>
$TC_HOME/server/bin/start-tc-server.sh -f $CYBERPLUGFEST_CODE_HOME/configs/tc-config.xml -n Server1

5 - Install Hadoop


I used the pseudo distributed mode for development…Tuning and configuring hadoop is outside the scope of this article…but should certainly be explored as a "go further" step. The apache page http://hadoop.apache.org/docs/stable/single_node_setup.html is good to get started on that...
In the rest of the article, we will assume that $HADOOP_INSTALL is the root install directory of Apache Hadoop

6 - Add the needed terracotta libraries to the Hadoop class path


  • The hadoop connector library: bigmemory-hadoop-0.1.jar
  • The ehcache client library: ehcache-ee-<downloaded version>.jar
  • The terracotta toolkit library: terracotta-toolkit-runtime-ee-<downloaded version>.jar
Note: I've downloaded on step 4 the version 4.0.2 of bigmemory-max, so that's the version I'll be using here. Adjust appropriately the HADOOP_CLASSPATH below based on the version you downloaded.

Edit $HADOOP_INSTALL/conf/hadoop-env.sh and add the following towards the top (replace the default empty HADOOP_CLASSPATH= line with it)
export TC_HOME=<root path to terracotta install>
export TC_HADOOP_HOME=<root path to hadoop install>
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${TC_HADOOP_HOME}/jars/bigmemory-hadoop-0.1.jar:${TC_HOME}/apis/ehcache/lib/ehcache-ee-2.7.2.jar:${TC_HOME}/apis/toolkit/lib/terracotta-toolkit-runtime-ee-4.0.2.jar

7 - Start Hadoop in pseudo-distributed mode


Master Step 2 - Write the Map/Reduce job using spring-hadoop and Terracotta BigMemory output connector


Ok, at this point, you should have all the software pieces (big memory max and hadoop) ready and running in the background. Now it's time to build a map/reduce job that will output something in Terracotta BigMemory. For the "AFCEA Cyber Symposium Plugfest" which this article is based on, I decided to build a simple "Mean Calculation" map/reduce job…the idea being that the job would run on a schedule, calculate the mean for all the transactions per Vendor, and output the calculated mean per vendor into a Terracotta BigMemory cache.

Cache
"vendorAvgSpend"
Key Value
Vendor A Mean A
Vendor B Mean B
... ...
Vendor N Mean N

And since I really like Spring (http://www.springsource.org) and wanted to extend the simple hadoop wordCount example, I decided to use Spring-Data Hadoop (http://www.springsource.org/spring-data/hadoop) to build the map reduce job for the plugfest.

Some really good tutorials for Spring Hadoop out there, so I don't want to duplicate here…One I liked for it's simplicity and clarity was http://www.petrikainulainen.net/programming/apache-hadoop/creating-hadoop-mapreduce-job-with-spring-data-apache-hadoop/

Rather, I'll concentrate at the specificities related to the Terracotta BigMemory output writing.
Code available at: https://github.com/lanimall/cyberplugfest/tree/master/HadoopJobs

1 - Let's explore the application-context.xml


https://github.com/lanimall/cyberplugfest/blob/master/HadoopJobs/src/main/resources/META-INF/spring/application-context.xml

a - Specify the output cache name for the BigMemory hadoop job

In the <hdp:configuration><hdp:configuration>, make sure to add the "bigmemory.output.cache" entry that specifies the output cache. Since our output cache is "vendorAvgSpend", it should basically be: bigmemory.output.cache=vendorAvgSpend
NOTE: I use Maven resource plugin, so this value is actually specific in the pom.xml (in the property "hadoop.output.cache")

b - Check the difference between hadoop jobs

  • hdjob-vendoraverage=standard M/R job that outputs to HDFS
  • hdjob-vendoraverage-bm=the same M/R job that outputs to BigMemory
You'll notice 4 differences:
  1. output-format
    1. For the hadoop BigMemory job (hdjob-vendoraverage-bm), output-format value is: "org.terracotta.bigmemory.hadoop.BigmemoryOutputFormat"
    2. For hdjob-vendoraverage, it's the standard "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat"
  2. output-path
    1. It is not needed for the hadoop BigMemory job since it does not write onto HDFS...
  3. reducer
    1. For the hadoop BigMemory job, a different reducer implementation is needed (org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducerBigMemory) because you need to return an object of type "BigmemoryElementWritable"
    2. For hdjob-vendoraverage job, the reducer returns an object of type"Text".
  4. files
    1. In the hdjob-vendoraverage-bm job, you need to add the terracotta license file so the hadoop job can connect to the terracotta bigmemory (enterprise feature)

c - Specify the job to run.

Done in the <hdp:job-runner ...> tag. You can switch back and forth to see the difference...

2 - Now, let's look at the reducers


Compare:
  • hdjob-vendoraverage-bm reducer class: org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducerBigMemory
  • hdjob-vendoraverage: org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducer
The difference is pretty much the return type that must be a "BigmemoryElementWritable" type if you want to output the results to Terracotta BigMemory.

3 - Include the cache configuration (Ehcache.xml) in your M/R project


To specify the details for the vendorAvgSpend cache. Using the maven conventions, the file is included (along my other resources files) in the resources folder (https://github.com/lanimall/cyberplugfest/blob/master/HadoopJobs/src/main/resources/ehcache.xml)

In this ehcache.xml file, you'll notice our hadoop output cache (as well as several other caches that are NOT used by the hadoop jobs). The one thing that is needed is that it must be a "distributed" cache - in other word, the data will be stored on the BigMemory Max Server instance that should be already running on your development box (The "" and "" tags specifies that)

For more info on that, go to http://terracotta.org/documentation/4.0/bigmemorymax/get-started/server-array

Master Step 3 - Prepare the sample data


In the real demo scenario, I use Apache Flume (http://flume.apache.org) to "funnel" near real-time the generated sample data into HDFS…But for the purpose of this test, it can all work fine with some sample data. All we need to do is import the data into our local HDFS.

Extract the sample at: $CYBERPLUGFEST_CODE_HOME/HadoopJobs/SampleTransactionsData/sample-data.zip.
It should create a "flume" folder with the following hierarchy:
  1. flume/
    1. events/
      1. 13-07-17/
        1. events.* (those are the files with the comma separated data)

Navigate to $CYBERPLUGFEST_CODE_HOME/HadoopJobs/SampleTransactionsData/
Run the hadoop shell "put" command to add all these files into HDFS:
$HADOOP_INSTALL/bin/hadoop dfs -put flume/ .
Once done, verify that the data is in HDFS by running (This should bring a lot of event files…):
$HADOOP_INSTALL/bin/hadoop dfs -ls flume/events/13-07-17/ 

Master Step 4 - Compile and Run the hadoop job


Still referring to the Cyberplugfest code available at https://github.com/lanimall/cyberplugfest/, you'll need to simply execute a maven build to get going.

Before that though, make sure the maven properties are right for your environment (i.e. hadoop name node url, hadoop job tracker url, terracotta url, cache name, etc…). These properties are specified towards the end of the pom file, in the maven profiles I created for that event (dev profile is for my local, prod profile is to deploy in the amazon ec2 cloud)

Then, navigate to the $CYBERPLUGFEST_CODE_HOME/HadoopJobs folder and run:
mvn clean package appassembler:assemble
This should build without an issue, and create a "PlugfestHadoopApp" executable script (the maven appassembler plugin helps with that) in $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin folder.

Depending on your platform (window or nix), chose the right script (sh or bat) and run:
sh $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin/PlugfestHadoopApp
or
%CYBERPLUGFEST_CODE_HOME%/HadoopJobs/target/appassembler/bin/PlugfestHadoopApp.bat
Your hadoop job should be running.



Master Step 5 - Verify data is written to Terracotta BigMemory


Now we'll verify that the data was written to BigMemory from the hadoop job. Simply run:
sh $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin/VerifyBigmemoryData
or
%CYBERPLUGFEST_CODE_HOME%/HadoopJobs/target/appassembler/bin/VerifyBigmemoryData.bat
You should see 6 entries being printed for cache vendorAvgSpend

Final Words


Using this hadoop-to-bigmemory connector, you can truly start to think: "I can now access all my BigData insights at micro-second speed directly from within all my enterprise applications, AND confidently rely on the fact that these insights will be updated automatically whenever you hadoop jobs are running next".

Hope you find this hands-on post useful.

Monday, July 1, 2013

How to reconcile Real-Time and Batch processing using In-Memory technology: A demo at the AFCEA Cyber Symposium Plugfest

As you might remember, we(*) participated in a "Plugfest"(**) earlier this year in San Diego. Here is the summary post of what we built for that occasion: http://fsanglier.blogspot.com/2013/04/my-2013-afcea-san-diego-plugfest.html.

This time around, we entered the plugfest competition as a technology provider at the AFCEA Cyber Symposium, which happened last week (June 25-27 2013) in Baltimore. We not only provided technologies components and data feeds to the challengers (San Diego State University, GMU, Army PEO C3T milSuite), but also built a very cool Fraud Detection and Money Laundering demo which was 1 of the plugfest use case for this cyber event.

Our demo was centered around a fundamental "Big Data" question: How can you detect fraud on 100,000s transactions per seconds in real-time (which is absolutely critical if you don't want to lose lots of $$$$ to fraud) while efficiently incorporating in that real-time process data from external systems (i.e. data warehouse or hadoop clusters).
Or in more general words: How to reconcile Real-time processing and Batch processing when dealing with large amounts of data.

To answer this question, we put together a demo centered around Terracotta's In-Genius intelligence platform (http://terracotta.org/products/in-genius) which provides a highly scalable low-latency in-memory layer capable of "reconciling" the real-time processing needs (ultra low latency with large amounts of new transactions) with the traditional batch processing needs (100s of TB/PB processed in an asynchronous background jobs), all bundled in a simple software package deployable on any commodity hardware.

Here is the solution we assembled:

Cyber Plugfest Software Architecture
How to reconcile real-time and batch processing

A quick view at how it all works:
  1. A custom transaction simulator generates pseudo-random fictional credit card transactions and publish all of them onto a JMS topic (Terracotta Universal Messaging bus)
  2. Each JMS message is delivered through pub/sub messaging to both real-time and batch track:
    1. The Complex Event Processing (CEP) engine which will identify fraud in real-time through the use of continuous queries.
      1. See "Real-Time fraud detection route"
    2. Apache Flume, an open source platform which will efficiently and reliably route all the messages into HDFS for further batch processing.
      1. See "Batch Processing Route"
  3. Batch Processing Route:
    1. Apache hadoop to collect and store all the transaction data in its powerful batch-optimized file system
    2. Map-Reduce jobs to compute transaction trends (simplified rolling average in this demo case) on the full transaction data for each vendors, customer, or purchase types.
    3. Output of map-reduce jobs stored in Terracotta BigMemory Max In-Memory platform.
  4. Real-Time fraud detection route:
    1. CEP fraud detection queries fetch from Terracotta BigMemory Max (microsecond latency under load) the hadoop-calculated averages (in 3.2), and correlates those with the current incoming transaction to detect anomalies (potential fraud) in real-time.
    2. Mashzone, a mashup and data aggregation tool to provide visualization on detected fraud data.
    3. For other plugfest challengers and technology providers to be able to use our data, all our data feeds were also available in REST, SOAP, and Web socket formats (which were used by ESRI, Visual Analytics, and others)

As I hope you can see in this post, having a scalable and powerful in-memory layer acting as the middle man between Hadoop and CEP is the key to providing true real-time analysis while still taking advantage of all the powerful computing capabilities that Hadoop has to offer.

In further posts, I'll explain in more detail all the components and code (code and configs are available on github at https://github.com/lanimall/cyberplugfest).

Notes:


(*) "we" = The SoftwareAG Government Solutions team, which I'm part of...
(**) "Plugfest" = "collaborative competitive challenge where industry vendors, academic, and government teams work towards solving a specific set of "challenges" strictly using the RI2P industrial best practices (agile, open standard, SOA, cloud, etc.) for enterprise information system development and deployment." (source: http://www.afcea.org/events/west/13/plugfest.asp)