Skip to content

Commit

Permalink
Add drawing features for core and HMC7044
Browse files Browse the repository at this point in the history
Signed-off-by: Travis F. Collins <[email protected]>
  • Loading branch information
tfcollins committed May 13, 2024
1 parent 5e26767 commit 162ab52
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 1 deletion.
149 changes: 148 additions & 1 deletion adijif/clocks/hmc7044.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from adijif.solvers import GEKKO # type: ignore # isort: skip # noqa: I202
from adijif.solvers import GK_Intermediate # type: ignore # isort: skip # noqa: I202

from adijif.draw import Layout, Node # type: ignore # isort: skip # noqa: I202


class hmc7044(hmc7044_bf):
"""HMC7044 clock chip model.
Expand Down Expand Up @@ -162,6 +164,143 @@ def vxco_doubler(self, value: Union[int, List[int]]) -> None:
self._check_in_range(value, self.vxco_doubler_available, "vxco_doubler")
self._vxco_doubler = value

def _init_diagram(self):
"""Initialize diagram for HMC7044 alone"""
self.ic_diagram_node = None
self._diagram_output_dividers = []

# lo = Layout("HMC7044 Example")

self.ic_diagram_node = Node("HMC7044")
# lo.add_node(root)

# External
# ref_in = Node("REF_IN", ntype="input")
# lo.add_node(ref_in)

vcxo_doubler = Node("VCXO Doubler", ntype="shell")
self.ic_diagram_node.add_child(vcxo_doubler)

# Inside the IC
r2_div = Node("R2", ntype="divider")
# r2_div.value = "2"
self.ic_diagram_node.add_child(r2_div)
pfd = Node("PFD", ntype="phase-frequency-detector")
self.ic_diagram_node.add_child(pfd)
lf = Node("LF", ntype="loop-filter")
self.ic_diagram_node.add_child(lf)
vco = Node("VCO", ntype="voltage-controlled-oscillator")
vco.shape = "circle"
self.ic_diagram_node.add_child(vco)
n2 = Node("N2", ntype="divider")
self.ic_diagram_node.add_child(n2)

out_dividers = Node("Output Dividers", ntype="shell")
# ds = 4
# out_divs = []
# for i in range(ds):
# div = Node(f"D{i+1}", ntype="divider")
# out_dividers.add_child(div)
# out_divs.append(div)

self.ic_diagram_node.add_child(out_dividers)

# Connections inside the IC
# lo.add_connection({"from": ref_in, "to": r2_div, 'rate': 125000000})
self.ic_diagram_node.add_connection({"from": vcxo_doubler, "to": r2_div})
self.ic_diagram_node.add_connection(
{"from": r2_div, "to": pfd, "rate": 125000000 / 2}
)
self.ic_diagram_node.add_connection({"from": pfd, "to": lf})
self.ic_diagram_node.add_connection({"from": lf, "to": vco})
self.ic_diagram_node.add_connection({"from": vco, "to": n2})
self.ic_diagram_node.add_connection({"from": n2, "to": pfd})

self.ic_diagram_node.add_connection(
{"from": vco, "to": out_dividers, "rate": 4000000000}
)
# for div in out_divs:
# self.ic_diagram_node.add_connection({"from": out_dividers, "to": div})
# # root.add_connection({"from": vco, "to": div})

def _update_diagram(self, config: Dict):
"""Update diagram with configuration.
Args:
config (Dict): Configuration dictionary
"""
# Add output dividers
keys = config.keys()
output_dividers = self.ic_diagram_node.get_child("Output Dividers")
for key in keys:
if key.startswith("D"):
div = Node(key, ntype="divider")
output_dividers.add_child(div)
self.ic_diagram_node.add_connection(
{"from": output_dividers, "to": div}
)
else:
raise Exception(
f"Unknown key {key}. Must be of for DX where X is a number"
)

def draw(self):
"""Draw diagram in d2 language for IC alone with reference clock."""
if not self._saved_solution:
raise Exception("No solution to draw. Must call solve first.")
lo = Layout("HMC7044 Example")
lo.add_node(self.ic_diagram_node)

ref_in = Node("REF_IN", ntype="input")
lo.add_node(ref_in)
vcxo_double = self.ic_diagram_node.get_child("VCXO Doubler")
lo.add_connection(
{"from": ref_in, "to": vcxo_double, "rate": self._saved_solution["vcxo"]}
)

# Update Node values
node = self.ic_diagram_node.get_child("VCXO Doubler")
node.value = str(self._saved_solution["vcxo_doubler"])
node = self.ic_diagram_node.get_child("R2")
node.value = str(self._saved_solution["r2"])
node = self.ic_diagram_node.get_child("N2")
node.value = str(self._saved_solution["n2"])

# Update VCXO Doubler to R2
con = self.ic_diagram_node.get_connection("VCXO Doubler", "R2")
rate = self._saved_solution["vcxo_doubler"] * self._saved_solution["vcxo"]
self.ic_diagram_node.update_connection("VCXO Doubler", "R2", rate)

# Update R2 to PFD
con = self.ic_diagram_node.get_connection("R2", "PFD")
rate = (
self._saved_solution["vcxo"]
* self._saved_solution["vcxo_doubler"]
/ self._saved_solution["r2"]
)
self.ic_diagram_node.update_connection("R2", "PFD", rate)

# Update VCO
con = self.ic_diagram_node.get_connection("VCO", "Output Dividers")
self.ic_diagram_node.update_connection(
"VCO", "Output Dividers", self._saved_solution["vco"]
)

# Update diagram with dividers and rates
d = 0
output_dividers = self.ic_diagram_node.get_child("Output Dividers")

for key, val in self._saved_solution["output_clocks"].items():
clk_node = Node(key, ntype="divider")
div_value = val["divider"]
div = output_dividers.get_child(f"D{d}")
div.value = str(div_value)
d += 1
lo.add_node(clk_node)
lo.add_connection({"from": div, "to": clk_node, "rate": val["rate"]})

return lo.draw()

def get_config(self, solution: CpoSolveResult = None) -> Dict:
"""Extract configurations from solver results.
Expand Down Expand Up @@ -208,6 +347,9 @@ def get_config(self, solution: CpoSolveResult = None) -> Dict:
config["vco"] = clk * vd
config["vcxo"] = self.vcxo
config["vcxo_doubler"] = vd

