Skip to content

Feed aggregator

dotCloud to deliver Keynote at MidwestPHP in March

DotCloud Blog - Wed, 01/23/2013 - 19:56

Team dotCloud's Jérôme Petazzoni to brave the Midwest chills to deliver keynote address at MidwestPHP conference.

MidwestPHP

MidwestPHP is a two-day conference in the heart of Minnesota featuring 40+ sessions by industry leaders covering a wide range of topics ranging from PHP basics for newbies to advanced PHP concepts, frameworks, databases, third party tools and components, and web development.

Keynote: Deploying PHP on PaaS - How & Why

PHP has been around for 17+ years. Back in the days, it was FTP, PhpMyAdmin, and sometimes CVS or SVN. Things have changed: git and mercurial have risen; everybody wants a “staging” or “pre-production” environment; replication, scaling, and load-balancers are not only for expensive, high-end projects; and developers want to mix & match languages, which led to the coming of “polyglot” platforms. We’ll see how Platform-as-a-Service has become a very solid offering for PHP hosting; the numerous advantages and productivity improvements that it provides; and how to embrace it in the most efficient way.

Here's the abstract.

Learn about dotCloud's PHP and PHP Worker services via dotCloud documentation.

Other Awesome Talks

You can see the complete list of sessions here but we are especially looking forward to:

Ticket Discount!

Use the code DOTCLOUD to save $30 (15%) on your ticket to MidwestPHP Conference. Register Now!

Permalink | Leave a comment  »

Categories: Companies

Your Guide to Cloudera @ Strata + Hadoop World This Week

Cloudera - Mon, 10/22/2012 - 14:00

Cloudera is co-presenting the sold-out Strata Conference + Hadoop World in New York this week, and if you’re an attendee, you have a great week ahead!

Here’s a quick guide to where you can find Clouderans during the conference. There are of course many other great activities planned as well that are not covered here.

Keynotes

Cloudera CEO Mike Olson (@mikeolson) takes the stage on Wednesday to kick off the plenary sessions, and Apache Hadoop Co-founder and Apache Software Foundation chair Doug Cutting (@cutting) is part of the Thursday morning plenary sessions. (See abstracts here.) Arrive early to make sure you have a seat; you won’t want to miss their vision for the future of Hadoop.

Presentations & Tutorials

As we have previously blogged, experts on the Cloudera team have put together some compelling and informative tutorials and presentations. They include:

  • Given Enough Monkeys – Some Thoughts on Randomness
  • Large Scale ETL with Hadoop
  • HDFS – What is New and Future
  • High Availability for the HDFS Namenode: Phase 2
  • Upcoming Enterprise Features in Apache HBase 0.96
  • Data Science on Hadoop: What’s There and What’s Missing
  • Designing Scalable Network Architectures for Fast Moving Big Data
  • Taming the Elephant – Learn How Monsanto Manages Their Hadoop Cluster to Enable Genome/Sequence Processing
  • An Introduction to Hadoop
  • Testing Hadoop Applications
  • Using HBase
  • Building a Large-Scale Data Collection System Using Flume NG
Free Books/Meet the Authors

The week is a great one for Hadoop book fans – book authors Tom White (@tom_e_white)Lars George (@larsgeorge)Eric Sammer (@esammer)Amandeep Khurana (@amansk) and Nick Dimiduk (@xefyr) be available in the Cloudera booth to sign your copies and answer your questions. If you don’t have a copy yet, we’ll be giving away 100 copies of each book on Wednesday and Thursday respectively during specific times – check out the book giveaway schedule here.

Meetups

Make the most of your evenings and attend one of the many meetups taking place. It’s a great way to network and meet others that are working on similar projects and challenges. Check the schedule here.

Find Cloudera at Booth #100

Visit our booth to check out our demos, learn what’s new, engage with our team and get a free t-shirt or book. If you can’t attend, at least follow us on Twitter and join the conversation. #strataconf

We look forward to seeing you in New York City!

Categories: Companies

Sneak Peek into Skybox Imaging’s Cloudera-powered Satellite System

Cloudera - Fri, 10/19/2012 - 22:00

This is a guest post by Oliver Guinan, VP Ground Software, at Skybox Imaging. Oliver is a 15-year veteran of the internet industry and is responsible for all ground system design, architecture and implementation at Skybox.

One of the great promises of the big data movement is using networks of ubiquitous sensors to deliver insights about the world around us. Skybox Imaging is attempting to do just that for millions of locations across our planet.

Skybox is developing a low cost imaging satellite system and web-accessible big data processing platform that will capture video or images of any location on Earth within a couple of days. The low cost nature of the satellite opens the possibility of deploying tens of satellites which, when integrated together, have the potential to image any spot on Earth within an hour.

Skybox satellites are designed to capture light in the harsh environment of outer space. Each satellite captures multiple images of a given spot on Earth. Once the images are transferred from the satellite to the ground, the data needs to be processed and combined to form a single image, similar to those seen within online mapping portals.

With any sensor network, capturing raw data is only the beginning of the story. We at Skybox are building a system to ingest and process the raw data, allowing data scientists and end users to ask arbitrary questions of the data, then publish the answers in an accessible way and at a scale that grows with the number of satellites in orbit. We selected Cloudera to support this deployment.

Processing raw imagery is a complex computer vision task that involves many pixel-level calculations over multiple images. Image Scientists create algorithms in C and C++ to efficiently perform these calculations. Hadoop prefers MapReduce jobs written in Java, so we have developed a proprietary framework called BusBoy to wrap the native algorithms into a standard Hadoop job. This allows our Hadoop engineers to develop efficient storage and publication solutions while our Image Scientists focus on developing better image processing algorithms.

Developing against CDH and using Puppet to manage our deployed extensions and configurations allows Skybox to develop our architecture on our in-house cluster. Once the solution is robust, we then have the option to deploy our solution at scale using Amazon’s EC2 hardware or other scalable computation and storage platforms. We have tested a large number of hardware configurations to validate our scalability assumptions and to determine the right balance between CPU, memory, disk, and network resources. This information informs the purchasing process for our next in-house cluster.

Making all data available on spinning disk allows data scientists to efficiently ask any question of the data. Traditional systems tend to archive older data to tape based systems. This makes speculative examination of the data prohibitively expensive. The Hadoop ecosystem of large scale compute and storage coupled with Apache Oozie‘s ability to chain complex processing jobs together that publish results to accessible, structured storage in Apache Hive and Apache HBase is allowing Skybox to create a sensor network that takes the pulse of the planet 24×7.

About Skybox Imaging

Skybox Imaging is a commercial, remote sensing start-up revolutionizing access to information that describes daily activity on our planet. Founded in 2009 and backed by leading venture firms, the company is designing, manufacturing, and operating the world’s first coordinated constellation of high-resolution microsatellites. With its constellation, Skybox will deliver timely, global imagery and video as well as an analytics platform capable of creating new sources of value from such data. Skybox is headquartered in Mountain View, California, and was named to MIT Technology Review’s “Top 50 Most Innovative Companies” for 2012. For more information, visit www.skyboximaging.com or follow Skybox Imaging on Twitter.

Categories: Companies

Apache Hadoop 2.0.2-alpha Released

Cloudera - Fri, 10/19/2012 - 15:00

Earlier this month the Apache Hadoop PMC released Apache Hadoop 2.0.2-alpha, which fixes over 600 issues since the previous release in the 2.0 series, 2.0.1-alpha, back in July. This is a tremendous rate of development, of which all contributors to the project should feel proud.

Some of the more noteworthy changes in this release include:

  • HDFS HA supports automatic failover using ZooKeeper (HDFS-3042).
  • The FUSE-DFS module now supports secure HDFS clusters (HDFS-3568).
  • The (non-standard) Kerberos over SSL has been replaced with SPNEGO for image transfers and for secure HDFS web access in general (HDFS-2617).
  • SASL encryption can be enabled for block data transfers in HDFS (HDFS-3637), and the MapReduce shuffle can be encrypted using HTTPS (MAPREDUCE-4417). There is also HTTPS support for the web UIs (HADOOP-8581).
  • A new type of Hadoop Metric, a quantile metric, has been added to provide latency histograms for various HDFS metrics (HDFS-3650).
  • The Capacity Scheduler now supports delay scheduling (YARN-80).
  • There are various performance improvements including support for fadvise in the shuffle handler (MAPREDUCE-3289) and datanode (HDFS-3697)
  • YARN is now a subproject of Hadoop (YARN-1). The separation will make it easier for folks who want to write YARN applications that are independent of MapReduce. (See Harsh Chouraria’s “MR2 and YARN Briefly Explained” post for more on the relationship between YARN and MapReduce.)
Try It Out!

You can download the release from an Apache mirror. Alternatively, you can try CDH 4.1, since it includes most of the changes from Apache Hadoop 2.0.2-alpha. Note that MR2 in CDH 4.1 is still experimental—in line with the Apache release—however MR1 in CDH 4.1 is stable and fully supported in production.

A Note on Release Numbering

Historically the numbering of Apache Hadoop releases has been somewhat confusing, but things have improved since the Hadoop community voted to adopt 1.x for the current stable branch (renamed from the 0.20.x series) and the 2.x branch for the new line of development (previously 0.23.x), which is still currently unstable as mentioned above.

Some confusion lingers in that there is still an 0.23 branch which is still producing releases (Robert Evans is the release manager). However this branch is a special case: it is an earlier version of the branch-2 line that Yahoo! is using to stabilize YARN for their own use, with plans to move to a 2.x sometime next year. The Yahoo! Hadoop team are also backporting fixes in the 2.x branch to the 0.23 branch as needed, and of course all changes that go into 0.23 go into trunk and 2.x first, so all the valuable stabilization work they are doing will benefit future 2.x releases.  From a feature point of view, the biggest difference between 0.23 and 2.x is that 0.23 lacks HDFS High Availability.

Acknowledgements

I would like to thank the many people from many different organizations who contributed to this release—from the smallest bug report to the largest feature, all contributions are appreciated. Also, thanks to Arun C Murthy who acted as release manager for this release.

Categories: Companies

What’s New in CDH4.1 Hue

Cloudera - Thu, 10/18/2012 - 21:24

