Apache DataFu™

Getting Started

DataFu Spark Docs

DataFu Pig Docs

DataFu Hourglass Docs


Apache Software Foundation

Apache DataFu Pig - Guide


Finding the most recent update of a given record — the dedup (de-duplication) macro

A common scenario in data sent to the HDFS — the Hadoop Distributed File System — is multiple rows representing updates for the same logical data. For example, in a table representing accounts, a record might be written every time customer data is updated, with each update receiving a newer timestamp. Let’s consider the following simplified example.

Raw customers’ data, with more than one row per customer

We can see that though most of the customers only appear once, julia and quentin have 2 and 3 rows, respectively. How can we get just the most recent record for each customer? For this we can use the dedup macro, as below:

REGISTER datafu-pig-1.6.1.jar;

IMPORT 'datafu/dedup.pig';

data = LOAD 'customers.csv' AS (id: int, name: chararray, purchases: int, date_updated: chararray);

dedup_data = dedup(data, 'id', 'date_updated');

STORE dedup_data INTO 'dedup_out';

Our result will be as expected — each customer only appears once, as you can see below:

“Deduplicated” data, with only the most recent record for each customer

One nice thing about this macro is that you can use more than one field to dedup the data. For example, if we wanted to use both the id and name fields, we would change this line:

dedup_data = dedup(data, 'id', 'date_updated');

to this:

dedup_data = dedup(data, '(id, name)', 'date_updated');

Comparing expected and actual results for regression tests — the diff_macro

After making changes in an application’s logic, we are often interested in the effect they have on our output. One common use case is when we refactor — we don’t expect our output to change. Another is a surgical change which should only affect a very small subset of records. For easily performing such regression tests on actual data, we use the diff_macro, which is based on DataFu’s TupleDiff UDF.

Let’s look at a table which is exactly like dedup_out, but with four changes.

  1. We will remove record 1, quentin
  2. We will change date_updated for record 2, julia
  3. We will change purchases and date_updated for record 4, alice
  4. We will add a new row, record 8, amanda

We’ll run the following Pig script, using DataFu’s diff_macro:

REGISTER datafu-pig-1.6.1.jar;

IMPORT 'datafu/diff_macros.pig';

data = LOAD 'dedup_out.csv' USING PigStorage(',') AS (id: int, name: chararray, purchases: int, date_updated: chararray);

changed = LOAD 'dedup_out_changed.csv' USING PigStorage(',') AS (id: int, name: chararray, purchases: int, date_updated: chararray);

diffs = diff_macro(data,changed,id,'');

DUMP diffs;

The results look like this:

Let’s take a moment to look at these results. They have the same general structure. Rows that start with missing indicate records that were in the first relation, but aren’t in the new one. Conversely, rows that start with added indicate records that are in the new relation, but not in the old one. Each of these rows is followed by the relevant tuple from the relations.

The rows that start with changed are more interesting. The word changed is followed by a list of the fields which have changed values in the new table. For the row with id 2, this is the date_updated field. For the row with id 4, this is the purchases and date_updated fields.

Obviously, one thing we might want to ignore is the date_updated field. If the only difference in the fields is when it was last updated, we might just want to skip these records for a more concise diff. For this, we need to change the following row in our original Pig script, from this:

diffs = diff_macro(data,changed,id,'');

to become this:

diffs = diff_macro(data,changed,id,'date_updated');

If we run our changed Pig script, we’ll get the following result.

The row for julia is missing from our diff, because only date_updated has changed, but the row for alice still appears, because the purchases field has also changed.

There’s one implementation detail that’s important to know — the macro uses a replicated join in order to be able to run quickly on very large tables, so the sample table needs to be able to fit in memory.