Skip to content

Commit

Permalink
A little more file tweaking
Browse files Browse the repository at this point in the history
  • Loading branch information
daveaglick committed Apr 11, 2021
1 parent 81f5cff commit 2f97ee4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/core/Statiq.Core/IO/Local/LocalFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Statiq.Core
// Initially based on code from Cake (http://cakebuild.net/)
internal class LocalFile : IFile
{
private const int BufferSize = 16384;
private const int BufferSize = 8192;

private readonly LocalFileProvider _fileProvider;
private readonly System.IO.FileInfo _file;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Statiq.Core/Modules/IO/CopyFiles.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected override async Task<IEnumerable<IDocument>> ExecuteConfigAsync(IDocume
if (value is object)
{
// Use a semaphore to limit the write operations so we don't try to do a bunch of writes at once
SemaphoreSlim semaphore = new SemaphoreSlim(10, 10);
SemaphoreSlim semaphore = new SemaphoreSlim(20, 20);

// Create copy tasks
IEnumerable<IFile> inputFiles = context.FileSystem.GetInputFiles(value);
Expand Down
90 changes: 47 additions & 43 deletions src/core/Statiq.Core/Modules/IO/WriteFiles.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public WriteFiles Where(Config<bool> predicate)
protected override async Task<IEnumerable<IDocument>> ExecuteContextAsync(IExecutionContext context)
{
// Use a semaphore to limit the write operations so we don't try to do a bunch of writes at once
SemaphoreSlim semaphore = new SemaphoreSlim(10, 10);
SemaphoreSlim semaphore = new SemaphoreSlim(20, 20);

// Get the output file path for each file in sequence and set up action chains
// Value = input source string(s) (for reporting a warning if not appending), write action
Dictionary<NormalizedPath, Tuple<List<string>, Func<Task>>> writesBySource = new Dictionary<NormalizedPath, Tuple<List<string>, Func<Task>>>();
foreach (IDocument input in context.Inputs)
{
await WriteFileAsync(input);
await AddWriteFileTaskAsync(input, context, writesBySource, semaphore);
}

// Display a warning for any duplicated outputs if not appending
Expand All @@ -91,57 +91,61 @@ protected override async Task<IEnumerable<IDocument>> ExecuteContextAsync(IExecu
}
}

// Run the write actions in parallel
// Run the write actions
await Task.WhenAll(writesBySource.Values.Select(x => x.Item2()));

// Return the input documents
return context.Inputs;
}

async Task WriteFileAsync(IDocument input)
private async Task AddWriteFileTaskAsync(
IDocument input,
IExecutionContext context,
Dictionary<NormalizedPath, Tuple<List<string>, Func<Task>>> writesBySource,
SemaphoreSlim semaphore)
{
if (await ShouldProcessAsync(input, context) && !input.Destination.IsNull)
{
if (await ShouldProcessAsync(input, context) && !input.Destination.IsNull)
if (writesBySource.TryGetValue(input.Destination, out Tuple<List<string>, Func<Task>> value))
{
if (writesBySource.TryGetValue(input.Destination, out Tuple<List<string>, Func<Task>> value))
{
// This output source was already seen so nest the previous write action in a new one
value.Item1.Add(input.Source.ToSafeDisplayString());
Func<Task> previousWrite = value.Item2;
value = new Tuple<List<string>, Func<Task>>(
value.Item1,
async () =>
// This output source was already seen so nest the previous write action in a new one
value.Item1.Add(input.Source.ToSafeDisplayString());
Func<Task> previousWrite = value.Item2;
value = new Tuple<List<string>, Func<Task>>(
value.Item1,
async () =>
{
await semaphore.WaitAsync();
try
{
await semaphore.WaitAsync();
try
{
// Complete the previous write, then do the next one
await previousWrite();
await WriteAsync(input, context, input.Destination);
}
finally
{
semaphore.Release();
}
});
}
else
{
value = new Tuple<List<string>, Func<Task>>(
new List<string> { input.Source.ToSafeDisplayString() },
async () =>
// Complete the previous write, then do the next one
await previousWrite();
await WriteAsync(input, context, input.Destination);
}
finally
{
await semaphore.WaitAsync();
try
{
await WriteAsync(input, context, input.Destination);
}
finally
{
semaphore.Release();
}
});
}
writesBySource[input.Destination] = value;
semaphore.Release();
}
});
}
else
{
value = new Tuple<List<string>, Func<Task>>(
new List<string> { input.Source.ToSafeDisplayString() },
async () =>
{
await semaphore.WaitAsync();
try
{
await WriteAsync(input, context, input.Destination);
}
finally
{
semaphore.Release();
}
});
}
writesBySource[input.Destination] = value;
}
}

Expand Down

0 comments on commit 2f97ee4

Please sign in to comment.