Skip to content

Commit

Permalink
Support extractFromJson with json path expression (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang authored and MrPowers committed May 19, 2019
1 parent ff31606 commit 70115cf
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ object transformations {
* transformations.extractFromJson("person", "personData", personSchema)
* )
*
* sourceDF.show()
* actualDF.show()
* +---+---------------------------------+----------------+
* |id |person |personData |
* +---+---------------------------------+----------------+
Expand All @@ -299,6 +299,42 @@ object transformations {
)
}

/**
* Extracts an object from a JSON field with a specified path expression
*
* {{{
* val sourceDF = spark.createDF(
* List(
* (10, """{"name": "Bart cool", "age": 25}"""),
* (20, """{"name": "Lisa frost", "age": 27}""")
* ), List(
* ("id", IntegerType, true),
* ("person", StringType, true)
* )
* )
*
* val actualDF = sourceDF.transform(
* transformations.extractFromJson("person", "name", "$.name")
* )
*
* actualDF.show()
* +---+---------------------------------+----------------+
* |id |person |name |
* +---+---------------------------------+----------------+
* |10 |{"name": "Bart cool", "age": 25} |"Bart cool" |
* |20 |{"name": "Lisa frost", "age": 27}|"Lisa frost" |
* +---+---------------------------------+----------------+
* }}}
*/
def extractFromJson(colName: String, outputColName: String, path: String)(df: DataFrame): DataFrame = {
df.withColumn(
outputColName,
get_json_object(
col(colName),
path)
)
}

def withRowAsStruct(outputColName: String = "row_as_struct")(df: DataFrame): DataFrame = {
val colNames = df.columns.map(col)
df.withColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,120 @@ object TransformationsTest extends TestSuite with DataFrameComparer with ColumnC
}
}

'getJsonObject - {
val bart = """
|{
| "name": "Bart cool",
| "info": {
| "age": 25,
| "gender": "male"
| }
|}
""".stripMargin

val lisa = """
|{
| "name": "Lisa frost",
| "info": {
| "age": 27,
| "gender": "female"
| }
|}
""".stripMargin

val sourceDF = spark.createDF(
List(
(10, bart),
(20, lisa)
),
List(
("id", IntegerType, true),
("person", StringType, true)
)
)

'fromOneLevelPath - {
val actualDF = sourceDF.transform(
transformations.extractFromJson(
"person",
"name",
"$.name"
)
)

val expectedDF = spark.createDF(
List(
(10, bart, "Bart cool"),
(20, lisa, "Lisa frost")
),
List(
("id", IntegerType, true),
("person", StringType, true),
("name", StringType, true)
)
)

assertSmallDataFrameEquality(
actualDF,
expectedDF
)
}

'fromTwoLevelPath - {
val actualDF = sourceDF.transform(
transformations.extractFromJson(
"person",
"age",
"$.info.age"
)
)

val expectedDF = spark.createDF(
List(
(10, bart, "25"),
(20, lisa, "27")
),
List(
("id", IntegerType, true),
("person", StringType, true),
("age", StringType, true)
)
)

assertSmallDataFrameEquality(
actualDF,
expectedDF
)
}

'fromNonExistingPath - {
val actualDF = sourceDF.transform(
transformations.extractFromJson(
"person",
"age",
"$.age"
)
)

val expectedDF = spark.createDF(
List(
(10, bart, null),
(20, lisa, null)
),
List(
("id", IntegerType, true),
("person", StringType, true),
("age", StringType, true)
)
)

assertSmallDataFrameEquality(
actualDF,
expectedDF
)
}
}

'withRowAsStruct - {

"collects an entire row intro a StructType column" - {
Expand Down

0 comments on commit 70115cf

Please sign in to comment.