Skip to content

Commit

Permalink
Macpie/optimizations (#1019)
Browse files Browse the repository at this point in the history
* Use cache instead of db to retrieve all device from org

* Lower MAX_CREDENTIAL_COUNT to 3

* Remove calls to router_device_routing replay

* Fix so we dont warn everytime a ack copy comes in

* Fix tests

* - Decreased default pool to 1000
- Increase Event pool to 1000

* Add pool stats

* Add in_use and free
  • Loading branch information
macpie authored Nov 8, 2023
1 parent 0225e32 commit dfb4604
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 48 deletions.
4 changes: 3 additions & 1 deletion include/metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-define(METRICS_VM_ETS_MEMORY, "router_vm_ets_memory").
-define(METRICS_DEVICE_TOTAL, "router_device_total_gauge").
-define(METRICS_DEVICE_RUNNING, "router_device_running_gauge").
-define(METRICS_CONSOLE_POOL, "router_console_pool_gauge").

-define(METRICS, [
{?METRICS_ROUTING_PACKET, prometheus_histogram, [type, status, reason, downlink],
Expand All @@ -19,5 +20,6 @@
{?METRICS_VM_PROC_Q, prometheus_gauge, [name], "Router process queue"},
{?METRICS_VM_ETS_MEMORY, prometheus_gauge, [name], "Router ets memory"},
{?METRICS_DEVICE_TOTAL, prometheus_gauge, [], "Device total gauge"},
{?METRICS_DEVICE_RUNNING, prometheus_gauge, [], "Device running gauge"}
{?METRICS_DEVICE_RUNNING, prometheus_gauge, [], "Device running gauge"},
{?METRICS_CONSOLE_POOL, prometheus_gauge, [name, type], "Console pool gauge"}
]).
4 changes: 2 additions & 2 deletions src/apis/router_console_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,10 @@ init(Args) ->
erlang:process_flag(trap_exit, true),
lager:info("~p init with ~p", [?SERVER, Args]),
ok = hackney_pool:start_pool(?DEFAULT_POOL, [
{timeout, timer:seconds(60)}, {max_connections, 5000}
{timeout, timer:seconds(60)}, {max_connections, 1000}
]),
ok = hackney_pool:start_pool(?EVENT_POOL, [
{timeout, timer:seconds(60)}, {max_connections, 100}
{timeout, timer:seconds(60)}, {max_connections, 1000}
]),
DownlinkEndpoint = maps:get(downlink_endpoint, Args),
Endpoint = maps:get(endpoint, Args),
Expand Down
23 changes: 10 additions & 13 deletions src/apis/router_console_ws_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ handle_info(
{ws_message, <<"organization:all">>, <<"organization:all:zeroed:dc_balance">>, #{
<<"id">> := OrgID, <<"dc_balance">> := Balance
}},
#state{db = DB, cf = CF} = State
State
) ->
case router_console_dc_tracker:add_unfunded(OrgID) of
true ->
lager:info("org ~p has reached a balance of ~p, disabling", [OrgID, Balance]),
DeviceIDs = get_device_ids_for_org(DB, CF, OrgID),
DeviceIDs = get_device_ids_for_org(OrgID),
catch router_ics_eui_worker:remove(DeviceIDs),
catch router_ics_skf_worker:remove_device_ids(DeviceIDs);
false ->
Expand All @@ -209,12 +209,11 @@ handle_info(
<<"dc_balance_nonce">> := Nonce,
<<"dc_balance">> := Balance
}},
#state{db = DB, cf = CF} = State
State
) ->
lager:info("got an org balance refill for ~p of ~p (~p)", [OrgID, Balance, Nonce]),
ok = router_console_dc_tracker:refill(OrgID, Nonce, Balance),

DeviceIDs = get_device_ids_for_org(DB, CF, OrgID),
DeviceIDs = get_device_ids_for_org(OrgID),
catch router_ics_eui_worker:add(DeviceIDs),
catch router_ics_skf_worker:add_device_ids(DeviceIDs),
{noreply, State};
Expand Down Expand Up @@ -357,16 +356,14 @@ start_ws(WSEndpoint, Token) ->
Pid.

-spec get_device_ids_for_org(
DB :: rocksdb:db_handle(),
CF :: rocksdb:cf_handle(),
OrgID :: binary()
) -> [binary()].
get_device_ids_for_org(DB, CF, OrgID) ->
Devices = router_device:get(DB, CF, fun(Device) ->
OrgID == maps:get(organization_id, router_device:metadata(Device), undefiend)
end),

