Previously I posted about how to write a custom Spark UDF in python and scala. Since then I have received lots of questions/comments around two things:
- How to pass additional parameters to the UDF
- How to reuse UDF outside of spark context.
Both the above requests can be easily satisfied using functional programming ideas. Below is an example of an udf that converts scores (between 0 and 100) to some ordinal categories. It takes a parameter, an array of tuple defining boundary conditions for different categories.
Step 1: Define Function
Below we define score_to_category
function that accepts boundary condition as one of the input parameters. The function itself doesn’t do anything but returns another function (notice line 21) that can take a particular score value and returns appropriate category value.
def score_to_category(boundaries): """ Converts a numeric score into an ordinal category. :param boundaries: list of tuples specifying upper limit and category name. Eg. [(0, D), (30, C), (50, B), (80, A)] :return: a function that accepts score as a argument """ sorted_boundaries = sorted(boundaries, key=lambda x: x[0], reverse=True) def _score_to_category(score): """ Converts score to ordinal category :param value: :return: """ assert 0 <= score <= 100 for (boundary, category) in sorted_boundaries: if score >= boundary: return category return _score_to_category # Test Function. Demonstrates using function in normal python code boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')] converter = score_to_category(boundaries) assert converter(10) == 'F' assert converter(50) == 'D' assert converter(51) == 'D' assert converter(100) == 'A'
Also, notice there is nothing about “spark” in the above function and hence we can easily use the function in any python script. Line 27 & 28 show an example usage of the function.
Step 2: Define UDF
To demonstrate that we can pass different boundary conditions and get different results, below I have defined two different UDFs. First UDF represents finer grained categories. Second UDF converts into Pass/Fail category.
from pyspark.sql.functions import udf boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')] udfScoreToFineCategories = udf(score_to_category(boundaries), StringType()) boundaries = [(0, 'Fail'), (50, 'Pass')] udfScoreToBroadCategories = udf(score_to_category(boundaries), StringType())
Step 3: Testing it on Spark DataFrame
Now let test the UDF on some dummy data.
# Generate Random Data and convert it into spark dataframe # Generate Data import itertools import random students = ['John', 'Mike','Matt'] subjects = ['Math', 'Sci', 'Geography', 'History'] random.seed(1) data = [] for (student, subject) in itertools.product(students, subjects): data.append((student, subject, random.randint(0, 100))) # Create Schema Object from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("student", StringType(), nullable=False), StructField("subject", StringType(), nullable=False), StructField("score", IntegerType(), nullable=False) ]) # Create DataFrame from pyspark.sql import HiveContext sqlContext = HiveContext(sc) rdd = sc.parallelize(data) df = sqlContext.createDataFrame(rdd, schema) # Apply UDFs (df .withColumn("fine_category", udfScoreToFineCategories("score")) .withColumn("broad_category", udfScoreToBroadCategories("score")) ).show(10)
Running the above code gives the following output
student | subject | score | fine_category | broad_category |
---|---|---|---|---|
John | Math | 13 | F | Fail |
John | Sci | 85 | B | Pass |
John | Geography | 77 | B | Pass |
John | History | 25 | F | Fail |
Mike | Math | 50 | F | Fail |
Mike | Sci | 45 | F | Fail |
Mike | Geography | 65 | C | Pass |
Mike | History | 79 | B | Pass |
Matt | Math | 9 | F | Fail |
Matt | Sci | 2 | F | Fail |