Skip to content

Commit

Permalink
Merge branch 'main' into refactor/rename
Browse files Browse the repository at this point in the history
  • Loading branch information
antazoey authored Dec 11, 2024
2 parents e9bdddb + fbdbe52 commit f38394e
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 32 deletions.
32 changes: 24 additions & 8 deletions src/ape/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
from inspect import getframeinfo, stack
from pathlib import Path
from types import CodeType, TracebackType
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

import click
from eth_typing import Hash32, HexStr
from eth_utils import humanize_hash, to_hex
from rich import print as rich_print

from ape.logging import LogLevel, logger

if TYPE_CHECKING:
from eth_typing import HexStr
from ethpm_types.abi import ConstructorABI, ErrorABI, MethodABI
from ethpm_types.contract_type import ContractType

Expand Down Expand Up @@ -521,9 +520,11 @@ class BlockNotFoundError(ProviderError):

def __init__(self, block_id: "BlockID", reason: Optional[str] = None):
if isinstance(block_id, bytes):
block_id_str = to_hex(block_id)
block_id_str = block_id.hex()
if not block_id_str.startswith("0x"):
block_id_str = f"0x{block_id_str}"
else:
block_id_str = HexStr(str(block_id))
block_id_str: "HexStr" = f"{block_id}" # type: ignore

