-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_path.py
190 lines (148 loc) · 5.62 KB
/
data_path.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import logging
from collections import deque
import control_unit as cu
from uarch import Signal
from isa import Opcode, MEMORY_SIZE, MIN_SIGN, MAX_SIGN
class DataPath:
def __init__(self, program: list, io_controller: "IOController") -> None:
self.control_unit: cu.ControlUnit
self._alu: ALU = ALU(self)
self._memory_size: int = MEMORY_SIZE
self._memory: list = [Opcode.NOP] * MEMORY_SIZE
for index, _ in enumerate(program):
self._memory[index] = program[index]
self._address_register: int = 0
self._buffer_register: int = 0
self._io_controller: IOController = io_controller
self._data_stack: deque[int] = deque()
self._tos: int = 0
def latch_address_tos(self, micro_instruction: list[Signal]) -> None:
if Signal.SEL_TOS_DS in micro_instruction:
self._tos = self.nos
elif Signal.SEL_TOS_MEMORY in micro_instruction:
self._tos = self.read()
elif Signal.SEL_TOS_INPUT in micro_instruction:
self._tos = self._io_controller.read(self._tos)
elif Signal.SEL_TOS_ALU in micro_instruction:
self._tos = self._alu.result(micro_instruction)
elif Signal.SEL_TOS_BR in micro_instruction:
self._tos = self._buffer_register
def latch_address_register(self, micro_instruction: list[Signal]) -> None:
if Signal.SEL_AR_TOS in micro_instruction:
self._address_register = self._tos
elif Signal.SEL_AR_PC in micro_instruction:
self._address_register = self.control_unit.program_counter
def latch_buffer_register(self) -> None:
self._buffer_register = self.nos
def ds_push(self) -> None:
self._data_stack.append(self._tos)
def ds_pop(self) -> None:
self._data_stack.pop()
def out(self) -> None:
symbol = self.nos
self._io_controller.write(self._tos, symbol)
def read(self):
return self._memory[self._address_register]
def write(self) -> None:
self._memory[self._address_register] = self.nos
@property
def zero(self) -> bool:
return self._tos == 0
@property
def data_stack(self):
return self._data_stack
@property
def tos(self) -> int:
return self._tos
@property
def nos(self) -> int:
if len(self._data_stack) == 0:
raise Exception("Empty data stack")
return self._data_stack[-1]
@property
def address_register(self):
return self._address_register
@property
def buffer_register(self):
return self._buffer_register
@property
def start_address(self):
return self._memory[0]
@property
def memory(self):
return self._memory
class ALU:
def __init__(self, dp: DataPath):
self._data_path: DataPath = dp
def result(self, micro_instruction: list[Signal]) -> int:
left = self._data_path.tos
if Signal.ALU_RIGHT_OP_ZERO in micro_instruction:
right = 0
elif Signal.ALU_RIGHT_OP_NOS in micro_instruction:
right = self._data_path.nos
else:
right = 0
if Signal.ALU_SUM in micro_instruction:
result = left + right
elif Signal.ALU_SUB in micro_instruction:
result = left - right
elif Signal.ALU_MUL in micro_instruction:
result = left * right
elif Signal.ALU_DIV in micro_instruction:
result = int(left / right)
elif Signal.ALU_MOD in micro_instruction:
result = int(left % right)
else:
result = 0
if Signal.ALU_INC in micro_instruction:
result += 1
elif Signal.ALU_DEC in micro_instruction:
result -= 1
return self._handle_overflow(result)
def _handle_overflow(self, value: int) -> int:
if value > MAX_SIGN:
value %= MAX_SIGN
elif value < MIN_SIGN:
value %= abs(MIN_SIGN)
return value
class IOUnit:
def __init__(self, input_buffer: deque[int]):
self._input_buffer: deque[int] = input_buffer
self._output_buffer: deque[int] = deque()
def read(self) -> int:
return self._input_buffer.popleft()
def write(self, value: int) -> None:
return self._output_buffer.append(value)
def get_str_output(self) -> str:
output: list[str] = []
i = 0
while i < len(self._output_buffer):
length = self._output_buffer[i]
for _ in range(length):
i += 1
output.append(chr(self._output_buffer[i]))
i += 1
return "".join(output)
def get_list_output(self) -> list[int]:
return list(self._output_buffer)
class IOController:
def __init__(self) -> None:
self._connected_units: dict[int, IOUnit] = {}
def connect(self, port: int, unit: IOUnit) -> None:
self._connected_units[port] = unit
def disconnect(self, port: int) -> None:
self._connected_units.pop(port)
def read(self, port: int) -> int:
if not self._connected_units[port]:
raise Exception(f"No device connected to port {port}")
return self._connected_units[port].read()
def write(self, port: int, value: int):
if not self._connected_units[port]:
raise Exception(f"No device connected to port {port}")
if value < 32 or value > 126:
logging.debug("Output: writing %d on port %d", value, port)
else:
logging.debug(
"Output: writing `%s` (%d) on port %d", chr(value), value, port
)
self._connected_units[port].write(value)