Reusable Spark Custom UDF

Previously I posted about how to write a custom Spark UDF in python and scala. Since then I have received lots of questions/comments around two things:

  1. How to pass additional parameters to the UDF
  2. How to reuse UDF outside of spark context.

Both the above requests can be easily satisfied using functional programming ideas. Below is an example of an udf that converts scores (between 0 and 100) to some ordinal categories. It takes a parameter, an array of tuple defining boundary conditions for different categories.

Step 1: Define Function

Below we define score_to_category function that accepts boundary condition as one of the input parameters. The function itself doesn’t do anything but returns another function (notice line 21) that can take a particular score value and returns appropriate category value.

def score_to_category(boundaries):
    """
    Converts a numeric score into an ordinal category.
    :param boundaries: list of tuples specifying upper limit and category name.
    Eg. [(0, D), (30, C), (50, B), (80, A)]
    :return: a function that accepts score as a argument
    """
    sorted_boundaries = sorted(boundaries, key=lambda x: x[0], reverse=True)

    def _score_to_category(score):
        """
        Converts score to ordinal category
        :param value:
        :return:
        """
        assert 0 <= score <= 100
        for (boundary, category) in sorted_boundaries:
            if score >= boundary:
                return category

    return _score_to_category

# Test Function. Demonstrates using function in normal python code
boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')]

converter = score_to_category(boundaries)
assert converter(10) == 'F'
assert converter(50) == 'D'
assert converter(51) == 'D'
assert converter(100) == 'A'

Also, notice there is nothing about “spark” in the above function and hence we can easily use the function in any python script. Line 27 & 28 show an example usage of the function.

Step 2: Define UDF

To demonstrate that we can pass different boundary conditions and get different results, below I have defined two different UDFs. First UDF represents finer grained categories. Second UDF converts into Pass/Fail category.

from pyspark.sql.functions import udf

boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')]
udfScoreToFineCategories = udf(score_to_category(boundaries), StringType())

boundaries = [(0, 'Fail'), (50, 'Pass')]
udfScoreToBroadCategories = udf(score_to_category(boundaries), StringType())

Step 3: Testing it on Spark DataFrame

Now let test the UDF on some dummy data.

# Generate Random Data and convert it into spark dataframe

# Generate Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))

# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

# Apply UDFs
(df
   .withColumn("fine_category", udfScoreToFineCategories("score"))
   .withColumn("broad_category", udfScoreToBroadCategories("score"))
 ).show(10)

Running the above code gives the following output

student subject score fine_category broad_category
John Math 13 F Fail
John Sci 85 B Pass
John Geography 77 B Pass
John History 25 F Fail
Mike Math 50 F Fail
Mike Sci 45 F Fail
Mike Geography 65 C Pass
Mike History 79 B Pass
Matt Math 9 F Fail
Matt Sci 2 F Fail