From 1d59f390a9b616486b89f8eb071068a4efcbf2b2 Mon Sep 17 00:00:00 2001 From: RoryPTB <47696929+RoryPTB@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:48:26 +0200 Subject: [PATCH] feat: initial implementation 4 --- pyproject.toml | 2 + src/cap2geojson/convert.py | 104 +++++++++++++++++++++---------------- 2 files changed, 62 insertions(+), 44 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 43d33c1..f7a6ef2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,8 @@ keywords = ["WIS2.0", "CAP", "XML", "GeoJSON", "convert"] license = {file = "LICENSE"} dependencies = [ "xmltodict>=0.13.0", + "shapely>=2.0.6", + "geojson>=3.1.0", "click>=8.1.7" ] dynamic = ["version"] diff --git a/src/cap2geojson/convert.py b/src/cap2geojson/convert.py index 3979c6d..cf26213 100644 --- a/src/cap2geojson/convert.py +++ b/src/cap2geojson/convert.py @@ -1,7 +1,8 @@ +import geojson import json import logging import math -from typing import Union +from typing import Generator, Union import re import xmltodict @@ -59,11 +60,11 @@ def get_area_desc(area: Union[dict, list]) -> str: return ", ".join([a["areaDesc"] for a in area]) -def get_all_circle_coords( +def get_circle_coords( x_centre: float, y_centre: float, radius: float, n_points: int -) -> list: +) -> Generator[list, None, None]: """ - Estimate the n coordinates of a circle with a given centre and radius. + Estimate the n+1 coordinates of a circle with a given centre and radius. Args: x_centre (float): The longitude of the circle's centre. @@ -73,27 +74,18 @@ def get_all_circle_coords( the circle. Returns: - list: The n estimated coordinates of the circle. + Generator: Yield the n+1 estimated coordinates of the circle. """ + # Generate thetas for the n-gon + thetas = [i / n_points * math.tau for i in range(n_points)] + # Add theta = 0 to the end to ensure the circle is closed + thetas.append(0) - def get_circle_coord( - theta: float, x_centre: float, y_centre: float, radius: float - ) -> list: - """Calculate the x and y coordinates of a point on a circle, - given the angle theta and the circle's centre and radius.""" + for theta in thetas: x = radius * math.cos(theta) + x_centre y = radius * math.sin(theta) + y_centre # Round to 5 decimal places to prevent excessive precision - return [round(x, 5), round(y, 5)] - - # Generate thetas for the n-gon - thetas = [i / n_points * math.tau for i in range(n_points)] - circle_coords = [ - get_circle_coord(theta, x_centre, y_centre, radius) for theta in thetas - ] - # Ensure the circle is closed by adding the first coordinate to the end - circle_coords.append(circle_coords[0]) - return circle_coords + yield [round(x, 5), round(y, 5)] def ensure_counter_clockwise(coords: list) -> list: @@ -108,18 +100,18 @@ def ensure_counter_clockwise(coords: list) -> list: list: List of coordinate pairs in counter-clockwise order. """ - def signed_area(coords): - """Calculate the signed area of the polygon, to help + def clockwise(coords: list) -> bool: + """Calculate (double) the signed area of the polygon, to help determine the order of the coordinates.""" area = 0 n = len(coords) for i in range(n): x1, y1 = coords[i] x2, y2 = coords[(i + 1) % n] - area += x1 * y2 - x2 * y1 - return area / 2 + area += (x1 * y2) - (x2 * y1) + return area < 0 - if signed_area(coords) < 0: + if clockwise(coords): coords.reverse() return coords @@ -140,12 +132,12 @@ def get_polygon_coordinates(single_area: dict) -> list: radius = float(radius) x_centre, y_centre = map(float, centre.split(",")) # Estimate the circle coordinates with n=100 points - return get_all_circle_coords(x_centre, y_centre, radius, 100) + return list(get_circle_coords(x_centre, y_centre, radius, 100)) if "polygon" in single_area: # Takes form "x,y x,y x,y" but with newlines that need to be removed polygon_str = single_area["polygon"].replace("\n", "").split() - polygon_list = [list(map(float, coord.split(","))) for coord in polygon_str] + polygon_list = [list(map(float, coord.split(","))) for coord in polygon_str] # noqa return ensure_counter_clockwise(polygon_list) return [] @@ -187,31 +179,55 @@ def preprocess_alert(xml: str) -> str: return re.sub(r"<(/?)cap:(\w+)", r"<\1\2", xml) -def to_geojson(xml: str) -> dict: +def to_geojson(xml: str) -> str: """Takes the CAP alert XML and converts it to a GeoJSON. Args: xml (str): The CAP XML string. Returns: - dict: The final GeoJSON object. + str: The final GeoJSON object stringified. """ processed_xml = preprocess_alert(xml) - data = xmltodict.parse(processed_xml) - alert = data["alert"] + try: + data = xmltodict.parse(processed_xml) + except Exception as e: + logger.error(f"Error parsing XML: {e}") + raise + + alert = data.get("alert", {}) + if not alert: + logger.error("No alert object found in the XML.") + raise ValueError("No alert object found in the XML.") + alert_properties = get_properties(alert) - alert_geometry = get_geometry(alert["info"]["area"]) - - result = { - "type": "FeatureCollection", - "features": [ - { - "type": "Feature", - "properties": alert_properties, - "geometry": alert_geometry, - } - ], - } - return json.dumps(result, indent=4) + area = alert.get("info", {}).get("area") + if not area: + logger.error("No area object found in the alert.") + raise ValueError("No area object found in the alert.") + + alert_geometry = get_geometry(area) + + result = json.dumps( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": alert_properties, + "geometry": alert_geometry, + } + ], + }, + indent=4, + ) + + try: + geojson.loads(result) + except Exception as e: + logger.error(f"Error converting to GeoJSON: {e}") + raise + + return result