self._saved_solution = config

return config

def _setup_solver_constraints(self, vcxo: int) -> None:
Expand Down Expand Up @@ -340,8 +482,10 @@ def set_requested_clocks(
# if type(self.vcxo) not in [int,float]:
# vcxo = self.vcxo['range']

self._saved_solution = None

# Add requested clocks to output constraints
for out_freq in out_freqs:
for d_n, out_freq in enumerate(out_freqs):

if self.solver == "gekko":
__d = self._d if isinstance(self._d, list) else [self._d]
Expand Down Expand Up @@ -375,5 +519,8 @@ def set_requested_clocks(
)
self.config["out_dividers"].append(od)

# Update diagram to include new divider
self._update_diagram({f"D{d_n}": od})

# Objectives
# self.model.Obj(-1*eo) # Favor even dividers
3 changes: 3 additions & 0 deletions adijif/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __init__(
Raises:
Exception: If solver is not valid
"""
self._saved_solution = None
if hasattr(self, "_init_diagram"):
self._init_diagram()
if solver:
self.solver = solver
if self.solver == "gekko":
Expand Down
173 changes: 173 additions & 0 deletions adijif/draw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Diagraming functions for different componets."""
import os


class Node:
def __init__(self, name, ntype=None, parent=None):
self.name = name
self.parent = parent
self.ntype = ntype
self.children = []
self.connections = []
self.shape = "rectangle"
self.use_unit_conversion_for_rate = True
self._value = None

def __repr__(self):
return f"Node({self.name})"

@property
def value(self):
return f"{self.name} = {self._value}" if self._value else None

@value.setter
def value(self, value):
self._value = value

def add_connection(self, connection: dict):
if "rate" in connection and self.use_unit_conversion_for_rate:
units = ["Hz", "kHz", "MHz", "GHz"]
rate = connection["rate"]
# Convert to unit based on rate scale
for i, unit in enumerate(units):
if rate < 1000:
break
rate /= 1000
connection["rate"] = f"{rate:.2f} {unit}"

self.connections.append(connection)

def get_connection(self, from_s, to):
for conn in self.connections:
if conn["from"].name == from_s and conn["to"].name == to:
return conn
raise ValueError(f"Connection from {from_s} to {to} not found.")

def update_connection(self, from_s, to, rate):
for conn in self.connections:
if conn["from"].name == from_s and conn["to"].name == to:
units = ["Hz", "kHz", "MHz", "GHz"]
# Convert to unit based on rate scale
for i, unit in enumerate(units):
if rate < 1000:
break
rate /= 1000
conn["rate"] = f"{rate:.2f} {unit}"
return

raise ValueError(f"Connection from {from_s} to {to} not found.")

def add_child(self, child):
if not isinstance(child, list):
child = [child]
for c in child:
c.parent = self
self.children.append(c)

def get_child(self, name: str):
for child in self.children:
if child.name == name:
return child
raise ValueError(f"Child with name {name} not found.")


class Layout:

si = " "

def __init__(self, name: str):
self.name = name
self.nodes = []
self.connections = []
self.use_unit_conversion_for_rate = True
self.output_filename = "clocks.d2"
self.output_image_filename = "clocks.png"
self.layout_engine = "elk"

def add_node(self, node: Node):
self.nodes.append(node)

def add_connection(self, connection: dict):
if "rate" in connection and self.use_unit_conversion_for_rate:
units = ["Hz", "kHz", "MHz", "GHz"]
rate = connection["rate"]
# Convert to unit based on rate scale
for i, unit in enumerate(units):
if rate < 1000:
break
rate /= 1000
connection["rate"] = f"{rate:.2f} {unit}"
self.connections.append(connection)

def draw(self):
"""Draw diagram in d2 language."""

diag = "direction: right\n\n"

def get_parents_names(node):
parents = []
while node.parent:
parents.append(node.parent.name)
node = node.parent
if not parents:
return ""
return ".".join(parents[::-1]) + "."

def draw_subnodes(node, spacing=" "):
diag = " {\n"
for child in node.children:
if child.value:
diag += spacing + child.name + f": {{tooltip: {child.value} }}"
else:
diag += spacing + child.name
if child.children:
diag += draw_subnodes(child, spacing + " ")
else:
diag += "\n"
diag += spacing + child.name + ".shape: " + child.shape + "\n"
lr = len(" ")
diag += spacing[:-lr] + "}\n"
return diag

# Add all nodes
for node in self.nodes:
diag += f"{node.name}"
if node.children:
diag += draw_subnodes(node)
diag += "\n"

diag += "\n"

# # Set shapes
# for node in self.nodes:
# parents_string = get_parents_names(node)
# diag += f"{parents_string}{node.name}.shape = {node.shape}\n"

# diag += "\n"

# Add all connections
for connection in self.connections:
from_p_name = get_parents_names(connection["from"])
to_p_name = get_parents_names(connection["to"])
label = f"{connection['rate']}" if "rate" in connection else None
diag += f"{from_p_name}{connection['from'].name} -> {to_p_name}{connection['to'].name}"
diag += ": " + label if label else ""
diag += "\n"

for node in self.nodes:
for connection in node.connections:
from_p_name = get_parents_names(connection["from"])
to_p_name = get_parents_names(connection["to"])
label = f"{connection['rate']}" if "rate" in connection else ""
diag += f"{from_p_name}{connection['from'].name} -> {to_p_name}{connection['to'].name}"
diag += ": " + label if label else ""
diag += "\n"

with open(self.output_filename, "w") as f:
f.write(diag)

os.system(
f"d2 -l {self.layout_engine} {self.output_filename} {self.output_image_filename}"
)

return self.output_image_filename
Loading

0 comments on commit 162ab52

Please sign in to comment.