Skip to content

Commit

Permalink
- fixed mypy errors for Python >= 3.12;
Browse files Browse the repository at this point in the history
  • Loading branch information
jaltmayerpizzorno committed Oct 4, 2024
1 parent 996dc90 commit a9f5869
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 73 deletions.
8 changes: 4 additions & 4 deletions src/slipcover/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def _mark_branch(self, from_line: int, to_line: int) -> List[ast.stmt]:
# Using a constant Expr allows the compiler to optimize this to a NOP
mark = ast.Expr(ast.Constant(None))
for node in ast.walk(mark):
node.lineno = node.end_lineno = encode_branch(from_line, to_line)
node.lineno = node.end_lineno = encode_branch(from_line, to_line) # type: ignore[attr-defined]
# Leaving the columns unitialized can lead to invalid positions despite
# our use of ast.fix_missing_locations
node.col_offset = node.end_col_offset = -1
node.col_offset = node.end_col_offset = -1 # type: ignore[attr-defined]
else:
mark = ast.Assign([ast.Name(BRANCH_NAME, ast.Store())],
ast.Tuple([ast.Constant(from_line), ast.Constant(to_line)], ast.Load()))
Expand Down Expand Up @@ -132,7 +132,7 @@ def visit_Match(self, node: ast.Match) -> ast.Match:
elif isinstance(node, try_type) and name == 'handlers':
# each 'except' continues either in 'finally', or after the 'try'
for h in field:
h.next_node = node.finalbody[0] if node.finalbody else node.next_node # type: ignore[attr-defined]
h.next_node = node.finalbody[0] if node.finalbody else node.next_node # type: ignore[attr-defined,union-attr]
elif isinstance(field, list):
# if a field is a list, each item but the last one continues with the next item
prev = None
Expand All @@ -152,7 +152,7 @@ def visit_Match(self, node: ast.Match) -> ast.Match:
elif node.finalbody:
prev.next_node = node.finalbody[0] # type: ignore[attr-defined]
else:
prev.next_node = node.next_node # type: ignore[attr-defined]
prev.next_node = node.next_node # type: ignore[attr-defined, union-attr]
else:
prev.next_node = node.next_node # type: ignore[attr-defined]

Expand Down
4 changes: 2 additions & 2 deletions src/slipcover/bytecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def branch2offset(arg: int) -> int:
op_PRECALL = dis.opmap["PRECALL"]
op_CALL = dis.opmap["CALL"]
op_CACHE = dis.opmap["CACHE"]
is_EXTENDED_ARG.append(dis._all_opmap["EXTENDED_ARG_QUICK"])
is_EXTENDED_ARG.append(dis._all_opmap["EXTENDED_ARG_QUICK"]) # type: ignore[attr-defined]
else:
op_RESUME = None
op_PUSH_NULL = None
Expand Down Expand Up @@ -63,7 +63,7 @@ def opcode_arg(opcode: int, arg: int, min_ext : int = 0) -> List[int]:
)
bytecode.extend([opcode, arg & 0xFF])
if sys.version_info >= (3,11):
bytecode.extend([op_CACHE, 0] * dis._inline_cache_entries[opcode])
bytecode.extend([op_CACHE, 0] * dis._inline_cache_entries[opcode]) # type: ignore[attr-defined]
return bytecode


