-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark.txt
53 lines (41 loc) · 1.28 KB
/
spark.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
-spark başlatma scrit i
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
---
from pyspark.sql import SparkSession, SQLContext, Row
#şemala
spark = SparkSession.builder.appname("kdd").getorcreate()
sc = spark.sparkcontext
data_file = "hdfs:///file_name.gz"
raw_rdd = sc.textfile(data_file).cache()
raw_rdd.take(5)
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
duration = int(r[0]),
protocol_type = r[1] )
parsed.rddd.take(5)
#dfyarat
sqlcontext = sqlcontext(sc)
df = sqlcontext.createdataframe(parsed_rdd)
connections_by_protocol = df.groupby('protocol_type').count().orderby('count',ascending=false)
connections_by_protocol.show()
df.registertemptable("connections")
attack_stats = sqlcontext.sql("""
select
...
from connections
...
""")
attack_stats.show()
%matplotlib inline
ax = attack_stats.topandas().plot.bar(x='protocol_type', subplots = true, figsize = (10,25))
### #### ###
#databricks
%python
data = spark.read.csv("/databricks../data_geo.csv", header = "true", inferschema = "true")
data.cache()
data = data.dropna()
%python
data.createorreplacetempview("data_geo")
select'column_name', '2' from data_geo
%sql
create table yusuf as select * from data_geo