Skip to content

Commit

Permalink
Merge pull request #1538 from atsign-foundation/refactor_commit_log
Browse files Browse the repository at this point in the history
fix: Refactor commit log keystore and unit tests
  • Loading branch information
gkc authored Oct 9, 2023
2 parents 59e75ae + 21cc674 commit bbfe95b
Show file tree
Hide file tree
Showing 7 changed files with 1,070 additions and 932 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,33 @@ import 'package:at_utf7/at_utf7.dart';
import 'package:at_utils/at_logger.dart';
import 'package:hive/hive.dart';

abstract class BaseAtCommitLog implements AtLogType<int, CommitEntry> {
Future<CommitEntry?> lastSyncedEntry() async {
// Implemented by [ClientAtCommitLog]
throw UnimplementedError();
}

Future<CommitEntry?> lastSyncedEntryWithRegex(String regex) async {
// Implemented by [ClientAtCommitLog]
throw UnimplementedError();
}

/// Returns the commit entry for a given commit sequence number
/// throws [DataStoreException] if there is an exception getting the commit entry
Future<CommitEntry?> getEntry(int? sequenceNumber) async {
// Implemented by [ClientAtCommitLog]
throw UnimplementedError();
}

Future<void> update(CommitEntry commitEntry, int commitId) async {
// Implemented by [ClientAtCommitLog]
throw UnimplementedError();
}
}

/// Class to maintain commit logs on the secondary server for create, update and remove operations on keys
class AtCommitLog implements AtLogType<int, CommitEntry> {
@server
class AtCommitLog extends BaseAtCommitLog {
var logger = AtSignLogger('AtCommitLog');

late final List<AtChangeEventListener> _atChangeEventListener = [];
Expand Down Expand Up @@ -54,54 +79,6 @@ class AtCommitLog implements AtLogType<int, CommitEntry> {
return result;
}

/// Returns the commit entry for a given commit sequence number
/// throws [DataStoreException] if there is an exception getting the commit entry
@client
Future<CommitEntry?> getEntry(int? sequenceNumber) async {
try {
var commitEntry = await _commitLogKeyStore.get(sequenceNumber!);
return commitEntry;
} on Exception catch (e) {
throw DataStoreException('Exception getting entry:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error adding to commit log:${e.toString()}');
}
}

/// Returns the list of commit entries greater than [sequenceNumber]
/// throws [DataStoreException] if there is an exception getting the commit entries
Future<List<CommitEntry>> getChanges(int? sequenceNumber, String? regex,
{int? limit}) async {
Future<List<CommitEntry>> changes;
try {
changes = _commitLogKeyStore.getChanges(sequenceNumber!,
regex: regex, limit: limit);
} on Exception catch (e) {
throw DataStoreException('Exception getting changes:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error adding to commit log:${e.toString()}');
}
// ignore: unnecessary_null_comparison
if (changes == null) {
return [];
}
return changes;
}

@client
Future<void> update(CommitEntry commitEntry, int commitId) async {
try {
await _commitLogKeyStore.update(commitId, commitEntry);
} on Exception catch (e) {
throw DataStoreException('Exception updating entry:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error updating entry to commit log:${e.toString()}');
}
}

/// Returns the latest committed sequence number
@server
int? lastCommittedSequenceNumber() {
Expand All @@ -114,16 +91,6 @@ class AtCommitLog implements AtLogType<int, CommitEntry> {
return await _commitLogKeyStore.lastCommittedSequenceNumberWithRegex(regex);
}

@client
Future<CommitEntry?> lastSyncedEntry() async {
return await _commitLogKeyStore.lastSyncedEntry();
}

@client
Future<CommitEntry?> lastSyncedEntryWithRegex(String regex) async {
return await _commitLogKeyStore.lastSyncedEntry(regex: regex);
}

/// Returns the first committed sequence number
@server
int? firstCommittedSequenceNumber() {
Expand All @@ -133,7 +100,6 @@ class AtCommitLog implements AtLogType<int, CommitEntry> {
/// Returns the total number of keys
/// @return - int : Returns number of keys in access log
@override
@server
int entriesCount() {
return _commitLogKeyStore.entriesCount();
}
Expand Down Expand Up @@ -229,4 +195,77 @@ class AtCommitLog implements AtLogType<int, CommitEntry> {
String toString() {
return runtimeType.toString();
}

/// Returns the list of commit entries greater than [sequenceNumber]
/// throws [DataStoreException] if there is an exception getting the commit entries
Future<List<CommitEntry>> getChanges(int? sequenceNumber, String? regex,
{int? limit}) async {
throw UnimplementedError('');
}
}

@client
class ClientAtCommitLog extends AtCommitLog {
ClientAtCommitLog(CommitLogKeyStore keyStore) : super(keyStore);

/// Returns the commit entry for a given commit sequence number
/// throws [DataStoreException] if there is an exception getting the commit entry
@override
Future<CommitEntry?> getEntry(int? sequenceNumber) async {
try {
var commitEntry = await _commitLogKeyStore.get(sequenceNumber!);
return commitEntry;
} on Exception catch (e) {
throw DataStoreException('Exception getting entry:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error adding to commit log:${e.toString()}');
}
}

@override
Future<void> update(CommitEntry commitEntry, int commitId) async {
try {
await _commitLogKeyStore.update(commitId, commitEntry);
} on Exception catch (e) {
throw DataStoreException('Exception updating entry:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error updating entry to commit log:${e.toString()}');
}
}

@override
Future<CommitEntry?> lastSyncedEntry() async {
return await (_commitLogKeyStore as ClientCommitLogKeyStore)
.lastSyncedEntry();
}

@override
Future<CommitEntry?> lastSyncedEntryWithRegex(String regex) async {
return await (_commitLogKeyStore as ClientCommitLogKeyStore)
.lastSyncedEntry(regex: regex);
}

/// Returns the list of commit entries greater than [sequenceNumber]
/// throws [DataStoreException] if there is an exception getting the commit entries
@override
Future<List<CommitEntry>> getChanges(int? sequenceNumber, String? regex,
{int? limit}) async {
List<CommitEntry> changes;
try {
changes = await _commitLogKeyStore.getChanges(sequenceNumber!,
regex: regex, limit: limit);
} on Exception catch (e) {
throw DataStoreException('Exception getting changes:${e.toString()}');
} on HiveError catch (e) {
throw DataStoreException(
'Hive error adding to commit log:${e.toString()}');
}
// ignore: unnecessary_null_comparison
if (changes == null) {
return [];
}
return changes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ class AtCommitLogManagerImpl implements AtCommitLogManager {
{String? commitLogPath, bool enableCommitId = true}) async {
//verify if an instance has been already created for the given instance.
if (!_commitLogMap.containsKey(atSign)) {
var commitLogKeyStore = CommitLogKeyStore(atSign);
commitLogKeyStore.enableCommitId = enableCommitId;
CommitLogKeyStore? commitLogKeyStore;
// Creating commit-log for client
if (enableCommitId) {
commitLogKeyStore = CommitLogKeyStore(atSign);
_commitLogMap[atSign] = AtCommitLog(commitLogKeyStore);
} else {
commitLogKeyStore = ClientCommitLogKeyStore(atSign);
_commitLogMap[atSign] = ClientAtCommitLog(commitLogKeyStore);
}
if (commitLogPath != null) {
await commitLogKeyStore.init(commitLogPath, isLazy: false);
}
_commitLogMap[atSign] = AtCommitLog(commitLogKeyStore);
}
return _commitLogMap[atSign];
}
Expand Down
Loading

0 comments on commit bbfe95b

Please sign in to comment.