Spark SQL – Salient functions in a Nutshell
As, Spark DataFrame becomes de-facto standard for data processing in Spark, it is a good idea to be aware key functions of Spark sql that most of the Data Engineers/Scientists might need to use in their data transformation journey.
callUDF
One of the most vital functions of Spark, which I found useful in my day to day usage. callUDF is used not only to call the user defined functions, but also to utilise the functions of Spark Sql which are not a part of Spark functions objects.
Example:
For example parse_url is a part of Spark Sql but not available in Spark’s functions object. Hence its not possible to use it directly as a part of your DataFrame operations, you need to wrap it in callUDF function.
val df= Seq((“http://spark.apache.org/path?query=1"),("https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/sql/#negative")).toDF("url_col") df.withColumn("host", callUDF("parse_url", $"url_col", lit("HOST")))
Input_file_name
This function is used to get the file name of the current spark task. Mostly used in debugging or to find out the specific files which have bad records.
Example:
df.withColumn(“filename”, input_file_name)
spark_partition_id
To get the specific spark partition id.
df.withColumn(“partition_id”,spark_partition_id)
na
na is Spark DataFrame api function, which is used to drop or fill null values.
Example:
df.na.drop()
will drop all the rows which containing any null values.
df.na.drop(Seq(<cols>))
drop all the rows which contain null values in the specified columns
df.na.fill(“literal”)
Fill the null values with the specified literal
agg
Aggregate function is used to compute different aggregate expressions over different columns.
df.groupBy(“dept”).agg(min(“id”), count(“id”), avg(“salary”))
struct
Struct function is used to create new struct from existing columns.
Example:
One on the most common use case of this functions is to create JSON values from existing columns.
val df1 = Seq(Seq("Arun","Vijay","vinay"),Seq(1,2,3)).toDF("Name","id") df1.select(struct("*").alias("json"))
transform — DataFrame API
Transform function is used to chain spark transformation functions. This function helps to breaks the big spark transformation queries into modular and chain them later.
Example
val df = Seq(1,2,3).toDF(“id”) def incByOne(df: DataFrame): DataFrame = { df.withColumn("inc", df("id") +1) } val incDf = df.transform(incByOne)
Transform — Spark Sql
Spark Sql also posses an transform function to work with array elements.
Example:
To increment one by each element of an array
Spark.sql(“SELECT transform(array(1, 2, 3), x -> x + 1)”)
Both Spark sql transform function and Dataframe Transform function are different and exists for different purpose.
java_method — Spark Sql
Spark sql specific function to invoke a java method as a part of the query by passing the java class name, method name and arguments if any.
Example
spark.sql(“SELECT java_method(“java.util.UUID”,”randomUUID”)”)
The above query calls the method randomUUID from the Java class UUID via reflection and returns an universally unique identifier.
flatten
Flatten out columns which have nested array of array.
Incase the nested levels are more than two, only one level will be flattened.
Example:
df.withColumn(“flattened_col”,flatten($”nested_column”)) Spark.sql(“SELECT flatten(array(array(1, 2), array(3, 4)))”)
get_json_objects
Function used to extract JSON elements with the help of JPath.
val jsonDF = Seq ((0, """{"student_id": 0, "student_name": "Arun", "Age": 12}"""),(1, """{"student_id": 1, "student_name": "Vijay", "Age": 12}""")).toDF("id","json") jsonDF.select($"id", get_json_object($"json", "$.student_id").alias("student_id"),get_json_object($"json", "$.student_name").alias("student_name"),get_json_object($"json", "$.age").alias("age"))
from_json
Function used to convert JSON string into JSON struct. One of the common area for use of this function is decoding JSON string into struct from messages consumed from Kinesis/Kafka
val jsonSchema = new StructType() .add("student_id", IntegerType) .add("student_name", LongType) .add("age",IntegerType)jsonDF.select($"json",jsonSchema)
str_to_map — Spark Sql
Converts string into map by splitting the text value into key value by using the specific pair and key/value delimiter.
Example:
Spark.sql(“SELECT str_to_map(‘a:1,b:2,c:3’, ‘,’, ‘:’)”)
Comments