Hue is a Web-based interface that makes it easier to use Apache Hadoop. Hue 2.1 (included in CDH4.1) provides a new application on top of Apache Oozie (a workflow scheduler system for Apache Hadoop) for creating workflows and scheduling them repetitively. For example, Hue makes it easy to group a set of MapReduce jobs and Hive scripts and run them every day of the week.

In this post, we’re going to focus on the Workflow component of the new application.

Workflow Editor

Workflows consist of one or multiple actions that can be executed sequentially or in parallel. Each action will run a program that can be configured with parameters (e.g. output=${OUTPUT} instead of hardcoding a directory path) in order to be easily reusable.

The current types of programs are:

  • MapReduce
  • Pig
  • Hive
  • Sqoop
  • Java
  • Shell
  • Ssh
  • Streaming jobs
  • DistCp

The application comes with a set of examples:


Workflows can be shared with other users and cloned. Forks are supported and enable actions to run at the same time. The Workflow Editor lets you compose your workflow.

Let’s take the Sequential Java (aka TeraSort) example and add an Hive action, HiveGen, that will generate some random data. TeraGen is a MapReduce job doing the same thing and both actions will run in parallel. Finally, the TeraSort action will read both outputs and sort them together You can see how this would look in Hue via the screenshot below.

Workflow Dashboard

Our TeraGen workflow can then be submitted and controlled in the Dashboard. Parameters values (e.g. ${OUTPUT} of the output path of the TeraSort action) are prompted when clicking on the submit button.

Jobs can be filtered/killed/restarted and detailed information (progress, logs) is available within the application and in the Job Browser Application.

Individual management of a workflow can be done on its specific page. We can see the active actions in orange below:

Summary

Before CDH4.1, Oozie users had to deal with XML files and command line programs. Now, this new application allows users to build, monitor and control their workflows within a single Web application. Moreover, the Hue File Browser (for listing and uploading workflows) and Job Browser (for accessing fine grained details of the jobs) are leveraged.

The next version of the Oozie application will focus on improving the general experience, increasing the number of supported Oozie workflows and prettifying the Editor.

In the meantime, feel free to report feedback and wishes to hue-user!

Categories: Companies

What’s New in CDH4.1 Pig

Cloudera - Thu, 10/18/2012 - 15:00

Apache Pig is a platform for analyzing large data sets that provides a high-level language called Pig Latin. Pig users can write complex data analysis programs in an intuitive and compact manner using Pig Latin.

Among many other enhancements, CDH4.1, the newest release of Cloudera’s open-source Hadoop distro, upgrades Pig from version 0.9 to version 0.10. This post provides a summary of the top seven new features introduced in CDH4.1 Pig.

Boolean Data Type

Pig Latin is continuously evolving. As with other actively developed programming languages, more data types are being added to Pig. CDH4.1 adds the boolean type. The boolean type is internally mapped to the Java Boolean class, and the boolean constants ‘TRUE’ and ‘FALSE’ are case-insensitive. Here are some example uses of boolean type:

a = LOAD 'a.txt' AS (a0:boolean, a1:(a10:boolean), a2);
b = FOREACH a GENERATE a0, a1, (boolean)a2;
c = FILTER b BY a2 == TRUE;

Note that if you have UDFs that implement the LoadCaster and StoreCaster interfaces in releases prior to CDH4.1, they will have to be modified to implement new methods called bytesToBoolean() and toBytes(), which were added to LoadCaster and StoreCaster respectively.

Nested FOREACH and CROSS

Pig now supports CROSS and FOREACH within a FOREACH statement in addition to already supported operators such as DISTINCT, FILTER, LIMIT, and ORDER BY. A nested FOREACH is particularly useful when you want to iterate through nested relations grouped by the same key. For example, if you want to compute the Cartesian product of two nested relations co-grouped by the same key and do some processing on them, you can achieve this in a concise manner as follows:

a = LOAD 'a.txt' AS (a0, a1);
b = LOAD 'b.txt' AS (b0, b1);
c = COGROUP a BY a0, b BY b0;
d = FOREACH c {
    d0 = CROSS a, b;
    d1 = FOREACH d0 GENERATE a1 + b1;
    GENERATE d1;
}
dump d;

Note that only two levels of nesting are supported.

Ruby UDFs

Scripting UDFs are not new to Pig. In fact, Python and JavaScript have been supported since CDH3. But in CDH4.1, another popular scripting language is added: Ruby. Just as Python UDFs interact with Pig via Jython, Ruby UDFs do the same via JRuby internally.

To register Ruby UDFs in Pig scripts, the REGISTER command is used as follows:

REGISTER 'myfuncs.rb' USING jruby AS myfuncs;

Similar to Python UDFs, there are two ways of defining the return type of UDFs: defining it in Pig syntax with an ‘outputSchema’ decorator if the return type is static, or defining a ‘schemaFunction’ if the return type is dynamic. For more details on Ruby UDF decorators, please refer to the Pig 0.10 documentation.

LIMIT / SPLIT by Expression

Prior to CHD4.1, only constants were allowed in the expression of LIMIT and SPLIT. In CDH4.1, the expression may include scalar variables as well. Here is an example of getting the top 10% of records from a relation using LIMIT:

a = LOAD '1.txt' AS val;
b = GROUP a ALL;
c = FOREACH b GENERATE COUNT(a) AS sum;
d = ORDER a BY val;
e = LIMIT d c.sum/10;

It is worth noting that only scalar variables can be used in the expression; column references cannot be used. In the above example, ‘c.sum’ is implicitly cast to a scalar since ‘c’ is a relation that contains a single long type record. A statement like e = LIMIT d val is not valid.

Default SPLIT Destination

The SPLIT operator is used to partition records of a relation into multiple relations. Prior to CDH4.1, records that did not meet any condition in the expressions were simply discarded. But in CDH4.1 it is possible to define the default destination via OTHERWISE as follows:

SPLIT a INTO x IF $0>10, y IF $0>5, z OTHERWISE;

Note that this feature introduces a new keyword – OTHERWISE, and this may break existing Pig scripts if they use it as an alias.

Syntactical Sugar for TOTUPLE, TOBAG, and TOMAP

Syntactic sugar for TOTUPLE, TOBAG, and TOMAP has been added. Now Pig automatically converts ‘( )’, ‘{ }’, and ‘[ ]’ to tuple, bag, and map respectively.

b = FOREACH a GENERATE (x, y); /* ((x,y))     */
c = FOREACH a GENERATE {x, y}; /* ({(x),(y)}) */
d = FOREACH a GENERATE [x, y]; /* ([x#y])     */
AvroStorage Improvements

AvroStorage now supports path globbing. Since Hadoop’s path globbing is used internally, the syntax is the same as that of Hadoop:

a = LOAD '/{foo,bar}/*.avro' USING AvroStorage();

AvroStorage can also load and store an Avro record type that has self-referencing fields. The problem with self-referencing fields is that it is not possible to convert them to Pig schema because recursive records are by definition infinite. As a workaround, Pig now converts them to bytearrays when detecting recursive records; therefore, it is possible to load and store an Avro record type that has self-referencing fields. Here is an example:

a = LOAD '1.avro' USING AvroStorage('no_schema_check');

