Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: server events web hook #1435

Draft
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:at_secondary/src/connection/inbound/inbound_message_listener.dar
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/server_context.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/telemetry/at_server_telemetry.dart';
import 'package:at_secondary/src/utils/logging_util.dart';
import 'package:at_server_spec/at_server_spec.dart';

Expand Down Expand Up @@ -43,6 +44,7 @@ class InboundConnectionImpl extends BaseConnection

late double lowWaterMarkRatio;
late bool progressivelyReduceAllowableInboundIdleTime;
AtServerTelemetryService? telemetry;

/// The maximum number of requests allowed within the specified time frame.
@override
Expand All @@ -55,7 +57,8 @@ class InboundConnectionImpl extends BaseConnection
/// A list of timestamps representing the times when requests were made.
late final Queue<int> requestTimestampQueue;

InboundConnectionImpl(Socket? socket, String? sessionId, {this.owningPool})
InboundConnectionImpl(Socket? socket, String? sessionId,
{this.owningPool, this.telemetry})
: super(socket) {
metaData = InboundConnectionMetadata()
..sessionID = sessionId
Expand Down Expand Up @@ -203,7 +206,7 @@ class InboundConnectionImpl extends BaseConnection
@override
void acceptRequests(Function(String, InboundConnection) callback,
Function(List<int>, InboundConnection) streamCallBack) {
var listener = InboundMessageListener(this);
var listener = InboundMessageListener(this, telemetry: telemetry);
listener.listen(callback, streamCallBack);
}

Expand All @@ -230,6 +233,10 @@ class InboundConnectionImpl extends BaseConnection
logger.finer(logger.getAtConnectionLogMessage(
getMetaData(), '$address:$port Disconnected'));
getMetaData().isClosed = true;
telemetry?.interaction(
eventType: AtServerTelemetryEventType.disconnect,
from: client,
to: server);
} on Exception {
getMetaData().isStale = true;
// Ignore exception on a connection close
Expand All @@ -239,13 +246,45 @@ class InboundConnectionImpl extends BaseConnection
}
}

String get serverAtSign => AtSecondaryServerImpl.getInstance().currentAtSign!;

InboundConnectionMetadata? get inboundMetadata {
if (metaData is InboundConnectionMetadata) {
return metaData as InboundConnectionMetadata;
} else {
return null;
}
}

String get client {
if (inboundMetadata!.from == true) {
return '${inboundMetadata!.fromAtSign!}:server';
} else {
return '$serverAtSign:client:${inboundMetadata?.sessionID?.hashCode}';
}
}

String? _server;
String get server {
_server ??= '$serverAtSign:server';
return _server!;
}

@override
void write(String data) {
super.write(data);
if (metaData is InboundConnectionMetadata) {
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseConnection.truncateForLogging(data)}'));
}
if (data == '@') {
// response to initial connection
return;
}
AtServerTelemetryEventType eventType = data.startsWith('error:')
? AtServerTelemetryEventType.errorResponse
: AtServerTelemetryEventType.response;
telemetry?.interaction(eventType: eventType, from: server, to: client);
}

