Skip to content

Commit

Permalink
Merge pull request #95 from richardschneider/configure-await
Browse files Browse the repository at this point in the history
avoid SynchronizationContext
  • Loading branch information
richardschneider authored May 3, 2019
2 parents 9da564d + 8427aab commit 5bae954
Show file tree
Hide file tree
Showing 27 changed files with 271 additions and 215 deletions.
6 changes: 3 additions & 3 deletions src/BlockExchange/Bitswap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
{
// There is a race condition between getting the remote identity and
// the remote sending the first wantlist.
await connection.IdentityEstablished.Task;
await connection.IdentityEstablished.Task.ConfigureAwait(false);

// Fire and forget.
var _ = SendWantListAsync(connection.RemotePeer);
Expand Down Expand Up @@ -334,9 +334,9 @@ async Task SendWantListAsync(Peer peer, IEnumerable<WantedBlock> wants, bool ful
{
try
{
using (var stream = await Swarm.DialAsync(peer, protocol.ToString()))
using (var stream = await Swarm.DialAsync(peer, protocol.ToString()).ConfigureAwait(false))
{
await protocol.SendWantsAsync(stream, wants, full: full);
await protocol.SendWantsAsync(stream, wants, full: full).ConfigureAwait(false);
}
return;
}
Expand Down
20 changes: 10 additions & 10 deletions src/BlockExchange/Bitswap1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public override string ToString()
/// <inheritdoc />
public async Task ProcessMessageAsync(PeerConnection connection, Stream stream, CancellationToken cancel = default(CancellationToken))
{
var request = await ProtoBufHelper.ReadMessageAsync<Message>(stream, cancel);
var request = await ProtoBufHelper.ReadMessageAsync<Message>(stream, cancel).ConfigureAwait(false);

// There is a race condition between getting the remote identity and
// the remote sending the first wantlist.
await connection.IdentityEstablished.Task;
await connection.IdentityEstablished.Task.ConfigureAwait(false);

log.Debug($"got message from {connection.RemotePeer}");

Expand Down Expand Up @@ -82,7 +82,7 @@ public override string ToString()
{
++Bitswap.BlocksReceived;
Bitswap.DataReceived += (ulong)sentBlock.Length;
await Bitswap.BlockService.PutAsync(sentBlock, pin: false);
await Bitswap.BlockService.PutAsync(sentBlock, pin: false).ConfigureAwait(false);
// TODO: Detect if duplicate and update stats
}
}
Expand All @@ -94,19 +94,19 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
try
{
IDataBlock block;
if (null != await Bitswap.BlockService.StatAsync(cid, cancel))
if (null != await Bitswap.BlockService.StatAsync(cid, cancel).ConfigureAwait(false))
{
block = await Bitswap.BlockService.GetAsync(cid, cancel);
block = await Bitswap.BlockService.GetAsync(cid, cancel).ConfigureAwait(false);
}
else
{
block = await Bitswap.Want(cid, remotePeer.Id, cancel);
block = await Bitswap.Want(cid, remotePeer.Id, cancel).ConfigureAwait(false);
}

// Send block to remote.
using (var stream = await Bitswap.Swarm.DialAsync(remotePeer, this.ToString()))
using (var stream = await Bitswap.Swarm.DialAsync(remotePeer, this.ToString()).ConfigureAwait(false))
{
await SendAsync(stream, block, cancel);
await SendAsync(stream, block, cancel).ConfigureAwait(false);
}

}
Expand Down Expand Up @@ -142,7 +142,7 @@ public async Task SendWantsAsync(
};

ProtoBuf.Serializer.SerializeWithLengthPrefix<Message>(stream, message, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
await stream.FlushAsync(cancel).ConfigureAwait(false);
}

internal async Task SendAsync(
Expand All @@ -164,7 +164,7 @@ internal async Task SendAsync(
};

ProtoBuf.Serializer.SerializeWithLengthPrefix<Message>(stream, message, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
await stream.FlushAsync(cancel).ConfigureAwait(false);
}

[ProtoContract]
Expand Down
20 changes: 10 additions & 10 deletions src/BlockExchange/Bitswap11.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public override string ToString()
{
// There is a race condition between getting the remote identity and
// the remote sending the first wantlist.
await connection.IdentityEstablished.Task;
await connection.IdentityEstablished.Task.ConfigureAwait(false);

while (true)
{
var request = await ProtoBufHelper.ReadMessageAsync<Message>(stream, cancel);
var request = await ProtoBufHelper.ReadMessageAsync<Message>(stream, cancel).ConfigureAwait(false);

// Process want list
if (request.wantlist != null && request.wantlist.entries != null)
Expand Down Expand Up @@ -89,7 +89,7 @@ await Bitswap.BlockService.PutAsync(
data: sentBlock.data,
contentType: contentType,
multiHash: multiHash,
pin: false);
pin: false).ConfigureAwait(false);
// TODO: Detect if duplicate and update stats
}
}
Expand All @@ -103,19 +103,19 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
try
{
IDataBlock block;
if (null != await Bitswap.BlockService.StatAsync(cid, cancel))
if (null != await Bitswap.BlockService.StatAsync(cid, cancel).ConfigureAwait(false))
{
block = await Bitswap.BlockService.GetAsync(cid, cancel);
block = await Bitswap.BlockService.GetAsync(cid, cancel).ConfigureAwait(false);
}
else
{
block = await Bitswap.Want(cid, remotePeer.Id, cancel);
block = await Bitswap.Want(cid, remotePeer.Id, cancel).ConfigureAwait(false);
}

// Send block to remote.
using (var stream = await Bitswap.Swarm.DialAsync(remotePeer, this.ToString()))
using (var stream = await Bitswap.Swarm.DialAsync(remotePeer, this.ToString()).ConfigureAwait(false))
{
await SendAsync(stream, block, cancel);
await SendAsync(stream, block, cancel).ConfigureAwait(false);
}

}
Expand Down Expand Up @@ -156,7 +156,7 @@ public async Task SendWantsAsync(
};

ProtoBuf.Serializer.SerializeWithLengthPrefix<Message>(stream, message, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
await stream.FlushAsync(cancel).ConfigureAwait(false);
}

internal async Task SendAsync(
Expand All @@ -181,7 +181,7 @@ internal async Task SendAsync(
};

ProtoBuf.Serializer.SerializeWithLengthPrefix<Message>(stream, message, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
await stream.FlushAsync(cancel).ConfigureAwait(false);
}

/// <summary>
Expand Down
13 changes: 7 additions & 6 deletions src/CoreApi/BitswapApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@ public BitswapApi(IpfsEngine ipfs)

public async Task<IDataBlock> GetAsync(Cid id, CancellationToken cancel = default(CancellationToken))
{
var bs = await ipfs.BitswapService;
var peer = await ipfs.LocalPeer;
return await bs.Want(id, peer.Id, cancel);
var bs = await ipfs.BitswapService.ConfigureAwait(false);
var peer = await ipfs.LocalPeer.ConfigureAwait(false);
return await bs.Want(id, peer.Id, cancel).ConfigureAwait(false);
}

public async Task UnwantAsync(Cid id, CancellationToken cancel = default(CancellationToken))
{
(await ipfs.BitswapService).Unwant(id);
(await ipfs.BitswapService.ConfigureAwait(false)).Unwant(id);
}

public async Task<IEnumerable<Cid>> WantsAsync(MultiHash peer = null, CancellationToken cancel = default(CancellationToken))
{
if (peer == null)
{
peer = (await ipfs.LocalPeer).Id;
peer = (await ipfs.LocalPeer.ConfigureAwait(false)).Id;
}
return (await ipfs.BitswapService).PeerWants(peer);
var bs = await ipfs.BitswapService.ConfigureAwait(false);
return bs.PeerWants(peer);
}
}
}
38 changes: 19 additions & 19 deletions src/CoreApi/BlockApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ FileStore<Cid, DataBlock> Store
KeyToName = (key) => new MultiHash(key.FromBase32()),
Serialize = async (stream, cid, block, cancel) =>
{
await stream.WriteAsync(block.DataBytes, 0, block.DataBytes.Length, cancel);
await stream.WriteAsync(block.DataBytes, 0, block.DataBytes.Length, cancel).ConfigureAwait(false);
},
Deserialize = async (stream, cid, cancel) =>
{
Expand All @@ -78,7 +78,7 @@ FileStore<Cid, DataBlock> Store
block.DataBytes = new byte[block.Size];
for (int i = 0, n; i < block.Size; i += n)
{
n = await stream.ReadAsync(block.DataBytes, i, (int)block.Size - i, cancel);
n = await stream.ReadAsync(block.DataBytes, i, (int)block.Size - i, cancel).ConfigureAwait(false);
}
return block;
}
Expand Down Expand Up @@ -109,7 +109,7 @@ FileStore<Cid, DataBlock> Store
}

// Check the local filesystem for the block.
var block = await Store.TryGetAsync(id, cancel);
var block = await Store.TryGetAsync(id, cancel).ConfigureAwait(false);
if (block != null)
{
return block;
Expand All @@ -121,13 +121,13 @@ FileStore<Cid, DataBlock> Store
// then send the block to us via bitswap and the get task will finish.
using (var queryCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel))
{
var bitswapGet = ipfs.Bitswap.GetAsync(id, queryCancel.Token);
var bitswapGet = ipfs.Bitswap.GetAsync(id, queryCancel.Token).ConfigureAwait(false);
var dht = await ipfs.DhtService;
var _ = dht.FindProvidersAsync(
id: id,
limit: 20, // TODO: remove this
cancel: queryCancel.Token,
action: (peer) => { var __ = ProviderFound(peer, queryCancel.Token); }
action: (peer) => { var __ = ProviderFound(peer, queryCancel.Token).ConfigureAwait(false); }
);

var got = await bitswapGet;
Expand All @@ -144,10 +144,10 @@ async Task ProviderFound(Peer peer, CancellationToken cancel)
return;

log.Debug($"Connecting to provider {peer.Id}");
var swarm = await ipfs.SwarmService;
var swarm = await ipfs.SwarmService.ConfigureAwait(false);
try
{
await swarm.ConnectAsync(peer, cancel);
await swarm.ConnectAsync(peer, cancel).ConfigureAwait(false);
}
catch (Exception e)
{
Expand Down Expand Up @@ -190,27 +190,27 @@ public async Task<Cid> PutAsync(
Id = cid,
Size = data.Length
};
if (await Store.ExistsAsync(cid))
if (await Store.ExistsAsync(cid).ConfigureAwait(false))
{
log.DebugFormat("Block '{0}' already present", cid);
}
else
{
await Store.PutAsync(cid, block, cancel);
await Store.PutAsync(cid, block, cancel).ConfigureAwait(false);
log.DebugFormat("Added block '{0}'", cid);
}

// Inform the Bitswap service.
(await ipfs.BitswapService).Found(block);
(await ipfs.BitswapService.ConfigureAwait(false)).Found(block);

// To pin or not.
if (pin)
{
await ipfs.Pin.AddAsync(cid, recursive: false, cancel: cancel);
await ipfs.Pin.AddAsync(cid, recursive: false, cancel: cancel).ConfigureAwait(false);
}
else
{
await ipfs.Pin.RemoveAsync(cid, recursive: false, cancel: cancel);
await ipfs.Pin.RemoveAsync(cid, recursive: false, cancel: cancel).ConfigureAwait(false);
}

return cid;
Expand All @@ -226,8 +226,8 @@ public async Task<Cid> PutAsync(
{
using (var ms = new MemoryStream())
{
await data.CopyToAsync(ms);
return await PutAsync(ms.ToArray(), contentType, multiHash, encoding, pin, cancel);
await data.CopyToAsync(ms).ConfigureAwait(false);
return await PutAsync(ms.ToArray(), contentType, multiHash, encoding, pin, cancel).ConfigureAwait(false);
}
}

Expand All @@ -237,10 +237,10 @@ public async Task<Cid> PutAsync(
{
return id;
}
if (await Store.ExistsAsync(id, cancel))
if (await Store.ExistsAsync(id, cancel).ConfigureAwait(false))
{
await Store.RemoveAsync(id, cancel);
await ipfs.Pin.RemoveAsync(id, recursive: false, cancel: cancel);
await Store.RemoveAsync(id, cancel).ConfigureAwait(false);
await ipfs.Pin.RemoveAsync(id, recursive: false, cancel: cancel).ConfigureAwait(false);
return id;
}
if (ignoreNonexistent) return null;
Expand All @@ -251,11 +251,11 @@ public async Task<Cid> PutAsync(
{
if (id.Hash.IsIdentityHash)
{
return await GetAsync(id, cancel);
return await GetAsync(id, cancel).ConfigureAwait(false);
}

IDataBlock block = null;
var length = await Store.LengthAsync(id, cancel);
var length = await Store.LengthAsync(id, cancel).ConfigureAwait(false);
if (length.HasValue)
{
block = new DataBlock
Expand Down
14 changes: 7 additions & 7 deletions src/CoreApi/BootstrapApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ public BootstrapApi(IpfsEngine ipfs)
// Throw if missing peer ID
var _ = address.PeerId;

var addrs = (await ListAsync(cancel)).ToList();
var addrs = (await ListAsync(cancel).ConfigureAwait(false)).ToList();
if (addrs.Any(a => a == address))
return address;

addrs.Add(address);
var strings = addrs.Select(a => a.ToString());
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel);
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel).ConfigureAwait(false);
return address;
}

public async Task<IEnumerable<MultiAddress>> AddDefaultsAsync(CancellationToken cancel = default(CancellationToken))
{
foreach (var a in defaults)
{
await AddAsync(a, cancel);
await AddAsync(a, cancel).ConfigureAwait(false);
}

return defaults;
Expand All @@ -77,25 +77,25 @@ public BootstrapApi(IpfsEngine ipfs)
catch (KeyNotFoundException)
{
var strings = defaults.Select(a => a.ToString());
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel);
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel).ConfigureAwait(false);
return defaults;
}
}

public async Task RemoveAllAsync(CancellationToken cancel = default(CancellationToken))
{
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(new string[0]), cancel);
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(new string[0]), cancel).ConfigureAwait(false);
}

public async Task<MultiAddress> RemoveAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken))
{
var addrs = (await ListAsync(cancel)).ToList();
var addrs = (await ListAsync(cancel).ConfigureAwait(false)).ToList();
if (!addrs.Any(a => a == address))
return address;

addrs.Remove(address);
var strings = addrs.Select(a => a.ToString());
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel);
await ipfs.Config.SetAsync("Bootstrap", JToken.FromObject(strings), cancel).ConfigureAwait(false);
return address;
}
}
Expand Down
Loading

0 comments on commit 5bae954

Please sign in to comment.