datafu.pig.sampling
Class SimpleRandomSampleWithReplacementVote

java.lang.Object
  extended by org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
      extended by datafu.pig.sampling.SimpleRandomSampleWithReplacementVote

public class SimpleRandomSampleWithReplacementVote
extends org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>

Scalable simple random sampling with replacement (ScaSRSWR).

This UDF together with SimpleRandomSampleWithReplacementElect implement a scalable algorithm for simple random sampling with replacement (SRSWR), which is a randomized algorithm with a failure rate less than 1.0E-4.

Let s be the desired sample size. To compute an SRSWR sample of size s, for each output position in {0, 1, ..., s-1}, we want to select an item from the population uniformly at random. This algorithm consists of two stages: vote and election. In the vote stage, this UDF SimpleRandomSampleWithReplacementVote votes items, called candidates, for each position. In the election stage, the paired UDF SimpleRandomSampleWithReplacementElect elects one candidate for each position. The algorithm succeeds if we have at least one candidate for each position.

To use this UDF pair, user needs to provide: 1) the desired sample size, 2) a good lower bound of the population size or the exact size. The input to the vote UDF SimpleRandomSampleWithReplacementVote is a tuple that consists of a bag of items, the desired sample size (int), and the population size (long) or a good lower bound of it, where the latter two must be scalars. The output from the vote UDF is a tuple that consists of position:int, score:double, and candidate. The input to the elect UDF SimpleRandomSampleWithReplacementElect is a tuple that contains all candidates voted by the vote UDF for some positions. The output from the elect UDF is a bag of sampled items.

For example, the following script generates a sample of size 100000 with replacement:

 DEFINE SRSWR_VOTE  datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
 DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
 
 item       = LOAD 'input' AS (x:double); 
 summary    = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
 candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
 sampled    = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE FLATTEN(SRSWR_ELECT(candidates));
 
Because for election we only need to group candidates voted for the same position, this algorithm can use many reducers to consume the candidates. See the "PARALLEL 10" statement above. If the item to sample is the entire row, use TOBAG(TOTUPLE(*)).

SRSWR is heavily used in bootstrapping. Bootstrapping can be done easily with this UDF pair. For example, the following script generates 100 bootstrap samples, computes the mean value for each sample, and then outputs the bootstrap estimates.

 summary    = FOREACH (GROUP item ALL) GENERATE AVG(item.x) AS mean, COUNT(item) AS count;
 candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
 sampled    = FOREACH (GROUP candidates BY (position % 100) PARALLEL 10) GENERATE AVG(SRSWR_ELECT(candidates)) AS mean;
 bootstrap  = FOREACH (GROUP sampled ALL) GENERATE summary.mean AS mean, sampled.mean AS bootstrapMeans;
 
Another usage of this UDF pair is to generate random pairs or tuples without computing the cross product, where each pair or tuple consist of items from different input sources. Let s be the number of random tuples we want to generate. For each input source, simply use the vote UDF to propose candidates, then join the candidates from different sources by their positions and for each position use the elect UDF to select one candidate from each source to form the pair or tuple for that position.

The algorithm is a simple extension to the work

 X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
 
Basically, for each output position, it performs a random sort on the population (associates each item with a random score independently drawn from the uniform distribution and then sorts items based on the scores), and picks the one that has the smallest score. However, a probabilistic threshold is used to avoid sorting the entire population. For example, if the population size is one billion and the random score generated for an item is 0.9, very likely it won't become the smallest and hence we do not need to propose it as a candidate.

More precisely, let n be the population size, n1 be a good lower bound of n, s be the sample size, delta be the failure rate, and q be the threshold. For each output position the probability of all random scores being greater than q is (1-q)^n. Thus, if we throw away items with associated scores greater than q, with probability at least 1 - s*(1-q)^n, we can still capture the item with the smallest score for each position. Fix delta = s*(1-q)^n and solve for q, we get q = 1-exp(log(delta/s)/n), Note that replacing n by n1 < n can only decrease the failure rate, though at the cost of increased number of candidates. The expected number of candidates is (1 - exp(log(delta/s)/n1)*s*n. When n1 equals n, this number is approximately s*log(s/delta).

Generating a random score for each (item, position) pair is very expensive and unnecessary. For each item, the number of positions for which it gets voted follows a binomial distribution B(s,q). We can simply draw a number from this distribution, determine the positions by sampling without replacement, and then generate random scores for those positions. This reduces the running time significantly.

Since for each position we only need the candidate with the smallest score, we implement a combiner to reduce the size of intermediate data in the elect UDF SimpleRandomSampleWithReplacementElect.

Author:
ximeng
See Also:
SimpleRandomSampleWithReplacementElect, Boostrapping (Wikipedia)

Field Summary
static java.lang.String CANDIDATE_FIELD_NAME
           
static double FAILURE_RATE
           
static java.lang.String OUTPUT_BAG_NAME_PREFIX
           
static java.lang.String POSITION_FIELD_NAME
           
static java.lang.String SCORE_FIELD_NAME
           
 
Fields inherited from class org.apache.pig.EvalFunc
log, pigLogger, reporter, returnType
 
Constructor Summary
SimpleRandomSampleWithReplacementVote()
           
 
Method Summary
 org.apache.pig.data.DataBag exec(org.apache.pig.data.Tuple tuple)
           
 org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
           
 
Methods inherited from class org.apache.pig.EvalFunc
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

OUTPUT_BAG_NAME_PREFIX

public static final java.lang.String OUTPUT_BAG_NAME_PREFIX
See Also:
Constant Field Values

CANDIDATE_FIELD_NAME

public static final java.lang.String CANDIDATE_FIELD_NAME
See Also:
Constant Field Values

POSITION_FIELD_NAME

public static final java.lang.String POSITION_FIELD_NAME
See Also:
Constant Field Values

SCORE_FIELD_NAME

public static final java.lang.String SCORE_FIELD_NAME
See Also:
Constant Field Values

FAILURE_RATE

public static final double FAILURE_RATE
See Also:
Constant Field Values
Constructor Detail

SimpleRandomSampleWithReplacementVote

public SimpleRandomSampleWithReplacementVote()
Method Detail

exec

public org.apache.pig.data.DataBag exec(org.apache.pig.data.Tuple tuple)
                                 throws java.io.IOException
Specified by:
exec in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
Throws:
java.io.IOException

outputSchema

public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
Overrides:
outputSchema in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>


Matthew Hayes, Sam Shah