-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpd.py
312 lines (275 loc) · 12.4 KB
/
pd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2021 ghecko - Jordan Ovrè <[email protected]>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import sigrokdecode as srd
import re
from collections import deque
from .lists import fifo_registers2, fifo_registers1
class Ann:
""" Annotation ID """
READ, WRITE, ADDRESS, WAIT, DATA, VMK = range(6)
class Operation:
""" TPM transaction type """
READ = 0x80
WRITE = 0x00
class TransactionState:
""" State of the transaction """
READ = 0
WRITE = 1
READ_ADDRESS = 2
WAIT = 3
TRANSFER_DATA = 4
class Transaction:
"""Represent one TPM SPI transaction
Args:
start_sample: The absolute samplenumber of the first sample of this transaction.
operation: Transaction type.
transfer_size: The number of data bytes.
Attributes:
start_sample (int): The absolute samplenumber of the first sample of this transaction.
end_sample (int): The absolute samplenumber of the last sample of this transaction.
operation (Operation): Transaction type.
address (bytearray): The register address in the transaction. (big-endian).
data (bytearray): Data in the transaction.
size (int): The number of data bytes.
wait_count (int): Holds the number of wait states between the address and data .
"""
def __init__(self, start_sample, operation, transfer_size):
self.start_sample = start_sample
self.end_sample_op = None
self.end_sample_addr = None
self.end_sample_data = None
self.end_sample_wait = None
self.operation = operation
self.address = bytearray()
self.data = bytearray()
self.size = transfer_size
self.wait_count = 0
def is_complete(self):
"""
Check if the transaction is complete.
A transaction is complete when all address and data bytes are captured.
"""
return self.is_address_complete() and self.is_data_complete()
def is_data_complete(self):
""" Check if all data bytes are captured """
return len(self.data) == self.size
def is_address_complete(self):
""" Check if all address bytes are captured. """
return len(self.address) == 3
def frame(self, fifo_registers):
""" Return address and data annotation if the transaction is complete. """
if self.is_complete():
register_name = ""
try:
register_name = fifo_registers[int.from_bytes(self.address, "big") & 0xffff]
except KeyError:
register_name = "Unknown"
data_str = ''.join('{:02x}'.format(x) for x in self.data)
op_ann = ['Read', 'Rd'] if self.operation == Operation.READ else ['Write', 'Wr']
wait_ann = ['Wait', 'Wt']
addr_ann = ['Register: {}'.format(register_name), '{}'.format(register_name)]
data_ann = ['{}'.format(data_str), '{}'.format(data_str), data_str]
if self.wait_count > 0:
return ((self.start_sample, self.end_sample_op, op_ann),
(self.end_sample_op, self.end_sample_addr, addr_ann),
(self.end_sample_addr, self.end_sample_wait, wait_ann),
(self.end_sample_wait, self.end_sample_data, data_ann))
else:
return ((self.start_sample, self.end_sample_op, op_ann),
(self.end_sample_op, self.end_sample_addr, addr_ann),
(self.end_sample_addr, self.end_sample_data, data_ann))
return None
class Decoder(srd.Decoder):
api_version = 3
id = 'spi_tpm'
name = 'SPI TPM'
longname = 'SPI TPM transactions'
desc = 'Parses TPM transactions from SPI bus with automatic VMK extraction for BitLocker.'
license = 'gplv2+'
inputs = ['spi']
outputs = []
tags = ['IC', 'TPM', 'BitLocker']
annotations = (
('Read', 'Read register operation'),
('Write', 'Write register operation'),
('Address', 'Register address'),
('Data', 'Data'),
('Wait', 'Wait'),
('VMK', 'Extracted BitLocker VMK'),
)
annotation_rows = (
('Transactions', 'TPM transactions', (0, 1, 2, 3, 4)),
('B-VMK', 'BitLocker Volume Master Key', (5,)),
)
options = (
{'id': 'tpm_version', 'desc': 'TPM Version 1.2 or 2.0', 'default': '2.0',
'values': ('2.0', '1.2')},
)
def __init__(self):
# TPM Profile Specification for TPM 2.0 page 133-134
self.end_wait = 0x01
self.operation_mask = 0x80
self.address_mask = 0x3f
# Circular buffer to detect VMK header on transaction data
self.queue = deque(maxlen=12)
self.vmk_meta = {"s_queue": deque(maxlen=12), "vmk_ss": 0, "vmk_es": 0}
self.saving_vmk = False
self.vmk = []
self.reset()
self.state_machine = None
self.init_state_machine()
self.fifo_registers = None
def reset(self):
self.ss = self.es = 0
self.state = None
self.current_transaction = None
def end_current_transaction(self):
self.reset()
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
if self.options['tpm_version'] == "2.0":
self.wait_mask = 0x00
self.fifo_registers = fifo_registers2
else:
self.wait_mask = 0xFE
self.fifo_registers = fifo_registers1
def init_state_machine(self):
self.state_machine = {
TransactionState.READ: self._transaction_read,
TransactionState.WRITE: self._transaction_write,
TransactionState.READ_ADDRESS: self._transaction_read_addr,
TransactionState.WAIT: self._transaction_wait,
TransactionState.TRANSFER_DATA: self._transaction_data,
None: self._return
}
def _return(self, mosi, miso):
return
def transaction_state(self, mosi):
if (mosi & self.operation_mask) == Operation.READ:
return TransactionState.READ
elif (mosi & self.operation_mask) == Operation.WRITE:
return TransactionState.WRITE
else:
return None
def _transaction_wait(self, mosi, miso):
self.current_transaction.wait_count += 1
if miso == self.end_wait:
self.current_transaction.end_sample_wait = self.es
self.state = TransactionState.TRANSFER_DATA
return
def _transaction_read(self, mosi, miso):
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
transfer_size = (mosi & 0x3f) + 1
self.current_transaction = Transaction(self.ss, Operation.READ, transfer_size)
self.current_transaction.end_sample_op = self.es
self.state = TransactionState.READ_ADDRESS
def _transaction_write(self, mosi, miso):
# TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write)
# transfer size is defined on bits 0 to 5 of the first byte of the transaction
transfer_size = (mosi & 0x3f) + 1
self.current_transaction = Transaction(self.ss, Operation.WRITE, transfer_size)
self.current_transaction.end_sample_op = self.es
self.state = TransactionState.READ_ADDRESS
def _transaction_read_addr(self, mosi, miso):
# Get address bytes
# Address is 3 bytes long
self.current_transaction.address.extend(mosi.to_bytes(1, byteorder='big'))
if self.current_transaction.is_address_complete():
self.current_transaction.end_sample_addr = self.es
if miso == self.wait_mask:
self.state = TransactionState.WAIT
else:
self.state = TransactionState.TRANSFER_DATA
return
def _transaction_data(self, mosi, miso):
self.current_transaction.end_sample_data = self.es
if self.current_transaction.operation == Operation.READ:
self.current_transaction.data.extend(miso.to_bytes(1, byteorder='big'))
self.recover_vmk(miso)
elif self.current_transaction.operation == Operation.WRITE:
self.current_transaction.data.extend(mosi.to_bytes(1, byteorder='big'))
# Check if the transaction is complete
annotation = self.current_transaction.frame(self.fifo_registers)
if annotation:
if self.current_transaction.wait_count == 0:
(op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (data_ss, data_es, data_ann) = annotation
self.put(op_ss, op_es, self.out_ann,
[Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann])
self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann])
self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann])
else:
(op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (wait_ss, wait_es, wait_ann), (data_ss, data_es, data_ann) = annotation
self.put(op_ss, op_es, self.out_ann,
[Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann])
self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann])
self.put(wait_ss, wait_es, self.out_ann, [Ann.WAIT, wait_ann])
self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann])
self.end_current_transaction()
def _is_vmk_transaction(self):
try:
if self.fifo_registers[int.from_bytes(self.current_transaction.address, "big") & 0xffff] == "TPM_DATA_FIFO_0":
return True
else:
return False
except KeyError:
return False
def check_vmk_header(self):
""" Check for VMK header """
if self.queue[0] == 0x2c:
potential_header = ''.join('{:02x}'.format(x) for x in self.queue)
if re.findall(r'2c000[0-6]000[1-9]000[0-1]000[0-5]200000', potential_header):
self.put(self.vmk_meta["s_queue"][0], self.es, self.out_ann,
[Ann.VMK, ['VMK header: {}'.format(potential_header)]])
self.saving_vmk = True
def recover_vmk(self, miso):
""" Check if VMK is releasing """
if not self.saving_vmk:
# Add data to the circular buffer
# Check if the transaction actually got the VMK.
# Sometimes, other TPM transactions occurs when recovering the VMK
if self._is_vmk_transaction():
self.queue.append(miso)
# Add sample number to meta queue
self.vmk_meta["s_queue"].append(self.ss)
# Check if VMK header retrieved
self.check_vmk_header()
else:
if len(self.vmk) == 0:
self.vmk_meta["vmk_ss"] = self.ss
if len(self.vmk) < 32:
# Check if the transaction actually got the VMK.
# Sometimes, other TPM transactions occurs when recovering the VMK
if self._is_vmk_transaction():
self.vmk.append(miso)
self.vmk_meta["vmk_es"] = self.es
else:
self.saving_vmk = False
self.put(self.vmk_meta["vmk_ss"], self.vmk_meta["vmk_es"], self.out_ann,
[Ann.VMK, ['VMK: {}'.format(''.join('{:02x}'.format(x) for x in self.vmk))]])
def decode(self, ss, es, data):
self.ss, self.es = ss, es
ptype, mosi, miso = data
if ptype == 'CS-CHANGE':
self.end_current_transaction()
if ptype != 'DATA':
return
if self.state is None:
self.state = self.transaction_state(mosi)
self.state_machine[self.state](mosi, miso)