Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
csgoh committed Jan 1, 2024
1 parent d234277 commit 00b6ccd
Showing 1 changed file with 45 additions and 108 deletions.
153 changes: 45 additions & 108 deletions src/processpiper/text2diagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@
import os
from PIL import Image
from rich.console import Console
from unittest import result


def parse_and_generate_code(input_str, png_output_file):
def _parse_and_generate_code(input_str, png_output_file):
"""
Parse input string and generate code to create a diagram
"""

# lines = input_str.strip().split("\n")
lines = [
line.strip()
for line in input_str.strip().split("\n")
if not line.startswith("#") and line.strip()
]

process_map_title = parse_title(lines)
process_map_width = int(parse_width(lines))
process_map_title = _parse_title(lines)
process_map_width = int(_parse_width(lines))

if not lines:
raise ValueError(
"No business process definition found. Please add pool(s), lane(s) and element(s)."
)

colour_theme = parse_colour_theme(lines)
colour_theme = _parse_colour_theme(lines)

colour_theme_code = f', colour_theme="{colour_theme}"'
process_map_width_code = f", width={process_map_width}"
Expand All @@ -36,33 +34,22 @@ def parse_and_generate_code(input_str, png_output_file):
f'with ProcessMap("{process_map_title}"{colour_theme_code if colour_theme is not None else ""}{process_map_width_code if process_map_width > 0 else ""}) as my_process_map:',
]

# if colour_theme is None and process_map_width is None:
# code_lines.append(f'with ProcessMap("{process_map_title}") as my_process_map:')
# elif colour_theme is None:
# code_lines.append(f'with ProcessMap("{process_map_title}", width="{process_map_width}") as my_process_map:')
# else:
# code_lines.append(f'with ProcessMap("{process_map_title}", colour_theme="{colour_theme}") as my_process_map:')

# if colour_theme is None
# elif process_map_width is None:
# f'with ProcessMap("{process_map_title}", colour_theme="{colour_theme}") as my_process_map:',
# else f'with ProcessMap("{process_map_title}", colour_theme="{colour_theme}", width="{process_map_width}) as my_process_map:',
# ]

indent = ""
pool_id = 1
lane_id = 0
pool_found = False
while lines:
line = lines.pop(0).strip()
if line.startswith("pool:"):
pool_found, pool_id = parse_pool(code_lines, pool_id, line)
pool_found, pool_id = _parse_pool(code_lines, pool_id, line)

elif line.startswith("lane:"):
indent, lane_id = parse_lane(code_lines, pool_id, lane_id, pool_found, line)
parse_element(lines, code_lines, indent, lane_id)
indent, lane_id = _parse_lane(
code_lines, pool_id, lane_id, pool_found, line
)
_parse_element(lines, code_lines, indent, lane_id)

parse_connection(input_str, code_lines)
_parse_connection(input_str, code_lines)

footer = input_str.strip().split("\n")[-1].split("footer:")
indent = " " * 4
Expand All @@ -75,7 +62,7 @@ def parse_and_generate_code(input_str, png_output_file):
return "\n".join(code_lines)


def parse_element(lines, code_lines, indent, lane_id):
def _parse_element(lines, code_lines, indent, lane_id):
"""
This function parses an element from a list of lines and adds it to a code block with the
appropriate indentation.
Expand All @@ -93,13 +80,13 @@ def parse_element(lines, code_lines, indent, lane_id):
# push lane_element back to lines
lines.insert(0, lane_element)
break
element_type, element_name, element_var = parse_lane_element(lane_element)
element_type, element_name, element_var = _parse_lane_element(lane_element)
code_lines.append(
f'{indent}{element_var} = lane{lane_id}.add_element("{element_name}", {element_type})'
)


def parse_lane(code_lines, pool_id, lane_id, pool_found, line):
def _parse_lane(code_lines, pool_id, lane_id, pool_found, line):
"""
The function parses a lane from a code line and adds it to a process map.
"""
Expand All @@ -120,7 +107,7 @@ def parse_lane(code_lines, pool_id, lane_id, pool_found, line):
return indent, lane_id


def parse_pool(code_lines, pool_id, line):
def _parse_pool(code_lines, pool_id, line):
"""
This function parses a line of code to extract a pool name and adds it to a list of code lines with
a specific format.
Expand All @@ -135,70 +122,29 @@ def parse_pool(code_lines, pool_id, line):
return pool_found, pool_id


# def parse_connection(input_str, code_lines):
# """
# The function parses a string input containing connection information and generates code lines to
# establish those connections in Python.
# """
# console = Console()
# connections_list = [
# line.split("->") for line in input_str.strip().split("\n") if "->" in line
# ]
# console.print(f">{connections_list=}")

# for connection in connections_list:
# console.print("\t" * 1 + f">{connection=}")
# for i in range(len(connection) - 1):
# element_name, label = get_element_name_and_label(connection[i].strip())
# target_element_name, target_label = get_element_name_and_label(
# connection[i + 1].strip()
# )
# console.print("\t" * 2 + f">{element_name=}, {label=}")
# console.print("\t" * 2 + f">{target_element_name=}, {target_label=}")
# if target_label:
# code_lines.append(
# f' {element_name}.connect({target_element_name}, "{target_label}")'
# )
# else:
# code_lines.append(
# f" {element_name}.connect({target_element_name})"
# )


def parse_connection(input_str, code_lines):
def _parse_connection(input_str, code_lines):
"""
The function parses a string input containing connection information and generates code lines to
establish those connections in Python.
"""
console = Console()
connections_list = [
line.split("->") for line in input_str.strip().split("\n") if "->" in line
]
# console.print(f">{connections_list=}")

