Skip to content

Commit

Permalink
feat: initial implementation 4
Browse files Browse the repository at this point in the history
  • Loading branch information
RoryPTB committed Sep 3, 2024
1 parent fabbd4e commit 1d59f39
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 44 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ keywords = ["WIS2.0", "CAP", "XML", "GeoJSON", "convert"]
license = {file = "LICENSE"}
dependencies = [
"xmltodict>=0.13.0",
"shapely>=2.0.6",
"geojson>=3.1.0",
"click>=8.1.7"
]
dynamic = ["version"]
Expand Down
104 changes: 60 additions & 44 deletions src/cap2geojson/convert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import geojson
import json
import logging
import math
from typing import Union
from typing import Generator, Union
import re
import xmltodict

Expand Down Expand Up @@ -59,11 +60,11 @@ def get_area_desc(area: Union[dict, list]) -> str:
return ", ".join([a["areaDesc"] for a in area])


def get_all_circle_coords(
def get_circle_coords(
x_centre: float, y_centre: float, radius: float, n_points: int
) -> list:
) -> Generator[list, None, None]:
"""
Estimate the n coordinates of a circle with a given centre and radius.
Estimate the n+1 coordinates of a circle with a given centre and radius.
Args:
x_centre (float): The longitude of the circle's centre.
Expand All @@ -73,27 +74,18 @@ def get_all_circle_coords(
the circle.
Returns:
list: The n estimated coordinates of the circle.
Generator: Yield the n+1 estimated coordinates of the circle.
"""
# Generate thetas for the n-gon
thetas = [i / n_points * math.tau for i in range(n_points)]
# Add theta = 0 to the end to ensure the circle is closed
thetas.append(0)

def get_circle_coord(
theta: float, x_centre: float, y_centre: float, radius: float
) -> list:
"""Calculate the x and y coordinates of a point on a circle,
given the angle theta and the circle's centre and radius."""
for theta in thetas:
x = radius * math.cos(theta) + x_centre
y = radius * math.sin(theta) + y_centre
# Round to 5 decimal places to prevent excessive precision
return [round(x, 5), round(y, 5)]

# Generate thetas for the n-gon
thetas = [i / n_points * math.tau for i in range(n_points)]
circle_coords = [
get_circle_coord(theta, x_centre, y_centre, radius) for theta in thetas
]
# Ensure the circle is closed by adding the first coordinate to the end
circle_coords.append(circle_coords[0])
return circle_coords
yield [round(x, 5), round(y, 5)]


def ensure_counter_clockwise(coords: list) -> list:
Expand All @@ -108,18 +100,18 @@ def ensure_counter_clockwise(coords: list) -> list:
list: List of coordinate pairs in counter-clockwise order.
"""

def signed_area(coords):
"""Calculate the signed area of the polygon, to help
def clockwise(coords: list) -> bool:
"""Calculate (double) the signed area of the polygon, to help
determine the order of the coordinates."""
area = 0
n = len(coords)
for i in range(n):
x1, y1 = coords[i]
x2, y2 = coords[(i + 1) % n]
area += x1 * y2 - x2 * y1
return area / 2
area += (x1 * y2) - (x2 * y1)
return area < 0

if signed_area(coords) < 0:
if clockwise(coords):
coords.reverse()
return coords

Expand All @@ -140,12 +132,12 @@ def get_polygon_coordinates(single_area: dict) -> list:
radius = float(radius)
x_centre, y_centre = map(float, centre.split(","))
# Estimate the circle coordinates with n=100 points
return get_all_circle_coords(x_centre, y_centre, radius, 100)
return list(get_circle_coords(x_centre, y_centre, radius, 100))

if "polygon" in single_area:
# Takes form "x,y x,y x,y" but with newlines that need to be removed
polygon_str = single_area["polygon"].replace("\n", "").split()
polygon_list = [list(map(float, coord.split(","))) for coord in polygon_str]
polygon_list = [list(map(float, coord.split(","))) for coord in polygon_str] # noqa
return ensure_counter_clockwise(polygon_list)

return []
Expand Down Expand Up @@ -187,31 +179,55 @@ def preprocess_alert(xml: str) -> str:
return re.sub(r"<(/?)cap:(\w+)", r"<\1\2", xml)


def to_geojson(xml: str) -> dict:
def to_geojson(xml: str) -> str:
"""Takes the CAP alert XML and converts it to a GeoJSON.
Args:
xml (str): The CAP XML string.
Returns:
dict: The final GeoJSON object.
str: The final GeoJSON object stringified.
"""
processed_xml = preprocess_alert(xml)
data = xmltodict.parse(processed_xml)

alert = data["alert"]
try:
data = xmltodict.parse(processed_xml)
except Exception as e:
logger.error(f"Error parsing XML: {e}")
raise

alert = data.get("alert", {})
if not alert:
logger.error("No alert object found in the XML.")
raise ValueError("No alert object found in the XML.")

alert_properties = get_properties(alert)
alert_geometry = get_geometry(alert["info"]["area"])

result = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": alert_properties,
"geometry": alert_geometry,
}
],
}

return json.dumps(result, indent=4)
area = alert.get("info", {}).get("area")
if not area:
logger.error("No area object found in the alert.")
raise ValueError("No area object found in the alert.")

alert_geometry = get_geometry(area)

result = json.dumps(
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": alert_properties,
"geometry": alert_geometry,
}
],
},
indent=4,
)

try:
geojson.loads(result)
except Exception as e:
logger.error(f"Error converting to GeoJSON: {e}")
raise

return result

0 comments on commit 1d59f39

Please sign in to comment.