Apache DataFu™ (incubating)

Apache DataFu

Apache DataFu Pig

Apache DataFu Hourglass

Community

Apache DataFu Hourglass

Getting Started

Hourglass is Apache DataFu's solution to incrementally processing data with Hadoop MapReduce. It is designed to make computations over sliding windows more efficient.

A typical example of a sliding window is a dashboard that shows the number of visitors to every page on the site over the last thirty days. To keep this dashboard up to date, we can schedule a query that runs daily and gathers the stats for the last 30 days. However, this simple implementation would be wasteful: only one day of data has changed, but we'd be consuming and recalculating the stats for all 30. A more efficient solution is to make the query incremental: using basic arithmetic, we can update the output from the previous day by adding and subtracting input data. This enables the job to process only the new data, significantly reducing the computational resources required.

Hourglass is a framework that makes it much easier to write incremental Hadoop jobs to perform this type of computation efficiently. It provides incremental jobs that abstract away the complexity of implementing a robust incremental solution, with the appropriate hooks so that developers can supply custom logic to perform the aggregation task.

If you have not already checked out the code, please see: Quick Start. There you'll also see how to declare a dependency on Hourglass in a project.

Examples

We have provided some sample incremental jobs to demonstrate how to use Hourglass for incremental processing. These jobs consume input data consisting of just a single id field and which is also partitioned by day according to the naming convention /data/event/yyyy/MM/dd. An event such as this could be used to track, say, each time a user has logged into the site, or each time a user has viewed a page. Given this event data there may be certain metrics we want to compute daily, such as how many times each user has logged in or viewed a page.

These sample jobs are packaged in a tool that can be run from the command line. The same tool also serves as a test data generator. Here we will walk through how to generate test data and run the jobs against it.

Build the main Hourglass JAR, build the test JAR, and copy the dependencies necessary for the demo to a single directory:

./gradlew :datafu-hourglass:jar :datafu-hourglass:testJar :datafu-hourglass:copyDemoDependencies

Define some variables that we'll need for the hadoop jar command. These list the JAR dependencies, as well as the JARs we just built.

export LIBJARS=$(find "datafu-hourglass/build/libs" -name '*.jar' | xargs echo | tr ' ' ',')

export LIBJARS=$LIBJARS,$(find "datafu-hourglass/build/demo_dependencies" -name '*.jar' | xargs echo | tr ' ' ',')

export HADOOP_CLASSPATH=`echo ${LIBJARS} | sed s/,/:/g`

Assuming you've set up the hadoop command to run against your Hadoop cluster, you are now ready to run the jobs.

Let's define some shorthand commands to run the Hourglass JAR and dump JSON from an Avro file:

export HOURGLASS_CMD="hadoop jar datafu-hourglass/build/libs/datafu-hourglass-incubating-0.1.3-tests.jar datafu.hourglass.demo.Main"

export TO_JSON_CMD="java -jar datafu-hourglass/build/demo_dependencies/avro-tools-1.7.4.jar tojson"

Counting Events

In this example we will run a job that counts how many times each id value has appeared in an input data set. Then we will run the job again on new data, which will incrementally update the previous result.

First we'll generate some test data under the path/data/event using the generate command from our command line tool. The command below will create some random events for dates between 2013/03/01 and 2013/03/14, inclusive. Each record consists of just a single long value from the range 1-100.

$HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/01-2013/03/14

Just to get a sense for what the data looks like, we can copy it locally and dump the first several records.

hadoop fs -copyToLocal /data/event/2013/03/01/part-00000.avro temp.avro
$TO_JSON_CMD temp.avro | head

This will produce output looking something like this:

{"id":35}
{"id":27}
{"id":78}
{"id":79}
{"id":73}
{"id":61}
{"id":94}
{"id":62}
{"id":6}
{"id":44}

Now run the countbyid command, which executes the sample CountById job. This will count the number of events for each ID value.

$HOURGLASS_CMD countbyid -libjars ${LIBJARS} /data/event /output

