From 03b2c34cdcca10d5240370474ca7e5ce4a15a9b2 Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Wed, 9 Oct 2024 13:57:54 +0200 Subject: [PATCH] Add runtime callback for 'pingreq' messages (#24) * Add runtime callback for 'pingreq' messages * CI: remove OTP 24, add OTP 27 --- .github/workflows/test.yml | 2 +- src/mqtt_sessions_process.erl | 6 ++++-- src/mqtt_sessions_runtime.erl | 7 +++++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 53bb00d..47d2573 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: - otp_version: [24, 25, 26] + otp_version: [25, 26, 27] os: [ubuntu-latest] container: diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 60e1e70..f5b5f64 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -416,8 +416,10 @@ handle_incoming(#{ type := subscribe } = Msg, _Options, State) -> handle_incoming(#{ type := unsubscribe } = Msg, _Options, State) -> packet_unsubscribe(Msg, State); -handle_incoming(#{ type := pingreq }, _Options, State) -> - State1 = reply_or_drop(#{ type => pingresp }, State), +handle_incoming(#{ type := pingreq } = Msg, _Options, State) -> + #state{ runtime = Runtime, user_context = UserContext } = State, + {ok, UserContext1} = Runtime:ping(UserContext), + State1 = reply_or_drop(#{ type => pingresp }, State#state{ user_context = UserContext1 }), {ok, State1}; handle_incoming(#{ type := pingresp }, _Options, State) -> {ok, State}; diff --git a/src/mqtt_sessions_runtime.erl b/src/mqtt_sessions_runtime.erl index 3f95857..c92ef58 100644 --- a/src/mqtt_sessions_runtime.erl +++ b/src/mqtt_sessions_runtime.erl @@ -25,6 +25,7 @@ new_user_context/3, connect/4, control_message/3, + ping/1, reauth/2, is_allowed/4, is_valid_message/3 @@ -152,6 +153,12 @@ connect(_Packet, _IsSessionPresent, _Options, UserContext) -> reauth(#{ type := auth }, _UserContext) -> {error, notsupported}. +%% @doc Called on a pingreq message from the remote. Used this to keep processes or track connection status. +-spec ping(UserContext) -> {ok, UserContext1} when + UserContext :: user_context(), + UserContext1 :: user_context(). +ping(UserContext) -> + {ok, UserContext}. -spec is_allowed( publish | subscribe, topic(), mqtt_packet_map:mqtt_packet(), user_context()) -> boolean(). is_allowed(publish, _Topic, _Packet, _UserContext) ->