datafu.pig.linkanalysis
Class PageRank
java.lang.Object
org.apache.pig.EvalFunc<T>
org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
datafu.pig.linkanalysis.PageRank
- All Implemented Interfaces:
- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
public class PageRank
- extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
A UDF which implements PageRank.
This is not a distributed implementation. Each graph is stored in memory while running the algorithm, with edges optionally
spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on multiple
reasonably sized graphs. It does not distribute execuion of PageRank for each individual graph. Each graph is identified
by an integer valued topic ID.
If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs
many MapReduce jobs in a sequence to complete the PageRank iterations.
Each graph is represented through a bag of (source,edges) tuples. The 'source' is an integer ID representing the source node.
The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples. The 'dest' is an
integer ID representing the destination node. The weight is a double representing how much the edge should be weighted.
For a standard PageRank implementation just use weight of 1.0.
The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
There are several configurable options for this UDF, among them:
-
alpha: Controls the PageRank alpha value. The default is 0.85. A higher value reduces the "random jump"
factor and causes the rank to be influenced more by edges.
-
max_iters: The maximum number of iterations to run. The default is 150.
-
dangling_nodes: How to handling "dangling nodes", i.e. nodes with no outgoing edges. When "true" this is equivalent
to forcing a dangling node to have an outgoing edge to every other node in the graph. The default is "false".
-
tolerance: A threshold which causes iterations to cease. It is measured from the total change in ranks from each of
the nodes in the graph. As the ranks settle on their final values the total change decreases. This can be used
to stop iterations early. The default is 1e-16.
-
max_nodes_and_edges: This is a control to prevent running out of memory. As a graph is loaded, if the sum of edges
and nodes exceeds this value then it will stop. It will not fail but PageRank will not be run on this graph. Instead a null
value will be returned as a result. The default is 100M.
-
spill_to_edge_disk_storage: Used to conserve memory. When "true" it causes the edge data to be written to disk in a temp file instead
of being held in memory when the number of edges exceeds a threshold. The nodes are still held in memory however.
Each iteration of PageRank will stream through the edges stored on disk. The default is "false".
-
max_edges_in_memory: When spilling edges to disk is enabled, this is the threshold which triggers that behavior. The default is 30M.
Parameters are configured by passing them in as a sequence of pairs into the UDF constructor. For example, below the alpha value is set to
0.87 and dangling nodes are enabled. All arguments must be strings.
define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
Full example:
topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
topic_edges_grouped = GROUP topic_edges by (topic, source) ;
topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
group.topic as topic,
group.source as source,
topic_edges.(dest,weight) as edges;
topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic;
topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
group as topic,
FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
topic_ranks = FOREACH topic_ranks GENERATE
topic, source, rank;
Fields inherited from class org.apache.pig.EvalFunc |
log, pigLogger, reporter, returnType |
Method Summary |
void |
accumulate(org.apache.pig.data.Tuple t)
|
void |
cleanup()
|
org.apache.pig.data.DataBag |
getValue()
|
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
|
Methods inherited from class org.apache.pig.AccumulatorEvalFunc |
exec |
Methods inherited from class org.apache.pig.EvalFunc |
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
PageRank
public PageRank()
PageRank
public PageRank(java.lang.String... parameters)
accumulate
public void accumulate(org.apache.pig.data.Tuple t)
throws java.io.IOException
- Specified by:
accumulate
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
accumulate
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
- Throws:
java.io.IOException
getValue
public org.apache.pig.data.DataBag getValue()
- Specified by:
getValue
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
getValue
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
cleanup
public void cleanup()
- Specified by:
cleanup
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
cleanup
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
outputSchema
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
- Overrides:
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
Matthew Hayes, Sam Shah