Using ternary operators is fairly common in Pig. For example, often you want to replace null values with zero:
data = FOREACH data GENERATE (val IS NOT NULL ? val : 0) as result;
Or, sometimes you want to return the first non-null value among several fields:
data = FOREACH data GENERATE (val1 IS NOT NULL ? val1 :
(val2 IS NOT NULL ? val2 :
(val3 IS NOT NULL ? val3 :
NULL))) as result;
The above code is very hard to follow, and it is very cumersome to write. To solve this problem,
Apache DataFu provides the useful
Coalesce. This is
very similar to the COALESCE available
in some SQL implementations. It simply takes the first non-null value from the arguments passed in.
With Coalesce
we can clean up the code above.
To replace any null value with 0:
DEFINE Coalesce datafu.pig.util.Coalesce();
data = FOREACH data GENERATE Coalesce(val,0) as result;
To return the first non-null value:
data = FOREACH data GENERATE Coalesce(val1,val2,val3) as result;
Suppose we have three data sets:
input1 = LOAD 'input1' using PigStorage(',') AS (key:INT,val:INT);
input2 = LOAD 'input2' using PigStorage(',') AS (key:INT,val:INT);
input3 = LOAD 'input3' using PigStorage(',') AS (key:INT,val:INT);
Let's say we want to left join input1
with input2
and input3
. You can do this in
SQL. Unfortunately Pig does not support outer joins on more than two relations.
-- DOES NOT WORK
joined = JOIN input1 BY key LEFT,
input2 BY key, input3 BY key;
Instead, you have to join twice, which means two MapReduce jobs:
data1 = JOIN input1 BY key LEFT, input2 BY key;
data2 = JOIN data1 BY input1::key LEFT, input3 BY key;
This is unfortunate, as left joins are very common, and for some applications it is common to need to left join multiple relations. Take a recommendation system for example: you start with a set of candidates to score and you join in multiple sets of features. Each set of features requires another join.
You can, however, perform a left join effectively by using COGROUP
with multiple relations
and applying clever use of FLATTEN
. Then only a single MapReduce job is required.
But this gets pretty messy:
data1 = COGROUP input1 BY key, input2 BY key, input3 BY key;
data2 = FOREACH data1 GENERATE
FLATTEN(input1), -- left join on this
FLATTEN((IsEmpty(input2) ? TOBAG(TOTUPLE((int)null,(int),null)) : input2))
AS (input2::key,input2::val),
FLATTEN((IsEmpty(input3) ? TOBAG(TOTUPLE((int)null,(int),null)) : input3))
AS (input3::key,input3::val);
As messy as this looks, it does work. It relies on the fact that flattening an empty bag produces
no output. Therefore any records not appearing in input
will be removed. Since we don't want
the lack of records in input2
or input3
to cause records to be removed, we replace the empty
bag with a bag having just a single tuple with null values. Using these tricks we are able to
simulate a left join on multiple relations.
To clean up this code, DataFu provides the EmptyBagToNullFields UDF. This performs the same logic above and makes the code much easier to write and understand:
DEFINE EmptyBagToNullFields datafu.pig.bags.EmptyBagToNullFields();
data = FOREACH (COGROUP input1 BY key, input2 BY key, input3 BY key) GENERATE
FLATTEN(input1), -- left join on this
FLATTEN(EmptyBagToNullFields(input2)),
FLATTEN(EmptyBagToNullFields(input3));
While you're at it, why not create a macro:
DEFINE left_outer_join(relation1, key1, relation2, key2, relation3, key3) returns joined {
cogrouped = COGROUP $relation1 BY $key1, $relation2 BY $key2, $relation3 BY $key3;
$joined = FOREACH cogrouped GENERATE
FLATTEN($relation1),
FLATTEN(EmptyBagToNullFields($relation2)),
FLATTEN(EmptyBagToNullFields($relation3));
}
Now you can simply apply your left join macro:
features = left_outer_join(input1, val1, input2, val2, input3, val3);