-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathProblem#6.txt
54 lines (39 loc) · 2.12 KB
/
Problem#6.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
Instructions
Get total number of orders for each customer where the cutomer_state = 'TX'
Data Description
retail_db data is available in HDFS at /public/retail_db
retail_db data information:
Source directories: /public/retail_db/orders and /public/retail_db/customers
Source Columns - orders - order_id, order_date, order_customer_id,
order_status
Source Columns - customers - customer_id, customer_fname,
customer_lname, customer_state (8th column) and many more
delimiter: (",")
Output Requirements
Output Fields: customer_fname, customer_lname, order_count
File Format: text
Delimiter: Tab character (\t)
End of Problem
#using DF APIs
from pyspark.sql.functions import substring, sum,desc, concat, dense_rank,concat_ws
customers =spark.read.format('csv').option("delimter",",").load("/public/data/retail_db/customers/")
orders=spark.read.format('csv').option("delimter",",").load("/public/data/retail_db/orders/")
customer.printSchema()
#filter and selectExpr
cf=customers.selectExpr("_c0 as customer_id", "_c1 as customer_fname", "_c1 as customer_lname", "_c7 as customer_state").where("customer_state = 'TX' ")
of=orders.selectExpr("_c0 as order_id", "_c2 as order_customer_id")
#aggr orders on customers
oa=of.groupBy('order_customer_id').agg(count('order_id').alias('order_count'))
#join with customer
output=cf.join(oa, oa.order_customer_id==cf.customer_id, "left").selectExpr("concat_ws('\t', customer_fname, customer_lname, order_count) as result")
output.coalesce(1).write.mode('overwrite').format('text').save('/public/data/output/sol6/')
#validate
import os
os.system("hdfs dfs -ls /public/data/output/sol6/")
os.system("hdfs dfs -cat /public/data/output/sol6/part-00000-7428451d-9bb7-424c-b8fa-25bffe75da3a-c000.txt | more")
#using SparkSQL
cf.createOrReplaceTempView("cf")
of.createOrReplaceTempView("of")
oa=spark.sql("select order_customer_id, count(order_id) as order_count from of group by order_customer_id")
oa.createOrReplaceTempView("oa")
output=spark.sql("select concat_ws('\t', customer_fname, customer_lname, order_count) as result from cf left join oa on oa.order_customer_id=cf.customer_id")