STORE a INTO '2' USING AvroStorage('no_schema_check', 'schema', '
   { "type" : "record",
     "name" : "recursive_record",
     "fields" : [ { "name" : "value",
                    "type" : "int" },
                  { "name" : "next",
                    "type" : [ "null", "recursive_record" ] } ]
   }
');

STORE a INTO '3' USING AvroStorage('no_schema_check', 'same', '1.avro');

Note that a new option ‘no_schema_check’ is passed to both the load function and store function. This is necessary because by mapping recursive records to bytearrays, discrepancies between Avro and Pig schemas are introduced. Therefore, we must disable schema check or the load and store will fail with an exception during job compilation. For the store function, there are two ways to specify output schemas. Either it can be specified via the ‘schema’ option with a JSON string, or the schema of an existing Avro file can be used via the ‘same’ option.

There are more new features coming in a future release including but not limited to a date/time data type, the rank function, and a cube operator. These will let you write even more powerful and concise Pig scripts.

Please try out the CDH4.1 Pig today. It can be downloaded from the Downloads page.

Categories: Companies

Axemblr’s Java Client for the Cloudera Manager API

Cloudera - Wed, 10/17/2012 - 19:00

Axemblr, purveyors of a cloud-agnostic MapReduce Web Service, have recently announced the availability of an Apache-licensed Java Client for the Cloudera Manager API.

The task at hand, according to Axemblr, is to ”deploy Hadoop on Cloud with as little user interaction as possible. We have the code to provision the hosts but we still need to install and configure Hadoop on all nodes and make it so the user has a nice experience doing it.” And voila, the answer is Cloudera Manager, with the process made easy via the REST API introduced in Release 4.0.

Thus, says Axemblr: “In the pursuit of our greatest desire (second only to coffee early in the morning), we ended up writing a Java client for Cloudera Manager’s API. Thus we achieved to automate a CDH3 Hadoop installation on Amazon EC2 and Rackspace Cloud. We also decided to open source the client so other people can play along.”

Congrats to Axemblr for building this client and bringing the code to the community, and let a thousand others bloom! (And by the way – if there is anyone else writing similar wrappers or plugins out there, please let us know in comments.)

Categories: Companies

Analyzing Twitter Data with Hadoop, Part 2: Gathering Data with Flume

Cloudera - Tue, 10/16/2012 - 15:00

This is the second article in a series about analyzing Twitter data using some of the components of the Hadoop ecosystem available in CDH, Cloudera’s open-source distribution of Hadoop and related projects. In the first article, you learned how to pull CDH components together into a single cohesive application, but to really appreciate the flexibility of each of these components, we need to dive deeper.

Every story has a beginning, and every data pipeline has a source. So, to build Hadoop applications, we need to get data from a source into HDFS.

Apache Flume is one way to bring data into HDFS using CDH. The Apache Flume website describes Flume as “a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.” At the most basic level, Flume enables applications to collect data from its origin and send it to a resting location, such as HDFS. At a slightly more detailed level, Flume achieves this goal by defining dataflows consisting of three primary structures: sources, channels and sinks. The pieces of data that flow through Flume are called events, and the processes that run the dataflow are called agents.

In the Twitter example, we used Flume to collect data from the Twitter Streaming API, and forward it to HDFS. Looking closer at the Flume pipeline from that example, we come away with a system like this:

In the rest of this post, we’ll take an in-depth look at the pieces of Flume that are used to build dataflows, and specifically, how they were used in the example.

Sources

A source is just what it sounds like: the part of Flume that connects to a source of data, and starts the data along its journey through a Flume dataflow. A source processes events and moves them along by sending them into a channel. Sources operate by gathering discrete pieces of data, translating the data into individual events, and then using the channel to process events one at a time, or as a batch.

Sources come in two flavors: event-driven or pollable. The difference between event-driven and pollable sources is how events are generated and processed. Event-driven sources typically receive events through mechanisms like callbacks or RPC calls. Pollable sources, in contrast, operate by polling for events every so often in a loop. Another good way to frame this differentiation is as a push-versus-pull model, where event-driven sources have events pushed to them, and pollable sources pull events from a generator.

Examining the TwitterSource

In our Twitter analysis example, we built a custom source called TwitterSource. To understand how sources operate more thoroughly, let’s look at how the TwitterSource was built. We can start with a very generic piece of boilerplate code:

/**
 * A template for a custom, configurable Flume source
 */
public class BoilerplateCustomFlumeSource extends AbstractSource
    implements EventDrivenSource, Configurable {

  /**
   * The initialization method for the Source. The context contains all the
   * Flume configuration info, and can be used to retrieve any configuration
   * values necessary to set up the Source.
   */
  @Override
  public void configure(Context context) {
    // Get config params with context.get* methods
    // Example: stringParam = context.getString("stringParamName")
  }

  /**
   * Start any dependent systems and begin processing events.
   */
  @Override
  public void start() {
    // For an event-driven source, the start method should spawn
    // a thread that will receive events and forward them to the
    // channel
    super.start();
  }

  /**
   * Stop processing events and shut any dependent systems down.
   */
  @Override
  public void stop() {
    super.stop();
  }
}
  

With this code, we have a configurable source that we can plug into Flume, although at this stage, it won’t do anything.

The start() method contains the bulk of the source’s logic. In the TwitterSource, the twitter4j library is used to get access to the Twitter Streaming API, using this block of code:

// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute callback methods every time a message comes in
// through the stream.
StatusListener listener = new StatusListener() {
  // The onStatus method is a callback executed when a new tweet comes in.
  public void onStatus(Status status) {
    Map headers = new HashMap();
    // The EventBuilder is used to build an event using the headers and
    // the raw JSON of a tweet
    headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
    Event event = EventBuilder.withBody(
        DataObjectFactory.getRawJSON(status).getBytes(), headers);

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException e) {
      // If we catch a channel exception, it’s likely that the memory channel
      // does not have a high enough capacity for our rate of throughput, and
      // we tried to put too many events in the channel. Error handling or
      // retry logic would go here.
      throw e;
    }
  }
       
  // This listener will ignore everything except for new tweets
  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
  public void onScrubGeo(long userId, long upToStatusId) {}
  public void onException(Exception ex) {}
};

The StatusListener implements a set of callbacks that will be called when receiving a new tweet, represented by a Status object. There are other callbacks available but for the purposes of this source, we’re only concerned with new tweets. As can be seen in the TwitterSource, the StatusListener is created and registered in the start() method.

Looking a bit closer, we can pick out the code that actually builds an event out of a tweet:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
      DataObjectFactory.getRawJSON(status).getBytes(), headers));

The EventBuilder interface takes a byte array and an optional set of headers, and creates an event, which we’re putting on the end of a list. The source processes events as they come in and passes them along to the channel:

channel.processEvent(event);

In order to connect to the Twitter APIs, we need access to some application-specific secrets. In the TwitterSource, these are variables like the consumerKey and consumerSecret, which are used to setup the Twitter stream:

twitterStream.setOAuthConsumer(consumerKey, consumerSecret);

So, where did the consumerKey and consumerSecret get defined? For this source, these variables are configuration parameters. Taking a look at the configure() method, we can see where the variables are defined:

consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);

The context object contains all the configuration parameters for the source, which can be pulled out and stored in instance variables using a variety of get accessors.

With this code in place, the custom source will be able to process tweets as events. The next step is to define where those events should go and how they should get there.

Configuring the Flume Agent

Before we discuss how to actually configure a Flume agent, we need to know what a configuration looks like. For the Twitter analysis example, we used this configuration:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = [required]
TwitterAgent.sources.Twitter.consumerSecret = [required]
TwitterAgent.sources.Twitter.accessToken = [required]
TwitterAgent.sources.Twitter.accessTokenSecret = [required]
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

Each object that is defined will be referenced by these names throughout the rest of the configuration. Most Flume configuration entries will follow a format very similar to the configuration of log4j appenders. An entry will look like [agent_name].[object_type].[object_name].[parameter_name], where [object_type] is one of sources, channels, or sinks.

Channels

Channels act as a pathway between the sources and sinks. Events are added to channels by sources, and later removed from the channels by sinks. Flume dataflows can actually support multiple channels, which enables more complicated dataflows, such as fanning out for replication purposes.

In the case of the Twitter example, we’ve defined a memory channel:

TwitterAgent.channels.MemChannel.type = memory

Memory channels use an in-memory queue to store events until they’re ready to be written to a sink. Memory channels are useful for dataflows that have a high throughput; however, since events are stored in memory in the channel, they may be lost if the agent experiences a failure. If the risk of data loss is not tolerable, this situation can be remedied using a different type of channel – i.e., with one that provides stronger guarantees of data durability like a FileChannel.

Sinks

The final piece of the Flume dataflow is the sink. Sinks take events and send them to a resting location or forward them on to another agent. In the Twitter example, we utilized an HDFS sink, which writes events to a configured location in HDFS.

The HDFS sink configuration we used does a number of things: First, it defines the size of the files with the rollCount parameter, so each file will end up containing 10,000 tweets. It also retains the original data format, by setting the fileType to DataStream and setting writeFormat to Text. This is done instead of storing the data as a SequenceFile or some other format. The most interesting piece, however, is the file path:

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

The file path, as defined, uses some wildcards to specify that the files will end up in a series of directories for the year, month, day and hour during which the events occur. For example, an event that comes in at 9/20/2012 3:00PM will end up in HDFS at hdfs://hadoop1:8020/user/flume/tweets/2012/09/20/15/.

Where does the timestamp information come from? If you’ll recall, we added a header to each event in the TwitterSource:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));

This timestamp header is used by Flume to determine the timestamp of the event, and is used to resolve the full path where the event should end up.

Starting the Agent

Now that we understand the configuration of our source, channel and sink, we need to start up the agent to get the dataflow running. Before we actually start the agent, we need to set the agent to have the appropriate name as defined in the configuration.

The file /etc/default/flume-ng-agent contains one environment variable defined called FLUME_AGENT_NAME. In a production system, for simplicity, the FLUME_AGENT_NAME will typically be set to the hostname of the machine on which the agent is running. However, in this case, we set it to TwitterAgent, and we’re ready to start up the process.

We can start the process by executing

$ /etc/init.d/flume-ng-agent start

Once it’s going, we should start to see files showing up in our /user/flume/tweets directory:

natty@hadoop1:~/source/cdh-twitter-example$ hadoop fs -ls /user/flume/tweets/2012/09/20/05
  Found 2 items
  -rw-r--r--   3 flume hadoop   255070 2012-09-20 05:30 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893253
  -rw-r--r--   3 flume hadoop   538616 2012-09-20 05:39 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893254.tmp

As more events are processed, Flume writes to files in the appropriate directory. A temporary file, suffixed with .tmp, is the file currently being written to. That .tmp suffix is removed when Flume determines that the file contains enough events or enough time has passed to roll the file. Those thresholds are determined in the configuration of the HDFS sink, as we saw above, by the rollCount and rollInterval parameters, respectively.

Conclusion

In this article, you’ve seen how to develop a custom source and process events from Twitter. The same approach can be used to build custom sources for other types of data. Also, we’ve looked at how to configure a basic, complete dataflow for Flume, to bring data into HDFS as it is generated. A distributed filesystem isn’t particularly useful unless you can get data into it, and Flume provides an efficient, reliable way to achieve just that.

Jon Natkins (@nattybnatkins) is a Software Engineer at Cloudera

Categories: Companies

HBase at ApacheCon Europe 2012

Cloudera - Mon, 10/15/2012 - 23:46

Apache HBase will have a notable profile at ApacheCon Europe next month. Clouderan and HBase committer Lars George has two sessions on the schedule:

  • HBase Sizing and Schema Design
    Abstract: This talk will guide the HBase novice to consider important details when designing HBase backed storage systems. Examples of schemas are given and discussed, as well as rules of thumb that will help to avoid common traps. With the right knowledge of how HBase works internally, it will be much easier to come to terms with performance implications of different data schemas.
  • HBase Status Quo
    Abstract: This talk focuses on what happened to HBase since version 0.90. The idea is to introduce and discuss all the major changes in 0.92, 0.94, and trunk, aka 0.96. This spans from coprocessors and security, to distributed log splitting in 0.92, to prefix compression and lazy seek optimizations in 0.94 and so on. But also more subtle – yet often equally important – features like WALPlayer, or the handling of checksums, are presented as they improve operations and performance. The goal is to show the audience the large strides HBase and its community have taken towards a 1.0 release.

HBase user Christian Gügi of Sentric in Zurich, who is also an organizer of the Swiss Big Data User Group, has a session as well:

  • Operating HBase: Things You Need to Know
    Abstract: If you’re running HBase in production, you have to be aware of many things. In this talk we will share our experience in running and operating an HBase production cluster for a customer. To avoid common pitfalls, we’ll discuss problems and challenges we’ve faced as well as practical solutions (real-world techniques) for repair. Even though HBase provides internal tools for diagnosing issues and for repair, running a healthy cluster can still be challenging for an administrator. We’ll cover some background on these tools as well as on HBase internals such as compaction, region splits and their distribution. 

