On Writing Python UDF for Pig: A perspective

When writing python UDF for Pig, one is faced with multiple options. In this post, I show three different approaches to writing python UDF for Pig. To keep things in perspective, lets take an example of student’s dataset containing following fields: name, GPA score and residential zipcode. Let’s assume our task is to rank students by there GPA score within each zipcode. Pig 0.11 comes with handy rank operator but doesn’t work inside the foreach operator (see this blog post for details). So let’s write an enumerate_bag operator as shown here but using different approaches. 

Approach 1: Using @outputSchema decorator
This is the most commonly used approach while writing python UDF. The idea is to simply bind a fixed schema to the returned value of the UDF so that Apache Pig know’s how to handle it. In this approach we use the “@outputSchema” decorator and define the output schema just above the python udf (as shown in the code below).

#myudf.py
@outputSchema("record: {(rank:int, name:chararray, gpa:double, zipcode:chararray)}")
def enumerate_bag(input):
    output = []
    for rank, item in enumerate(input):
        output.append(tuple([rank] + list(item)))
    return output
register 'myudf.py' using jython as myudf;
students = load 'students.dat' using PigStorage(',') as (name:chararray, gpa:double, zipcode:chararray);
students_by_zipcode = group students by zipcode;
result = foreach students_by_zipcode {
           sorted = order students by gpa desc;
           ranked = myudf.enumerate_bag(sorted);
           generate flatten(ranked);
        };
dump result;

The problem with the above approach however is that it makes the UDF specific to a particular dataset. In the example above, our enumerate_bag UDF is tied to the student’s dataset and can’t be used anywhere else. This is kind of restricting especially for UDF that are generic in nature and have usage beyond a particular situation. This takes to our second approach.

Approach 2: Binding schema within Pig
Using the @outputSchema decorator ties the UDF to a particular dataset. In order to overcome this issue, the second approach is to bind the schema to the returned value within the Pig script, where we have much more information about the dataset and its schema. Without the @outputSchema decorator, Pig interpret’s the UDF output to be of type bytearray. Within the Pig script, we cast this bytearray to some other form as shown below.

#myudf.py
def enumerate_bag(input):
    output = []
    for rank, item in enumerate(input):
        output.append(tuple([rank] + list(item)))
    return output
register 'myudf.py' using jython as myudf;
students = load 'students.dat' using PigStorage(',') as (name:chararray, gpa:double, zipcode:chararray);
students_by_zipcode = group students by zipcode;
result1 = foreach students_by_zipcode {
           sorted = order students by gpa desc;
           ranked = myudf.enumerate_bag(sorted);
           generate ranked as record: {(rank:int, name:chararray, gpa:double, zipcode:chararray)};
        };
-- note we need to this additional line to explode bag.
result2 = foreach result1 generate flatten(record);
dump result2;

Although the above approach makes our enumerate_bag UDF generic enough to be used outside of the given task, I don’t like two things about this approach. First, we need an extra statement to flatten out the results (notice result1 and result2). Second, this kind of approach can’t be used within a macro as we won’t know the schema before hand. This leads us to our third approach of using the @outputSchemaFunction decorator.

Approach 3: Using @outputSchmaFunction:
This is by far the most complex solution but also the one that makes our UDF truly generalized and offers the greatest flexibility. The idea is to use a python function that returns the schema for the value retuned by the UDF in runtime.

As shown in the code, we have a enumerate_bag UDF that returns rank position along with other fields in the dataset and enumerateBagSchema function that returns the output schema. The input to the enumerateBagSchema is a Schema object, which in turn contains one or more Field Schemas. We inform Pig about the enumerateBagSchema function by using the @outputSchemaFunction decorator (placed just above the UDF). In addition, we also have to use the @schemaFunction decorator (just above the enumerateBagSchema function).

#myudf.py
import org.apache.pig.data.DataType as DataType
import org.apache.pig.impl.logicalLayer.schema.SchemaUtil as SchemaUtil

@outuptSchemaFunction("enumerateBagSchema")
def enumerate_bag(input):
    output = []
    for rank, item in enumerate(input):
        output.append(tuple([rank] + list(item)))
    return output

# define the schema for enumerate_bag UDF
@schemaFunction("enumerateBagSchema")
def enumerateBagSchema(input):
    # create rank position field
    fields = ['rank_position']
    dt = [DataType.INTEGER]

    #The input schema is a bag of tuples. Get the tuple from bag
    for i in input.getField(0).schema.getFields():
        # iterate over all the fields within the tuple
        for j in i.schema.getFields():
            fields.append(j.alias)
            dt.append(j.type)

    # return new schema
    return SchemaUtil.newBagSchema(fields, dt);
register 'myudf.py' using jython as myudf;
students = load 'students.dat' using PigStorage(',') as (name:chararray, gpa:double, zipcode:chararray);
students_by_zipcode = group students by zipcode;
result = foreach students_by_zipcode {
           sorted = order students by gpa desc;
           ranked = myudf.enumerate_bag(sorted);
           generate flatten(ranked);
        };
dump result;

Conclusion
We have several different choices for writing python UDF. If you don’t plan to use the UDF within a Pig Macro, then the second approach is worth considering as it makes the UDF generic but also much faster to code. However the challenge is that if one changes the UDF then one has to make sure to change the schema definition in all the Pig scripts that’s using the UDF. For instance, if we change the position of the newly added rank column from first to last position in the above UDF, then we have to remember to do the same in all the Pig script that is using the UDF. Using the third approach makes this redundant as the information related to the UDF is in one place. Thus,

  1. Use the first approach if your UDF is very specific to a particular dataset.
  2. Use the second approach if you plan to use the UDF in just few Pig scripts and it will be only you who will end up using the UDF.
  3. Use the third approach if you want the UDF to be generic that would be used across many projects and by many people.

16 thoughts on “On Writing Python UDF for Pig: A perspective

  1. I have tried third approach, but I am facing below error in pig

    ” @outuptSchemaFunction(“outBagSchema”)
    NameError: name ‘outuptSchemaFunction’ is not defined”

  2. HI Ritesh,

    When I run script :
    register ‘myudf.py’ using jython as myudf;

    I am getting error like myudf.py file does not exists.

    if I run script below, I am not getting any error.
    register myudf.py using jython as myudf;

    but when i use this udf in future commands i am getting following error:

    Could not resolve myudf.enumerate_bag using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]

    Could you please help on this.

    Thanks,
    Nitin

    1. Hi Nitin
      Use below Syntax while calling UDF functions in your pig script:

      ranked = myfuncs.enumerate_bag();

      Regards,
      Ashu

  3. Hi Nitin,

    My python script returns: sys.version.split(‘ ‘)[0]

    If I run the script directly as: python myscript.py
    I get Output: 2.6.6

    If I use this script as UDF in pig script, as : REGISTER ‘myscript.py’ USING jython as pyudf
    I get Output : 2.5.3 .

    Could you please help on this?

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.