for connection in connections_list:
# console.print("\t" * 1 + f">{connection=}")
for i in range(len(connection) - 1):
(
element_name,
label,
source_source_side,
source_target_side,
) = get_element_name_and_label(connection[i].strip())
) = _get_element_name_and_label(connection[i].strip())
(
target_element_name,
target_label,
source_side,
target_side,
) = get_element_name_and_label(connection[i + 1].strip())
# console.print(
# "\t" * 2
# + f">{element_name=}, {label=}, {source_source_side=}, {source_target_side=}"
# )
# console.print(
# "\t" * 2
# + f">{target_element_name=}, {target_label=}, {source_side=}, {target_side=}"
# )
) = _get_element_name_and_label(connection[i + 1].strip())
if target_label:
if source_source_side is not None and source_target_side is not None:
code_lines.append(
Expand Down Expand Up @@ -253,38 +199,36 @@ def parse_connection(input_str, code_lines):
)


def get_element_name_and_label(connection: str):
def _get_element_name_and_label(connection: str):
"""
The function extracts the element name and label from a given connection string using regular
expressions.
"""

# console = Console()
# console.print("\t" * 3 + f"#{connection=}")
# -- Find the connection label
pattern = r"(.*): (.*)"
if result := re.search(pattern, connection):
# -- Extract the element name and label
return result[1], result[2], None, None

# -- Find the connection source and target sides
direction_pattern = r"(.*)-(?:\((.*?),(.*?)\))?"
if match := re.search(direction_pattern, connection):
element_name = match[1]
# console.print("\t" * 4 + f">{element_name=}")
source_side = match[2]
# console.print("\t" * 4 + f">{source_side=}")
target_side = match[3].strip()
# console.print("\t" * 4 + f">{target_side=}")
if direction_match := re.search(direction_pattern, connection):
element_name = direction_match[1]
source_side = direction_match[2]
target_side = direction_match[3].strip()
return element_name, None, source_side, target_side

return connection.split(":")[0], None, None, None

def parse_colour_theme(lines):

def _parse_colour_theme(lines):
"""
The function extracts the color theme from a list of lines if it exists.
"""
return lines.pop(0).split(":")[1].strip() if "colourtheme" in lines[0] else None


def parse_title(lines):
def _parse_title(lines):
"""
The function extracts the title from a list of lines if the first line contains the word "title".
"""
Expand All @@ -295,26 +239,20 @@ def parse_title(lines):
return process_map_title


def parse_width(lines):
def _parse_width(lines):
"""
The function extracts the title from a list of lines if the first line contains the word "title".
"""
return lines.pop(0).split(":")[1].strip() if "width" in lines[0] else 0

if "width" in lines[0]:
process_map_width = lines.pop(0).split(":")[1].strip()
else:
process_map_width = 0

return process_map_width


def parse_lane_element(element_str):
def _parse_lane_element(element_str):
"""
The function parses a string representing a BPMN element and returns its type, name, and variable
name.
"""
"""Detect element type"""
### EventType
# --EventType--
if element_str.startswith("(start)"):
element_type = "EventType.START"
element_name = element_str[1 : element_str.index(")")].strip()
Expand Down Expand Up @@ -344,14 +282,14 @@ def parse_lane_element(element_str):
element_type = "EventType.LINK"
element_name = element_str[len("(@link") : element_str.index(")")].strip()

### ActivityType
# --ActivityType--
elif element_str.startswith("[@subprocess"):
element_type = "ActivityType.SUBPROCESS"
element_name = element_str[len("[@subprocess") : element_str.index("]")].strip()
elif element_str.startswith("["):
element_type = "ActivityType.TASK"
element_name = element_str[1 : element_str.index("]")].strip()
### GatewayType
# --GatewayType--
elif element_str.startswith("<@parallel"):
element_type = "GatewayType.PARALLEL"
element_name = element_str[len("<@parallel") : element_str.index(">")].strip()
Expand All @@ -376,6 +314,15 @@ def parse_lane_element(element_str):
return element_type, element_name, element_var


def _validate_generated_code(code: str):
if "add_lane" not in code:
raise ValueError("There is no lane defined. Please add lanes to process map.")

# If a multiline code does not contain add_element, raise error
if "add_element" not in code:
raise ValueError("There is no element defined. Please add elements to lane.")


def show_code_with_line_number(code: str):
"""
The function takes a string of code and prints it with line numbers.
Expand All @@ -386,15 +333,6 @@ def show_code_with_line_number(code: str):
console.print(f"{i+1:3} {line}")


def validate_generated_code(code: str):
if "add_lane" not in code:
raise ValueError("There is no lane defined. Please add lanes to process map.")

### If a multiline code does not contain add_element, raise error
if "add_element" not in code:
raise ValueError("There is no element defined. Please add elements to lane.")


def render(text: str, png_output_file: str = ""):
"""Render text to diagram"""
output_file_provided = True
Expand All @@ -405,10 +343,9 @@ def render(text: str, png_output_file: str = ""):
f"piper_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
)

generated_code = parse_and_generate_code(text, png_output_file)
validate_generated_code(generated_code)
generated_code = _parse_and_generate_code(text, png_output_file)
_validate_generated_code(generated_code)
# show_code_with_line_number(generated_code)
# print(generated_code)
exec(generated_code)
generated_image = Image.open(png_output_file)
generated_image.load()
Expand Down

0 comments on commit 00b6ccd

Please sign in to comment.