Skip to content

Commit

Permalink
Merge pull request #1776 from atsign-foundation/gkc/alpn
Browse files Browse the repository at this point in the history
  • Loading branch information
gkc authored Feb 8, 2024
2 parents 096c136 + 51a35c7 commit f45904c
Show file tree
Hide file tree
Showing 69 changed files with 657 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,47 @@ import 'package:at_server_spec/at_server_spec.dart';
import 'package:at_utils/at_logger.dart';

/// Base class for common socket operations
abstract class BaseConnection extends AtConnection {
late final Socket? _socket;
abstract class BaseSocketConnection<T extends Socket> extends AtConnection {
final T _socket;
@override
late AtConnectionMetaData metaData;
late AtSignLogger logger;

BaseConnection(Socket? socket) {
BaseSocketConnection(this._socket) {
logger = AtSignLogger(runtimeType.toString());
socket?.setOption(SocketOption.tcpNoDelay, true);
_socket = socket;
}

@override
AtConnectionMetaData getMetaData() {
return metaData;
_socket.setOption(SocketOption.tcpNoDelay, true);
}

@override
Future<void> close() async {
try {
var address = getSocket().remoteAddress;
var port = getSocket().remotePort;
await _socket!.close();
var address = underlying.remoteAddress;
var port = underlying.remotePort;
await _socket.close();
logger.finer('$address:$port Disconnected');
getMetaData().isClosed = true;
metaData.isClosed = true;
} on Exception {
getMetaData().isStale = true;
metaData.isStale = true;
// Ignore exception on a connection close
} on Error {
getMetaData().isStale = true;
metaData.isStale = true;
// Ignore error on a connection close
}
}

@override
Socket getSocket() {
return _socket!;
}
T get underlying => _socket;

@override
void write(String data) {
if (isInValid()) {
throw ConnectionInvalidException('Connection is invalid');
}
try {
getSocket().write(data);
getMetaData().lastAccessed = DateTime.now().toUtc();
underlying.write(data);
metaData.lastAccessed = DateTime.now().toUtc();
} on Exception catch (e) {
getMetaData().isStale = true;
metaData.isStale = true;
logger.severe(e.toString());
throw AtIOException(e.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import 'dart:io';
import 'package:at_server_spec/at_server_spec.dart';

abstract class AtConnectionFactory<T extends AtConnection> {
T createConnection(Socket socket, {String? sessionId});
T createSocketConnection(Socket socket, {String? sessionId});
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:io';

import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart';
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_server_spec/at_server_spec.dart';
Expand Down Expand Up @@ -30,12 +28,10 @@ class DummyInboundConnection implements InboundConnection {
}

@override
AtConnectionMetaData getMetaData() {
return metadata;
}
AtConnectionMetaData get metaData => metadata;

@override
Socket getSocket() {
dynamic get underlying {
throw UnimplementedError(
"DummyInboundConnection.getSocket is not implemented");
}
Expand All @@ -59,9 +55,6 @@ class DummyInboundConnection implements InboundConnection {

bool isStream = false;

@override
Socket? receiverSocket;

@override
bool isRequestAllowed() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import 'package:at_server_spec/at_server_spec.dart';

import 'dummy_inbound_connection.dart';

class InboundConnectionImpl extends BaseConnection
class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
implements InboundConnection {
@override
bool? isMonitor = false;
Expand Down Expand Up @@ -55,7 +55,7 @@ class InboundConnectionImpl extends BaseConnection
/// A list of timestamps representing the times when requests were made.
late final Queue<int> requestTimestampQueue;

InboundConnectionImpl(Socket? socket, String? sessionId, {this.owningPool})
InboundConnectionImpl(T socket, String? sessionId, {this.owningPool})
: super(socket) {
metaData = InboundConnectionMetadata()
..sessionID = sessionId
Expand Down Expand Up @@ -99,9 +99,9 @@ class InboundConnectionImpl extends BaseConnection
// Without the above check, we were getting runtime errors on the next check
// since DummyInboundConnection.getSocket throws a "not implemented" error

if (getSocket().remoteAddress.address ==
connection.getSocket().remoteAddress.address &&
getSocket().remotePort == connection.getSocket().remotePort) {
if (underlying.remoteAddress.address ==
connection.underlying.remoteAddress.address &&
underlying.remotePort == connection.underlying.remotePort) {
return true;
}

Expand All @@ -111,7 +111,7 @@ class InboundConnectionImpl extends BaseConnection
/// Returning true indicates to the caller that this connection **can** be closed if needed
@override
bool isInValid() {
if (getMetaData().isClosed || getMetaData().isStale) {
if (metaData.isClosed || metaData.isStale) {
return true;
}

Expand All @@ -134,7 +134,7 @@ class InboundConnectionImpl extends BaseConnection
// We're past the low water mark. Let's use some fancier logic to mark connections invalid increasingly aggressively.
double idleTimeReductionFactor =
1 - (numConnectionsOverLwm / (poolMaxConnections - lowWaterMark));
if (!getMetaData().isAuthenticated && !getMetaData().isPolAuthenticated) {
if (!metaData.isAuthenticated && !metaData.isPolAuthenticated) {
// For **unauthenticated** connections, we deem invalid if idle time is greater than
// ((maxIdleTime - minIdleTime) * (1 - numConnectionsOverLwm / (maxConnections - connectionsLowWaterMark))) + minIdleTime
//
Expand Down Expand Up @@ -182,9 +182,9 @@ class InboundConnectionImpl extends BaseConnection

/// Get the idle time of the inbound connection since last write operation
int _getIdleTimeMillis() {
var lastAccessedTime = getMetaData().lastAccessed;
var lastAccessedTime = metaData.lastAccessed;
// if lastAccessedTime is not set, use created time
lastAccessedTime ??= getMetaData().created;
lastAccessedTime ??= metaData.created;
var currentTime = DateTime.now().toUtc();
return currentTime.difference(lastAccessedTime!).inMilliseconds;
}
Expand All @@ -193,7 +193,7 @@ class InboundConnectionImpl extends BaseConnection
/// false otherwise
bool _idleForLongerThanMax() {
var idleTimeMillis = _getIdleTimeMillis();
if (getMetaData().isAuthenticated || getMetaData().isPolAuthenticated) {
if (metaData.isAuthenticated || metaData.isPolAuthenticated) {
return idleTimeMillis > authenticatedMaxAllowableIdleTimeMillis;
} else {
return idleTimeMillis > unauthenticatedMaxAllowableIdleTimeMillis;
Expand All @@ -207,9 +207,6 @@ class InboundConnectionImpl extends BaseConnection
listener.listen(callback, streamCallBack);
}

@override
Socket? receiverSocket;

bool? isStream;

@override
Expand All @@ -219,22 +216,22 @@ class InboundConnectionImpl extends BaseConnection
// (Note however that, at time of writing, outbound_connection_impl also calls socket.destroy)

// Some defensive code just in case we accidentally call close multiple times
if (getMetaData().isClosed) {
if (metaData.isClosed) {
return;
}

try {
var address = getSocket().remoteAddress;
var port = getSocket().remotePort;
getSocket().destroy();
var address = underlying.remoteAddress;
var port = underlying.remotePort;
underlying.destroy();
logger.finer(logger.getAtConnectionLogMessage(
getMetaData(), '$address:$port Disconnected'));
getMetaData().isClosed = true;
metaData, '$address:$port Disconnected'));
metaData.isClosed = true;
} on Exception {
getMetaData().isStale = true;
metaData.isStale = true;
// Ignore exception on a connection close
} on Error {
getMetaData().isStale = true;
metaData.isStale = true;
// Ignore error on a connection close
}
}
Expand All @@ -244,7 +241,7 @@ class InboundConnectionImpl extends BaseConnection
super.write(data);
if (metaData is InboundConnectionMetadata) {
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseConnection.truncateForLogging(data)}'));
metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}'));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class InboundConnectionManager implements AtConnectionFactory {
/// @param sessionId - current sessionId
/// Throws a [InboundConnectionLimitException] if pool doesn't have capacity
@override
InboundConnection createConnection(Socket? socket, {String? sessionId}) {
InboundConnection createSocketConnection(Socket socket, {String? sessionId}) {
if (!_isInitialized) {
init(defaultPoolSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,31 @@ class InboundMessageListener {
void listen(callback, streamCallBack) {
onStreamCallBack = streamCallBack;
onBufferEndCallBack = callback;
connection.getSocket().listen(_messageHandler,
connection.underlying.listen(_messageHandler,
onDone: _finishedHandler, onError: _errorHandler);
connection
.getSocket()
.done
.onError((error, stackTrace) => (_errorHandler(error)));
connection.getMetaData().isListening = true;
connection.metaData.isListening = true;
}

/// Handles messages on the inbound client's connection and calls the verb executor
/// Closes the inbound connection in case of any error.
Future<void> _messageHandler(data) async {
//ignore the data read if the connection is stale or closed
if (connection.getMetaData().isStale || connection.getMetaData().isClosed) {
if (connection.metaData.isStale || connection.metaData.isClosed) {
//clear buffer as data is redundant
_buffer.clear();
return;
}
// If connection is invalid, throws ConnectionInvalidException and closes the connection
if (connection.isInValid()) {
_buffer.clear();
logger.info(logger.getAtConnectionLogMessage(connection.getMetaData(),
logger.info(logger.getAtConnectionLogMessage(connection.metaData,
'Inbound connection is invalid. Closing the connection'));
await GlobalExceptionHandler.getInstance().handle(
ConnectionInvalidException('Connection is invalid'),
atConnection: connection);
return;
}
if (connection.getMetaData().isStream) {
if (connection.metaData.isStream) {
await onStreamCallBack(data, connection);
return;
}
Expand All @@ -76,8 +72,8 @@ class InboundMessageListener {
//decode only when end of buffer is reached
var command = utf8.decode(_buffer.getData());
command = command.trim();
logger.info(logger.getAtConnectionLogMessage(connection.getMetaData(),
'RCVD: ${BaseConnection.truncateForLogging(command)}'));
logger.info(logger.getAtConnectionLogMessage(connection.metaData,
'RCVD: ${BaseSocketConnection.truncateForLogging(command)}'));
// if command is '@exit', close the connection.
if (command == '@exit') {
await _finishedHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class OutboundClient {

String? toHost;
String? toPort;
OutboundConnection? outboundConnection;
OutboundSocketConnection? outboundConnection;
bool isConnectionCreated = false;
bool isHandShakeDone = false;
DateTime lastUsed = DateTime.now();
Expand Down Expand Up @@ -374,13 +374,13 @@ class OutboundClient {
}

abstract class OutboundConnectionFactory {
Future<OutboundConnection> createOutboundConnection(
Future<OutboundSocketConnection> createOutboundConnection(
String host, int port, String toAtSign);
}

class DefaultOutboundConnectionFactory implements OutboundConnectionFactory {
@override
Future<OutboundConnection> createOutboundConnection(
Future<OutboundSocketConnection> createOutboundConnection(
String host, int port, String toAtSign) async {
AtSecurityContextImpl securityContext = AtSecurityContextImpl();
SecurityContext secConConnect = SecurityContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import 'dart:io';
import 'package:at_secondary/src/connection/base_connection.dart';
import 'package:at_server_spec/at_server_spec.dart';

// Represent an OutboundConnection to another user's secondary server.
abstract class OutboundConnection extends BaseConnection {
OutboundConnection(Socket? socket) : super(socket);
// Represent an OutboundConnection to another atServer
abstract class OutboundSocketConnection<T extends Socket>
extends BaseSocketConnection {
OutboundSocketConnection(T socket) : super(socket);
}

/// Metadata information for [OutboundConnection]
/// Metadata information for [OutboundSocketConnection]
class OutboundConnectionMetadata extends AtConnectionMetaData {
String? toAtSign;
bool isHandShakeSuccess = false;
Expand Down
Loading

0 comments on commit f45904c

Please sign in to comment.