datafu.pig.linkanalysis
Class PageRank
java.lang.Object
   org.apache.pig.EvalFunc<T>
org.apache.pig.EvalFunc<T>
       org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
           datafu.pig.linkanalysis.PageRank
datafu.pig.linkanalysis.PageRank
- All Implemented Interfaces: 
- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- public class PageRank 
- extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
A UDF which implements PageRank.
 
 
  
 This is not a distributed implementation.  Each graph is stored in memory while running the algorithm, with edges optionally 
 spilled to disk to conserve memory.  This can be used to distribute the execution of PageRank on multiple
 reasonably sized graphs.  It does not distribute execuion of PageRank for each individual graph.  Each graph is identified
 by an integer valued topic ID.
 
 
 
 If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs
 many MapReduce jobs in a sequence to complete the PageRank iterations.
 
 
 
 Each graph is represented through a bag of (source,edges) tuples.  The 'source' is an integer ID representing the source node.
 The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples.  The 'dest' is an
 integer ID representing the destination node.  The weight is a double representing how much the edge should be weighted.
 For a standard PageRank implementation just use weight of 1.0.
 
 
 
 The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
 
 
 
 There are several configurable options for this UDF, among them:
 
 
 
 - 
 alpha: Controls the PageRank alpha value.  The default is 0.85.  A higher value reduces the "random jump"
 factor and causes the rank to be influenced more by edges. 
 
- 
 max_iters: The maximum number of iterations to run.  The default is 150.
 
- 
 dangling_nodes: How to handling "dangling nodes", i.e. nodes with no outgoing edges.  When "true" this is equivalent
 to forcing a dangling node to have an outgoing edge to every other node in the graph.  The default is "false".
 
- 
 tolerance: A threshold which causes iterations to cease.  It is measured from the total change in ranks from each of
 the nodes in the graph.  As the ranks settle on their final values the total change decreases.  This can be used
 to stop iterations early.  The default is 1e-16. 
 
- 
 max_nodes_and_edges: This is a control to prevent running out of memory.  As a graph is loaded, if the sum of edges
 and nodes exceeds this value then it will stop.  It will not fail but PageRank will not be run on this graph.  Instead a null
 value will be returned as a result.  The default is 100M.
 
- 
 spill_to_edge_disk_storage: Used to conserve memory.  When "true" it causes the edge data to be written to disk in a temp file instead
 of being held in memory when the number of edges exceeds a threshold.  The nodes are still held in memory however.  
 Each iteration of PageRank will stream through the edges stored on disk.  The default is "false".
 
- 
 max_edges_in_memory: When spilling edges to disk is enabled, this is the threshold which triggers that behavior.  The default is 30M.
 
 Parameters are configured by passing them in as a sequence of pairs into the UDF constructor.  For example, below the alpha value is set to
 0.87 and dangling nodes are enabled.  All arguments must be strings.
 
 
 
 
 define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
 
 
 
 
 
 Full example:
 
 topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
 
 topic_edges_grouped = GROUP topic_edges by (topic, source) ;
 topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
    group.topic as topic,
    group.source as source,
    topic_edges.(dest,weight) as edges;
 
 topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic; 
 
 topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
    group as topic,
    FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
 topic_ranks = FOREACH topic_ranks GENERATE
    topic, source, rank;
 
 
 
 
 
| Fields inherited from class org.apache.pig.EvalFunc | 
| log, pigLogger, reporter, returnType | 
 
 
| Method Summary | 
|  void | accumulate(org.apache.pig.data.Tuple t)
 | 
|  void | cleanup()
 | 
|  org.apache.pig.data.DataBag | getValue()
 | 
|  org.apache.pig.impl.logicalLayer.schema.Schema | outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
 | 
 
| Methods inherited from class org.apache.pig.AccumulatorEvalFunc | 
| exec | 
 
| Methods inherited from class org.apache.pig.EvalFunc | 
| finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn | 
 
| Methods inherited from class java.lang.Object | 
| clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait | 
 
PageRank
public PageRank()
PageRank
public PageRank(java.lang.String... parameters)
accumulate
public void accumulate(org.apache.pig.data.Tuple t)
                throws java.io.IOException
- 
- Specified by:
- accumulatein interface- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
- accumulatein class- org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
 
- 
- Throws:
- java.io.IOException
 
getValue
public org.apache.pig.data.DataBag getValue()
- 
- Specified by:
- getValuein interface- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
- getValuein class- org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
 
- 
 
cleanup
public void cleanup()
- 
- Specified by:
- cleanupin interface- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
- cleanupin class- org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
 
- 
 
outputSchema
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
- 
- Overrides:
- outputSchemain class- org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
 
- 
 
Matthew Hayes, Sam Shah