Skip to content

Commit

Permalink
Version 1.0.0 -> 2.0.0, support ReadOnlyMemory<byte>
Browse files Browse the repository at this point in the history
The method StoreWriter.WriteAsync now supports a ReadOnlyMemory<byte>
in addition to the old (byte[], int, int) signature.

BREAKING CHANGE  the virtual method StoreWriter.DoWriteAsync now expects
a ReadOnlyMemory<byte> instead of the old (byte[], int, int) signature.
  • Loading branch information
VictorNicollet committed Aug 23, 2022
1 parent 6b6e8aa commit 5c25463
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 73 deletions.
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2020, LOKAD SAS
Copyright (c) 2022, LOKAD SAS
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
Expand Down
10 changes: 5 additions & 5 deletions src/Disk/DiskWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public DiskWriter(string path)
_temp = File.Open(_tempPath, FileMode.CreateNew);
}

/// <see cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel) =>
_temp.WriteAsync(buffer, offset, count, cancel);
/// <inheritdoc cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel) =>
_temp.WriteAsync(buffer, cancel).AsTask();

/// <see cref="StoreWriter.DoCommitAsync"/>
/// <inheritdoc cref="StoreWriter.DoCommitAsync"/>
protected override Task DoCommitAsync(Hash hash, CancellationToken cancel) =>
DoOptCommitAsync(hash, null, cancel);

/// <see cref="StoreWriter.DoOptCommitAsync"/>
/// <inheritdoc cref="StoreWriter.DoOptCommitAsync"/>
protected override async Task DoOptCommitAsync(Hash hash, Func<Task> optionalWrite, CancellationToken cancel)
{
var path = DiskStorePaths.PathOfHash(_path, hash);
Expand Down
28 changes: 6 additions & 22 deletions src/Lokad.ContentAddr.csproj
Original file line number Diff line number Diff line change
@@ -1,46 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net452;netstandard2.0;netcoreapp2.1</TargetFrameworks>
<TargetFrameworks>netstandard2.1</TargetFrameworks>
<AssemblyName>Lokad.ContentAddr</AssemblyName>
<RuntimeIdentifiers>win</RuntimeIdentifiers>
<OutputType>Library</OutputType>
<Company>Lokad</Company>
<Copyright>Copyright © Lokad 2020</Copyright>
<Copyright>Copyright © Lokad 2022</Copyright>

<AssemblyVersion>1.0.0.0</AssemblyVersion>
<FileVersion>1.0.0</FileVersion>
<AssemblyVersion>2.0.0.0</AssemblyVersion>
<FileVersion>2.0.0</FileVersion>
<PackageId>Lokad.ContentAddr</PackageId>
<PackageVersion>1.0.0</PackageVersion>
<PackageVersion>2.0.0</PackageVersion>
<LangVersion>7.2</LangVersion>
<Version>1.0.0</Version>
<Version>2.0.0</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageLicenseFile>LICENSE.txt</PackageLicenseFile>
<PackageIcon>lokad.png</PackageIcon>
<PackageProjectUrl>https://github.com/Lokad/ContentAddr</PackageProjectUrl>
<Description>In-memory and on-disk content-addressable storage</Description>
</PropertyGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.Runtime" Version="4.3.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
<PackageReference Include="System.IO" Version="4.3.0" />
<PackageReference Include="System.IO.FileSystem.Primitives" Version="4.3.0" />
<PackageReference Include="System.Text.Encoding.Extensions" Version="4.3.0" />
<PackageReference Include="System.Collections" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
<PackageReference Include="System.Resources.ResourceManager" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Debug" Version="4.3.0" />
<PackageReference Include="System.Runtime.InteropServices" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Lokad.Logging" Version="1.0.9" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Runtime" Version="4.3.1" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.1" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' != 'netstandard2.0'">
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<None Include="..\LICENSE.txt">
<Pack>True</Pack>
Expand Down
11 changes: 6 additions & 5 deletions src/Memory/MemoryStore.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -52,11 +53,11 @@ public Writer(IDictionary<Hash, byte[]> files)
_files = files;
}

/// <see cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel) =>
_stream.WriteAsync(buffer, offset, count, cancel);
/// <inheritdoc cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel) =>
_stream.WriteAsync(buffer, cancel).AsTask();

/// <see cref="StoreWriter.DoCommitAsync"/>
/// <inheritdoc cref="StoreWriter.DoCommitAsync"/>
protected override Task DoCommitAsync(Hash hash, CancellationToken cancel)
{
lock (_files)
Expand Down
9 changes: 5 additions & 4 deletions src/NullStore.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Lokad.ContentAddr
Expand All @@ -10,11 +11,11 @@ public sealed class NullStore : IWriteOnlyStore

private sealed class NullWriter : StoreWriter
{
/// <see cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel) =>
/// <inheritdoc cref="StoreWriter.DoWriteAsync"/>
protected override Task DoWriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel) =>
Task.FromResult(0);

/// <see cref="StoreWriter.DoCommitAsync"/>
/// <inheritdoc cref="StoreWriter.DoCommitAsync"/>
protected override Task DoCommitAsync(Hash hash, CancellationToken cancel) =>
Task.FromResult(0);
}
Expand Down
93 changes: 61 additions & 32 deletions src/StoreWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ public WrittenBlob CommittedBlob
private Hash _hash;

