Pig has a built-in SAMPLE
operator that performs Bernoulli sampling
on a relation. Apache DataFu Pig provides additional sampling techniques for when Bernoulli sampling is not applicable.
Simple Random Sampling produces samples of a specific size,
where each item has the same probability of being chosen. DataFu has scalable implementations of this that will
generate samples of exactly the right size with very high probability (at least 99.99%).
Pig's SAMPLE
, on the other hand, produces a sample of size roughly p*n
,
where p
is the sampling probability and n
is the sample size. With SAMPLE
there are no guarantees on the size of the
generated sample.
SimpleRandomSample
implements scalable simple random sampling.
It can be used to generate a sample of size of exactly ceil(p*n)
without replacement from a population of size n
,
where p
is the sampling
probability. The output will be exactly this size with probability at least 99.99%. Sampling "without replacement"
means that no item will appear more than once.
To use it simply pass in the sampling probability into the UDF's constructor and then pass in a bag to be sampled. For example, the following will produce a 1% sample:
DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
input = LOAD 'input' AS (x:double);
sampled = FOREACH (GROUP input ALL) GENERATE FLATTEN(SRS(input));
This UDF can also be used to perform stratified sampling. For example, the following takes a 1% stratified sample using a label and a proportional allocation strategy:
DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
examples = LOAD 'input' AS (x:double,label:chararray);
grouped = GROUP examples BY label;
sampled = FOREACH grouped GENERATE FLATTEN(SRS(examples));
SimpleRandomSampleWithReplacementVote and SimpleRandomSampleWithReplacementElect together implements a scalable algorithm for simple random sampling with replacement. These can be used to generate a sample of a specific size, with probability at least 99.99%.
To use these UDFs, the user needs to provide the desired sample size and a good lower bound on the population size (or the exact size). For example, to generate a sample of 100,000 without replacement:
DEFINE SRSWR_VOTE
datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
DEFINE SRSWR_ELECT
datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
item = LOAD 'input' AS (x:double);
summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
candidates = FOREACH item GENERATE
FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE
FLATTEN(SRSWR_ELECT(candidates));
Here we pass in the exact size for the lower bound. Because of the way the algorithm works, we can use many reducers
to generate the final set of sampled data. This is why we use PARALLEL 10
. The parallel factor can be increased
if necessary to distribute the work more.
Sampling with replacement is used heavily in bootstrapping. For example, the following script generates 100 bootstrap samples, computes the mean value for each sample, and then outputs the bootstrap estimates.
summary = FOREACH (GROUP item ALL) GENERATE
AVG(item.x) AS mean, COUNT(item) AS count;
candidates = FOREACH item GENERATE
FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
sampled = FOREACH (GROUP candidates BY (position % 100)
PARALLEL 10) GENERATE
AVG(SRSWR_ELECT(candidates)) AS mean;
bootstrap = FOREACH (GROUP sampled ALL) GENERATE
summary.mean AS mean, sampled.mean AS bootstrapMeans;
A weighted sample is similar to a simple random sample without replacement in that it generates a sample with a specific size. The difference is that the probability of selecting each item can be different. WeightedSample provides an implementation of this.
WeightedSample
operates on a bag of items, where each item has a weight attached to it. It iteratively selects tems from the
bag until it reaches the desired output bag size. Since this samples without replacement, once an item is selected it cannot
appear again. The probability of selecting an item is given by that item's weight divided by the sum of all remaining
items' weights.
For example, suppose
that we have a bag of four items: a
, b
, c
, d
. For this bag, a
has a weight of 100 and the remaining have a weight
of 1.
input = LOAD 'input' AS (A: bag{T: tuple(name:chararray,score:int)});
/* Contains a single bag:
{(a,100),(b,1),(c,1),(d,1)}
*/
We expect a weighted sample of this bag to contain a
with very high probability.
Let's generate a sample of size 3 from this bag. To do this we pass in the bag, with 1 to indicate the weight
is at index 1, and the sample size of 3.
define WeightedSample datafu.pig.sampling.WeightedSample()
result = FOREACH input GENERATE WeightedSample(A,1,3);
This is likely to generate output like this, where a
tends to be present due to its high weight.
DUMP result;
/*
({(a,100),(c,5),(b,1)})
*/
Alternatively, if we don't pass in sample size, WeightedSample
will include all items, with the order being influenced
by the item weights.
result = FOREACH input GENERATE WeightedSample(A,1);
DUMP result;
/*
({(a,100),(c,5),(b,1),(d,1)})
*/
One simple technique for generating weights that can be used with WeightedSample
is to use DataFu's
Enumerate UDF, which can be used
to append each item's tuple with its index within the bag.
Again, suppose we have a bag with values a
, b
, c
, d
, but this time without weights.
input = LOAD 'input' AS (A: bag{T: tuple(name:chararray)});
/* Contains a single bag:
{(a),(b),(c),(d)}
*/
Using Enumerate
, we can append the index for each item and then compute a score from it.
define Enumerate datafu.pig.bags.Enumerate();
data = FOREACH data GENERATE Enumerate(A) as A;
data = FOREACH data {
A = FOREACH A GENERATE v1, 1/(double)(i+1) as score;
GENERATE A;
}
/* Produces:
({(a,1.0),(b,0.5),(c,0.3333333333333333),(d,0.25)})
*/
This bag can then be passed into WeightedSample
. This produces a simple random sample where the items in the
beginning of the bag are more likely to be selected.
A common use case for sampling is selecting a set of training examples for building a prediction model. For example, suppose that we have a recommendation system where we have tracked when items have been impressed to users and when they have clicked on them:
impressions =
LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long);
clicks =
LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long);
Using this data we would like to build a model that can predict user behavior so that we can show items
to users that they are more likely to click on. Since the data may be very large, we need to take a
sample that is easier to work with. We basically want to join on (user_id,item_id)
, sample the result
and product training data with the following format:
{(user_id:int, item_id:int, is_impressed:int, is_clicked:int}
The problem with this approach though is that the join can be very expensive if the data size is large.
Sampling reduces the data size, but it has to be applied after the join because the same (user_id,item_id)
pairs won't be selected from impressions
and clicks
.
SampleByKey solves this problem by allowing us to sample consistently across multiple relations. By this we mean that if a "key" appears in the sample output for one relation, then the same key will appear in every other relation as well. This guarantees that we can apply the join after sampling.
This is essentially Bernoulli sampling. But the "random number" in this case is derived by applying a cryptographic hash to the key, rather than by invoking a pseudo-random number generator.
Let's see how we can apply this to our example above. We want to take a 10% sample of the joined clicks and impressions. We start by defining the UDF:
DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.1');
Since we are going to be joining on (user_id,item_id)
, we need sample using this pair:
impressions = FILTER impressions BY SampleByKey(user_id,item_id);
clicks = FILTER clicks BY SampleByKey(user_id,item_id);
We can now join the impressions and clicks, with the knowledge that the same (user_id,item_id)
pairs will appear in both samples.
joined_sample = FOREACH (COGROUP impressions BY (user_id,item_id),
clicks BY (user_id,item_id)) GENERATE
group.user_id as user_id,
group.item_id as item_id,
((SIZE(impressions) > 0 ? 1 : 0)) as is_impressed,
((SIZE(clicks) > 0 ? 1 : 0)) as is_clicked;
Since we have sampled before joining the data, this should be much more efficient.
All the previous methods are based on random selection. If you wish to create a sample of a given table based on a (manually created) table of ids, you can use the sample_by_keys macro.
For example, lets assume we have a list of customers are stored on the HDFS as customers.csv, and our list for the sample are in sample.csv, which only contains customers 2, 4 and 6 from the original customers.csv.
We can use the following Pig script:
REGISTER datafu-pig-1.6.1.jar;
IMPORT 'datafu/sample_by_keys.pig';
data = LOAD 'customers.csv' USING PigStorage(',') AS (id: int, name: chararray, purchases: int, updated: chararray);
customers = LOAD 'sample.csv' AS (cust_id: int);
sampled = sample_by_keys(data, customers, id, cust_id);
STORE sampled INTO 'sample_out';
The result will be all the records from our original table for customers 2, 4 and 6.