Reusable Spark Custom UDF

Previously I posted about how to write a custom Spark UDF in python and scala. Since then I have received lots of questions/comments around two things:

  1. How to pass additional parameters to the UDF
  2. How to reuse UDF outside of spark context.

Both the above requests can be easily satisfied using functional programming ideas. Below is an example of an udf that converts scores (between 0 and 100) to some ordinal categories. It takes a parameter, an array of tuple defining boundary conditions for different categories.

Step 1: Define Function

Below we define score_to_category function that accepts boundary condition as one of the input parameters. The function itself doesn’t do anything but returns another function (notice line 21) that can take a particular score value and returns appropriate category value.

def score_to_category(boundaries):
    """
    Converts a numeric score into an ordinal category.
    :param boundaries: list of tuples specifying upper limit and category name.
    Eg. [(0, D), (30, C), (50, B), (80, A)]
    :return: a function that accepts score as a argument
    """
    sorted_boundaries = sorted(boundaries, key=lambda x: x[0], reverse=True)

    def _score_to_category(score):
        """
        Converts score to ordinal category
        :param value:
        :return:
        """
        assert 0 <= score <= 100
        for (boundary, category) in sorted_boundaries:
            if score >= boundary:
                return category

    return _score_to_category

# Test Function. Demonstrates using function in normal python code
boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')]

converter = score_to_category(boundaries)
assert converter(10) == 'F'
assert converter(50) == 'D'
assert converter(51) == 'D'
assert converter(100) == 'A'

Also, notice there is nothing about “spark” in the above function and hence we can easily use the function in any python script. Line 27 & 28 show an example usage of the function.

Step 2: Define UDF

To demonstrate that we can pass different boundary conditions and get different results, below I have defined two different UDFs. First UDF represents finer grained categories. Second UDF converts into Pass/Fail category.

from pyspark.sql.functions import udf

boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')]
udfScoreToFineCategories = udf(score_to_category(boundaries), StringType())

boundaries = [(0, 'Fail'), (50, 'Pass')]
udfScoreToBroadCategories = udf(score_to_category(boundaries), StringType())

Step 3: Testing it on Spark DataFrame

Now let test the UDF on some dummy data.

# Generate Random Data and convert it into spark dataframe

# Generate Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
for (student, subject) in itertools.product(students, subjects):
    data.append((student, subject, random.randint(0, 100)))

# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

# Create DataFrame 
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

# Apply UDFs
(df
   .withColumn("fine_category", udfScoreToFineCategories("score"))
   .withColumn("broad_category", udfScoreToBroadCategories("score"))
 ).show(10)

Running the above code gives the following output

student subject score fine_category broad_category
John Math 13 F Fail
John Sci 85 B Pass
John Geography 77 B Pass
John History 25 F Fail
Mike Math 50 F Fail
Mike Sci 45 F Fail
Mike Geography 65 C Pass
Mike History 79 B Pass
Matt Math 9 F Fail
Matt Sci 2 F Fail

4 thoughts on “Reusable Spark Custom UDF

  1. Hi Ritesh,

    Thanks for your example codes.
    I am currently trying to pass a list into the udf as the second argument when applying withColumn().
    I am using python 2.7, Spark 2.1 in Databricks.

    I tried methods in https://stackoverflow.com/questions/37409857/passing-a-data-frame-column-and-external-list-to-udf-under-withcolumn and they didn’t work out for me.
    Either it won’t run or it looks like it runs but the generated column cannot be used for downstream procedures (will cause error).

    I was wondering if you have any suggestions for this type of usage (passing a global value, say, list, as an argument additional to a dataframe column.)

    Thank you!

      1. Thank you for the reply! Sorry I didn’t read the codes carefully enough yesterday. I used your method this morning and it worked like a charm!

        Could you please explain a bit on:
        Why score_to_category() only takes the global value when defined, while the sub-function _score_to_category could take in the value from the column that score_to_category didn’t ask for?
        (I tried to feed in two arguments into the udf and it didn’t work out)

        This seems counter-intuitive to me and I am trying to understand if this has something to do with how udf works in general.

        Thank you!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s