forked from databrickslabs/mosaic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path02. Data Ingestion.py
83 lines (57 loc) · 2.39 KB
/
02. Data Ingestion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Databricks notebook source
# MAGIC %md ## Data Ingestion
# COMMAND ----------
# MAGIC %pip install databricks_mosaic
# COMMAND ----------
from pyspark.sql.functions import *
import mosaic as mos
spark.conf.set("spark.databricks.labs.mosaic.geometry.api", "ESRI")
spark.conf.set("spark.databricks.labs.mosaic.index.system", "H3")
mos.enable_mosaic(spark, dbutils)
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC We begin with loading from a table. Here we use captured `AIS` data.
# MAGIC
# MAGIC - MMSI: unique 9-digit identification code of the ship - numeric
# MAGIC - VesselName: name of the ship - string
# MAGIC - CallSign: unique callsign of the ship - string
# MAGIC - BaseDateTime: timestamp of the AIS message - datetime
# MAGIC - LAT: latitude of the ship (in degree: [-90 ; 90], negative value represents South, 91 indicates ‘not available’) - numeric
# MAGIC - LON: longitude of the ship (in degree: [-180 ; 180], negative value represents West, 181 indicates ‘not available’) - numeric
# MAGIC - SOG: speed over ground, in knots - numeric
# MAGIC - Status: status of the ship - string
# COMMAND ----------
cargos = spark.read.table("ship2ship.AIS")
display(cargos)
# COMMAND ----------
# MAGIC %md ## Data Transformation
# COMMAND ----------
# MAGIC %md ### Indexing
# MAGIC To facilitate downstream analytics it is also possible to create a quick point index leveraging a chosen H3 resolution.
# MAGIC In this case, resolution `9` has an edge length of ~174 metres.
# COMMAND ----------
cargos_indexed = (
cargos.withColumn("point_geom", mos.st_point("LON", "LAT"))
.withColumn("ix", mos.grid_pointascellid("point_geom", resolution=lit(9)))
.withColumn("sog_kmph", round(col("sog") * 1.852, 2))
)
display(cargos_indexed)
# COMMAND ----------
# MAGIC %md ## Exporting
# MAGIC and we can write the treated output to a new table.
# COMMAND ----------
(
cargos_indexed.withColumn("point_geom", mos.st_aswkb("point_geom"))
.write.mode("overwrite")
.saveAsTable("ship2ship.cargos_indexed")
)
# COMMAND ----------
# DBTITLE 1,We can optimise our table to colocate data and make querying faster
# MAGIC %sql OPTIMIZE ship2ship.cargos_indexed ZORDER by (ix, BaseDateTime)
# COMMAND ----------
# MAGIC %md ## Visualisation
# MAGIC And we can perform a quick visual inspection of the data.
# COMMAND ----------
# MAGIC %%mosaic_kepler
# MAGIC ship2ship.cargos_indexed "ix" "h3" 10_000