Apache DataFu™

Getting Started

DataFu Spark Docs

DataFu Pig Docs

DataFu Hourglass Docs


Apache Software Foundation

Apache DataFu Pig - Guide


A 'session' is a useful concept when analyzing user activity on a website. We essentially define a session as sustained user activity. By assigning events to sessions we can perform analysis on user sessions and draw useful conclusions.

For example, suppose that we have a stream of page views by user. Each page view can be represented by a member ID, a timestamp, and a URL:

pv = LOAD 'pageviews.csv' USING PigStorage(',')
     AS (memberId:int, time:long, url:chararray);

One statistic that may be useful to know is how long users tend to stay active on the website. When they visit do they tend to stick around for a long time and view many pages? Or is it typically a very brief session? Apache DataFu provides UDFs that help in this sort of analysis.


The Sessionize UDF can be used to assign unique session IDs to events within a stream. Events are passed to the UDF in time-sorted order. If two consecutive events are separated by a sufficient amount of time, then they are assigned to a different session.

Let's walk through an example. Suppose we are interested in computing some statistics on session length. What we need to do is sessionize the data and then compute the time difference between the first and last event of each session. With the session lengths computed we can then pass the values to statistics methods.

First we need to choose a threshold for the Sessionize UDF. We'll consider "10 minutes" a sufficient amount of time:

DEFINE Sessionize datafu.pig.sessions.Sessionize('10m');

We'll also define functions to compute various statistics. In this example we'll compute the median, 90th and 95th percentiles, and variance of the session lengths.

DEFINE Median datafu.pig.stats.StreamingMedian();
DEFINE Quantile datafu.pig.stats.StreamingQuantile('0.9','0.95');
DEFINE VAR datafu.pig.VAR();

Next we'll sessionize the data. We group by member and sort the events by time. Sessionize appends the session ID to each tuple. Events for a member that are within 10 minutes of each other will be assigned to the same session.

pv = FOREACH pv GENERATE time, memberId;
pv_sessionized = FOREACH (GROUP pv BY memberId) {
  ordered = ORDER pv BY time;
  GENERATE FLATTEN(Sessionize(ordered))
           AS (time,memberId,sessionId);

Now that the data is sessionized, we can compute the session lengths:

session_times =
  FOREACH (GROUP pv_sessionized BY (sessionId,memberId)) {
    GENERATE group.sessionId as sessionId,
             group.memberId as memberId,
             (MAX(pv_sessionized.time) - MIN(pv_sessionized.time))
               / 1000.0 / 60.0 as session_length;

Finally let's compute our statistics:

session_stats = FOREACH (GROUP session_times ALL) {
    AVG(session_times.session_length) as avg_session,
    SQRT(VAR(session_times.session_length)) as std_dev_session,
    Median(session_times.session_length) as median_session,
    Quantile(session_times.session_length) as quantile_session;

DUMP session_stats;

With the session statistics computed, we can now perform some interesting queries. For example, let's get the list of users who had sessions in the upper 95th percentile. These are the users who were most engaged in our website.

long_sessions = FILTER session_times BY
  session_length > session_stats.quantiles_session.quantile_0_95;

very_engaged_users = DISTINCT (FOREACH long_sessions GENERATE memberId);

DUMP very_engaged_users;

Counting Sessions

SessionCount can be used to count sessions. It works very similarly to Sessionize.

One useful application of SessionCount is in counting page views. This is a useful statistic to track for any website. However, user's sometimes hit refresh. Or they may inadvertently take another action within a short period of time that causes another page view to be recorded. These additional page views are significant and we may want to filter them out when computing the count. SessionCount can help with this.

First we'll define the UDF and specify a 10 minute threshold:

define SessionCount datafu.pig.sessions.SessionCount('10m');

We then perform the same procedure as before, sorting the events by time and passing them into the UDF. This time we get a count as output instead of a bag of sessionized events.

pv_sessionized = FOREACH (GROUP pv BY (memberId,url)) {
  ordered = ORDER pv BY time;
  GENERATE group.memberId as memberId,
           group.url as url,
           FLATTEN(SessionCount(ordered.time)) as count;

We now have the page view counts grouped by member and URL. Now we can perform one more group to get the total page views across all members and URLs.

pv_sum = FOREACH (GROUP pv_sessionized ALL)
         GENERATE SUM(pv_sessionized.count) as total_pvs;
DUMP pv_sum;