Finally, Steve Watt of HP offers some learning from his company’s experiences with respect to HBase:

  • Taking the Guesswork Out of Your Hadoop Infrastructure
    Abstract: Apache Hadoop is clearly one of the fastest growing big data platforms to store and analyze arbitrarily structured data in search of business insights. However, applicable commodity infrastructures have advanced greatly in the last number of years and there is not a lot of information to assist the community in optimally designing and configuring Hadoop Infrastructure based on specific requirements. For example, how many disks and controllers should you use? Should you buy processors with 4 or 6 cores? Do you need a 1GbE or 10GbE Network? Should you use SATA or MDL SAS? Small or Large Form Factor Disks? How much memory do you need? How do you characterize your Hadoop workloads to figure out whether your are I/O, CPU, Network or Memory bound? In this talk we’ll discuss the lessons learned and outcomes from the work HP has done to optimally design and configure infrastructure for both MapReduce and HBase. 

So, if you happen to be in Sinsheim, Germany, during the first week of November, you could do worse with your free time than brush up on your HBase knowledge at ApacheCon!

 

Categories: Companies

Meet the Engineer: Todd Lipcon

Cloudera - Fri, 10/12/2012 - 15:00

Todd

In this installment of “Meet the Engineers”, meet Todd Lipcon (@tlipcon), PMC member/committer for the Hadoop, HBase, and Thrift projects.

What do you do at Cloudera, and in which Apache project are you involved?

I’m a software engineer in Cloudera’s Platform Engineering group, more specifically on the HDFS team. In my time at Cloudera I’ve also done a significant amount of work on other components of CDH including Hue, HBase, and MapReduce. I spend most of my time developing new features for these open source components – recently I’ve been primarily focused on designing and implementing High Availability for the HDFS NameNode. I’m also a performance nut, and have spent a lot of time over the last several years improving the performance of the Hadoop stack.

Why do you enjoy your job?

Getting paid to work on open source is pretty great. Almost all of my time goes to working with the open source community, whether it’s developing new code, reviewing patches from new contributors, or speaking at events like Hadoop user groups and conferences. From an engineering perspective, Hadoop is quite interesting and challenging to work on. It runs at enormous scale and in critical production workloads at some of the world’s biggest companies, so you really have to think through all the corner cases to make a robust design and implementation. (If you’re interested in learning more about the Cloudera work environment, I recently wrote a more lengthy post on this.)

Systems programming is particularly interesting to me since it encourages a “full-stack” perspective. To make Hadoop efficient, you have to really understand all the layers of the system, from the Linux kernel to the JVM to TCP/IP up through distributed consensus protocols. Jumping between these layers to solve bugs or improve performance keeps every day fresh and interesting.

What is your favorite thing about Hadoop?

From an engineer’s perspective, I like working on Hadoop because it’s very challenging. There are very few open source systems that operate at this kind of scale or are making this big of an impact. From a user’s perspective, I think Hadoop is exciting because it levels the playing field between technical powerhouses like Google who have built this kind of technology in-house and more traditional enterprises. I imagine that working on Hadoop today is very much like what it was like to work on Linux in the mid to late 90s.

What is your advice for someone who is interested in participating in any open source project for the first time?

Walk before you run. One mistake I’ve seen new contributors make is that they try to start off with a huge chunk of work at the core of the system. Instead, learn your way around the source code by doing small improvements, bug fixes, etc. Then, when you want to propose a larger change, the rest of the community will feel more comfortable accepting it. One great way to build karma in the community is to look at recently failing unit tests, file bugs, and fix them up.

At what age did you become interested and programming, and why?

I started out with Apple Basic and LOGO on an Apple IIc when I was 5 or 6 years old, probably because there weren’t that many exciting games to play on the machine, and drawing spirographs and making “guess-the-number” games was pretty cool. We even had some kind of adapter to hook up to our TV and display color! I progressed from there through various other beginner languages until Perl and C++ when I was 14 or so. By that point, I’d say I was interested because it was more challenging than working at a grocery store and paid a bit better too!

Look for our next “Meet the Engineer” profile in a week or two. See you then!

Categories: Companies

New Additions to the HBase Team

Cloudera - Fri, 10/12/2012 - 03:21

StumbleUpon (SU) and Cloudera have signed a technology collaboration agreement. Cloudera will support the SU clusters, and in exchange, Cloudera will have access to a variety of production deploys on which to study and try out beta software.

As part of the agreement, the StumbleUpon HBase+Hadoop team — Jean-Daniel Cryans, Elliott Clark and I — have joined Cloudera. From our new perch up in the Cloudera San Francisco office — 10 blocks north and 11 floors up — we will continue as first-level support for SU clusters, tending and optimizing them as we have always done. The rest of our time will be spent helping develop Apache HBase as the newest additions to Cloudera’s HBase team.

We do not foresee this transition disrupting our roles as contributors to HBase. If anything, we look forward to contributing even more than in the past.

As we see it, our job at SU was effectively done. We had put in place a stable, scalable data store used both for low latency serving of the SU frontend, and by bigger, backend batch clusters used by scientists and analysts running all kinds of processing and reporting MapReduce jobs. The front-end clusters are set up so they replicate to the batch and backup clusters across datacenters. All clusters are multi-tenant serving a variety of schemas, features and a wide breadth of access patterns. As the SU dataset  and processing demands continue to grow, all they need do to scale their data store is add boxes.

While we once furiously made HBase customizations to facilitate new SU features, there is less of that of late and most of our SU-specific work has long since been pushed upstream. We reached a state whereby software updates to the SU HBase+Hadoop stack, apart from the odd bug fix, came whenever the HBase project put up a new release candidate for the community to try. At SU we tested the release candidate by rolling it up through the SU clusters from dev, through batch, and eventually out to the front-end-serving cluster if all proved stable.

We therefore were spending most of our time working in open source on the HBase project helping releases along, with the remainder spent operating our HBase+Hadoop clusters and educating folks a bit about how best to use the datastore (and Hadoop). It became apparent after a while that if we could hand off operations, there was little to tie us to SU in particular.

Cloudera has a wide variety of customer HBase installs. It also already has a strong HBase team who, to tell you the truth, were barely keeping up with growth in the customer base. We wanted to give them a helping hand.  We also wanted to be able to take on some larger features and fix-ups — snapshots and backups, to mention but a few — projects that take more than a single developer and a weekend to finish. Being part of a larger team, we will be able to do this. At Cloudera, we would also be better positioned to improve HBase integration with the rest of the Hadoop stack. Finally, Cloudera is in the support business. We could set up an arrangement whereby we could continue to look after SU.

This move strikes us as a natural progression – “coming home,” as Mike Olson calls it. We are super-psyched to be joining Cloudera even if we are going to miss our old haunt, the super supportive SU, which generously sponsored and pioneered HBase these last three years.

– Michael Stack

Categories: Companies

Set Up a Hadoop/HBase Cluster on EC2 in (About) an Hour

Cloudera - Thu, 10/11/2012 - 17:00

Today we bring you one user’s experience using Apache Whirr to spin up a CDH cluster in the cloud. This post was originally published here by George London (@rogueleaderr) based on his personal experiences; he has graciously allowed us to bring it to you here as well in a condensed form. (Note: the configuration described here is intended for learning/testing purposes only.)

I’m going to walk you through a (relatively) simple set of steps that will get you up and running MapReduce programs on a cloud-based, six-node distributed Hadoop/HBase cluster as fast as possible. This is all based on what I’ve picked up on my own, so if you know of better/faster methods, please let me know in comments!

We’re going to be running our cluster on Amazon EC2, and launching the cluster using Apache Whirr and configuring it using Cloudera Manager Free Edition.  Then we’ll run some basic programs I’ve posted on Github that will parse data and load it into Apache HBase.

All together, this tutorial will take a bit over one hour and cost about $10 in server costs.

Step 1: Get the Cluster Running

I’m going to assume you already have an Amazon Web Services account (because it’s awesome, and the basic tier is free.) If you don’t, go get one. Amazon’s directions for getting started are pretty clear, or you can easily find a guide with Google. We won’t actually be interacting with the Amazon management console much, but you will need two pieces of information, your AWS Access Key ID and your AWS Secret Access Key.

To find these, go to https://portal.aws.amazon.com/gp/aws/securityCredentials. You can write these down, or better yet add them to your shell startup script by doing:

$ echo "export AWS_ACCESS_KEY_ID=" > ~/.bashrc
$ echo "export AWS_SECRET_ACCESS_KEY="your_key_here>" > ~/.bashrc
$ exec $SHELL 

You will also need a security certificate and private key that will let you use the command-line tools to interact with AWS. From the AWS Management Console go to Account > Security Credentials > Access Credentials, select the “X.509 Certificates” tab and click on Create a new Certificate. Download and save this somewhere safe (e.g. ~/.ec2)

Then do:

$ export EC2_PRIVATE_KEY=~/.ec2/.pem
$ export EC2_CERT=~/.ssh/.pem

Finally, you’ll need a different key to log into your servers using SSH. To create that, do:

$ mkdir ~/.ec2
$ ec2-add-keypair --region us-east-1 hadoop | sed 1d > ~/.ec2/hadoop
$ chmod 600 ~/.ec2/hadoop
(to lock down the permissions on the key so that SSH will agree to use it)
 

You have the option of manually creating a bunch of EC2 nodes, but that’s a pain. Instead, we’re going to use Whirr, which is specifically designed to allow push-button setup of clusters in the cloud.

To use Whirr, we are going to need to create one node manually, which we are going to use as our “control center.” I’m assuming you have the EC2 command-line tools installed (if not, go here and follow directions).

We’re going to create an instance running Ubuntu 10.04 (it’s old, but all of the tools we need run on it in stable fashion), and launch it in the USA-East region. You can find AMIs for other Ubuntu versions and regions here.

So, do:

$ ec2-run-instances ami-1db20274 -k "hadoop"

This creates an EC2 instance using a minimal Ubuntu image, with the SSH key “hadoop_tutorial” that we created a moment ago. The command will produce a bunch of information about your instance. Look for the “instance id” that starts with i- , then do:

$ ec2-describe-instance [i-whatever]

This will tell you the IP of your new instance (it will start ec2-). Now we’re going to remotely log in to that server.

$ ssh -i ~/.ec2/hadoop ubuntu@ec2-54-242-56-86.compute-1.amazonaws.com

