Class PageRank

  extended by org.apache.pig.EvalFunc<T>
      extended by org.apache.pig.AccumulatorEvalFunc<>
          extended by datafu.pig.linkanalysis.PageRank
All Implemented Interfaces:

public class PageRank
extends org.apache.pig.AccumulatorEvalFunc<>

A UDF which implements PageRank.

This is not a distributed implementation. Each graph is stored in memory while running the algorithm, with edges optionally spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on multiple reasonably sized graphs. It does not distribute execuion of PageRank for each individual graph. Each graph is identified by an integer valued topic ID.

If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs many MapReduce jobs in a sequence to complete the PageRank iterations.

Each graph is represented through a bag of (source,edges) tuples. The 'source' is an integer ID representing the source node. The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples. The 'dest' is an integer ID representing the destination node. The weight is a double representing how much the edge should be weighted. For a standard PageRank implementation just use weight of 1.0.

The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.

There are several configurable options for this UDF, among them:

Parameters are configured by passing them in as a sequence of pairs into the UDF constructor. For example, below the alpha value is set to 0.87 and dangling nodes are enabled. All arguments must be strings.

 define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');

Full example:

 topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
 topic_edges_grouped = GROUP topic_edges by (topic, source) ;
 topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
    group.topic as topic,
    group.source as source,
    topic_edges.(dest,weight) as edges;
 topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic; 
 topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
    group as topic,
    FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);

 topic_ranks = FOREACH topic_ranks GENERATE
    topic, source, rank;

Field Summary
Fields inherited from class org.apache.pig.EvalFunc
log, pigLogger, reporter, returnType
Constructor Summary
PageRank(java.lang.String... parameters)
Method Summary
 void accumulate( t)
 void cleanup()
 org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
Methods inherited from class org.apache.pig.AccumulatorEvalFunc
Methods inherited from class org.apache.pig.EvalFunc
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Constructor Detail


public PageRank()


public PageRank(java.lang.String... parameters)
Method Detail


public void accumulate( t)
Specified by:
accumulate in interface org.apache.pig.Accumulator<>
Specified by:
accumulate in class org.apache.pig.AccumulatorEvalFunc<>


public getValue()
Specified by:
getValue in interface org.apache.pig.Accumulator<>
Specified by:
getValue in class org.apache.pig.AccumulatorEvalFunc<>


public void cleanup()
Specified by:
cleanup in interface org.apache.pig.Accumulator<>
Specified by:
cleanup in class org.apache.pig.AccumulatorEvalFunc<>


public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema in class org.apache.pig.EvalFunc<>

Matthew Hayes, Sam Shah