EXTRACT TOP N RECORDS IN EACH GROUP IN HADOOP/HIVE

Assume you have a table with three columns: user, category and value. For each user, you want to select top N categories. To achieve this in hive, you can use the following query:

SELECT * 
FROM 
(
   SELECT *, rank(user) as row_number
   FROM (
        SELECT user, category, value
	    FROM $compTable
	    WHERE user is NOT NULL AND AND ctr > 0
	    DISTRIBUTE BY user
	    SORT BY user, value desc
   ) A
) B
WHERE row_number < 5
ORDER BY user, row_number

In the above query, I am using a custom rank function. The overall approach is as follows:

  1. divide the data by user (distribute by user)
  2. Sort each group by user and value (sort by user, value desc)
  3. Within each group, assign rank order to each record. This is achieved by custom rank function. The rank function keeps track of last user key and simply increments the counter. As soon as it sees a new user, it reset counter to zero. Since the data is already sorted by user and is in descending order of value, we know for sure that all records related to a single user will be sent to the same node and they will be grouped together and also sorted by value.
  4. Pick top 5 categories (where rank < 5). Note since our index starts with 0, we only need to categories from 0 to 4.

Below is the custom rank function:

package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;

public final class Rank extends UDF{
    private int  counter;
    private String last_key;
    public int evaluate(final String key){
	  if ( !key.equalsIgnoreCase(this.last_key) ) {
	     this.counter = 0;
	     this.last_key = key;
	  }
	  return this.counter++;
    }
}

Below is the complete sequence of commands required to get the above query working

#Compile Rank class
> javac -classpath $HIVE_HOME/lib/hive-serde-1.7.jar:$HIVE_HOME/lib/hive-exec.jar:$HADOOP_HOME/hadoop-core.jar -d /tmp/jar/ Rank.java

#Create Rank jar
> jar -cf Rank.jar /tmp/jar/Rank.class

#start hive
> hive

#Inside hive: add jar file
hive> add jar Rank.jar

#assign name to the custom function
hive> create temporary function rank as 'com.example.hive.udf.Rank';

#now we are ready to issue our query
hive> SELECT user, category, value
FROM (
	SELECT user, category, rank(user) as rank, value
	FROM $compTable
	WHERE user is NOT NULL AND AND ctr &gt; 0
	DISTRIBUTE BY user
	SORT BY user, value desc
) a
WHERE rank &lt; 5
ORDER BY user, rank;

Useful Resources:
1. Compiling UDF