Skip to content

Commit

Permalink
update naming and flow chart
Browse files Browse the repository at this point in the history
  • Loading branch information
mt7180 committed Dec 17, 2023
1 parent 23f6f02 commit fab21aa
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ env
__pycache__/
.DS_Store
prefect.md
*_old.*
*_old.*
Bildschirmfoto*
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The goal is to automatically detect and process incoming data from entso-e Trans

>## Table of Contents
>- [Motivation](#motivation)
>- [Framework Choices](#framework-choices)regularly
>- [Framework Choices](#framework-choices)
> - [Why Prefect](#why-prefect)
> - [Why Pulumi](#why-pulumi)
> - [Why AWS ECS](#why-aws-ecs)
Expand Down Expand Up @@ -104,12 +104,13 @@ As promised, the data_flow workflow is a decorated python function and quite eas
# dataflow.py

@flow
def data_flow(event_message: str) -> None:
event_payload = extract_event_payload(event_message)
region_code = extract_payload_region(event_payload)
def data_flow(event_msg: str) -> None:
event_payload = extract_event_payload(event_msg)
region_code = extract_region_code(event_msg)
region = lookup_area(region_code).meaning
installed_capacity = extract_installed_capacity(region_code)
data = transform_data(event_payload, installed_capacity)
send_newsletters(data, region_code)
send_newsletters(data, region)
```

When you feed in an `event_message`, the message payload will get extracted.
Expand Down Expand Up @@ -138,12 +139,13 @@ from etl.utils import mock_event_data


@flow
def data_flow(event_message: str) -> None:
event_payload = extract_event_payload(event_message)
region_code = extract_payload_region(event_payload)
def data_flow(event_msg: str) -> None:
event_payload = extract_event_payload(event_msg)
region_code = extract_region_code(event_msg)
region = lookup_area(region_code).meaning
installed_capacity = extract_installed_capacity(region_code)
data = transform_data(event_payload, installed_capacity)
send_newsletters(data, region_code)
send_newsletters(data, region)


@task
Expand All @@ -152,7 +154,7 @@ def extract_event_payload(event_message: str) -> str:


@task
def extract_payload_region(payload_str: str) -> str:
def extract_region_code(payload_str: str) -> str:
pattern = r">([^<]+)</inBiddingZone_Domain\.mRID>"
match = re.search(pattern, payload_str)
region_code = ""
Expand Down Expand Up @@ -201,14 +203,13 @@ def transform_data(


@flow(retries=3, retry_delay_seconds=30)
def send_newsletters(data: Dict[str, Any], region_code: str) -> None:
def send_newsletters(data: Dict[str, Any], region: str) -> None:
""" example sub-flow in which the data will be sent to registered users;
no load step in our etl here, in fact it's an ets ;)
"""
# use the prefect Email Credentials Block here:
email_server_credentials = EmailServerCredentials.load("my-email-credentials")
users: List[User] = get_users()
region = lookup_area(region_code).meaning

for user in users:
msg = f"Hello {user.name}, <br> Please find our lastest update: <br><br>"
Expand All @@ -223,7 +224,7 @@ def send_newsletters(data: Dict[str, Any], region_code: str) -> None:
email_to=user.email,
)
```
It is possible to run this flow locally on your computer by feeding in some mocked data for the `event_message`. You do not necessarily need the entso-e api key for a first test run, but the newsletters data will be outdated and miss some information. All you have to prepare for this, is the "Prefect" step of the following [Prerequisites](#prerequisites-to-run-the-dataflow), you may want to set the `entsoe_api_key=""`, if you don't have one so far, and the deployment mode to `LOCAL_TEST`. In fact, you could reduce the following code to one line: `if __name__ == "__main__": data_flow(mock_event_data())`, but I like to have the different options combined here.
It is possible to run this flow locally on your computer by feeding in some mocked data for the `event_msg`. You do not necessarily need the entso-e api key for a first test run, but the newsletters data will be outdated and miss some information. All you have to prepare for this, is the "Prefect" step of the following [Prerequisites](#prerequisites-to-run-the-dataflow), you may want to set the `entsoe_api_key=""`, if you don't have one so far, and the deployment mode to `LOCAL_TEST`. In fact, you could reduce the following code to one line: `if __name__ == "__main__": data_flow(mock_event_data())`, but I like to have the different options combined here.
```python
# dataflow.py

Expand Down Expand Up @@ -331,8 +332,8 @@ When you create the ECS push work pool (you can do this directly in the Prefect
>**_ADVANCED:_** If you are already familiar with the [AWS Task Definition], you might have noticed, that not all parameters of the Task Definition Template are available in the base job template of the Prefect ECS (push) work pool. It is very easy to [adjust the job template], if you need to set a specific task definition parameter, the linked video shows how to do this in the Prefect Cloud UI. In short: put the desired parameter to the underlaying work pool json definition (advanced tab of the work pool configuration), to ingest the needed parameters AND assign it also to the desired task definition parameter down at the bottom in the job configuration section (in jinja notation!).
By the way: the following command will give you the base job template for the ecs:push work pool in the terminal: `prefect work-pool get-default-base-job-template --type ecs:push`

...TODO: update due to release of [Prefect 2.14.11](https://docs.prefect.io/latest/guides/deployment/push-work-pools/)

...TODO: update due to release of [Prefect 2.14.11](https://docs.prefect.io/latest/guides/deployment/push-work-pools/)
Recently, also a new optional ecs push work pool parameter [--provision-infra] was released:
```bash
prefect work-pool create --type ecs:push my_push_work-pool --provision-infra
Expand Down
15 changes: 8 additions & 7 deletions etl/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def extract_event_payload(event_msg: str) -> str:


@task
def extract_payload_region(payload_str: str) -> str:
def extract_region_code(payload_str: str) -> str:
pattern = r">([^<]+)</inBiddingZone_Domain\.mRID>"
match = re.search(pattern, payload_str)
region_code = ""
Expand Down Expand Up @@ -95,14 +95,14 @@ def transform_data(xml_str: str, installed_capacity_df: pd.DataFrame) -> Dict[st


@flow(retries=3, retry_delay_seconds=30)
def send_newsletters(data: Dict[str, Any], region_code: str) -> None:
def send_newsletters(data: Dict[str, Any], region: str) -> None:
"""in this example the data won't be loaded into a database,
but will be sent to registered users
"""
# use the prefect Email Credentials Block here:
email_server_credentials = EmailServerCredentials.load("my-email-credentials")
users: List[User] = get_users()
region = lookup_area(region_code).meaning
## region = lookup_area(region_code).meaning

for user in users:
line1 = f"Hello {user.name}, <br>"
Expand All @@ -122,13 +122,14 @@ def send_newsletters(data: Dict[str, Any], region_code: str) -> None:
)


@flow
@flow(log_prints=True)
def data_flow(event_msg: str) -> None:
event_payload = extract_event_payload(event_msg)
region_code = extract_payload_region(event_payload)
region_code = extract_region_code(event_msg)
region = lookup_area(region_code).meaning
installed_capacity = extract_installed_capacity(region_code)
data = transform_data(event_payload, installed_capacity)
send_newsletters(data, region_code)
send_newsletters(data, region)


if __name__ == "__main__":
Expand All @@ -142,7 +143,7 @@ def data_flow(event_msg: str) -> None:
)

### Set your preferred flow run/ deployment mode here:
deploy_mode = DeployModes.ECS_PUSH_WORK_POOL
deploy_mode = DeployModes.LOCAL_TEST

if deploy_mode == DeployModes.LOCAL_TEST:
# test flow with mocked event data
Expand Down
Binary file modified images/flow_chart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit fab21aa

Please sign in to comment.