[router_device:id(D) || D <- Devices].
get_device_ids_for_org(OrgID) ->
Devices = router_device_cache:get(),
[
router_device:id(D)
|| D <- Devices, OrgID == maps:get(organization_id, router_device:metadata(D), undefined)
].

-spec update_devices(
DB :: rocksdb:db_handle(),
Expand Down
2 changes: 1 addition & 1 deletion src/device/router_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@

-define(QUEUE_SIZE_LIMIT, 20).
-define(MAX_32_BITS, 16#100000000).
-define(MAX_CREDENTIAL_COUNT, 25).
-define(MAX_CREDENTIAL_COUNT, 3).

-type device() :: #device_v7{}.

Expand Down
32 changes: 15 additions & 17 deletions src/device/router_device_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ handle_info(
join_cache = maps:remove(DevNonce, JoinCache)
}};
handle_info(
{frame_timeout, FCnt, PacketTime, BalanceNonce},
{frame_timeout, FCnt, _PacketTime, BalanceNonce},
#state{
db = DB,
cf = CF,
Expand Down Expand Up @@ -1100,7 +1100,6 @@ handle_info(
ok = router_device_channels_worker:frame_timeout(ChannelsWorker, UUID, BalanceNonce),
lager:debug("frame timeout for ~p / device ~p", [FCnt, lager:pr(Device0, router_device)]),
{ADREngine1, ADRAdjustment} = maybe_track_adr_packet(Device0, ADREngine0, FrameCache),
DeviceID = router_device:id(Device0),
%% NOTE: Disco-mode has a special frame_timeout well above what we're trying
%% to achieve here. We ignore those packets in metrics so they don't skew
%% our alerting.
Expand All @@ -1127,7 +1126,6 @@ handle_info(
Pid,
blockchain_state_channel_response_v1:new(true)
),
router_device_routing:clear_replay(DeviceID),
ok = router_metrics:packet_trip_observe_end(
blockchain_helium_packet_v1:packet_hash(Packet),
PubKeyBin,
Expand Down Expand Up @@ -1161,10 +1159,6 @@ handle_info(
FOpts
),
ok = maybe_send_queue_update(Device2, State),
case router_utils:mtype_to_ack(Frame#frame.mtype) of
1 -> router_device_routing:allow_replay(Packet, DeviceID, PacketTime);
_ -> router_device_routing:clear_replay(DeviceID)
end,
ok = save_and_update(DB, CF, ChannelsWorker, Device2),
lager:debug("sending downlink for fcnt: ~p, ~p", [FCnt, DownlinkPacket]),
catch blockchain_state_channel_common:send_response(
Expand Down Expand Up @@ -1192,7 +1186,6 @@ handle_info(
Pid,
blockchain_state_channel_response_v1:new(true)
),
router_device_routing:clear_replay(DeviceID),
ok = router_metrics:packet_trip_observe_end(
blockchain_helium_packet_v1:packet_hash(Packet),
PubKeyBin,
Expand Down Expand Up @@ -1556,18 +1549,23 @@ validate_frame(

%% This only applies when it's the first packet we see
%% If frame countain ACK=1 we should clear message from queue and go on next
QueueDeviceUpdates =
case {ACK, router_device:queue(Device0)} of
MaybeUpdateDeviceQueue = fun(Ack, Device) ->
case {Ack, router_device:queue(Device)} of
%% Check if acknowledging confirmed downlink
{1, [#downlink{confirmed = true} | T]} ->
[{queue, T}, {fcntdown, router_device:fcntdown_next_val(Device0)}];
router_device:update(
[{queue, T}, {fcntdown, router_device:fcntdown_next_val(Device)}], Device
);
{1, _} ->
lager:warning("got ack when no confirmed downlinks in queue"),
[];
Device;
_ ->
[]
end,
QueueUpdatedDevice = router_device:update(QueueDeviceUpdates, Device0),
Device
end
end,

%% Bug here when multiple uplinks from diff gateways might clear more downlinks than we need to
%% This should be move to frame timeout? so that we avoid that warning all the time

case MType of
MType when MType == ?CONFIRMED_UP orelse MType == ?UNCONFIRMED_UP ->
Expand All @@ -1591,7 +1589,7 @@ validate_frame(
Packet,
PubKeyBin,
Region,
QueueUpdatedDevice,
MaybeUpdateDeviceQueue(ACK, Device0),
OfferCache,
false
);
Expand Down Expand Up @@ -1640,7 +1638,7 @@ validate_frame(
Packet,
PubKeyBin,
Region,
QueueUpdatedDevice,
MaybeUpdateDeviceQueue(ACK, Device0),
OfferCache,
false
)
Expand Down
22 changes: 21 additions & 1 deletion src/metrics/router_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ handle_info(
ok = record_vm_stats(),
ok = record_ets(),
ok = record_queues(),
ok = record_devices()
ok = record_devices(),
ok = record_pools()
end,
[
{fullsweep_after, 0},
Expand Down Expand Up @@ -222,6 +223,25 @@ record_vm_stats() ->
),
ok.

-spec record_pools() -> ok.
record_pools() ->
lists:foreach(
fun(Pool) ->
try hackney_pool:get_stats(Pool) of
Stats ->
InUse = proplists:get_value(in_use_count, Stats, 0),
Free = proplists:get_value(free_count, Stats, 0),
_ = prometheus_gauge:set(?METRICS_CONSOLE_POOL, [Pool, in_use], InUse),
_ = prometheus_gauge:set(?METRICS_CONSOLE_POOL, [Pool, free], Free)
catch
_E:_R ->
lager:error("failed to get stats for pool ~p ~p ~p", [Pool, _E, _R])
end
end,
[router_console_api_pool, router_console_api_event_pool]
),
ok.

-spec record_ets() -> ok.
record_ets() ->
lists:foreach(
Expand Down
24 changes: 11 additions & 13 deletions test/router_device_worker_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ device_inactive_update_test(Config) ->
#{} = test_utils:join_device(Config),
#{} = test_utils:join_device(Config),
#{} = test_utils:join_device(Config),
#{} = test_utils:join_device(Config),

%% Waiting for reply from router to hotspot
test_utils:wait_state_channel_message(1250),
Expand All @@ -464,8 +463,8 @@ device_inactive_update_test(Config) ->
DeviceID = ?CONSOLE_DEVICE_ID,
{ok, Device0} = router_device:get_by_id(DB, CF, DeviceID),

?assertEqual(4, length(router_device:keys(Device0))),
?assertEqual(4, length(router_device:devaddrs(Device0))),
?assertEqual(3, length(router_device:keys(Device0))),
?assertEqual(3, length(router_device:devaddrs(Device0))),

%% Deactivate the device in Console
Tab = proplists:get_value(ets, Config),
Expand Down Expand Up @@ -498,9 +497,8 @@ device_inactive_update_test(Config) ->
receive
{router_test_ics_route_service, update_skfs, Req1} ->
Updates = Req1#iot_config_route_skf_update_req_v1_pb.updates,
%% The 4 join attempts were removed
?assertEqual(4, length(Updates)),

%% The 3 join attempts were removed
?assertEqual(3, length(Updates)),
ok
after timer:seconds(2) -> ct:fail(expected_skf_update)
end,
Expand Down Expand Up @@ -731,9 +729,9 @@ drop_downlink_test(Config) ->
ok.

evict_keys_join_test(Config) ->
%% If a device get's stuck in a join loop, we will only keep the last 25
%% If a device get's stuck in a join loop, we will only keep the last 3
%% devaddrs and key sets. When there is a successful uplink after joining,
%% all the keys not used will be removed. But if there were more than 25
%% all the keys not used will be removed. But if there were more than 3
%% attempts, we cannot remove keys that have been cycled out.
#{} = test_utils:join_device(Config),

Expand All @@ -748,9 +746,9 @@ evict_keys_join_test(Config) ->
[
{keys, [
{crypto:strong_rand_bytes(16), crypto:strong_rand_bytes(16)}
|| _ <- lists:seq(1, 25)
|| _ <- lists:seq(1, 3)
]},
{devaddrs, [crypto:strong_rand_bytes(4) || _ <- lists:seq(1, 25)]}
{devaddrs, [crypto:strong_rand_bytes(4) || _ <- lists:seq(1, 3)]}
],
Device0
),
Expand All @@ -776,9 +774,9 @@ evict_keys_join_test(Config) ->
%% Expected updates
%% 1 Add
%% 1 Remove {EvictedKey, EvictedDevaddr}
%% 25 Remove {EvictedKey, RemainingDevaddr}
%% 25 Remove {RemainingKey, EvictedDevaddr}
?assertEqual(1 + 1 + 25 + 25, length(Updates)),
%% 3 Remove {EvictedKey, RemainingDevaddr}
%% 3 Remove {RemainingKey, EvictedDevaddr}
?assertEqual(1 + 1 + 3 + 3, length(Updates)),

ok = meck:unload(router_ics_skf_worker),
ok.
Expand Down

0 comments on commit dfb4604

Please sign in to comment.