Now we’re in! This server is only going to run two programs, Whirr and the Cloudera Manager. First we’ll install Whirr.  Find a mirror at (http://www.apache.org/dyn/closer.cgi/whirr/), then download to your home directory using wget:

$ wget http://www.motorlogy.com/apache/whirr/whirr-0.8.0/whirr-0.8.0.tar.gz

Untar and unzip:

$ tar -xvf whirr-0.8.0.tar.gz
$ cd whirr-0.8.0

Whirr will launch clusters for you in EC2 according to a “properties” file you pass it. It’s actually quite powerful and allows a lot of customization (and can be used with non-Amazon cloud providers) or you to set up complicated servers using Chef scripts. But for our purposes, we’ll keep it simple.

Create a file called hadoop.properties:

$ nano hadoop.properties

And give it these contents:

whirr.cluster-name=whirrly
whirr.instance-templates=6 noop
whirr.provider=aws-ec2
whirr.identity=
whirr.credential=
whirr.cluster-user=huser
whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
whirr.env.repo=cdh4
whirr.hardware-id=m1.large
whirr.image-id=us-east-1/ami-1db20274
whirr.location-id=us-east-1

This will launch a cluster of six unconfigured “large” EC2 instances. (Whirr refused to create small or medium instances for me. Please let me know in comments if you know how to do that.)

Before we can use Whirr, we’re going to need to install Java, so do:

$ sudo apt-get update
$ sudo apt-get install openjdk-6-jre-headless

Next we need to create that SSH key that will let our control node log into to our cluster. 

$ ssh-keygen -t rsa -P ''

And hit [enter] at the prompt.

Now we’re ready to launch!

$ bin/whirr launch-cluster --config hadoop.properties

This will produce a bunch of output and end with commands to SSH into your servers.

We’re going to need these IPs for the next step, so copy and paste these lines into a new file:

$ nano hosts.txt

Then use this bit of regular expression magic to create a file with just the IP’s:

$ sed -rn "\|.*@(.*)'| s/.*@(.*)'/\1/p" hosts.txt >> ips.txt
Step 2: Configure the Cluster

From your Control Node, download Cloudera Manager; we will install the Free Edition, which can be used for up to 50 nodes:

$ wget http://archive.cloudera.com/cm4/installer/latest/cloudera-manager-installer.bin

Then install it:

$ sudo chmod +x cloudera-manager-installer.bin
$ sudo ./cloudera-manager-installer.bin
 

This will pop up an extreme green install wizard; just hit “yes” to everything.

Cloudera Manager works poorly with textual browsers like Lynx. (It has an API, but we won’t cover that here.) Luckily, we can access the web interface from our laptop by looking up the public DNS address we used to log in to our control node, and appending “:7180” to the end in our web browser.

First, you need to tell Amazon to open that port. The manager also needs a pretty ridiculously long list of open ports to work, so we’re just going to tell Amazon to open all TCP ports. That’s not great for security, so you can add the individual ports if you care enough (lists here):

$ ec2-authorize default -P tcp -p 0-65535 -o "jclouds#whirrly"
$ ec2-authorize default -P tcp -p 7180 -o 0.0.0.0/0
$ ec2-authorize default -P udp -p 0-65535 -o "jclouds#whirrly"
$ ec2-authorize default -P icmp -t -1:-1 -o "jclouds#whirrly"

Then fire up Chrome and visit http://ec2-.compute-1.amazonaws.com:7180/ .

Log in with the default credentials user: “admin” pass: “admin”

Click “just install the free edition”, “continue”, then “proceed” in tiny text at the bottom right of the registration screen.

Now go back to that ips.txt file we created in the last part and copy the list of IPs. Past them into the box on the next screen, and click “search”, then “install CDH on selected hosts.”

Next the manager needs credentials that’ll allow it to log into the nodes in the cluster to set them up. You need to give it a SSH key, but that key is on the server and can’t be directly accessed from you laptop. So you need to copy it to your laptop.

 $ scp -r -i ~/.ec2/hadoop_tutorial.pem ubuntu@ec2-54-242-62-52.compute-1.amazonaws.com:/home/ubuntu/.ssh ~/Downloads/hadoop_tutorial
 

(“scp” is a program that securely copies files through ssh, and the –r flag will copy a directory.)

Now you can give the manager the username “huser”, and the SSH keys you just downloaded:

Click “start installation,” then “ok” to log in with no passphrase. Now wait for a while as CDH is installed on each node.

Next, Cloudera Manager will inspect the hosts and issues some warnings but just click “continue.” Then it will ask you which services you want to start – choose “custom” and then select Zookeeper, HDFS, HBase, and MapReduce.

Click “continue” on the “review configuration changes” page, then wait as the manager starts your services.

Click “continue” a couple more times when prompted, and now you’ve got a functioning cluster.

Step 3: Do Something

To use your cluster, you need to SSH login to one of the nodes. Pop open the “hosts.txt” file we made earlier, grab any of the lines, and paste it into the terminal.

$ ssh -i /home/ubuntu/.ssh/id_rsa -o "UserKnownHostsFile /dev/null" \
 -o StrictHostKeyChecking=no huser@75.101.233.156

If you already know how to use Hadoop and HBase, then you’re all done. Your cluster is good to go. If you don’t, here’s a brief overview:

The basic Hadoop workflow is to run a “job” that reads some data from HDFS, “maps” some function onto that data to process it, “reduces” the results back to a single set of data, and then stores the results back to HDFS. You can also use HBase as the input and/or output to your job.

You can interact with HDFS directly from the terminal through commands starting “hadoop fs”. In CDH, Cloudera’s open-source Hadoop distro, you need to be logged in as the “hdfs” user to manipulate HDFS, so let’s log in as hdfs, create a users directory for ourselves, then create an input directory to store data.

$ sudo su - hdfs
$ hadoop fs -mkdir -p /user/hdfs/input

You can list the contents of HDFS by typing:

$ hadoop fs -ls -R /user

To run a program using MapReduce, you have two options. You can either:

  • Write a program in Java using the MapReduce API and package it as a JAR
  • Use Hadoop Streaming, which allows you to write your mapper and reducer”scripts in whatever language you want and transmit data between stages by reading/writing to StdOut.

If you’re used to scripting languages like Python or Ruby and just want to crank through some data, Hadoop Streaming is great (especially since you can add more nodes to overcome the relative CPU slowness of a higher level language). But interacting programmatically with HBase is a lot easier through Java. (Interacting with HBase is tricky but not impossible with Python. There is a package called “Happybase” which lets you interact “pythonically” with HBase; the problem is that you have to run a special service called Thrift on each server to translate the Python instructions into Java, or else transmit all of your requests over the wire to a server on one node, which I assume will heavily degrade performance. Cloudera Manager will not set up Thrift for you, though you could do it by hand or using Whirr+Chef.) So I’ll provide a quick example of Hadoop streaming and then a more extended HBase example using Java.

Now, grab my example code repo off Github. We’ll need git.

(If you’re still logged in as hdfs, do “exit” back to “huser” since hdfs doesn’t have sudo privileges by default.)

$ sudo apt-get install -y git-core
$ sudo su - hdfs
$ git clone https://github.com/rogueleaderr/Hadoop_Tutorial.git $ cd Hadoop_Tutorial/hadoop_tutorial

Cloudera Manager won’t tell the nodes where to find the configuration files it needs to run (i.e. “set the classpath”), so let’s do that now:

$ export HADOOP_CLASSPATH=/etc/hbase/conf.cloudera.hbase1/:/etc/hadoop/conf.cloudera.mapreduce1/:/etc/hadoop/conf.cloudera.hdfs1/
Hadoop Streaming

Michael Noll has a good tutorial on Hadoop streaming with Python here. I’ve stolen the code and put it in Github for you, so to get going. Load some sample data into hdfs:

$ hadoop fs -copyFromLocal data/sample_rdf.nt input/sample_rdf.nt
$ hadoop fs -ls -R
(to see that the data was copied)

Now let’s Hadoop:

$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar \
-file python/mapper.py -mapper python/mapper.py \
-file python/reducer.py -reducer python/reducer.py \
-input /user/hdfs/input/sample_rdf.nt -output /user/hdfs/output/1

That’s a big honking statement, but what it’s doing is telling Hadoop (which Cloudera Manager installs in /usr/lib/hadoop-0.20-mapreduce) to execute the “streaming” jar, to use the mapper and reducer “mapper.py” and “reducer.py”, passing those actual script files along to all of the nodes, telling it to operate on the sample_rdf.nt file, and to store the output in the (automatically created) output/1/ folder.

Let that run for a few minutes, then confirm that it worked by looking at the data:

$ hadoop fs -cat /user/hdfs/output/1/part-00000

That’s Hadoop Streaming in a nutshell. You can execute whatever code you want for your mappers/reducers (e.g. Ruby or even shell commands like “cat”. If you want to use non-standardlib Python packages – e.g. “rdflib” for actually parsing the RDF – you need to zip the packages and pass those files to hadoop streaming using -file [package.zip].)

Hadoop/HBase API

If you want to program directly into Hadoop and HBase, you’ll do that using Java. The necessary Java code can be pretty intimidating and verbose, but it’s fairly straightforward once you get the hang of it.

The Github repo we downloaded in Step 3 contains some example code that should just run if you’ve followed this guide carefully, and you can incrementally modify that code for your own purposes. (The basic code is adapted from the code examples in Lars George’s HBase, the Definitive Guide. The full original code can be found here.  That code has its own license, but my marginal changes are released into the public domain.)

All you need to run the code is Maven. Grab that:

(If you’re logged in as user “hdfs”, type “exit” until you get back to huser. Or give hdfs sudo privileges with “visudo” if you know how.)

$ sudo apt-get install maven2
$ sudo su - hdfs $ cd Hadoop_Tutorial/hadoop_tutorial

When you run Hadoop jobs from the command line, Hadoop is literally shipping your code over the wire to each of the nodes to be run locally. So you need to wrap your code up into a JAR file that contains your code and all the dependencies. (There are other ways to bundle or transmit your code but I think fully self-contained “fat jars” are the easiest. You can make these using the “shade” plugin which is included in the example project. )

Build the jar file by typing:

$ export JAVA_HOME=/usr/lib/jvm/j2sdk1.6-oracle/
 (to tell maven where java lives)
 $ mvn package
 

That will take an irritatingly long time (possibly 20+ minutes) as Maven downloads all the dependencies, but it requires no supervision.

(If you’re curious, you can look at the code with a text editor at /home/users/hdfs/Hadoop_Tutorial/hadoop_tutorial/src/main/java/com/tumblr/rogueleaderr/hadoop_tutorial/HBaseMapReduceExample.java). There’s a lot going on there, but I’ve tried to make it clearer via comments.

Now we can actually run our job:

$  cd /var/lib/hdfs/Hadoop_Tutorial/hadoop_tutorial
$ hadoop jar target/uber-hadoop_tutorial-0.0.1-SNAPSHOT.jar com.tumblr.rogueleaderr.hadoop_tutorial.HBaseMapReduceExample

If you get a bunch of connection errors, make sure your classpath is set correctly by doing:

$ export HADOOP_CLASSPATH=/etc/hbase/conf.cloudera.hbase1/:/etc/hadoop/conf.cloudera.mapreduce1/:/etc/hadoop/conf.cloudera.hdfs1/

Confirm that it worked by opening up the hbase commandline shell:

$ hbase shell
hbase(main):001:0> scan "parsed_lines"

If you see a whole bunch of lines of data, then – congratulations! You’ve just parsed RDF data using a six-node Hadoop Cluster, and stored the results in HBase!

Next Steps

If you’re planning on doing serious work with Hadoop and HBase, just buy the books:

The official tutorials for WhirrHadoop, and HBase are okay, but pretty intimidating for beginners.

Beyond that, you should be able to Google up some good tutorials.

Categories: Companies

Videos: Get Started with Hadoop Using Cloudera Enterprise

Cloudera - Thu, 10/11/2012 - 13:00

Our video animation factory has been busy lately. The embedded player below contains our two latest ones stitched together:

Get Started with Hadoop Using Cloudera Enterprise, Part 1 

To be a proactive industry leader, you must be able to consume critical data as it is produced, analyze it quickly and act on your results. Before Hadoop there was no scalable, cost-effective way to do this with Big Data.

Learn how Cloudera Enterprise helps you to efficiently store all of your data, avoiding archiving and data loss, and provides the opportunity to gain detailed insights into aspects of your business never before possible.

Get Started with Hadoop Using Cloudera Enterprise, Part 2

To run Hadoop effectively, you need to pair it with several other software projects. Cloudera Enterprise offers a simple click-through setup wizard for deployment and configuration of Hadoop as well as these projects, which will save weeks of developer hours that can now be spent taking advantage of the Big Data capabilities Hadoop offers.

Once Hadoop is up and running it is also a large and complicated system to monitor. This is why Cloudera built Cloudera Manager, included as part of Cloudera Enterprise. Cloudera Manager helps you easily monitor things like hosts, services, system resources and security. If anything goes wrong in the Hadoop environment, Cloudera Manager will send an alert and provides easy-to-use diagnostic tools to get to the root of the issue. If the issue is not easily identifiable, a simple click will send a Cloudera Hadoop expert all the information needed to quickly diagnose the problem and get your system back to normal.

» Learn more about Cloudera Enterprise
» Download Cloudera Manager Free Edition

Contact Cloudera to learn more:
» Contact Cloudera
» Get Private Training Quote

 
Categories: Companies

Data Science: The New Heart of Healthcare

Cloudera - Wed, 10/10/2012 - 22:40

We at Cloudera are tremendously excited by the power of data to effect large-scale change in the healthcare industry. Many of the projects that our data science team worked on in the past year originated as data-intensive problems in healthcare, such as analyzing adverse drug events and constructing case-control studies. Last summer, we announced that our Chief Scientist Jeff Hammerbacher would be collaborating with the Mt. Sinai School of Medicine to leverage large-scale data analysis with Apache Hadoop for the treatment and prevention of disease. And next week, it will be my great pleasure to host a panel of data scientists and researchers at the Strata Rx Conference (register with discount code SHARON for 25% off) to discuss the meaningful use of natural language processing in clinical care.

Of course, the cost-effective storage and analysis of massive quantities of text is one of Hadoop’s strengths, and Jimmy Lin’s book on text processing is an excellent way to learn how to think in MapReduce. But a close study of how the applications of natural language processing technology in healthcare have evolved over the last few years is instructive for anyone who wants to understand how to use data science in order to tackle seemingly intractable problems.

Lesson 1: Choose the Right Problem

A couple of months ago, Mark Madsen tweeted that many organizations are using an “underpants gnomes” strategy for data science. The plan goes something like this:

  1. Collect a lot of dirty, unstructured data.
  2. Hire a data scientist.
  3. Profit!

In general, I am wary of people who come to me bearing databases and asking for “insights” into their data. The right way to approach data science is to start with a problem that has a bottom-line impact on your business, and then work backward from the problem towards the analysis and the data is needed to solve it. Insights don’t happen in a vacuum – they come with the hard work of analyzing data and building models to solve real problems.

Sometimes, the link between the business problem and the application of data science will be very clear, like in the case of correctly identifying fraudulent credit card transactions. In other cases, there can be multiple steps that separate the business problem from the data science application. For example, a rental service like Netflix is primarily interested in growing and retaining their subscribers. They could have performed an analysis that demonstrated a correlation between the number of movies in a customer’s queue and the probability that the customer will renew his subscription. This analysis might have then motivated Netflix to create a movie recommendation system that helps customers discover movies that they will love. If the users who add recommended movies to their queues are then more likely to renew their subscriptions, then the project has succeeded.

In the case of natural language processing and healthcare, the right problem turned out to be computer-assisted coding (CAC). In healthcare, coding refers to the process of converting the narrative description of the treatments a patient received, including doctors’ notes, lab tests, and medical images, into a series of alphanumeric codes that are used for billing purposes. Medical coding is both very important (if the treatments a patient receives aren’t coded, the insurance company won’t pay for them) and very expensive (medical coders need a lot of training and skill to do the job well). To make matters worse, the coding standards are becoming more complex: the current ICD-9 standard has around 24,000 possible codes, while the next-generation ICD-10 standard will expand to 155,000 codes. Finding ways to use natural language processing to help coders be more efficient is a great problem for a data scientist to tackle: it has a quantifiable impact on the bottom line and there is a strong potential for data analysis and modeling to make a meaningful difference.

Lesson 2: Build a Minimum Viable Data Product

The minimum viable product strategy is also a good way of developing data products: the first model that we use for a problem does not need to crunch massive quantities of data or leverage the most advanced machine learning algorithms. Our primary objective is to create the simplest system that will provide enough utility for its users that they are willing to use it and start providing feedback that we can use to make the product better. At first, this feedback may be explicitly communicated to the data scientists working on the system, who may incorporate the feedback by tuning the system by hand. But if we design the system well, we can use automated and implicit sources of feedback to make improvements in a more scalable fashion. 

The first systems for performing computer-assisted coding were similar to the first spam classifiers: they relied almost exclusively on a static set of rules in order to make coding decisions. They also primarily targeted medical coding applications for outpatient treatments, instead of the more complex coding required for inpatient treatments. These early systems weren’t particularly great, but they were useful enough that they could gather feedback from the medical coders on when they failed to identify a code or included one that was not relevant for the problem, and as more data was gathered, the static rules could be augmented with statistical models that were capable of adjusting to new information and improving over time.

Lesson 3: The One Who Has the Best Data Wins

Data is like any other kind of capital – it flows to where it is wanted, and it stays where it is well-treated. Good algorithms and good people are critical for any data science project, but there is absolutely no substitute for high-quality data that you can use as inputs for your models. As your models improve, they get used more often to make decisions, receive even more feedback, and are used in a wider variety of situations, which leads to a virtuous cycle and the kind of network effects that we see in winner-take-all markets.

Many of the computer-assisted coding products that are available today are web-based and/or integrated with electronic health record (EHR) systems, which allows them to collect feedback data quickly and reliably as well as take advantage of more information about the patient to improve the automated coding. It also becomes possible to use the feedback from many different medical coders across different healthcare institutions in order to make improvements in the underlying models more quickly.

Data as Platform

For many problems that can be tackled using machine learning, the choice of input features is the most important part of the overall process. Data scientists bridge the gap between messy, unstructured data and the structured inputs required by our algorithms. At scale, the skills required to generate input features are similar to the ones needed to build ETL pipelines for data warehousing applications. You might say that ETL is the wax-on, wax-off of data science.

"I don't see how this is going to help me win the All Valley Kaggle competition."

One of the reasons that automatic medical coding is such a great problem for data scientists to take on is that solving it well doesn’t just save money and time, it also provides the structured information that we need as inputs for other problems, including the adverse drug event and case-control projects that we have worked on here at Cloudera. We hope that you can join us at Strata Rx next week to join the conversation around how to effect change in healthcare via the effective, meaningful use of data.

Categories: Companies

What is Hadoop Metrics2?

Cloudera - Wed, 10/10/2012 - 00:15

Metrics are collections of information about Hadoop daemons, events and measurements; for example, data nodes collect metrics such as the number of blocks replicated, number of read requests from clients, and so on. For that reason, metrics are an invaluable resource for monitoring Hadoop services and an indispensable tool for debugging system problems. 

This blog post focuses on the features and use of the Metrics2 system for Hadoop, which allows multiple metrics output plugins to be used in parallel, supports dynamic reconfiguration of metrics plugins, provides metrics filtering, and allows all metrics to be exported via JMX.

Metrics vs. MapReduce Counters

When speaking about metrics, a question about their relationship to MapReduce counters usually arises. This differences can be described in two ways: First, Hadoop daemons and services are generally the scope for metrics, whereas MapReduce applications are the scope for MapReduce counters (which are collected for MapReduce tasks and aggregated for the whole job). Second, whereas Hadoop administrators are the main audience for metrics, MapReduce users are the audience for MapReduce counters.

Contexts and Prefixes

For organizational purposes metrics are grouped into named contexts – e.g., jvm for java virtual machine metrics or dfs for the distributed file system metric. There are different sets of contexts supported by Hadoop-1 and Hadoop-2; the table below highlights the ones supported for each of them.  

Branch-1

Branch-2

- jvm
- rpc
- rpcdetailed
- metricssystem
- mapred
- dfs
- ugi - yarn
- jvm
- rpc
- rpcdetailed
- metricssystem
- mapred
- dfs
- ugi

A Hadoop daemon collects metrics in several contexts. For example, data nodes collect metrics for the “dfs”, “rpc” and “jvm” contexts. The daemons that collect different metrics in Hadoop (for Hadoop-1 and Hadoop-2) are listed below:

Branch-1 Daemons/Prefixes Branch-2 Daemons/Prefixes

- namenode
– datanode
– jobtracker
– tasktracker
– maptask
– reducetask

 

- namenode
- secondarynamenode
- datanode
- resourcemanager
- nodemanager
- mrappmaster
- maptask
- reducetask System Design

The Metrics2 framework is designed to collect and dispatch per-process metrics to monitor the overall status of the Hadoop system. Producers register the metrics sources with the metrics system, while consumers register the sinks. The framework marshals metrics from sources to sinks based on (per source/sink) configuration options. This design is depicted below.

 

Here is an example class implementing the MetricsSource:

class MyComponentSource implements MetricsSource {
   @Override
    public void getMetrics(MetricsCollector collector, boolean all) {
    collector.addRecord("MyComponentSource")
             .setContext("MyContext")
              .addGauge(info("MyMetric", "My metric description"), 42);
        }
     }

The “MyMetric” in the listing above could be, for example, the number of open connections for a specific server.

Here is an example class implementing the MetricsSink:

public class MyComponentSink implements MetricsSink {
    public void putMetrics(MetricsRecord record) {
    System.out.print(record);
      }
       public void init(SubsetConfiguration conf) {}
        public void flush() {}
     }

To use the Metric2s framework, the system needs to be initialized and sources and sinks registered. Here is an example initialization:

DefaultMetricsSystem.initialize(”datanode");
MetricsSystem.register(source1, “source1 description”, new MyComponentSource());
MetricsSystem.register(sink2, “sink2 description”, new MyComponentSink())
Configuration and Filtering

The Metrics2 framework uses the PropertiesConfiguration from the apache commons configuration library.

Sinks are specified in a configuration file (e.g., “hadoop-metrics2-test.properties”), as:

test.sink.mysink0.class=com.example.hadoop.metrics.MySink

The configuration syntax is:

[prefix].[source|sink|jmx|].[instance].[option]

In the previous example, test is the prefix and mysink0 is an instance name. DefaultMetricsSystem would try to load hadoop-metrics2-[prefix].properties first, and if not found, try the default hadoop-metrics2.properties in the class path. Note, the [instance] is an arbitrary name to uniquely identify a particular sink instance. The asterisk (*) can be used to specify default options.

Here is an example with inline comments to identify the different configuration sections:

# syntax: [prefix].[source|sink].[instance].[options]
# Here we define a file sink with the instance name “foo”
*.sink.foo.class=org.apache.hadoop.metrics2.sink.FileSink
# Now we specify the filename for every prefix/daemon that is used for
# dumping metrics to this file. Notice each of the following lines is
# associated with one of those prefixes.
namenode.sink.foo.filename=/tmp/namenode-metrics.out
secondarynamenode.sink.foo.filename=/tmp/secondarynamenode-metrics.out
datanode.sink.foo.filename=/tmp/datanode-metrics.out
resourcemanager.sink.foo.filename=/tmp/resourcemanager-metrics.out
nodemanager.sink.foo.filename=/tmp/nodemanager-metrics.out
maptask.sink.foo.filename=/tmp/maptask-metrics.out
reducetask.sink.foo.filename=/tmp/reducetask-metrics.out
mrappmaster.sink.foo.filename=/tmp/mrappmaster-metrics.out
# We here define another file sink with a different instance name “bar”
*.sink.bar.class=org.apache.hadoop.metrics2.sink.FileSink
# The following line specifies the filename for the nodemanager daemon
# associated with this instance. Note that the nodemanager metrics are
# dumped into two different files. Typically you’ll use a different sink type
# (e.g. ganglia), but here having two file sinks for the same daemon can be
# only useful when different filtering strategies are applied to each.
nodemanager.sink.bar.filename=/tmp/nodemanager-metrics-bar.out

Here is an example set of NodeManager metrics that are dumped into the NodeManager sink file:

1349542623843 jvm.JvmMetrics: Context=jvm, ProcessName=NodeManager, SessionId=null, Hostname=ubuntu, MemNonHeapUsedM=11.877365, MemNonHeapCommittedM=18.25, MemHeapUsedM=2.9463196, MemHeapCommittedM=30.5, GcCountCopy=5, GcTimeMillisCopy=28, GcCountMarkSweepCompact=0, GcTimeMillisMarkSweepCompact=0, GcCount=5, GcTimeMillis=28, ThreadsNew=0, ThreadsRunnable=6, ThreadsBlocked=0, ThreadsWaiting=23, ThreadsTimedWaiting=2, ThreadsTerminated=0, LogFatal=0, LogError=0, LogWarn=0, LogInfo=0
1349542623843 yarn.NodeManagerMetrics: Context=yarn, Hostname=ubuntu, AvailableGB=8
1349542623843 ugi.UgiMetrics: Context=ugi, Hostname=ubuntu
1349542623843 mapred.ShuffleMetrics: Context=mapred, Hostname=ubuntu
1349542623844 rpc.rpc: port=42440, Context=rpc, Hostname=ubuntu, NumOpenConnections=0, CallQueueLength=0
1349542623844 rpcdetailed.rpcdetailed: port=42440, Context=rpcdetailed, Hostname=ubuntu
1349542623844 metricssystem.MetricsSystem: Context=metricssystem, Hostname=ubuntu, NumActiveSources=6, NumAllSources=6, NumActiveSinks=1, NumAllSinks=0, SnapshotNumOps=6, SnapshotAvgTime=0.16666666666666669

Each line starts with a time followed by the context and metrics name and the corresponding value for each metric.

Filtering

By default, filtering can be done by source, context, record and metrics. More discussion of different filtering strategies can be found in the Javadoc and wiki.

Example:

mrappmaster.sink.foo.context=jvm
# Define the classname used for filtering
*.source.filter.class=org.apache.hadoop.metrics2.filter.GlobFilter
*.record.filter.class=${*.source.filter.class}
*.metric.filter.class=${*.source.filter.class}
# Filter in any sources with names start with Jvm
nodemanager.*.source.filter.include=Jvm*
# Filter out records with names that matches foo* in the source named "rpc"
nodemanager.source.rpc.record.filter.exclude=foo*
# Filter out metrics with names that matches foo* for sink instance "file" only
nodemanager.sink.foo.metric.filter.exclude=MemHeapUsedM
Conclusion

The Metrics2 system for Hadoop provides a gold mine of real-time and historical data that help monitor and debug problems associated with the Hadoop services and jobs. 

Ahmed Radwan is a software engineer at Cloudera, where he contributes to various platform tools and open-source projects.

 
Categories: Companies

MR2 and YARN Briefly Explained

Cloudera - Mon, 10/08/2012 - 16:00

With CDH4 onward, the Apache Hadoop component introduced two new terms for Hadoop users to wonder about: MR2 and YARN. Unfortunately, these terms are mixed up so much that many people are confused about them. Do they mean the same thing, or not?

This post aims to clarify these two terms.

What is YARN?

YARN stands for “Yet-Another-Resource-Negotiator”. It is a new framework that facilitates writing arbitrary distributed processing frameworks and applications.

YARN provides the daemons and APIs necessary to develop generic distributed applications of any kind, handles and schedules resource requests (such as memory and CPU) from such applications, and supervises their execution.

YARN’s execution model is more generic than the earlier MapReduce implementation. YARN can run applications that do not follow the MapReduce model, unlike the original Apache Hadoop MapReduce (also called MR1).

What is MR2?

With the advent of YARN, there is no longer a single JobTracker to run jobs and a TaskTracker to run tasks of the jobs. The old MR1 framework was rewritten to run within a submitted application on top of YARN. This application was christened MR2, or MapReduce version 2. It is the familiar MapReduce execution underneath, except that each job now controls its own destiny via its own ApplicationMaster taking care of execution flow (such as scheduling tasks, handling speculative execution and failures, etc.). It is a more isolated and scalable model than the MR1 system where a singular JobTracker does all the resource management, scheduling and task monitoring work.

MR2 and a new proof-of-concept application called the DistributedShell are the first two applications using the YARN API in CDH4.

Summary

YARN is a generic platform for any form of distributed application to run on, while MR2 is one such distributed application that runs the MapReduce framework on top of YARN. For a more extensive post on this topic, click here.

Categories: Companies

Applying Parallel Prediction to Big Data

Cloudera - Fri, 10/05/2012 - 15:00

This guest post is provided by Dan McClary, Principal Product Manager for Big Data and Hadoop at Oracle.

One of the constants in discussions around Big Data is the desire for richer analytics and models. However, for those who don’t have a deep background in statistics or machine learning, it can be difficult to know not only just what techniques to apply, but on what data to apply them. Moreover, how can we leverage the power of Apache Hadoop to effectively operationalize the model-building process? In this post we’re going to take a look at a simple approach for applying well-known machine learning approaches to our big datasets. We’ll use Pig and Hadoop to quickly parallelize a standalone machine-learning program written in Jython.

Playing Weatherman

I’d like to predict the weather. Heck, we all would – there’s personal and business value in knowing the likelihood of sun, rain, or snow. Do I need an umbrella? Can I sell more umbrellas? Better yet, groups like the National Climatic Data Center offer public access to weather data stretching back to the 1930s. I’ve got a question I want to answer and some big data with which to do it. On first reaction, because I want to do machine learning on data stored in HDFS, I might be tempted to reach for a massively scalable machine learning library like Mahout.

For the problem at hand, that may be overkill and we can get it solved in an easier way, without understanding Mahout. Something becomes apparent on thinking about the problem: I don’t want my climate model for San Francisco to include the weather data from Providence, RI. Weather is a local problem and we want to model it locally. Therefore what we need is many models across different subsets of data. For the purpose of example, I’d like to model the weather on a state-by-state basis. But if I have to build 50 models sequentially, tomorrow’s weather will have happened before I’ve got a national forecast. Fortunately, this is an area where Pig shines.

Parallel Pipelines with Pig

We want to build models by state, but unfortunately NCDC’s global surface temperature data doesn’t come tagged by state. Instead, what we have is day-over-day data organized by station ID number. NCDC provides a map from station ID to locations, but we’ll need to join it to our weather data. However, the number of stations is much, much smaller than the number of temperature observations — in fact, it will fit into a modest amount of memory.

Pig’s JOIN operator allows us to specify join behavior when we understand our bags are of uneven sizes. In this case, we can use the “replicated” directive with the JOIN operator to force all but the first bag to reside in memory.

Pig’s GROUP operator allows us to quickly organize our dataset by a feature (e.g., state) resulting in an outer bag for which each group is a training set. Once the data’s organized, dispatching our model-building code is as simple as using Pig’s FOREACH operator. That accomplished, we’re going to need some model-building code. Pig alone isn’t suited to this task, but its ability to leverage scripting languages in the JVM makes it easy to tap into a wealth of machine learning methods.

Using Jython and Weka for Model Building

Decision trees are one of the fundamental techniques in machine learning and are applicable to a wide set of industry problems (e.g., advertisement targeting, quality evaluation). I’d like to build a simple C4.5 tree for each state’s weather. While I won’t get into the details of C4.5 here, the concept is simple: we want to organize our tree so that as we travel from the root to the leaves our decisions are ordered by the amount of information that can be gleaned from that feature. For example, if one of our features is latitude, then the model for California might place that decision near the top of the tree. Why? Because northern and southern California have very different climates and latitude tells us more about weather outcome across California than, say, the wind speed.

As I mentioned above, Pig isn’t suitable for constructing a C4.5 tree. Besides, C4.5′s been around since the 1990s, someone has surely open-sourced an implementation of it. In fact, the Weka machine-learning library contains a very good Java implementation of C4.5 called J48. Since Pig can register JARs for use as UDFs, it should be easy enough for us to route our FOREACH call into Weka.

However, I’m in a hurry, and may want to try out a bunch of different modeling techniques. I don’t want to write and package Java code if I can just write Python instead. Fortunately, Pig has support for Jython UDFs built in. All we need to do is make sure Jython is on the Pig classpath and make sure our code knows where to find our UDF. It looks a little like this:

REGISTER weka.jar;
REGISTER 'c45_udf.py' USING jython AS c45;
  

And we’ll send the bags of training data to Jython like this:

  models = FOREACH training_groups GENERATE c45.build_instances(group, training_data);
  

But what does the Jython code do to leverage Weka? The code needs to accomplish a few things:

  • Import the necessary Weka classes
  • Define an output schema so that Pig understands how the resulting tuples are structured
  • Transform the input data into a Weka dataset and build a C4.5 tree
  • Turn the model into data that can be used to test or make predictions in the future and return it to Pig

Importing the classes and defining the output schema are simple:

      import sys
      sys.path += ["/usr/lib/jython/Lib","/usr/lib/jython/Lib/site-packages"]
      import weka.core.Instances as Instances
      import weka.core.Instance as Instance
      import weka.core.FastVector as FastVector
      import weka.core.Attribute as Attribute
      import weka.classifiers.trees.J48 as J48

      @outputSchema("state:chararray, model:chararray")
      def build_instances(state,dataset): 

That output decorator tells Pig what to expect in the return tuple.

The mechanics of transforming the input data into Weka vectors and training a model are less easily summarized, so you can find a code sample here:

REGISTER weka.jar;
REGISTER 'c45_udf.py' USING jython AS c45;
rmf /user/oracle/weather/models
weather_obs = LOAD '/user/oracle/weather/cleaned_history'
    using PigStorage('\u0001') as
    (usaf:int, wban:int, year:int, month:int, day:int, temp:float,
     dewp:float, weather:chararray);                 

stations = LOAD '/user/oracle/weather/stations' USING PigStorage() as
(stn:int, wban:int, country:chararray, state:chararray, lat:float,
 lon:float);
observations = JOIN weather_obs BY usaf, stations BY stn using 'replicated';
training_data = FOREACH observations
                  GENERATE state,lat, lon, day,temp,dewp,weather;
training_groups = GROUP training_data BY state;
models = FOREACH training_groups
                  GENERATE c45.build_instances(group, training_data); STORE models INTO '/user/oracle/weather/models' USING PigStorage();,
        

…or read about integrating Weka and Jython here.

Once we’ve done this, we end up with trees like this one for California:

        lon <= -124.160004
          |   dewp <= 49.299999
          |   |   temp <= 50.599998
          |   |   |   dewp <= 45.700001
          |   |   |   |   temp <= 42.299999
          |   |   |   |   |   day <= 6: Fog (2.0)
          |   |   |   |   |   day > 6: Sunny (18.0/2.0)
          |   |   |   |   temp > 42.299999
          |   |   |   |   |   dewp <= 44.299999
          |   |   |   |   |   |   temp <= 42.599998: Rain (2.0)
          |   |   |   |   |   |   temp > 42.599998: Sunny (156.0/38.0)
          |   |   |   |   |   dewp > 44.299999
          |   |   |   |   |   |   temp <= 50.299999
          |   |   |   |   |   |   |   dewp <= 44.599998: Rain (10.0)
          |   |   |   |   |   |   |   dewp > 44.599998
          |   |   |   |   |   |   |   |   day <= 18: Sunny (8.0)
          |   |   |   |   |   |   |   |   day > 18: Rain (4.0)
          |   |   |   |   |   |   temp > 50.299999: Sunny (4.0)

          ...
            

While my code includes some half-backed packaging of the model, consider serialization an exercise left to the reader.

Takeaway

This model snippet doesn’t tell me much, other than perhaps that it’s always sunny in California. But what we have here is more than just a fun example of how to play with public data using Pig and Python; rather it’s a simple methodology for applying existing modeling approaches to Big Data. By adding in your own aggregation logic and modeling code, you can get up and running with analytics on Hadoop with very little effort.

 

Categories: Companies

Data Science: Hot or Not?

Cloudera - Thu, 10/04/2012 - 15:00

You may have noticed that Harvard Business Review is calling data science “the sexiest job of the 21st century.” So our answer to the question is: Hot. Definitely hot. 

If you need an explanation, watch the “Definition of a Data Scientist” talk embedded below from Cloudera data science director Josh Wills, which was hosted by Cloudera partner Lilien LLC recently in Portland, Ore. The key take-away is, you don’t literally have to be a “scientist,” just someone with the curiosity of one.

Categories: Companies

CDH4.1 Now Released!

Cloudera - Tue, 10/02/2012 - 06:56

Update time!  As a reminder, Cloudera releases major versions of CDH, our 100% open source and enterprise-ready distribution of Hadoop and related projects, annually and then updates to CDH every three months.  Updates primarily comprise bug fixes but we will also add enhancements.  We only include fixes or enhancements in updates that maintain compatibility, improve system stability and still allow customers and users to skip updates as they see fit.

We’re pleased to announce the availability of CDH4.1.  We’ve seen excellent adoption of CDH4.0 since it went GA at the end of June and a number of exciting use cases have moved to production.  CDH4.1 is an update that has a number of fixes but also a number of useful enhancements.  Among them:

  • Quorum based storage – Quorum-based Storage for HDFS provides the ability for HDFS to store its own NameNode edit logs, allowing you to run a highly available NameNode without external storage or custom fencing.
  • Hive security and concurrency – we’ve fixed some long standing issues with running Hive.  With CDH4.1, it is now possible to run a shared Hive instance where users submit queries using Kerberos authentication.  In addition this new Hive server supports multiple users submitting queries at the same time.
  • Support for DataFu – the LinkedIn data science team was kind enough to open source their library of Pig UDFs that make it easier to perform common jobs like sessionization or set operations.  Big thanks to the LinkedIn team!!!
  • Oozie workflow builder – since we added Oozie to CDH more than two years ago, we have often had requests to make it easier to develop Oozie workflows.  The newly enhanced job designer in Hue enables users to use a visual tool to build and run Oozie workflows.
  • FlumeNG improvements –  since its release, FlumeNG has become the backbone for some exciting data collection projects, in some cases collecting as much as 20TB of new event data per day.  In CDH4.1 we added an HBase sink as well as metrics for monitoring as well as a number of performance improvements.
  • Various performance improvements – CDH4.1 users should experience a boost in their MapReduce performance from CDH4.0.
  • Various security improvements – CDH4.1 enables users to configure the system to encrypt data in flight during the shuffle phase.  CDH now also applies Hadoop security to users who access the filesystem via a FUSE mount.

CDH4.1 is available on all of the usual platforms and form factors.  You can install it via Cloudera Manager or learn how to install the packages manually here.

Categories: Companies

Apache Hadoop Wins Duke’s Choice Award, is a Java Ecosystem “MVP”

Cloudera - Fri, 09/28/2012 - 16:33

For those of you new to it, the Duke’s Choice Awards program was initiated by Sun Microsystems in 2002 in an effort to “celebrate extreme innovation in the world of Java technology” – in essence, it’s the “MVP” of the Java ecosystem. Since it acquired Sun in 2009, Oracle has continued the tradition of bestowing the award, and in fact has made the process more community-oriented by accepting nominations from the public and involving Java User Groups in the judging effort.

For the 2012 awards, I’m happy to report that Apache Hadoop is among the awardees - which also include the United Nations High Commission for Refugees, Liquid Robotics, and Java cloud company Jelastic Inc., among others.

As Doug Cutting, the Hadoop project’s founder, current ASF chairman, and Cloudera’s chief architect, explains in the Java Magazine writeup about the award, “Java is the primary language of the Hadoop ecosystem…and Hadoop is the de facto standard operating system for big data. So, as the big data trend spreads, Java spreads too.”

Update (10/2/2012): And here’s documentary evidence – a group photo of the winners! That’s Clouderan committer Eli Collins in the back row, second from right.

So, a big congrats to Apache Hadoop for getting the recognition it deserves from the Java community!

Categories: Companies