@override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:io';
import 'package:at_secondary/src/connection/connection_factory.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart';
import 'package:at_secondary/src/telemetry/at_server_telemetry.dart';
import 'package:at_server_spec/at_server_spec.dart';
import 'package:uuid/uuid.dart';
import 'package:at_commons/at_commons.dart';
Expand Down Expand Up @@ -29,7 +30,8 @@ class InboundConnectionManager implements AtConnectionFactory {
/// @param sessionId - current sessionId
/// Throws a [InboundConnectionLimitException] if pool doesn't have capacity
@override
InboundConnection createConnection(Socket? socket, {String? sessionId}) {
InboundConnection createConnection(Socket? socket,
{String? sessionId, AtServerTelemetryService? telemetry}) {
if (!_isInitialized) {
init(defaultPoolSize);
}
Expand All @@ -38,8 +40,8 @@ class InboundConnectionManager implements AtConnectionFactory {
'max limit reached on inbound pool');
}
sessionId ??= '_${Uuid().v4()}';
var atConnection =
InboundConnectionImpl(socket, sessionId, owningPool: _pool);
var atConnection = InboundConnectionImpl(socket, sessionId,
owningPool: _pool, telemetry: telemetry);
_pool.add(atConnection);
true;
return atConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import 'dart:convert';
import 'package:at_commons/at_commons.dart';
import 'package:at_commons/at_commons.dart' as at_commons;
import 'package:at_secondary/src/connection/base_connection.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart';
import 'package:at_secondary/src/exception/global_exception_handler.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/telemetry/at_server_telemetry.dart';
import 'package:at_secondary/src/utils/logging_util.dart';
import 'package:at_server_spec/at_server_spec.dart';
import 'package:at_utils/at_logger.dart';
Expand All @@ -13,10 +16,35 @@ import 'package:at_utils/at_logger.dart';
/// For each incoming message [DefaultVerbExecutor()] execute is invoked
class InboundMessageListener {
InboundConnection connection;
AtServerTelemetryService? telemetry;
var logger = AtSignLogger('InboundListener');
final _buffer = at_commons.ByteBuffer(capacity: 10240000);

InboundMessageListener(this.connection);
String get serverAtSign => AtSecondaryServerImpl.getInstance().currentAtSign!;

InboundConnectionMetadata? get inboundMetadata {
if (connection.getMetaData() is InboundConnectionMetadata) {
return connection.getMetaData() as InboundConnectionMetadata;
} else {
return null;
}
}

String get client {
if (inboundMetadata?.from == true) {
return '${inboundMetadata!.fromAtSign!}:server';
} else {
return '$serverAtSign:client:${inboundMetadata?.sessionID?.hashCode}';
}
}

String? _server;
String get server {
_server ??= '$serverAtSign:server';
return _server!;
}

InboundMessageListener(this.connection, {this.telemetry});

late Function(String, InboundConnection) onBufferEndCallBack;
late Function(List<int>, InboundConnection) onStreamCallBack;
Expand Down Expand Up @@ -54,6 +82,11 @@ class InboundMessageListener {
return;
}
if (connection.getMetaData().isStream) {
telemetry?.interaction(
eventType: AtServerTelemetryEventType.stream,
from: client,
to: server,
value: data.length);
await onStreamCallBack(data, connection);
return;
}
Expand All @@ -80,21 +113,33 @@ class InboundMessageListener {
'RCVD: ${BaseConnection.truncateForLogging(command)}'));
// if command is '@exit', close the connection.
if (command == '@exit') {
telemetry?.interaction(
eventType: AtServerTelemetryEventType.request,
from: client,
to: server,
value: '@exit');
await _finishedHandler();
return;
}
_buffer.clear();
if (!command.startsWith('from:')) {
telemetry?.interaction(
eventType: AtServerTelemetryEventType.request,
from: client,
to: server,
value: getVerbFromCommand(command));
}
await onBufferEndCallBack(command, connection);
}
} catch (e, st) {
_buffer.clear();
logger.severe('exception in message handler:$e - stack trace: $st');
logger.severe('_messageHandler:$e - stack trace: $st');
}
}

/// Logs the error and closes the [InboundConnection]
Future<void> _errorHandler(error) async {
logger.severe(error.toString());
logger.severe('_errorHandler: $error');
await _closeConnection();
}

Expand All @@ -110,4 +155,13 @@ class InboundMessageListener {
// Removes the connection from the InboundConnectionPool.
InboundConnectionPool.getInstance().remove(connection);
}

