Skip to content

Commit

Permalink
Simplify distributed lock creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ddaspit committed Sep 5, 2023
1 parent d0792ff commit 464421d
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 176 deletions.
22 changes: 0 additions & 22 deletions src/SIL.Machine.AspNetCore/Services/DistributedReaderWriterLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,19 @@ public class DistributedReaderWriterLock : IDistributedReaderWriterLock
private readonly IIdGenerator _idGenerator;
private readonly string _id;

private bool _lockChecked;

public DistributedReaderWriterLock(string hostId, IRepository<RWLock> locks, IIdGenerator idGenerator, string id)
{
_hostId = hostId;
_locks = locks;
_idGenerator = idGenerator;
_id = id;
_lockChecked = false;
}

public async Task<IAsyncDisposable> ReaderLockAsync(
TimeSpan? lifetime = default,
CancellationToken cancellationToken = default
)
{
await createLockIfNotExist();
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken))
{
Expand Down Expand Up @@ -53,7 +49,6 @@ public async Task<IAsyncDisposable> WriterLockAsync(
CancellationToken cancellationToken = default
)
{
await createLockIfNotExist();
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken))
{
Expand Down Expand Up @@ -101,23 +96,6 @@ await _locks.UpdateAsync(
return new WriterLockReleaser(this, lockId);
}

private async Task createLockIfNotExist()
{
if (_lockChecked == false)
{
try
{
await _locks.InsertAsync(new RWLock { Id = _id }, CancellationToken.None);
}
catch (DuplicateKeyException)
{
// the lock is already made - no new one needs to be made
// This is done instead of checking if it exists first to prevent race conditions.
}
_lockChecked = true;
}
}

private async Task<bool> TryAcquireWriterLock(
string lockId,
TimeSpan? lifetime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,20 @@ public async Task InitAsync(CancellationToken cancellationToken = default)
await ReleaseAllReaderLocksAsync(cancellationToken);
}

public IDistributedReaderWriterLock Create(string id)
public async Task<IDistributedReaderWriterLock> CreateAsync(
string id,
CancellationToken cancellationToken = default
)
{
try
{
await _locks.InsertAsync(new RWLock { Id = id }, cancellationToken);
}
catch (DuplicateKeyException)
{
// the lock is already made - no new one needs to be made
// This is done instead of checking if it exists first to prevent race conditions.
}
return new DistributedReaderWriterLock(_serviceOptions.ServiceId, _locks, _idGenerator, id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
public interface IDistributedReaderWriterLockFactory
{
Task InitAsync(CancellationToken cancellationToken = default);
IDistributedReaderWriterLock Create(string id);
Task<IDistributedReaderWriterLock> CreateAsync(string id, CancellationToken cancellationToken = default);
Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task RunAsync(
CancellationToken cancellationToken
)
{
IDistributedReaderWriterLock rwLock = _lockFactory.Create(engineId);
IDistributedReaderWriterLock rwLock = await _lockFactory.CreateAsync(engineId, cancellationToken);

var tokenizer = new LatinWordTokenizer();
var detokenizer = new LatinWordDetokenizer();
Expand Down
14 changes: 7 additions & 7 deletions src/SIL.Machine.AspNetCore/Services/SmtTransferEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override async Task CreateAsync(
await base.CreateAsync(engineId, engineName, sourceLanguage, targetLanguage, cancellationToken);

SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, CancellationToken.None);
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
state.InitNew();
Expand All @@ -45,7 +45,7 @@ public override async Task DeleteAsync(string engineId, CancellationToken cancel
await base.DeleteAsync(engineId, cancellationToken);
if (_stateService.TryRemove(engineId, out SmtTransferEngineState? state))
{
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, CancellationToken.None);
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
// ensure that there is no build running before unloading
Expand All @@ -67,7 +67,7 @@ public override async Task<IReadOnlyList<TranslationResult>> TranslateAsync(
)
{
SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.ReaderLockAsync(cancellationToken: cancellationToken))
{
TranslationEngine engine = await GetEngineAsync(engineId, cancellationToken);
Expand All @@ -85,7 +85,7 @@ public override async Task<WordGraph> GetWordGraphAsync(
)
{
SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.ReaderLockAsync(cancellationToken: cancellationToken))
{
TranslationEngine engine = await GetEngineAsync(engineId, cancellationToken);
Expand All @@ -105,7 +105,7 @@ public override async Task TrainSegmentPairAsync(
)
{
SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
TranslationEngine engine = await GetEngineAsync(engineId, cancellationToken);
Expand Down Expand Up @@ -142,7 +142,7 @@ public override async Task StartBuildAsync(
)
{
SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await StartBuildInternalAsync(engineId, buildId, corpora, cancellationToken);
Expand All @@ -153,7 +153,7 @@ public override async Task StartBuildAsync(
public override async Task CancelBuildAsync(string engineId, CancellationToken cancellationToken = default)
{
SmtTransferEngineState state = _stateService.Get(engineId);
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await CancelBuildInternalAsync(engineId, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TimeSpan inactiveTimeout
{
foreach (SmtTransferEngineState state in _engineStates.Values)
{
IDistributedReaderWriterLock @lock = lockFactory.Create(state.EngineId);
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(state.EngineId);
await using (await @lock.WriterLockAsync())
{
TranslationEngine? engine = await engines.GetAsync(e => e.EngineId == state.EngineId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public virtual async Task StartBuildAsync(
CancellationToken cancellationToken = default
)
{
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await StartBuildInternalAsync(engineId, buildId, corpora, cancellationToken);
Expand All @@ -69,7 +69,7 @@ public virtual async Task StartBuildAsync(

public virtual async Task CancelBuildAsync(string engineId, CancellationToken cancellationToken = default)
{
IDistributedReaderWriterLock @lock = LockFactory.Create(engineId);
IDistributedReaderWriterLock @lock = await LockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await CancelBuildInternalAsync(engineId, cancellationToken);
Expand Down
Loading

0 comments on commit 464421d

Please sign in to comment.