Expand Down
136 changes: 69 additions & 67 deletions src/slipcover/slipcover.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,54 +450,55 @@ def instrument(self, co: types.CodeType, parent: Optional[types.CodeType] = None
return new_code


def deinstrument(self, co, lines: set) -> types.CodeType:
"""De-instruments a code object previously instrumented for coverage detection.
if sys.version_info < (3,12):
def deinstrument(self, co, lines: set) -> types.CodeType:
"""De-instruments a code object previously instrumented for coverage detection.
If invoked on a function, de-instruments its code.
"""
If invoked on a function, de-instruments its code.
"""

assert not self.immediate
assert not self.immediate

if isinstance(co, types.FunctionType):
co.__code__ = self.deinstrument(co.__code__, lines)
return co.__code__
if isinstance(co, types.FunctionType):
co.__code__ = self.deinstrument(co.__code__, lines)
return co.__code__

assert isinstance(co, types.CodeType)
# print(f"de-instrumenting {co.co_name}")
assert isinstance(co, types.CodeType)
# print(f"de-instrumenting {co.co_name}")

ed = bc.Editor(co)
ed = bc.Editor(co)

co_consts = co.co_consts
for i, c in enumerate(co_consts):
if isinstance(c, types.CodeType):
nc = self.deinstrument(c, lines)
if nc is not c:
ed.set_const(i, nc)
co_consts = co.co_consts
for i, c in enumerate(co_consts):
if isinstance(c, types.CodeType):
nc = self.deinstrument(c, lines)
if nc is not c:
ed.set_const(i, nc)

index = self.code2index[co]
index = self.code2index[co]

for (offset, lineno) in index:
if lineno in lines and (func := ed.get_inserted_function(offset)):
func_index, func_arg_index, *_ = func
if co_consts[func_index] == probe.signal:
probe.mark_removed(co_consts[func_arg_index])
ed.disable_inserted_function(offset)
for (offset, lineno) in index:
if lineno in lines and (func := ed.get_inserted_function(offset)):
func_index, func_arg_index, *_ = func
if co_consts[func_index] == probe.signal:
probe.mark_removed(co_consts[func_arg_index])
ed.disable_inserted_function(offset)

new_code = ed.finish()
if new_code is co:
return co
new_code = ed.finish()
if new_code is co:
return co

# no offsets changed, so the old code's index is still usable
self.code2index[new_code] = index
# no offsets changed, so the old code's index is still usable
self.code2index[new_code] = index

with self.lock:
self.replace_map[co] = new_code
with self.lock:
self.replace_map[co] = new_code

if co in self.instrumented[co.co_filename]:
self.instrumented[co.co_filename].remove(co)
self.instrumented[co.co_filename].add(new_code)
if co in self.instrumented[co.co_filename]:
self.instrumented[co.co_filename].remove(co)
self.instrumented[co.co_filename].add(new_code)

return new_code
return new_code


def _add_unseen_source_files(self, source: List[str]):
Expand Down Expand Up @@ -644,41 +645,42 @@ def register_module(self, m):
self.modules.append(m)


def deinstrument_seen(self) -> None:
with self.lock:
newly_seen = self._get_newly_seen()
if sys.version_info < (3,12):
def deinstrument_seen(self) -> None:
with self.lock:
newly_seen = self._get_newly_seen()

for file, new_set in newly_seen.items():
for co in self.instrumented[file]:
self.deinstrument(co, new_set)

self.all_seen[file].update(new_set)

# Replace references to code
if self.replace_map:
visited : set = set()

# XXX the set of function objects could be pre-computed at register_module;
# also, the same could be done for functions objects in globals()
for m in self.modules:
for f in Slipcover.find_functions(m.__dict__.values(), visited):
if f.__code__ in self.replace_map:
f.__code__ = self.replace_map[f.__code__]

globals_seen = []
for frame in sys._current_frames().values():
while frame:
if not frame.f_globals in globals_seen:
globals_seen.append(frame.f_globals)
for f in Slipcover.find_functions(frame.f_globals.values(), visited):
if f.__code__ in self.replace_map:
f.__code__ = self.replace_map[f.__code__]
for file, new_set in newly_seen.items():
for co in self.instrumented[file]:
self.deinstrument(co, new_set)

self.all_seen[file].update(new_set)

for f in Slipcover.find_functions(frame.f_locals.values(), visited):
# Replace references to code
if self.replace_map:
visited : set = set()

# XXX the set of function objects could be pre-computed at register_module;
# also, the same could be done for functions objects in globals()
for m in self.modules:
for f in Slipcover.find_functions(m.__dict__.values(), visited):
if f.__code__ in self.replace_map:
f.__code__ = self.replace_map[f.__code__]

frame = frame.f_back # type: ignore[assignment]
globals_seen = []
for frame in sys._current_frames().values():
while frame:
if not frame.f_globals in globals_seen:
globals_seen.append(frame.f_globals)
for f in Slipcover.find_functions(frame.f_globals.values(), visited):
if f.__code__ in self.replace_map:
f.__code__ = self.replace_map[f.__code__]

for f in Slipcover.find_functions(frame.f_locals.values(), visited):
if f.__code__ in self.replace_map:
f.__code__ = self.replace_map[f.__code__]

frame = frame.f_back # type: ignore[assignment]

# all references should have been replaced now... right?
self.replace_map.clear()
# all references should have been replaced now... right?
self.replace_map.clear()

0 comments on commit a9f5869

Please sign in to comment.