Friday, July 19, 2013

Terracotta BigMemory-Hadoop connector: A detailed hands-on tutorial

In my previous post, "How to reconcile Real-Time and Batch processing using In-Memory technology: A demo at the AFCEA Cyber Symposium Plugfest", I went over the challenges and benefits of reconciling real-time analytics with batched analytics. Doing so, I explained the solution we put together to create an integrated Real-Time analytical capability "augmented" by a batched BigData Hadoop cluster.

A critical piece of that architecture is the ability for Terracotta BigMemory to act as a fast In-Memory buffer, accessible by both the real-time world and the batch world...effectively bridging the gap between the 2.
The Terracotta BigMemory-Hadoop connector is at the center of that piece, allowing hadoop to write seamlessly to BigMemory.

For general information, please refer to existing writings about this connector:
But in this post, I want to be "hands-on" and enable you to see it running for yourself on your own development box. I've outlined the 5 major steps to successfully install and test the Hadoop-to-BigMemory connector on your own development platform.
I'll be using as a guide the code I put together for "AFCEA Cyber Symposium Plugfest", available on github at https://github.com/lanimall/cyberplugfest

Master Step 1 - Get the software components up and running


1 - Let's download the needed components:


2 - Clone the git repository to get the cyberplugfest code: 

git clone https://github.com/lanimall/cyberplugfest
In the rest of the article, we will assume that $CYBERPLUGFEST_CODE_HOME is the root install directory for the code.

3 - Extract the hadoop connector somewhere on your development box. 


The content of the package has some simple instructions as well as a "wordcount" map reduce package.
If you want to explore and follow the default instructions + sample word count program, it works fine…but please note that I took some liberties when it comes to my setup…and these will be explained in this article.
In the rest of the article, we will assume that $TC_HADOOP_HOME is the root install directory of the terracotta hadoop connector.

4 - Install, configure, and start BigMemory Max 


Follow this guide at http://terracotta.org/documentation/4.0/bigmemorymax/get-started. Make sure to try the helloWorld application to see if things are setup properly.
In the rest of the article, we will assume that $TC_HOME is the root directory of BigMemory Max.

I added a sample tc-config.xml at https://github.com/lanimall/cyberplugfest/blob/master/configs/tc-config.xml.

To get bigmemory-max started with that configuration file on your local machine, run:

export CYBERPLUGFEST_CODE_HOME=<root path to cyberplugfest code cloned from github>
export TC_HOME=<root path to terracotta install>
$TC_HOME/server/bin/start-tc-server.sh -f $CYBERPLUGFEST_CODE_HOME/configs/tc-config.xml -n Server1

5 - Install Hadoop


I used the pseudo distributed mode for development…Tuning and configuring hadoop is outside the scope of this article…but should certainly be explored as a "go further" step. The apache page http://hadoop.apache.org/docs/stable/single_node_setup.html is good to get started on that...
In the rest of the article, we will assume that $HADOOP_INSTALL is the root install directory of Apache Hadoop

6 - Add the needed terracotta libraries to the Hadoop class path


  • The hadoop connector library: bigmemory-hadoop-0.1.jar
  • The ehcache client library: ehcache-ee-<downloaded version>.jar
  • The terracotta toolkit library: terracotta-toolkit-runtime-ee-<downloaded version>.jar
Note: I've downloaded on step 4 the version 4.0.2 of bigmemory-max, so that's the version I'll be using here. Adjust appropriately the HADOOP_CLASSPATH below based on the version you downloaded.

Edit $HADOOP_INSTALL/conf/hadoop-env.sh and add the following towards the top (replace the default empty HADOOP_CLASSPATH= line with it)
export TC_HOME=<root path to terracotta install>
export TC_HADOOP_HOME=<root path to hadoop install>
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${TC_HADOOP_HOME}/jars/bigmemory-hadoop-0.1.jar:${TC_HOME}/apis/ehcache/lib/ehcache-ee-2.7.2.jar:${TC_HOME}/apis/toolkit/lib/terracotta-toolkit-runtime-ee-4.0.2.jar

7 - Start Hadoop in pseudo-distributed mode


Master Step 2 - Write the Map/Reduce job using spring-hadoop and Terracotta BigMemory output connector


