Skip to content

Commit

Permalink
changing heirarchy and removeing outboundconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
purnimavenkatasubbu committed Dec 6, 2024
1 parent 3e05187 commit 8e5aaee
Show file tree
Hide file tree
Showing 20 changed files with 528 additions and 646 deletions.
5 changes: 1 addition & 4 deletions packages/at_lookup/lib/at_lookup.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ library at_lookup;

export 'src/at_lookup.dart';
export 'src/at_lookup_impl.dart';
export 'src/connection/outbound_connection.dart';
export 'src/connection/outbound_websocket_connection.dart';
export 'src/connection/outbound_connection_impl.dart';
export 'src/exception/at_lookup_exception.dart';
export 'src/monitor_client.dart';
export 'src/cache/secondary_address_finder.dart';
export 'src/cache/cacheable_secondary_address_finder.dart';
export 'src/util/secure_socket_util.dart';
export 'src/connection/at_lookup_outbound_connection_factory.dart';
export 'src/connection/at_lookup_connection_factory.dart';
59 changes: 27 additions & 32 deletions packages/at_lookup/lib/src/at_lookup_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import 'package:at_commons/at_builders.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_lookup/at_lookup.dart';
import 'package:at_lookup/src/connection/at_connection.dart';
import 'package:at_lookup/src/connection/outbound_message_listener.dart';
import 'package:at_lookup/src/connection/at_message_listener.dart';
import 'package:at_utils/at_logger.dart';
import 'package:crypto/crypto.dart';
import 'package:crypton/crypton.dart';
Expand All @@ -20,14 +20,13 @@ class AtLookupImpl implements AtLookUp {
final logger = AtSignLogger('AtLookup');

/// Listener for reading verb responses from the remote server
late OutboundMessageListener messageListener;
late AtMessageListener messageListener;

AtConnection?
_connection; // Single variable for both socket and WebSocket connections
AtConnection? _connection; // Represents Socket or WebSocket connection

AtConnection? get connection => _connection;

late AtLookupOutboundConnectionFactory atOutboundConnectionFactory;
late AtLookupConnectionFactory atConnectionFactory;

@override
late SecondaryAddressFinder secondaryAddressFinder;
Expand All @@ -44,7 +43,7 @@ class AtLookupImpl implements AtLookUp {
String? cramSecret;

// ignore: prefer_typing_uninitialized_variables
var outboundConnectionTimeout;
var atConnectionTimeout;

late SecureSocketConfig _secureSocketConfig;

Expand All @@ -59,10 +58,9 @@ class AtLookupImpl implements AtLookUp {
SecondaryAddressFinder? secondaryAddressFinder,
SecureSocketConfig? secureSocketConfig,
Map<String, dynamic>? clientConfig,
AtLookupOutboundConnectionFactory? atOutboundConnectionFactory}) {
AtLookupConnectionFactory? atConnectionFactory}) {
// Default to secure socket factory
this.atOutboundConnectionFactory =
atOutboundConnectionFactory ?? AtLookupSecureSocketFactory();
this.atConnectionFactory = atConnectionFactory ?? AtLookupSecureSocketFactory();
_currentAtSign = atSign;
_rootDomain = rootDomain;
_rootPort = rootPort;
Expand Down Expand Up @@ -248,8 +246,7 @@ class AtLookupImpl implements AtLookUp {
var port = secondaryAddress.port;

// 2. Create a connection to the secondary server
await createOutboundConnection(
host, port.toString(), _secureSocketConfig);
await createAtConnection(host, port.toString(), _secureSocketConfig);

// 3. Listen to server response
messageListener.listen();
Expand Down Expand Up @@ -432,7 +429,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand All @@ -454,13 +451,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -471,7 +468,7 @@ class AtLookupImpl implements AtLookUp {
await createConnection();
try {
await _pkamAuthenticationMutex.acquire();
if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
..clientConfig = _clientConfig)
Expand Down Expand Up @@ -501,13 +498,13 @@ class AtLookupImpl implements AtLookUp {
var pkamResponse = await messageListener.read();
if (pkamResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException(
'Failed connecting to $_currentAtSign. $pkamResponse');
}
}
return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_pkamAuthenticationMutex.release();
}
Expand All @@ -521,7 +518,7 @@ class AtLookupImpl implements AtLookUp {
try {
await _cramAuthenticationMutex.acquire();

if (!_connection!.getMetaData()!.isAuthenticated) {
if (!_connection!.metaData.isAuthenticated) {
// Use the connection and message listener dynamically
await _sendCommand((FromVerbBuilder()
..atSign = _currentAtSign
Expand All @@ -548,13 +545,13 @@ class AtLookupImpl implements AtLookUp {

if (cramResponse == 'data:success') {
logger.info('auth success');
_connection!.getMetaData()!.isAuthenticated = true;
_connection!.metaData.isAuthenticated = true;
} else {
throw UnAuthenticatedException('Auth failed');
}
}

return _connection!.getMetaData()!.isAuthenticated;
return _connection!.metaData.isAuthenticated;
} finally {
_cramAuthenticationMutex.release();
}
Expand Down Expand Up @@ -629,28 +626,26 @@ class AtLookupImpl implements AtLookUp {
}

bool _isAuthRequired() {
return !isConnectionAvailable() ||
!(_connection!.getMetaData()!.isAuthenticated);
return !isConnectionAvailable() || !(_connection!.metaData.isAuthenticated);
}

Future<bool> createOutboundConnection(
Future<bool> createAtConnection(
String host, String port, SecureSocketConfig secureSocketConfig) async {
try {
// Create the socket connection using the factory
final underlying = await atOutboundConnectionFactory.createUnderlying(
final underlying = await atConnectionFactory.createUnderlying(
host, port, secureSocketConfig);

// Create the outbound connection and listener using the factory's methods
final outboundConnection =
atOutboundConnectionFactory.createConnection(underlying);
messageListener =
atOutboundConnectionFactory.createListener(outboundConnection);
// Create at connection and listener using the factory's methods
AtConnection atConnection =
atConnectionFactory.createConnection(underlying);
messageListener = atConnectionFactory.createListener(atConnection);

_connection = outboundConnection;
_connection = atConnection;

// Set idle time if applicable
if (outboundConnectionTimeout != null) {
outboundConnection.setIdleTime(outboundConnectionTimeout);
if (atConnectionTimeout != null) {
atConnection.setIdleTime(atConnectionTimeout);
}
} on SocketException {
throw SecondaryConnectException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ class SecondaryAddressCacheEntry {
class SecondaryUrlFinder {
final String _rootDomain;
final int _rootPort;
late final AtLookupOutboundConnectionFactory _socketFactory;
late final AtLookupConnectionFactory _socketFactory;

SecondaryUrlFinder(this._rootDomain, this._rootPort,
{AtLookupOutboundConnectionFactory? socketFactory}) {
{AtLookupConnectionFactory? socketFactory}) {
_socketFactory = socketFactory ?? AtLookupSecureSocketFactory();
}

Expand Down
44 changes: 37 additions & 7 deletions packages/at_lookup/lib/src/connection/at_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,52 @@ abstract class AtConnection<T> {
/// The underlying connection
T get underlying;

/// Write a data to the underlying socket of the connection
/// Metadata for the connection
final AtConnectionMetaData metaData = AtConnectionMetaData();

/// The idle timeout in milliseconds (default: 10 minutes)
int idleTimeMillis = 600000;

AtConnection() {
metaData.created = DateTime.now().toUtc();
}

/// Writes data to the underlying socket of the connection.
/// @param - data - Data to write to the socket
/// @throws [AtIOException] for any exception during the operation
FutureOr<void> write(String data);

/// closes the underlying connection
/// Closes the underlying connection.
Future<void> close();

/// Returns true if the connection is invalid
bool isInValid();
/// Returns true if the connection is invalid.
bool isInValid() {
return _isIdle() || metaData.isClosed || metaData.isStale;
}

/// Updates the idle time for the connection (Socket or WebSocket).
void setIdleTime(int? idleTimeMillis) {
if (idleTimeMillis != null) {
this.idleTimeMillis = idleTimeMillis;
}
}

/// Checks if the connection has been idle for longer than the specified timeout.
bool _isIdle() {
return _getIdleTimeMillis() > idleTimeMillis;
}

/// Gets the connection metadata
AtConnectionMetaData? getMetaData();
/// Calculates the idle time in milliseconds.
int _getIdleTimeMillis() {
var lastAccessedTime = metaData.lastAccessed;
lastAccessedTime ??= metaData.created;
var currentTime = DateTime.now().toUtc();
return currentTime.difference(lastAccessedTime!).inMilliseconds;
}
}

abstract class AtConnectionMetaData {
/// Metadata for [AtConnection].
class AtConnectionMetaData {
bool isAuthenticated = false;
DateTime? lastAccessed;
DateTime? created;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import 'dart:io';

import 'package:at_commons/at_commons.dart';
import 'package:at_lookup/at_lookup.dart';
import 'package:at_lookup/src/connection/at_connection.dart';
import 'package:at_lookup/src/connection/at_socket_connection.dart';
import 'package:at_lookup/src/connection/at_websocket_connection.dart';

import 'at_message_listener.dart';

/// This factory is responsible for creating the underlying connection,
/// an connection wrapper, and the message listener for a
/// specific type of connection (e.g., `SecureSocket` or `WebSocket`).
abstract class AtLookupConnectionFactory<T, U> {
/// Creates the underlying connection of type [T].
Future<T> createUnderlying(
String host, String port, SecureSocketConfig secureSocketConfig);

/// Wraps the underlying connection of type [T] into an connection [U].
U createConnection(T underlying);

/// Creates an [AtMessageListener] to manage messages for the given [U] connection.
AtMessageListener createListener(U connection);
}

/// Factory class to create a secure connection over [SecureSocket].
class AtLookupSecureSocketFactory extends AtLookupConnectionFactory<
SecureSocket, AtConnection> {
/// Creates a secure socket connection to the specified [host] and [port]
/// using the given [secureSocketConfig]. Returns a [SecureSocket]
@override
Future<SecureSocket> createUnderlying(
String host, String port, SecureSocketConfig secureSocketConfig) async {
return await SecureSocketUtil.createSecureSocket(
host, port, secureSocketConfig);
}

/// Wraps the [SecureSocket] connection into an [AtConnection] instance.
@override
AtConnection createConnection(SecureSocket underlying) {
return AtSocketConnection(underlying);
}

/// Creates an [AtMessageListener] to manage messages for the secure
/// socket-based [AtConnection].
@override
AtMessageListener createListener(AtConnection connection) {
return AtMessageListener(connection);
}
}

/// Factory class to create a WebSocket-based connection.
class AtLookupWebSocketFactory extends AtLookupConnectionFactory<
WebSocket, AtConnection> {
/// Creates a WebSocket connection to the specified [host] and [port]
/// using the given [secureSocketConfig].
@override
Future<WebSocket> createUnderlying(
String host, String port, SecureSocketConfig secureSocketConfig) async {
final socket = await SecureSocketUtil.createSecureSocket(
host, port, secureSocketConfig,
isWebSocket: true);
return socket as WebSocket;
}

/// Wraps the [WebSocket] connection into an [AtConnection] instance.
@override
AtConnection createConnection(underlying) {
return AtWebSocketConnection(underlying);
}

/// Creates an [AtMessageListener] to manage messages for the
/// WebSocket-based [AtConnection].
@override
AtMessageListener createListener(
AtConnection connection) {
return AtMessageListener(connection);
}
}
Loading

0 comments on commit 8e5aaee

Please sign in to comment.