Previously I posted about how to write a custom Spark UDF in python and scala. Since then I have received lots of questions/comments around two things:
- How to pass additional parameters to the UDF
- How to reuse UDF outside of spark context.
Both the above requests can be easily satisfied using functional programming ideas. Below is an example of an udf that converts scores (between 0 and 100) to some ordinal categories. It takes a parameter, an array of tuple defining boundary conditions for different categories.
Step 1: Define Function
Below we define score_to_category
function that accepts boundary condition as one of the input parameters. The function itself doesn’t do anything but returns another function (notice line 21) that can take a particular score value and returns appropriate category value.
def score_to_category(boundaries): """ Converts a numeric score into an ordinal category. :param boundaries: list of tuples specifying upper limit and category name. Eg. [(0, D), (30, C), (50, B), (80, A)] :return: a function that accepts score as a argument """ sorted_boundaries = sorted(boundaries, key=lambda x: x[0], reverse=True) def _score_to_category(score): """ Converts score to ordinal category :param value: :return: """ assert 0 <= score <= 100 for (boundary, category) in sorted_boundaries: if score >= boundary: return category return _score_to_category # Test Function. Demonstrates using function in normal python code boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')] converter = score_to_category(boundaries) assert converter(10) == 'F' assert converter(50) == 'D' assert converter(51) == 'D' assert converter(100) == 'A'
Also, notice there is nothing about “spark” in the above function and hence we can easily use the function in any python script. Line 27 & 28 show an example usage of the function.
Step 2: Define UDF
To demonstrate that we can pass different boundary conditions and get different results, below I have defined two different UDFs. First UDF represents finer grained categories. Second UDF converts into Pass/Fail category.
from pyspark.sql.functions import udf boundaries = [(0, 'F'), (50, 'D'), (60, 'C'), (75, 'B'), (90, 'A')] udfScoreToFineCategories = udf(score_to_category(boundaries), StringType()) boundaries = [(0, 'Fail'), (50, 'Pass')] udfScoreToBroadCategories = udf(score_to_category(boundaries), StringType())
Step 3: Testing it on Spark DataFrame
Now let test the UDF on some dummy data.
# Generate Random Data and convert it into spark dataframe # Generate Data import itertools import random students = ['John', 'Mike','Matt'] subjects = ['Math', 'Sci', 'Geography', 'History'] random.seed(1) data = [] for (student, subject) in itertools.product(students, subjects): data.append((student, subject, random.randint(0, 100))) # Create Schema Object from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("student", StringType(), nullable=False), StructField("subject", StringType(), nullable=False), StructField("score", IntegerType(), nullable=False) ]) # Create DataFrame from pyspark.sql import HiveContext sqlContext = HiveContext(sc) rdd = sc.parallelize(data) df = sqlContext.createDataFrame(rdd, schema) # Apply UDFs (df .withColumn("fine_category", udfScoreToFineCategories("score")) .withColumn("broad_category", udfScoreToBroadCategories("score")) ).show(10)
Running the above code gives the following output
student | subject | score | fine_category | broad_category |
---|---|---|---|---|
John | Math | 13 | F | Fail |
John | Sci | 85 | B | Pass |
John | Geography | 77 | B | Pass |
John | History | 25 | F | Fail |
Mike | Math | 50 | F | Fail |
Mike | Sci | 45 | F | Fail |
Mike | Geography | 65 | C | Pass |
Mike | History | 79 | B | Pass |
Matt | Math | 9 | F | Fail |
Matt | Sci | 2 | F | Fail |
Hi Ritesh,
Thanks for your example codes.
I am currently trying to pass a list into the udf as the second argument when applying withColumn().
I am using python 2.7, Spark 2.1 in Databricks.
I tried methods in https://stackoverflow.com/questions/37409857/passing-a-data-frame-column-and-external-list-to-udf-under-withcolumn and they didn’t work out for me.
Either it won’t run or it looks like it runs but the generated column cannot be used for downstream procedures (will cause error).
I was wondering if you have any suggestions for this type of usage (passing a global value, say, list, as an argument additional to a dataframe column.)
Thank you!
Hi Tianran,. Did you check my other blog which has an example of how to initialize a UDF with additional parameters. I think it will solve your problem. Here is the link https://ragrawal.wordpress.com/2017/06/17/reusable-spark-custom-udf/
Thank you for the reply! Sorry I didn’t read the codes carefully enough yesterday. I used your method this morning and it worked like a charm!
Could you please explain a bit on:
Why score_to_category() only takes the global value when defined, while the sub-function _score_to_category could take in the value from the column that score_to_category didn’t ask for?
(I tried to feed in two arguments into the udf and it didn’t work out)
This seems counter-intuitive to me and I am trying to understand if this has something to do with how udf works in general.
Thank you!
This is nothing to do with UDF. It’s related to lambda expression in Python. I will try to provide more detailed answer later