In the console output you will notice that it reads all fourteen days of input that are available. We can see what this produced by copying the output locally and dumping the first several records. Each record consists of an ID and a count.

rm temp.avro
hadoop fs -copyToLocal /output/20130314/part-r-00000.avro temp.avro
$TO_JSON_CMD temp.avro | head

This will produce output looking something like this:

{"key":{"member_id":1},"value":{"count":162}}
{"key":{"member_id":2},"value":{"count":136}}
{"key":{"member_id":3},"value":{"count":137}}
{"key":{"member_id":4},"value":{"count":142}}
{"key":{"member_id":5},"value":{"count":149}}
{"key":{"member_id":6},"value":{"count":145}}
{"key":{"member_id":7},"value":{"count":131}}
{"key":{"member_id":8},"value":{"count":143}}
{"key":{"member_id":9},"value":{"count":138}}
{"key":{"member_id":10},"value":{"count":160}}

Now let's generate an additional day of data, for 2013/03/15:

$HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/15

The job is configured to consume all available input data. But since a previous output already exists, it is able to reuse this result and therefore it only needs to consume the previous output and the new day of input. Let's run the incremental job again:

$HOURGLASS_CMD countbyid -libjars ${LIBJARS} /data/event /output

You'll notice in console output that the job considers two alternative plans. In one version it consumes all available input data to produce the new output. In the other version it reuses the previous output and merges this with only the new input. The second version requires fewer bytes to be consumed, so the job picks this one.

We can download the new output and inspect the counts:

rm temp.avro
hadoop fs -copyToLocal /output/20130315/part-r-00000.avro temp.avro
$TO_JSON_CMD temp.avro | head

The implementation of the CountById job can be found here. A more detailed explanation of how the job works and how it is implemented can be found in our blog post.

Estimating Cardinality

Continuing from the previous example, suppose we wanted to know the number of distinct IDs over the past fifteen days. In this example we will run a sequence of jobs that use the HyperLogLog cardinality estimation algorithm to compute this result incrementally. The first job estimates the cardinality of IDs per day and outputs these estimates in directories partitioned by day. Each output also includes the serialized cardinality estimator so that the state can be reconstructed. The second job reads the fifteen days of intermediate data, merges the estimators together, and then produces the final estimate over the fifteen days. The advantage of this over other approaches is that when a new day arrives, only that day of input needs to be processed. Once the estimate for that day is produced, the second job can combine it with the other estimates computed previously.

Let's start by cleaning up the output directory:

hadoop fs -rmr /output

If you have been following along from the previous example, you already have fifteen days of input data available, so we don't need to regenerate it. We can run the cardinality command to execute the two jobs. This executes the sample jobs in EstimateCardinality.

$HOURGLASS_CMD cardinality -libjars ${LIBJARS} /data/event /intermediate /output 15

You will notice in the console output that the job consumes fifteen days of input. We can then inspect the output to see the count of distinct IDs. Note that the output record consists of the count and the serialized HyperLogLog estimator, so we use grep to return just the count.

rm temp.avro
hadoop fs -copyToLocal /output/20130315/part-r-00000.avro temp.avro
$TO_JSON_CMD temp.avro | grep -E -oh "\"count\":[0-9]+"

As the IDs in the test data are generated from the range 1-100, this produces the expected output of 100. We'll add a new day of data, but this time we'll use the range 101-200.

$HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/16 101-200

Now we'll run the job again. It automatically consumes fifteen days of data ending with the most recent data that's available.

$HOURGLASS_CMD cardinality -libjars ${LIBJARS} /data/event /intermediate /output 15

We can now inspect the output again:

rm temp.avro
hadoop fs -copyToLocal /output/20130316/part-r-00000.avro temp.avro
$TO_JSON_CMD temp.avro | grep -E -oh "\"count\":[0-9]+"

This produces the expected result of 200.

The implementation of the EstimateCardinality job can be found here. A more detailed explanation of how the job works and how it is implemented can be found in our blog post.

Next Steps

Check out Concepts for more details on developing jobs with Hourglass.