Rank Operator in Pig 0.11

In one of my earlier posts, I talked about implementing the RANK operator for extracting top N records. Since then lot of things have changed and I thought it will be worth while to catchup. Hive 0.7 or greater now has a “row_sequence” operator that assigns rank position to all the input records in the order they arrive.

Similarly, in the upcoming release of Apache Pig (0.11), there is a new handy “RANK” operator. The basic syntax is

ranked = RANK input [BY [COL [ASC|DESC]]] [DENSE];

However, below are some things to keep in mind about the “RANK” operator:
1. Ties are assigned same rank position: Below is an example of how ties are handled. Table 1 lists a sample student’s dataset. In Table 2, the dataset is sorted by GPA score. Note that student B and C have same score and therefore both are assigned the same rank position of 2. Also note that the student D is assigned rank position 4 instead of 3.

Table 1: Student Dataset
StudentID GPA
A 4.0
B 3.5
C 3.5
D 3.0
Table 2:
ranked = rank student by GPA desc
rank_student StudentID GPA
1 A 4.0
2 B 3.5
2 C 3.5
4 D 3.0

2. Use the keyword “DENSE” to avoid gaps in rank positions: As show in Table 2, by default the “RANK” operator inserts gaps in between rank positions if ties are observed. However, this behavior can be modified by using the “DENSE” keyword. Below is a sample output if using the DENSE keyword.

ranked = rank student by GPA desc DENSE
rank_student StudentID GPA
1 A 4.0
2 B 3.5
2 C 3.5
3 D 3.0

3. It didn’t work inside the FOREACH operator: Let’s assume we also have residential address for each of the above student and we want to  want to rank students within different zipcode of their home address. A natural (as shown below) instinct would be to group the dataset by zipcode and then apply rank operator within the “foreach” operator. 
students_by_zipcode = group students by zipcode;
ranked = foreach students_by_zipcode{
ranked = rank students by GPA desc;
generate flatten(ranked);

However, the above code threw an error indicating that the rank operator can’t be nested within the foreach operator.  Since the official docs for Pig 0.11 is not yet available, i am not sure whether this is a bug or an intended limitation of RANK operator and will keep an eye on it. But for now, I hope this is helpful.

Update: If you need to use rank operator within the foreach loop, then checkout my next blog on how to write python UDF in Pig. There I show different ways to implement rank operator UDF.

About Ritesh Agrawal

I am a applied researcher who enjoys anything related to statistics, large data analysis, data mining, machine learning and data visualization.
This entry was posted in Data Mining and tagged , . Bookmark the permalink.

8 Responses to Rank Operator in Pig 0.11

  1. Pingback: On Writing Python UDF for Pig: A perspective | Memento

  2. Karthik says:

    Hey Ritesh,

    I love your blog. This is a very informative post. When i try this Rank operator even on a not so large dataset say 300m rows,

    c= load ‘foo’ (col1:chararray, count:int);
    c1 = rank c by count desc;
    store c1 into ‘foo_ranked’;

    i get a java heap space error.
    ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

    org.apache.pig.backend.executionengine.ExecException: ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getErrorMessages(Launcher.java:217)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getStats(Launcher.java:149)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:400)
    at org.apache.pig.PigServer.launchPlan(PigServer.java:1264)
    at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1249)
    at org.apache.pig.PigServer.execute(PigServer.java:1239)
    at org.apache.pig.PigServer.executeBatch(PigServer.java:333)
    at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:137)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:198)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:170)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84)
    at org.apache.pig.Main.run(Main.java:604)
    at org.apache.pig.Main.main(Main.java:157)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

    However, if I order the dataset outside of rank operator like this

    c= load ‘foo’ (col1:chararray, count:int);
    c1 = ORDER c by count desc;
    c2 = rank c1;
    c3 = ORDER c2 by rank_c1 asc;
    store c3 into ‘foo_ranked’;

    This is working, but what I get it is a essentially a ordered dataset, with rank which is nothing but a rownumber. So two rows with the same value for count will get different ranks.

    Sorry for using this as stack overflow🙂 Hope you are able to help me.

  3. Karthik says:

    Hi Ritesh, Thanks for prompt reply. I am using Apache Pig version 0.11.1 (r1459641)

    Is there any other mechanism/UDF available to rank a sorted dataset based on a column?

    • Hi Karthik,

      I don’t have any handy rank UDF available but you can easily write a streaming function that simulate rank operator. Below is an attempt that I haven’t tested:

      === Pig Code ==
      define RankMapper `RankMapper.py` ship('RankMapper.py'
      data = load 'data'
      sorted = order data by some_col;
      ranked_data = stream sorted through RankMapper as (rank_position:int, rest of the original cols);

      ==== Python RankMapper.py =====

      #!/usr/bin/env python
      import os
      import sys
      last_score = None
      next_rank = 1 #start rank
      rank_position = 0
      delimiter = "\t" 
      score_idx = 3 # change this to correspond to the column that contains score
      for line in sys.stdin:
          if len(line.strip()) == 0: continue
          tokens = line.strip().split(delimiter)
          #extract current score
          cur_score = float(tokens[score_idx])
          #assumes scores are already sorted
          if cur_score != last_score:
              rank_position = next_rank
              last_score = cur_score
          next_rank = next_rank + 1
          print delimiter.join([str(rank_position)] + tokens)
  4. Abhishek says:

    Hi Karthik,

    How to dedupe records and pick the first one for the same rank?


  5. dave says:

    If ties are a problem for you, just pass multiple values to the RANK operator. I ran into this problem while assigning ntiles.

    ranked = rank student by GPA desc, StudentID;

  6. Jim Murphy says:

    Hey Ritesh,

    Great post. I am curious if you have seen any more on using rank inside a foreach.



Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s