diff --git a/.gitignore b/.gitignore index afe99af..9afb144 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ env __pycache__/ .DS_Store prefect.md -*_old.* \ No newline at end of file +*_old.* +Bildschirmfoto* \ No newline at end of file diff --git a/README.md b/README.md index 2131a0b..986e5fe 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The goal is to automatically detect and process incoming data from entso-e Trans >## Table of Contents >- [Motivation](#motivation) ->- [Framework Choices](#framework-choices)regularly +>- [Framework Choices](#framework-choices) > - [Why Prefect](#why-prefect) > - [Why Pulumi](#why-pulumi) > - [Why AWS ECS](#why-aws-ecs) @@ -104,12 +104,13 @@ As promised, the data_flow workflow is a decorated python function and quite eas # dataflow.py @flow -def data_flow(event_message: str) -> None: - event_payload = extract_event_payload(event_message) - region_code = extract_payload_region(event_payload) +def data_flow(event_msg: str) -> None: + event_payload = extract_event_payload(event_msg) + region_code = extract_region_code(event_msg) + region = lookup_area(region_code).meaning installed_capacity = extract_installed_capacity(region_code) data = transform_data(event_payload, installed_capacity) - send_newsletters(data, region_code) + send_newsletters(data, region) ``` When you feed in an `event_message`, the message payload will get extracted. @@ -138,12 +139,13 @@ from etl.utils import mock_event_data @flow -def data_flow(event_message: str) -> None: - event_payload = extract_event_payload(event_message) - region_code = extract_payload_region(event_payload) +def data_flow(event_msg: str) -> None: + event_payload = extract_event_payload(event_msg) + region_code = extract_region_code(event_msg) + region = lookup_area(region_code).meaning installed_capacity = extract_installed_capacity(region_code) data = transform_data(event_payload, installed_capacity) - send_newsletters(data, region_code) + send_newsletters(data, region) @task @@ -152,7 +154,7 @@ def extract_event_payload(event_message: str) -> str: @task -def extract_payload_region(payload_str: str) -> str: +def extract_region_code(payload_str: str) -> str: pattern = r">([^<]+)" match = re.search(pattern, payload_str) region_code = "" @@ -201,14 +203,13 @@ def transform_data( @flow(retries=3, retry_delay_seconds=30) -def send_newsletters(data: Dict[str, Any], region_code: str) -> None: +def send_newsletters(data: Dict[str, Any], region: str) -> None: """ example sub-flow in which the data will be sent to registered users; no load step in our etl here, in fact it's an ets ;) """ # use the prefect Email Credentials Block here: email_server_credentials = EmailServerCredentials.load("my-email-credentials") users: List[User] = get_users() - region = lookup_area(region_code).meaning for user in users: msg = f"Hello {user.name},
Please find our lastest update:

" @@ -223,7 +224,7 @@ def send_newsletters(data: Dict[str, Any], region_code: str) -> None: email_to=user.email, ) ``` -It is possible to run this flow locally on your computer by feeding in some mocked data for the `event_message`. You do not necessarily need the entso-e api key for a first test run, but the newsletters data will be outdated and miss some information. All you have to prepare for this, is the "Prefect" step of the following [Prerequisites](#prerequisites-to-run-the-dataflow), you may want to set the `entsoe_api_key=""`, if you don't have one so far, and the deployment mode to `LOCAL_TEST`. In fact, you could reduce the following code to one line: `if __name__ == "__main__": data_flow(mock_event_data())`, but I like to have the different options combined here. +It is possible to run this flow locally on your computer by feeding in some mocked data for the `event_msg`. You do not necessarily need the entso-e api key for a first test run, but the newsletters data will be outdated and miss some information. All you have to prepare for this, is the "Prefect" step of the following [Prerequisites](#prerequisites-to-run-the-dataflow), you may want to set the `entsoe_api_key=""`, if you don't have one so far, and the deployment mode to `LOCAL_TEST`. In fact, you could reduce the following code to one line: `if __name__ == "__main__": data_flow(mock_event_data())`, but I like to have the different options combined here. ```python # dataflow.py @@ -331,8 +332,8 @@ When you create the ECS push work pool (you can do this directly in the Prefect >**_ADVANCED:_** If you are already familiar with the [AWS Task Definition], you might have noticed, that not all parameters of the Task Definition Template are available in the base job template of the Prefect ECS (push) work pool. It is very easy to [adjust the job template], if you need to set a specific task definition parameter, the linked video shows how to do this in the Prefect Cloud UI. In short: put the desired parameter to the underlaying work pool json definition (advanced tab of the work pool configuration), to ingest the needed parameters AND assign it also to the desired task definition parameter down at the bottom in the job configuration section (in jinja notation!). By the way: the following command will give you the base job template for the ecs:push work pool in the terminal: `prefect work-pool get-default-base-job-template --type ecs:push` +...TODO: update due to release of [Prefect 2.14.11](https://docs.prefect.io/latest/guides/deployment/push-work-pools/) -...TODO: update due to release of [Prefect 2.14.11](https://docs.prefect.io/latest/guides/deployment/push-work-pools/) Recently, also a new optional ecs push work pool parameter [--provision-infra] was released: ```bash prefect work-pool create --type ecs:push my_push_work-pool --provision-infra diff --git a/etl/dataflow.py b/etl/dataflow.py index 3cfed73..56c81e7 100644 --- a/etl/dataflow.py +++ b/etl/dataflow.py @@ -21,7 +21,7 @@ def extract_event_payload(event_msg: str) -> str: @task -def extract_payload_region(payload_str: str) -> str: +def extract_region_code(payload_str: str) -> str: pattern = r">([^<]+)" match = re.search(pattern, payload_str) region_code = "" @@ -95,14 +95,14 @@ def transform_data(xml_str: str, installed_capacity_df: pd.DataFrame) -> Dict[st @flow(retries=3, retry_delay_seconds=30) -def send_newsletters(data: Dict[str, Any], region_code: str) -> None: +def send_newsletters(data: Dict[str, Any], region: str) -> None: """in this example the data won't be loaded into a database, but will be sent to registered users """ # use the prefect Email Credentials Block here: email_server_credentials = EmailServerCredentials.load("my-email-credentials") users: List[User] = get_users() - region = lookup_area(region_code).meaning + ## region = lookup_area(region_code).meaning for user in users: line1 = f"Hello {user.name},
" @@ -122,13 +122,14 @@ def send_newsletters(data: Dict[str, Any], region_code: str) -> None: ) -@flow +@flow(log_prints=True) def data_flow(event_msg: str) -> None: event_payload = extract_event_payload(event_msg) - region_code = extract_payload_region(event_payload) + region_code = extract_region_code(event_msg) + region = lookup_area(region_code).meaning installed_capacity = extract_installed_capacity(region_code) data = transform_data(event_payload, installed_capacity) - send_newsletters(data, region_code) + send_newsletters(data, region) if __name__ == "__main__": @@ -142,7 +143,7 @@ def data_flow(event_msg: str) -> None: ) ### Set your preferred flow run/ deployment mode here: - deploy_mode = DeployModes.ECS_PUSH_WORK_POOL + deploy_mode = DeployModes.LOCAL_TEST if deploy_mode == DeployModes.LOCAL_TEST: # test flow with mocked event data diff --git a/images/flow_chart.png b/images/flow_chart.png index e714d1e..6eb3118 100644 Binary files a/images/flow_chart.png and b/images/flow_chart.png differ