Ok, at this point, you should have all the software pieces (big memory max and hadoop) ready and running in the background. Now it's time to build a map/reduce job that will output something in Terracotta BigMemory. For the "AFCEA Cyber Symposium Plugfest" which this article is based on, I decided to build a simple "Mean Calculation" map/reduce job…the idea being that the job would run on a schedule, calculate the mean for all the transactions per Vendor, and output the calculated mean per vendor into a Terracotta BigMemory cache.

Cache
"vendorAvgSpend"
Key Value
Vendor A Mean A
Vendor B Mean B
... ...
Vendor N Mean N

And since I really like Spring (http://www.springsource.org) and wanted to extend the simple hadoop wordCount example, I decided to use Spring-Data Hadoop (http://www.springsource.org/spring-data/hadoop) to build the map reduce job for the plugfest.

Some really good tutorials for Spring Hadoop out there, so I don't want to duplicate here…One I liked for it's simplicity and clarity was http://www.petrikainulainen.net/programming/apache-hadoop/creating-hadoop-mapreduce-job-with-spring-data-apache-hadoop/

Rather, I'll concentrate at the specificities related to the Terracotta BigMemory output writing.
Code available at: https://github.com/lanimall/cyberplugfest/tree/master/HadoopJobs

1 - Let's explore the application-context.xml


https://github.com/lanimall/cyberplugfest/blob/master/HadoopJobs/src/main/resources/META-INF/spring/application-context.xml

a - Specify the output cache name for the BigMemory hadoop job

In the <hdp:configuration><hdp:configuration>, make sure to add the "bigmemory.output.cache" entry that specifies the output cache. Since our output cache is "vendorAvgSpend", it should basically be: bigmemory.output.cache=vendorAvgSpend
NOTE: I use Maven resource plugin, so this value is actually specific in the pom.xml (in the property "hadoop.output.cache")

b - Check the difference between hadoop jobs

  • hdjob-vendoraverage=standard M/R job that outputs to HDFS
  • hdjob-vendoraverage-bm=the same M/R job that outputs to BigMemory
You'll notice 4 differences:
  1. output-format
    1. For the hadoop BigMemory job (hdjob-vendoraverage-bm), output-format value is: "org.terracotta.bigmemory.hadoop.BigmemoryOutputFormat"
    2. For hdjob-vendoraverage, it's the standard "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat"
  2. output-path
    1. It is not needed for the hadoop BigMemory job since it does not write onto HDFS...
  3. reducer
    1. For the hadoop BigMemory job, a different reducer implementation is needed (org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducerBigMemory) because you need to return an object of type "BigmemoryElementWritable"
    2. For hdjob-vendoraverage job, the reducer returns an object of type"Text".
  4. files
    1. In the hdjob-vendoraverage-bm job, you need to add the terracotta license file so the hadoop job can connect to the terracotta bigmemory (enterprise feature)

c - Specify the job to run.

Done in the <hdp:job-runner ...> tag. You can switch back and forth to see the difference...

2 - Now, let's look at the reducers


Compare:
  • hdjob-vendoraverage-bm reducer class: org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducerBigMemory
  • hdjob-vendoraverage: org.terracotta.pocs.cyberplugfest.VendorSalesAvgReducer
The difference is pretty much the return type that must be a "BigmemoryElementWritable" type if you want to output the results to Terracotta BigMemory.

3 - Include the cache configuration (Ehcache.xml) in your M/R project


To specify the details for the vendorAvgSpend cache. Using the maven conventions, the file is included (along my other resources files) in the resources folder (https://github.com/lanimall/cyberplugfest/blob/master/HadoopJobs/src/main/resources/ehcache.xml)

In this ehcache.xml file, you'll notice our hadoop output cache (as well as several other caches that are NOT used by the hadoop jobs). The one thing that is needed is that it must be a "distributed" cache - in other word, the data will be stored on the BigMemory Max Server instance that should be already running on your development box (The "" and "" tags specifies that)

For more info on that, go to http://terracotta.org/documentation/4.0/bigmemorymax/get-started/server-array

Master Step 3 - Prepare the sample data


In the real demo scenario, I use Apache Flume (http://flume.apache.org) to "funnel" near real-time the generated sample data into HDFS…But for the purpose of this test, it can all work fine with some sample data. All we need to do is import the data into our local HDFS.

Extract the sample at: $CYBERPLUGFEST_CODE_HOME/HadoopJobs/SampleTransactionsData/sample-data.zip.
It should create a "flume" folder with the following hierarchy:
  1. flume/
    1. events/
      1. 13-07-17/
        1. events.* (those are the files with the comma separated data)

Navigate to $CYBERPLUGFEST_CODE_HOME/HadoopJobs/SampleTransactionsData/
Run the hadoop shell "put" command to add all these files into HDFS:
$HADOOP_INSTALL/bin/hadoop dfs -put flume/ .
Once done, verify that the data is in HDFS by running (This should bring a lot of event files…):
$HADOOP_INSTALL/bin/hadoop dfs -ls flume/events/13-07-17/ 

Master Step 4 - Compile and Run the hadoop job


Still referring to the Cyberplugfest code available at https://github.com/lanimall/cyberplugfest/, you'll need to simply execute a maven build to get going.

Before that though, make sure the maven properties are right for your environment (i.e. hadoop name node url, hadoop job tracker url, terracotta url, cache name, etc…). These properties are specified towards the end of the pom file, in the maven profiles I created for that event (dev profile is for my local, prod profile is to deploy in the amazon ec2 cloud)

Then, navigate to the $CYBERPLUGFEST_CODE_HOME/HadoopJobs folder and run:
mvn clean package appassembler:assemble
This should build without an issue, and create a "PlugfestHadoopApp" executable script (the maven appassembler plugin helps with that) in $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin folder.

Depending on your platform (window or nix), chose the right script (sh or bat) and run:
sh $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin/PlugfestHadoopApp
or
%CYBERPLUGFEST_CODE_HOME%/HadoopJobs/target/appassembler/bin/PlugfestHadoopApp.bat
Your hadoop job should be running.



Master Step 5 - Verify data is written to Terracotta BigMemory


Now we'll verify that the data was written to BigMemory from the hadoop job. Simply run:
sh $CYBERPLUGFEST_CODE_HOME/HadoopJobs/target/appassembler/bin/VerifyBigmemoryData
or
%CYBERPLUGFEST_CODE_HOME%/HadoopJobs/target/appassembler/bin/VerifyBigmemoryData.bat
You should see 6 entries being printed for cache vendorAvgSpend

Final Words


Using this hadoop-to-bigmemory connector, you can truly start to think: "I can now access all my BigData insights at micro-second speed directly from within all my enterprise applications, AND confidently rely on the fact that these insights will be updated automatically whenever you hadoop jobs are running next".

Hope you find this hands-on post useful.

Monday, July 1, 2013

How to reconcile Real-Time and Batch processing using In-Memory technology: A demo at the AFCEA Cyber Symposium Plugfest

As you might remember, we(*) participated in a "Plugfest"(**) earlier this year in San Diego. Here is the summary post of what we built for that occasion: http://fsanglier.blogspot.com/2013/04/my-2013-afcea-san-diego-plugfest.html.

This time around, we entered the plugfest competition as a technology provider at the AFCEA Cyber Symposium, which happened last week (June 25-27 2013) in Baltimore. We not only provided technologies components and data feeds to the challengers (San Diego State University, GMU, Army PEO C3T milSuite), but also built a very cool Fraud Detection and Money Laundering demo which was 1 of the plugfest use case for this cyber event.

Our demo was centered around a fundamental "Big Data" question: How can you detect fraud on 100,000s transactions per seconds in real-time (which is absolutely critical if you don't want to lose lots of $$$$ to fraud) while efficiently incorporating in that real-time process data from external systems (i.e. data warehouse or hadoop clusters).
Or in more general words: How to reconcile Real-time processing and Batch processing when dealing with large amounts of data.

To answer this question, we put together a demo centered around Terracotta's In-Genius intelligence platform (http://terracotta.org/products/in-genius) which provides a highly scalable low-latency in-memory layer capable of "reconciling" the real-time processing needs (ultra low latency with large amounts of new transactions) with the traditional batch processing needs (100s of TB/PB processed in an asynchronous background jobs), all bundled in a simple software package deployable on any commodity hardware.

Here is the solution we assembled:

Cyber Plugfest Software Architecture
How to reconcile real-time and batch processing

A quick view at how it all works:
  1. A custom transaction simulator generates pseudo-random fictional credit card transactions and publish all of them onto a JMS topic (Terracotta Universal Messaging bus)
  2. Each JMS message is delivered through pub/sub messaging to both real-time and batch track:
    1. The Complex Event Processing (CEP) engine which will identify fraud in real-time through the use of continuous queries.
      1. See "Real-Time fraud detection route"
    2. Apache Flume, an open source platform which will efficiently and reliably route all the messages into HDFS for further batch processing.
      1. See "Batch Processing Route"
  3. Batch Processing Route:
    1. Apache hadoop to collect and store all the transaction data in its powerful batch-optimized file system
    2. Map-Reduce jobs to compute transaction trends (simplified rolling average in this demo case) on the full transaction data for each vendors, customer, or purchase types.
    3. Output of map-reduce jobs stored in Terracotta BigMemory Max In-Memory platform.
  4. Real-Time fraud detection route:
    1. CEP fraud detection queries fetch from Terracotta BigMemory Max (microsecond latency under load) the hadoop-calculated averages (in 3.2), and correlates those with the current incoming transaction to detect anomalies (potential fraud) in real-time.
    2. Mashzone, a mashup and data aggregation tool to provide visualization on detected fraud data.
    3. For other plugfest challengers and technology providers to be able to use our data, all our data feeds were also available in REST, SOAP, and Web socket formats (which were used by ESRI, Visual Analytics, and others)

As I hope you can see in this post, having a scalable and powerful in-memory layer acting as the middle man between Hadoop and CEP is the key to providing true real-time analysis while still taking advantage of all the powerful computing capabilities that Hadoop has to offer.

In further posts, I'll explain in more detail all the components and code (code and configs are available on github at https://github.com/lanimall/cyberplugfest).

Notes:


(*) "we" = The SoftwareAG Government Solutions team, which I'm part of...
(**) "Plugfest" = "collaborative competitive challenge where industry vendors, academic, and government teams work towards solving a specific set of "challenges" strictly using the RI2P industrial best practices (agile, open standard, SOA, cloud, etc.) for enterprise information system development and deployment." (source: http://www.afcea.org/events/west/13/plugfest.asp)

Friday, May 31, 2013

Custom framework for easy multi-threading

A big part of my job at Terracotta is to not only demonstrate the various features of Terracotta BigMemory (efficiently using all the RAM available on our server within a single java process, ease of use, high availability, consistency, multi-tenancy, etc...) but also the performance improvements (maximizing transaction/seconds or minimizing processing response times) you get by introducing BigMemory in your environment/application(s).
But to fully demonstrate how Terracotta BigMemory can provide and consistently guarantee Microsecond speed at TerraByte (TB) scale, I needed (still need) on a regular basis to be able to:
  1. Create an way to load Terracotta with massive amount of data, and
  2. Create a way to "flood" Terracotta BigMemory with massive amount of requests (gets, puts, searches)
  3. While making it easy to extend for any test use case, and 
  4. Allowing for the loading of all different "specialized" business objects into BigMemory (because customers don't want to necessarily test only with generic data structures or byte arrays)
What I started to do initially was to build for each new use case a different program using thread pool executor, queue, deques, etc...and doing a lot of copy pasting, and of course introducing some nice concurrency bugs at the same time :) !! (for example, I should have known the class "Random" is thread-safe...hence uses locks if accessed by multiple threads = not good for a concurrent framework)
So I decided to build a highly concurrent framework that I could rely on and could reuse over and over without having to rebuild the wheel each time, and especially without having to wonder: is the performance I'm measuring right now the actual performance of the system under test, or is it measuring - without my knowledge - the time taken by my threads to "lock" on top of each other!
Enter "JPerftester" (I wish I had found a more awesome name for it :) ) available on my github account at https://github.com/lanimall/JPerfTester.
I tried to architect it in a way that it's modular enough to use it for Terracotta-specific use cases, but possibly also for other use case as well (like for example using multi-threading to load billions of records into a DB, or do comparative load testing against other systems out there). Here is the structure:
  • Base -----> [maven base pom for global plugins/dependencies]
  • BaseEngine -----> [multi-threading base + generic framework objects and helpers]
  • TerracottaEhCacheTesterSuite -----> [Terracotta specific suite]
    • Base -----> [maven base pom for global Terracotta/Ehcache plugins/dependencies]
    • BaseEngine  -----> [specific framework objects for terracotta such as key-value pair operation executors etc...]
    • TesterClients  -----> [actual running examples]
      • CacheWriterTester  -----> [testing cache writing scenarii under load]
      • CustomCacheSearchTester  -----> [testing searches]
      • POJOCacheTester  -----> [testing get/puts with some actual POJOs]
      • ...there could be many more here
  • Other suite 1
  • Other suite 2
  • etc...
To build, simply go to JPerfTester root folder and run "mvn clean install"...that should be it to compile it all + package the sample TesterClients apps in tar.gz files ("dist" folders after build phase) that you can deploy anywhere you have JAVA installed (and Terracotta, since these sample app are terracotta testers)
In further posts, I'll go over in more detail how to run these test clients in your environment, and how to create a new "Tester Client" (terracotta-specific or not) for your needs.

Monday, April 1, 2013

My 2013 AFCEA San Diego Plugfest Participation - Showing off Terracotta In-Genius

A couple of weeks ago (end of January 2013), 2 colleagues and I participated (under our company banners, Terracotta and SoftwareAG) in a government "plugfest"...and we won first place! Check out this other articles that also talks about our win: http://ctovision.com/2013/02/plugfest-at-afcea-west-terracotta-wins-first-place - and if you're in a rush, jump directly to the short 3 minute video demo below.

What is that, you may ask? As explained on the AFCEA website (http://www.afcea.org/events/west/13/plugfest.asp), a plugfest is a "collaborative competitive" challenge where industry vendors, academic, and government teams work towards solving a specific set of "challenges" strictly using the RI2P industrial best practices (agile, open standard, SOA, cloud, etc.) for enterprise information system development and deployment.

The idea is to "plug" technologies together (technologies provided by the various players, not necessarily within your team) as opposed to rebuild everything from scratch. And indeed, "plugging" is almost mandatory since the scenario is only announced 24 hours before the event, giving the teams a mere 72 hours to create something based on the scenario provided. 

Overall, it's the government effort to encourage/push for more interoperability and reuse of IT components across projects and/or even agencies.

This particular January 2013 plugfest was about solving a Humanitarian Assistance and Disaster Relief (HADR) use case problem where technology:
  • Helps track in real-time what's happening on the ground (data streams about hazardous materials, first responders, sensors, injured civilians, etc...) and report it in an actionable, geospatially-enabled, format
  • Provides real-time decision support based on pre-defined emergency protocols
  • Correlates various "BigData" streams (sensors, social feeds, etc...) to perform real-time analytics in order to predict movements and/or identify "flash mobs" / criminal hotspots taking advantage of the confusion.
The end result of what we put together was a real-time "map" dashboard that shows everything that's happening on the ground, and provide contextual highlights to help decision support.
Here is a short 3 minute video showing the nuts and bolts of that demo:


What you particularly see in the video demo:
  • Moving actors on the disaster zone (first responders, plumes of toxicity, drones, etc...). Each of these actors are "broadcasting" their current geolocations (lat, long + metadata) at various time interval using the nirvana universal messaging (1000s of message per second)
  • Terracotta's Complex event processing (CEP) engine performing continuous "geo" queries identifying in real-time the distance, speed and direction of the hazardous plumes in comparison to the various red-cross shelters on the map. The CEP engine automatically generates alerts if hazardous plumes are indeed forecasted to impact shelters...providing critical decision support to the commander in charge.
  • All events and metadata are stored in-memory, using Terracotta BigMemory for faster, micro-second access and analytics.
  • The ability to drill into the moving drones, planes and responders to see a ground view in real-time.
  • A triaged based causality tracking and available blood supply.
  • Availability of shelters, red cross centers, blood banks, and other supporting organizations(DOD types).
List of what we "plugged":
Thanks for everyone who organized this event. It's been a blast to participate as part of Terracotta team, and I'm looking forward to participating in the next "plugfest" event!