getVerbFromCommand(String command) {
int ix = command.indexOf(":");
if (ix == -1) {
return command;
} else {
return command.substring(0, ix);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ class OutboundClient {
messageListener = OutboundMessageListener(this);
messageListener.listen();

//1. create from request
outboundConnection!.write(AtRequestFormatter.createFromRequest(
AtSecondaryServerImpl.getInstance().currentAtSign));

//2. Receive proof
var fromResult = await messageListener.read();
if (fromResult == '') {
throw HandShakeException(
'no response received for From:$toAtSign command');
}

await checkRemotePublicKey();

// 3. Establish handshake if required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class AtSecondaryConfig {
static final List<String> _malformedKeys = [];
static const bool _shouldRemoveMalformedKeys = true;

// Telemetry web hook
static final String defaultTelemetryEventWebHook = '';

// Protected Keys
// <@atsign> is a placeholder. To be replaced with actual atsign during runtime
static final Set<String> _protectedKeys = {
Expand Down Expand Up @@ -712,6 +715,17 @@ class AtSecondaryConfig {
}
}

static String get telemetryEventWebHook {
if (_envVars.containsKey('telemetryEventWebHook')) {
return _envVars['telemetryEventWebHook']!;
}
try {
return getConfigFromYaml(['telemetry', 'eventWebHook']);
} on ElementNotFoundException {
return defaultTelemetryEventWebHook;
}
}

static Set<String> get protectedKeys {
try {
YamlList keys = getConfigFromYaml(['hive', 'protectedKeys']);
Expand Down Expand Up @@ -776,7 +790,7 @@ class AtSecondaryConfig {

//implementation for config:set. This method returns a data stream which subscribers listen to for updates
static Stream<dynamic>? subscribe(ModifiableConfigs configName) {
if (testingMode) {
if (testingMode || !configName.requireTestingMode) {
if (!_streamListeners.containsKey(configName)) {
_streamListeners[configName] = ModifiableConfigurationEntry()
..streamController = StreamController<dynamic>.broadcast()
Expand All @@ -791,7 +805,7 @@ class AtSecondaryConfig {
static void broadcastConfigChange(
ModifiableConfigs configName, var newConfigValue,
{bool isReset = false}) {
if (testingMode) {
if (testingMode || !configName.requireTestingMode) {
//if an entry for the config does not exist new entry is created
if (!_streamListeners.containsKey(configName)) {
_streamListeners[configName] = ModifiableConfigurationEntry()
Expand Down Expand Up @@ -844,6 +858,8 @@ class AtSecondaryConfig {
return false;
case ModifiableConfigs.doCacheRefreshNow:
return false;
case ModifiableConfigs.telemetryEventWebHook:
return telemetryEventWebHook;
case ModifiableConfigs.maxRequestsPerTimeFrame:
return maxEnrollRequestsAllowed;
case ModifiableConfigs.timeFrameInMills:
Expand Down Expand Up @@ -920,17 +936,25 @@ String? getStringValueFromYaml(List<String> keyParts) {
}

enum ModifiableConfigs {
inboundMaxLimit,
commitLogCompactionFrequencyMins,
accessLogCompactionFrequencyMins,
notificationKeyStoreCompactionFrequencyMins,
autoNotify,
maxNotificationRetries,
checkCertificateReload,
shouldReloadCertificates,
doCacheRefreshNow,
maxRequestsPerTimeFrame,
timeFrameInMills
inboundMaxLimit(requireTestingMode: true, isInt: true),
commitLogCompactionFrequencyMins(requireTestingMode: true, isInt: true),
accessLogCompactionFrequencyMins(requireTestingMode: true, isInt: true),
notificationKeyStoreCompactionFrequencyMins(
requireTestingMode: true, isInt: true),
autoNotify(requireTestingMode: true, isInt: false),
maxNotificationRetries(requireTestingMode: true, isInt: true),
checkCertificateReload(requireTestingMode: true, isInt: false),
shouldReloadCertificates(requireTestingMode: true, isInt: false),
doCacheRefreshNow(requireTestingMode: true, isInt: false),
telemetryEventWebHook(requireTestingMode: false, isInt: false),
maxRequestsPerTimeFrame(requireTestingMode: false, isInt: true),
timeFrameInMills(requireTestingMode: false, isInt: true),
;

final bool requireTestingMode;
final bool isInt;
const ModifiableConfigs(
{required this.requireTestingMode, required this.isInt});
}

class ModifiableConfigurationEntry {
Expand All @@ -940,5 +964,5 @@ class ModifiableConfigurationEntry {
}

class ElementNotFoundException extends AtException {
ElementNotFoundException(message) : super(message);
ElementNotFoundException(super.message);
}
Loading