Spark SQL – Salient functions in a Nutshell

As, Spark DataFrame becomes de-facto standard for data processing in Spark, it is a good idea to be aware key functions of Spark sql that most of the Data Engineers/Scientists might need to use in their data transformation journey.

callUDF

One of the most vital functions of Spark, which I found useful in my day to day usage. callUDF is used not only to call the user defined functions, but also to utilise the functions of Spark Sql which are not a part of Spark functions objects.

Example:

For example parse_url is a part of Spark Sql but not available in Spark’s functions object. Hence its not possible to use it directly as a part of your DataFrame operations, you need to wrap it in callUDF function.

val df= Seq((“http://spark.apache.org/path?query=1"),("https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/sql/#negative")).toDF("url_col")
df.withColumn("host", callUDF("parse_url", $"url_col", lit("HOST")))

Input_file_name

This function is used to get the file name of the current spark task. Mostly used in debugging or to find out the specific files which have bad records.

Example:

df.withColumn(“filename”, input_file_name)

spark_partition_id

To get the specific spark partition id.

df.withColumn(“partition_id”,spark_partition_id)

na

na is Spark DataFrame api function, which is used to drop or fill null values.

Example:

df.na.drop()

will drop all the rows which containing any null values.

df.na.drop(Seq(<cols>))

drop all the rows which contain null values in the specified columns

df.na.fill(“literal”)

Fill the null values with the specified literal

agg

Aggregate function is used to compute different aggregate expressions over different columns.

df.groupBy(“dept”).agg(min(“id”), count(“id”), avg(“salary”))

struct

Struct function is used to create new struct from existing columns.

Example:

One on the most common use case of this functions is to create JSON values from existing columns.

val df1 = Seq(Seq("Arun","Vijay","vinay"),Seq(1,2,3)).toDF("Name","id")
df1.select(struct("*").alias("json"))

transform — DataFrame API

Transform function is used to chain spark transformation functions. This function helps to breaks the big spark transformation queries into modular and chain them later.

Example

val df = Seq(1,2,3).toDF(“id”)
def incByOne(df: DataFrame): DataFrame = {

  df.withColumn("inc", df("id") +1)

}
val incDf = df.transform(incByOne)

Transform — Spark Sql

Spark Sql also posses an transform function to work with array elements.

Example:

To increment one by each element of an array

Spark.sql(“SELECT transform(array(1, 2, 3), x -> x + 1)”)

Both Spark sql transform function and Dataframe Transform function are different and exists for different purpose.

java_method — Spark Sql

Spark sql specific function to invoke a java method as a part of the query by passing the java class name, method name and arguments if any.

Example

spark.sql(“SELECT java_method(“java.util.UUID”,”randomUUID”)”)

The above query calls the method randomUUID from the Java class UUID via reflection and returns an universally unique identifier.

flatten

Flatten out columns which have nested array of array.

Incase the nested levels are more than two, only one level will be flattened.

Example:

df.withColumn(“flattened_col”,flatten($”nested_column”))
Spark.sql(“SELECT flatten(array(array(1, 2), array(3, 4)))”)

get_json_objects

Function used to extract JSON elements with the help of JPath.

val jsonDF = Seq ((0, """{"student_id": 0, "student_name": "Arun", "Age": 12}"""),(1, """{"student_id": 1, "student_name": "Vijay", "Age": 12}""")).toDF("id","json")
 jsonDF.select($"id", get_json_object($"json", "$.student_id").alias("student_id"),get_json_object($"json", "$.student_name").alias("student_name"),get_json_object($"json", "$.age").alias("age"))

from_json

Function used to convert JSON string into JSON struct. One of the common area for use of this function is decoding JSON string into struct from messages consumed from Kinesis/Kafka

val jsonSchema = new StructType()
                .add("student_id", IntegerType)
                .add("student_name", LongType)
                .add("age",IntegerType)jsonDF.select($"json",jsonSchema)

str_to_map — Spark Sql

Converts string into map by splitting the text value into key value by using the specific pair and key/value delimiter.

Example:

Spark.sql(“SELECT str_to_map(‘a:1,b:2,c:3’, ‘,’, ‘:’)”)

Comments

Leave a Reply

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>