/// <summary> Used to hash incoming bytes. </summary>
private readonly MD5 _hasher = MD5.Create();
private readonly IncrementalHash _hasher = IncrementalHash.CreateHash(HashAlgorithmName.MD5);

/// <summary> Clears the hasher contents and returns the computed hash. </summary>
private Hash GetHashFromHasher()
{
Span<byte> destination = stackalloc byte[Hash.Size];
_hasher.TryGetHashAndReset(destination, out _);
return Hash.FromBytes(destination);
}

/// <summary> The task returned by the first call to a commit function. </summary>
private Task<WrittenBlob> _theCommit;
Expand Down Expand Up @@ -111,13 +119,12 @@ public async Task<WrittenBlob> RealCommitAsync(CancellationToken cancel)
_syncBuffer = null;
_syncOffset = 0;

return await RealWriteAndCommitAsync(buffer, 0, count, cancel).ConfigureAwait(false);
return await RealWriteAndCommitAsync(buffer.AsMemory(0, count), cancel).ConfigureAwait(false);
}

try
{
_hasher.TransformFinalBlock(new byte[0], 0, 0);
_hash = new Hash(_hasher.Hash);
_hash = GetHashFromHasher();

await DoOptCommitAsync(_hash, null, cancel).ConfigureAwait(false);
}
Expand Down Expand Up @@ -157,7 +164,27 @@ public async Task<WrittenBlob> RealCommitAsync(CancellationToken cancel)
/// <see cref="WriteAsync(byte[],int,int,CancellationToken)"/> with the buffer
/// beforehand. This call is not awaited.
/// </remarks>
public void Write(byte[] buffer, int offset, int count)
public void Write(byte[] buffer, int offset, int count) =>
Write(buffer.AsSpan(offset, count));

/// <summary> Synchronous write. </summary>
/// <remarks>
/// Some outer interfaces (such as <see cref="System.IO.Stream"/>) require us to support
/// synchronous writes. We cannot just wrap the async write function with a <c>Wait()</c>
/// due to deadlock risks, so we provide this method instead. What it does is:
///
/// - every time data is received, it is appended to the <see cref="_syncBuffer"/>
/// (allocated on first use) at position <see cref="_syncOffset"/>. This is because
/// we are not allowed to keep around the buffer received as argument, so we need
/// to perform a copy. The good news is, this is actually more efficient when dealing
/// with a lot of very small writes.
///
/// - when the buffer is full, or when an async operation is performed through
/// another method, <see cref="WriteSyncBuffer"/> is called to perform a
/// <see cref="WriteAsync(byte[],int,int,CancellationToken)"/> with the buffer
/// beforehand. This call is not awaited.
/// </remarks>
public void Write(ReadOnlySpan<byte> span)
{
if (Failure != null)
throw new InvalidOperationException(
Expand All @@ -167,16 +194,15 @@ public void Write(byte[] buffer, int offset, int count)
if (CommitWasRequested)
throw new InvalidOperationException("Cannot write to a committed StoreWriter.");

while (count > 0)
while (span.Length > 0)
{
if (_syncBuffer == null) _syncBuffer = new byte[4 * 1024 * 1024];

var length = Math.Min(count, _syncBuffer.Length - _syncOffset);
Array.Copy(buffer, offset, _syncBuffer, _syncOffset, length);

var length = Math.Min(span.Length, _syncBuffer.Length - _syncOffset);
span.Slice(0, length).CopyTo(_syncBuffer.AsSpan(_syncOffset));
_syncOffset += length;
offset += length;
count -= length;
span = span.Slice(length);

if (_syncOffset == _syncBuffer.Length) WriteSyncBuffer(CancellationToken.None);
}
Expand Down Expand Up @@ -210,8 +236,6 @@ public void WriteSyncBuffer(CancellationToken cancel)
_syncOffset = 0;
_syncBuffer = null;

#pragma warning disable CS4014

// The thread-unsafe parts of this call are done by the time it returns. The
// task itself only needs to be awaited in order to know when the buffer is
// no longer in use ; but since we discard the buffer, we don't care, so there
Expand All @@ -221,15 +245,17 @@ public void WriteSyncBuffer(CancellationToken cancel)
// ongoing writes, so that they are all completed by the time the final commit
// is performed.

WriteAsync(buffer, 0, count, cancel);

#pragma warning restore CS4014
_ = WriteAsync(buffer, 0, count, cancel);
}

#endregion

/// <summary> Write to the background task. </summary>
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel) =>
WriteAsync(buffer.AsMemory(offset, count), cancel);