message = (
"Missing latest block."
Expand Down Expand Up @@ -621,11 +622,26 @@ class UnknownSnapshotError(ChainError):
"""

def __init__(self, snapshot_id: "SnapshotID"):
snapshot_id_str: str
if isinstance(snapshot_id, bytes):
# Is block hash
snapshot_id = humanize_hash(cast(Hash32, snapshot_id))
# Is block hash. Logic borrowed from `eth_utils.humanize_hash()`.
if len(snapshot_id) <= 5:
snapshot_id_str = snapshot_id.hex()
else:
value_hex = snapshot_id.hex()
head = value_hex[:4]
tail_idx = -1 * 4
tail = value_hex[tail_idx:]
snapshot_id_str = f"{head}..{tail}"

snapshot_id_str = (
f"0x{snapshot_id_str}" if not snapshot_id_str.startswith("0x") else snapshot_id_str
)

else:
snapshot_id_str = f"{snapshot_id}" # type: ignore

super().__init__(f"Unknown snapshot ID '{snapshot_id}'.")
super().__init__(f"Unknown snapshot ID '{snapshot_id_str}'.")


class QueryEngineError(ApeException):
Expand Down
29 changes: 18 additions & 11 deletions src/ape/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from contextlib import contextmanager
from enum import IntEnum
from pathlib import Path
from typing import IO, Any, Optional, Union
from typing import IO, TYPE_CHECKING, Any, Optional, Union
from urllib.parse import urlparse, urlunparse

import click
from rich.console import Console as RichConsole
from yarl import URL

if TYPE_CHECKING:
from rich.console import Console as RichConsole


class LogLevel(IntEnum):
Expand Down Expand Up @@ -325,23 +327,28 @@ def _get_level(level: Optional[Union[str, int, LogLevel]] = None) -> str:
def sanitize_url(url: str) -> str:
"""Removes sensitive information from given URL"""

url_obj = URL(url).with_user(None).with_password(None)
parsed = urlparse(url)
new_netloc = parsed.hostname or ""
if parsed.port:
new_netloc += f":{parsed.port}"

# If there is a path, hide it but show that you are hiding it.
# Use string interpolation to prevent URL-character encoding.
return f"{url_obj.with_path('')}/{HIDDEN_MESSAGE}" if url_obj.path else f"{url}"
new_url = urlunparse(parsed._replace(netloc=new_netloc, path=""))
return f"{new_url}/{HIDDEN_MESSAGE}" if parsed.path else new_url


logger = ApeLogger.create()


class _RichConsoleFactory:
rich_console_map: dict[str, RichConsole] = {}
rich_console_map: dict[str, "RichConsole"] = {}

def get_console(self, file: Optional[IO[str]] = None, **kwargs) -> RichConsole:
def get_console(self, file: Optional[IO[str]] = None, **kwargs) -> "RichConsole":
# Configure custom file console
file_id = str(file)
if file_id not in self.rich_console_map:
# perf: delay importing from rich, as it is slow.
from rich.console import Console as RichConsole

self.rich_console_map[file_id] = RichConsole(file=file, width=100, **kwargs)

return self.rich_console_map[file_id]
Expand All @@ -350,7 +357,7 @@ def get_console(self, file: Optional[IO[str]] = None, **kwargs) -> RichConsole:
_factory = _RichConsoleFactory()


def get_rich_console(file: Optional[IO[str]] = None, **kwargs) -> RichConsole:
def get_rich_console(file: Optional[IO[str]] = None, **kwargs) -> "RichConsole":
"""
Get an Ape-configured rich console.
Expand All @@ -361,7 +368,7 @@ def get_rich_console(file: Optional[IO[str]] = None, **kwargs) -> RichConsole:
Returns:
``rich.Console``.
"""
return _factory.get_console(file)
return _factory.get_console(file, **kwargs)


__all__ = ["DEFAULT_LOG_LEVEL", "logger", "LogLevel", "ApeLogger", "get_rich_console"]
6 changes: 5 additions & 1 deletion src/ape/managers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from ape.utils.misc import log_instead_of_fail
from ape.utils.os import create_tempdir, in_tempdir
from ape.utils.rpc import RPCHeaders
from ape.utils.rpc import USER_AGENT, RPCHeaders

if TYPE_CHECKING:
from ethpm_types import PackageManifest
Expand All @@ -39,6 +39,10 @@ def __init__(self, data_folder: Optional[Path] = None, request_header: Optional[
else:
self.DATA_FOLDER = data_folder or Path.home() / ".ape"

request_header = request_header or {
"User-Agent": USER_AGENT,
"Content-Type": "application/json",
}
self.REQUEST_HEADER = request_header or {}

def __ape_extra_attributes__(self):
Expand Down
5 changes: 1 addition & 4 deletions src/ape/utils/basemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from ape.exceptions import ApeAttributeError, ApeIndexError, ProviderNotConnectedError
from ape.logging import logger
from ape.utils.misc import log_instead_of_fail, raises_not_implemented
from ape.utils.rpc import USER_AGENT

if TYPE_CHECKING:
from pydantic.main import Model
Expand Down Expand Up @@ -173,9 +172,7 @@ def config_manager(cls) -> "ConfigManager":
The :class:`~ape.managers.config.ConfigManager`.
"""
config = import_module("ape.managers.config")
return config.ConfigManager(
request_header={"User-Agent": USER_AGENT, "Content-Type": "application/json"},
)
return config.ConfigManager()

@manager_access
def conversion_manager(cls) -> "ConversionManager":
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class TestUnknownSnapshotError:
def test_bytes(self):
snapshot_id = b"asdfasdfasdf"
err = UnknownSnapshotError(snapshot_id)
assert str(err) == "Unknown snapshot ID '6173..6466'."
assert str(err) == "Unknown snapshot ID '0x6173..6466'."

@pytest.mark.parametrize("snapshot_id", (123, "123"))
def test_not_bytes(self, snapshot_id):
Expand Down
11 changes: 10 additions & 1 deletion tests/functional/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from click.testing import CliRunner

from ape.cli import ape_cli_context
from ape.logging import LogLevel, logger
from ape.logging import LogLevel, logger, sanitize_url


@pytest.fixture
Expand Down Expand Up @@ -121,3 +121,12 @@ def test_at_level():
assert logger.level == level_to_set

assert logger.level == initial_level


@pytest.mark.parametrize(
"url", ("https://user:[email protected]/v1/API_KEY", "https://example.com/v1/API_KEY")
)
def test_sanitize_url(url):
actual = sanitize_url(url)
expected = "https://example.com/[hidden]"
assert actual == expected
10 changes: 4 additions & 6 deletions tests/functional/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,11 @@ def test_ipc_per_network(project, key):
ipc = "path/to/example.ipc"
with project.temp_config(node={"ethereum": {"sepolia": {key: ipc}}}):
node = project.network_manager.ethereum.sepolia.get_provider("node")
if key != "ipc_path":
assert node.uri == ipc
# else: uri gets to set to random HTTP from default settings,
# but we may want to change that behavior.
# TODO: 0.9 investigate not using random if ipc set.

assert node.ipc_path == Path(ipc)
if key == "uri":
assert node.uri == ipc
# else: uri ends up as a random HTTP URI from evmchains.
# TODO: Do we want to change this in 0.9?


def test_snapshot(eth_tester_provider):
Expand Down

0 comments on commit f38394e

Please sign in to comment.