datafu.pig.sessions
Class Sessionize

java.lang.Object
  extended by org.apache.pig.EvalFunc<T>
      extended by org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
          extended by datafu.pig.sessions.Sessionize
All Implemented Interfaces:
org.apache.pig.Accumulator<org.apache.pig.data.DataBag>

@Nondeterministic
public class Sessionize
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

Sessionizes an input stream, appending a session ID to each tuple.

This UDF takes a constructor argument which is the session timeout (an idle period of this amount indicates that a new session has started) and assumes the first element of the input bag is an ISO8601 timestamp. The input bag must be sorted by this timestamp. It returns the input bag with a new field, session_id, that is a GUID indicating the session of the request.

Example:

 %declare TIME_WINDOW  30m
 
 define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');

 views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);

 -- sessionize the visit stream
 views = GROUP views BY member_id;
 sessions = FOREACH views {
   visits = ORDER views BY visit_date;
   GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id); 
 }

 -- count the number of sessions hitting the url
 rollup = GROUP sessions BY url;
 result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
 
 


Field Summary
 
Fields inherited from class org.apache.pig.EvalFunc
log, pigLogger, reporter, returnType
 
Constructor Summary
Sessionize(java.lang.String timeSpec)
           
 
Method Summary
 void accumulate(org.apache.pig.data.Tuple input)
           
 void cleanup()
           
 org.apache.pig.data.DataBag getValue()
           
 org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
           
 
Methods inherited from class org.apache.pig.AccumulatorEvalFunc
exec
 
Methods inherited from class org.apache.pig.EvalFunc
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

Sessionize

public Sessionize(java.lang.String timeSpec)
Method Detail

accumulate

public void accumulate(org.apache.pig.data.Tuple input)
                throws java.io.IOException
Specified by:
accumulate in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
accumulate in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
Throws:
java.io.IOException

getValue

public org.apache.pig.data.DataBag getValue()
Specified by:
getValue in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
getValue in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

cleanup

public void cleanup()
Specified by:
cleanup in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
Specified by:
cleanup in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>

outputSchema

public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
Overrides:
outputSchema in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>


Matthew Hayes, Sam Shah