/// <summary> Write to the background task. </summary>
public async Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel)
{
if (Failure != null)
throw new InvalidOperationException(
Expand All @@ -242,12 +268,12 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT
if (_syncBuffer != null)
WriteSyncBuffer(cancel);

_hasher.TransformBlock(buffer, offset, count, buffer, offset);
_hashedBytes += count;
_hasher.AppendData(buffer.Span);
_hashedBytes += buffer.Length;

try
{
await DoWriteAsync(buffer, offset, count, cancel).ConfigureAwait(false);
await DoWriteAsync(buffer, cancel).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -261,7 +287,10 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT
/// by <see cref="CommitAsync"/>, but allows the <see cref="StoreWriter"/> to perform
/// optimizations during the write because it knows it will be the last.
/// </summary>
public Task<WrittenBlob> WriteAndCommitAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
public Task<WrittenBlob> WriteAndCommitAsync(byte[] buffer, int offset, int count, CancellationToken cancel) =>
WriteAndCommitAsync(buffer.AsMemory(offset, count), cancel);

public Task<WrittenBlob> WriteAndCommitAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel)
{
if (_theCommit != null)
throw new InvalidOperationException("Cannot write to a committed StoreWriter.");
Expand All @@ -271,12 +300,12 @@ public Task<WrittenBlob> WriteAndCommitAsync(byte[] buffer, int offset, int coun
"StoreWriter is in a failed state due to an earlier exception.",
Failure);

return _theCommit = RealWriteAndCommitAsync(buffer, offset, count, cancel);
return _theCommit = RealWriteAndCommitAsync(buffer, cancel);
}

/// <summary> Async implementation of <see cref="WriteAndCommitAsync"/>. </summary>
/// <remarks> Will only be called if <see cref="_theCommit"/> is null. </remarks>
private async Task<WrittenBlob> RealWriteAndCommitAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
private async Task<WrittenBlob> RealWriteAndCommitAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel)
{
// We let the protected async function decide whether the final writes need
// to be performed (if the blob does not exist yet), by providing a closure
Expand All @@ -285,7 +314,7 @@ private async Task<WrittenBlob> RealWriteAndCommitAsync(byte[] buffer, int offse

if (_syncBuffer == null)
{
writeIfNecessary = () => DoWriteAsync(buffer, offset, count, cancel);
writeIfNecessary = () => DoWriteAsync(buffer, cancel);
}
else
{
Expand All @@ -296,25 +325,25 @@ private async Task<WrittenBlob> RealWriteAndCommitAsync(byte[] buffer, int offse
var preCount = _syncOffset;

_syncBuffer = null;
_syncOffset = 0;
_syncOffset = 0;

_hasher.TransformBlock(preBuffer, 0, preCount, preBuffer, 0);
_hasher.AppendData(preBuffer.AsSpan(0, preCount));
_hashedBytes += preCount;

writeIfNecessary = () =>
{
// As long as the calls are done in sequence, the returned tasks
// themselves can be awaited in parallel.
var pre = DoWriteAsync(preBuffer, 0, preCount, cancel);
var post = DoWriteAsync(buffer, offset, count, cancel);
var pre = DoWriteAsync(preBuffer.AsMemory(0, preCount), cancel);
var post = DoWriteAsync(buffer, cancel);
return Task.WhenAll(pre, post);
};
}

_hasher.TransformFinalBlock(buffer, offset, count);
_hashedBytes += count;
_hasher.AppendData(buffer.Span);
_hashedBytes += buffer.Length;

_hash = new Hash(_hasher.Hash);
_hash = GetHashFromHasher();

try
{
Expand Down Expand Up @@ -409,7 +438,7 @@ public async Task WriteAsync(
/// The returned task should complete as soon as another call to <see cref="DoWriteAsync"/>
/// or <see cref="DoCommitAsync"/> can be performed, even if the data is not yet fully persisted.
/// </remarks>
protected abstract Task DoWriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel);
protected abstract Task DoWriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancel);

/// <summary> Commit all data written so far, with the provided hash. </summary>
/// <remarks> The returned task should complete as soon as the data is safely persisted. </remarks>
Expand Down
6 changes: 2 additions & 4 deletions test/Lokad.ContentAddr.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<AssemblyName>Lokad.ContentAddr.Tests</AssemblyName>
<RuntimeIdentifiers>win</RuntimeIdentifiers>
<OutputType>Library</OutputType>
Expand All @@ -16,15 +16,13 @@
<ProjectReference Include="..\src\Lokad.ContentAddr.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
Expand Down

0 comments on commit 5c25463

Please sign in to comment.