Skip to content

Commit

Permalink
Add smartrequests
Browse files Browse the repository at this point in the history
  • Loading branch information
sdb9696 committed Dec 29, 2023
1 parent 1b79142 commit b755c9e
Show file tree
Hide file tree
Showing 8 changed files with 1,474 additions and 68 deletions.
135 changes: 107 additions & 28 deletions devtools/dump_devinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import asyncclick as click

from kasa import AuthenticationException, Credentials, Discover, SmartDevice
from kasa import AuthenticationException, Credentials, Discover
from kasa.discover import DiscoveryResult
from kasa.exceptions import SmartErrorCode
from kasa.smartrequests import COMPONENT_REQUESTS, SmartRequest
from kasa.tapo.tapodevice import TapoDevice

Call = namedtuple("Call", "module method")
SmartCall = namedtuple("SmartCall", "module request should_succeed")


def scrub(res):
Expand All @@ -46,11 +49,19 @@ def scrub(res):
"oem_id",
"nickname",
"alias",
"bssid",
"channel",
]

for k, v in res.items():
if isinstance(v, collections.abc.Mapping):
res[k] = scrub(res.get(k))
elif (
isinstance(v, list)
and len(v) > 0
and isinstance(v[0], collections.abc.Mapping)
):
res[k] = [scrub(vi) for vi in v]
else:
if k in keys_to_scrub:
if k in ["latitude", "latitude_i", "longitude", "longitude_i"]:
Expand All @@ -64,6 +75,8 @@ def scrub(res):
v = base64.b64encode(b"#MASKED_NAME#").decode()
elif k in ["alias"]:
v = "#MASKED_NAME#"
elif isinstance(res[k], int):
v = 0
else:
v = re.sub(r"\w", "0", v)

Expand Down Expand Up @@ -200,33 +213,86 @@ async def get_legacy_fixture(device):
return save_filename, copy_folder, final


async def get_smart_fixture(device: SmartDevice):
async def get_smart_fixture(device: TapoDevice):
"""Get fixture for new TAPO style protocol."""
items = [
Call(module="component_nego", method="component_nego"),
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(module="temp_humidity_records", method="get_temp_humidity_records"),
Call(module="child_device_list", method="get_child_device_list"),
Call(
module="trigger_logs",
method={"get_trigger_logs": {"page_size": 5, "start_id": 0}},
extra_test_calls = [
SmartCall(
module="temp_humidity_records",
request=SmartRequest.get_raw_request("get_temp_humidity_records"),
should_succeed=False,
),
Call(
SmartCall(
module="child_device_list",
request=SmartRequest.get_raw_request("get_child_device_list"),
should_succeed=False,
),
SmartCall(
module="child_device_component_list",
method="get_child_device_component_list",
request=SmartRequest.get_raw_request("get_child_device_component_list"),
should_succeed=False,
),
SmartCall(
module="trigger_logs",
request=SmartRequest.get_raw_request(
"get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0)
),
should_succeed=False,
),
]

successes = []

for test_call in items:
try:
click.echo("Testing component_nego call ..", nl=False)
component_info_response = await device._smart_query_helper(
SmartRequest.component_nego()
)
click.echo(click.style("OK", fg="green"))
successes.append(
SmartCall(
module="component_nego",
request=SmartRequest("component_nego"),
should_succeed=True,
)
)
except AuthenticationException as ex:
click.echo(
click.style(
f"Unable to query the device due to an authentication error: {ex}",
bold=True,
fg="red",
)
)
exit(1)
except Exception:
click.echo(
click.style("CRITICAL FAIL on component_nego call, exiting", fg="red")
)
exit(1)

test_calls = []
should_succeed = []

for item in component_info_response["component_nego"]["component_list"]:
component_id = item["id"]
if requests := COMPONENT_REQUESTS.get(component_id):
component_test_calls = [
SmartCall(module=component_id, request=request, should_succeed=True)
for request in requests
]
test_calls.extend(component_test_calls)
should_succeed.extend(component_test_calls)
elif component_id not in COMPONENT_REQUESTS:
click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow"))

test_calls.extend(extra_test_calls)

for test_call in test_calls:
click.echo(f"Testing {test_call.module}..", nl=False)
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
response = await device._smart_query_helper(test_call.request)
except AuthenticationException as ex:
click.echo(
click.style(
Expand All @@ -237,22 +303,38 @@ async def get_smart_fixture(device: SmartDevice):
)
exit(1)
except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
if (
not test_call.should_succeed
and hasattr(ex, "error_code")
and ex.error_code == SmartErrorCode.UNKNOWN_METHOD_ERROR
):
click.echo(click.style("FAIL - EXPECTED", fg="green"))
else:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
click.echo(click.style("FAIL no response", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
if not test_call.should_succeed:
click.echo(click.style("OK - EXPECTED FAIL", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)

requests = []
for succ in successes:
requests.append({"method": succ.method})

final_query = {"multipleRequest": {"requests": requests}}
requests.append(succ.request)

final = {}
try:
responses = await device.protocol.query(final_query)
end = len(requests)
step = 10 # Break the requests down as there seems to be a size limit
for i in range(0, end, step):
x = i
requests_step = requests[x : x + step]
responses = await device._smart_query_helper(requests_step)
for method, result in responses.items():
final[method] = result
except AuthenticationException as ex:
click.echo(
click.style(
Expand All @@ -269,9 +351,6 @@ async def get_smart_fixture(device: SmartDevice):
)
)
exit(1)
final = {}
for method, result in responses.items():
final[method] = result

# Need to recreate a DiscoverResult here because we don't want the aliases
# in the fixture, we want the actual field names as returned by the device.
Expand Down
Loading

0 comments on commit b755c9e

Please sign in to comment.