-
Notifications
You must be signed in to change notification settings - Fork 2
/
eg4_joins.py
52 lines (42 loc) · 1.37 KB
/
eg4_joins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark=(
SparkSession
.builder
.appName("eg_joins")
.getOrCreate()
)
person=(
spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])
]).toDF("id","name","graduate_program","spark_status")
)
graduateProgram=(
spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")
]).toDF("id", "degree", "department", "school")
)
sparkStatus=(
spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")
]).toDF("id", "status")
)
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")
#joinExpression=person.graduate_program == graduateProgram.id
joinExpression=expr("graduate_program = id")
person.withColumnRenamed("id","personId").join(graduateProgram,joinExpression,"inner").show()
#person.join(graduateProgram,joinExpression,"inner").explain(extended=True)
#person.join(graduateProgram,joinExpression,"outer").show()
# (
# person.withColumnRenamed("id","personId")
# .join(sparkStatus,expr("array_contains(spark_status,id)")).show()
# )
spark.stop()