datafu.pig.linkanalysis
Class PageRank

java.lang.Object
  extended by org.apache.pig.EvalFunc<T>
      extended by org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
          extended by datafu.pig.linkanalysis.PageRank
All Implemented Interfaces:
org.apache.pig.Accumulator<org.apache.pig.data.DataBag>

public class PageRank
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

A UDF which implements PageRank.

This is not a distributed implementation. Each graph is stored in memory while running the algorithm, with edges optionally spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on multiple reasonably sized graphs. It does not distribute execuion of PageRank for each individual graph. Each graph is identified by an integer valued topic ID.

If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs many MapReduce jobs in a sequence to complete the PageRank iterations.

Each graph is represented through a bag of (source,edges) tuples. The 'source' is an integer ID representing the source node. The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples. The 'dest' is an integer ID representing the destination node. The weight is a double representing how much the edge should be weighted. For a standard PageRank implementation just use weight of 1.0.

The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.

There are several configurable options for this UDF, among them:

Parameters are configured by passing them in as a sequence of pairs into the UDF constructor. For example, below the alpha value is set to 0.87 and dangling nodes are enabled. All arguments must be strings.

 define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
 
 

Full example:

 topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
 
 topic_edges_grouped = GROUP topic_edges by (topic, source) ;
 topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
    group.topic as topic,
    group.source as source,
    topic_edges.(dest,weight) as edges;
 
 topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic; 
 
 topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
    group as topic,
    FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);

 topic_ranks = FOREACH topic_ranks GENERATE
    topic, source, rank;
 
 
 


Field Summary
 
Fields inherited from class org.apache.pig.EvalFunc
log, pigLogger, reporter, returnType
 
Constructor Summary
PageRank()
           
PageRank(java.lang.String... parameters)
           
 
Method Summary
 void accumulate(org.apache.pig.data.Tuple t)
           
 void cleanup()
           
 org.apache.pig.data.DataBag getValue()
           
 org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
           
 
Methods inherited from class org.apache.pig.AccumulatorEvalFunc
exec
 
Methods inherited from class org.apache.pig.EvalFunc
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

PageRank

public PageRank()

PageRank

public PageRank(java.lang.String... parameters)
Method Detail

accumulate

public void accumulate(org.apache.pig.data.Tuple t)
                throws java.io.IOException
Specified by:
accumulate in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
accumulate in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
Throws:
java.io.IOException

getValue

public org.apache.pig.data.DataBag getValue()
Specified by:
getValue in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
getValue in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

cleanup

public void cleanup()
Specified by:
cleanup in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
cleanup in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

outputSchema

public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
Overrides:
outputSchema in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>


Matthew Hayes, Sam Shah