Skip to content

Commit

Permalink
Merge pull request #94 from richardschneider/random-failures
Browse files Browse the repository at this point in the history
Random failures
  • Loading branch information
richardschneider authored May 2, 2019
2 parents 67bddd9 + 4917c09 commit 9da564d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 50 deletions.
6 changes: 5 additions & 1 deletion src/CoreApi/SwarmApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
using System.Linq;
using System.Collections.Concurrent;
using PeerTalk;
using Common.Logging;

namespace Ipfs.Engine.CoreApi
{
class SwarmApi : ISwarmApi
{
static ILog log = LogManager.GetLogger(typeof(SwarmApi));
IpfsEngine ipfs;

static MultiAddress[] defaultFilters = new MultiAddress[]
Expand Down Expand Up @@ -48,7 +50,9 @@ public SwarmApi(IpfsEngine ipfs)
public async Task ConnectAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken))
{
var swarm = await ipfs.SwarmService;
await swarm.ConnectAsync(address, cancel);
log.Debug($"Connecting to {address}");
var peer = await swarm.ConnectAsync(address, cancel);
log.Debug($"Connected to {peer.ConnectedAddress}");
}

public async Task DisconnectAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken))
Expand Down
106 changes: 65 additions & 41 deletions src/IpfsEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ void Init()
LocalPeer = new AsyncLazy<Peer>(async () =>
{
log.Debug("Building local peer");
var keyChain = await KeyChain();
var keyChain = await KeyChain().ConfigureAwait(false);
log.Debug("Getting key info about self");
var self = await keyChain.FindKeyByNameAsync("self");
var self = await keyChain.FindKeyByNameAsync("self").ConfigureAwait(false);
var localPeer = new Peer
{
Id = self.Id,
PublicKey = await keyChain.GetPublicKeyAsync("self"),
PublicKey = await keyChain.GetPublicKeyAsync("self").ConfigureAwait(false),
ProtocolVersion = "ipfs/0.1.0"
};
var version = typeof(IpfsEngine).GetTypeInfo().Assembly.GetName().Version;
Expand All @@ -149,9 +149,9 @@ void Init()
}
}
}
var peer = await LocalPeer;
var keyChain = await KeyChain();
var self = await keyChain.GetPrivateKeyAsync("self");
var peer = await LocalPeer.ConfigureAwait(false);
var keyChain = await KeyChain().ConfigureAwait(false);
var self = await keyChain.GetPrivateKeyAsync("self").ConfigureAwait(false);
var swarm = new Swarm
{
LocalPeer = peer,
Expand All @@ -160,6 +160,8 @@ void Init()
? null
: new Psk1Protector { Key = Options.Swarm.PrivateNetworkKey }
};
if (Options.Swarm.PrivateNetworkKey != null)
log.Debug($"Private network {Options.Swarm.PrivateNetworkKey.Fingerprint().ToHexString()}");

log.Debug("Built swarm service");
return swarm;
Expand All @@ -169,7 +171,7 @@ void Init()
log.Debug("Building bitswap service");
var bitswap = new BlockExchange.Bitswap
{
Swarm = await SwarmService,
Swarm = await SwarmService.ConfigureAwait(false),
BlockService = Block
};
log.Debug("Built bitswap service");
Expand All @@ -180,7 +182,7 @@ void Init()
log.Debug("Building DHT service");
var dht = new PeerTalk.Routing.Dht1
{
Swarm = await SwarmService
Swarm = await SwarmService.ConfigureAwait(false)
};
dht.Swarm.Router = dht;
log.Debug("Built DHT service");
Expand Down Expand Up @@ -266,13 +268,13 @@ void Init()
}
}

await keyChain.SetPassphraseAsync(passphrase, cancel);
await keyChain.SetPassphraseAsync(passphrase, cancel).ConfigureAwait(false);

// Maybe create "self" key, this is the local peer's id.
var self = await keyChain.FindKeyByNameAsync("self", cancel);
var self = await keyChain.FindKeyByNameAsync("self", cancel).ConfigureAwait(false);
if (self == null)
{
self = await keyChain.CreateAsync("self", null, 0, cancel);
self = await keyChain.CreateAsync("self", null, 0, cancel).ConfigureAwait(false);
}
}
return keyChain;
Expand Down Expand Up @@ -304,7 +306,7 @@ void Init()
/// </exception>
public async Task<Cid> ResolveIpfsPathToCidAsync (string path, CancellationToken cancel = default(CancellationToken))
{
var r = await Generic.ResolveAsync(path, true, cancel);
var r = await Generic.ResolveAsync(path, true, cancel).ConfigureAwait(false);
return Cid.Decode(r.Remove(0, 6)); // strip '/ipfs/'.
}

Expand All @@ -328,45 +330,45 @@ public async Task StartAsync()
throw new Exception("IPFS engine is already started.");
}

var localPeer = await LocalPeer;
var localPeer = await LocalPeer.ConfigureAwait(false);
log.Debug("starting " + localPeer.Id);

// Everybody needs the swarm.
var swarm = await SwarmService;
var swarm = await SwarmService.ConfigureAwait(false);
stopTasks.Add(async () =>
{
await swarm.StopAsync();
await swarm.StopAsync().ConfigureAwait(false);
});
await swarm.StartAsync();
await swarm.StartAsync().ConfigureAwait(false);

// Start the primary services.
var tasks = new List<Func<Task>>
{
async () =>
{
var bitswap = await BitswapService;
stopTasks.Add(async () => await bitswap.StopAsync());
await bitswap.StartAsync();
var bitswap = await BitswapService.ConfigureAwait(false);
stopTasks.Add(async () => await bitswap.StopAsync().ConfigureAwait(false));
await bitswap.StartAsync().ConfigureAwait(false);
},
async () =>
{
var dht = await DhtService;
stopTasks.Add(async () => await dht.StopAsync());
await dht.StartAsync();
var dht = await DhtService.ConfigureAwait(false);
stopTasks.Add(async () => await dht.StopAsync().ConfigureAwait(false));
await dht.StartAsync().ConfigureAwait(false);
},
};

log.Debug("waiting for services to start");
await Task.WhenAll(tasks.Select(t => t()));
await Task.WhenAll(tasks.Select(t => t())).ConfigureAwait(false);

// Starting listening to the swarm.
var json = await Config.GetAsync("Addresses.Swarm");
var json = await Config.GetAsync("Addresses.Swarm").ConfigureAwait(false);
var numberListeners = 0;
foreach (string a in json)
{
try
{
await swarm.StartListeningAsync(a);
await swarm.StartListeningAsync(a).ConfigureAwait(false);
++numberListeners;
}
catch (Exception e)
Expand Down Expand Up @@ -408,8 +410,8 @@ public async Task StartAsync()
Addresses = await this.Bootstrap.ListAsync()
};
bootstrap.PeerDiscovered += OnPeerDiscovered;
stopTasks.Add(async () => await bootstrap.StopAsync());
await bootstrap.StartAsync();
stopTasks.Add(async () => await bootstrap.StopAsync().ConfigureAwait(false));
await bootstrap.StartAsync().ConfigureAwait(false);
},
async () =>
{
Expand All @@ -421,8 +423,8 @@ public async Task StartAsync()
MulticastService = multicast
};
mdns.PeerDiscovered += OnPeerDiscovered;
stopTasks.Add(async () => await mdns.StopAsync());
await mdns.StartAsync();
stopTasks.Add(async () => await mdns.StopAsync().ConfigureAwait(false));
await mdns.StartAsync().ConfigureAwait(false);
},
async () =>
{
Expand All @@ -434,8 +436,8 @@ public async Task StartAsync()
MulticastService = multicast
};
mdns.PeerDiscovered += OnPeerDiscovered;
stopTasks.Add(async () => await mdns.StopAsync());
await mdns.StartAsync();
stopTasks.Add(async () => await mdns.StopAsync().ConfigureAwait(false));
await mdns.StartAsync().ConfigureAwait(false);
},
async () =>
{
Expand All @@ -447,12 +449,12 @@ public async Task StartAsync()
MulticastService = multicast
};
mdns.PeerDiscovered += OnPeerDiscovered;
stopTasks.Add(async () => await mdns.StopAsync());
await mdns.StartAsync();
stopTasks.Add(async () => await mdns.StopAsync().ConfigureAwait(false));
await mdns.StartAsync().ConfigureAwait(false);
},
};
log.Debug("waiting for discovery services to start");
await Task.WhenAll(tasks.Select(t => t()));
await Task.WhenAll(tasks.Select(t => t())).ConfigureAwait(false);

multicast?.Start();

Expand All @@ -475,7 +477,7 @@ public async Task StopAsync()
{
var tasks = stopTasks.ToArray();
stopTasks = new ConcurrentBag<Func<Task>>();
await Task.WhenAll(tasks.Select(t => t()));
await Task.WhenAll(tasks.Select(t => t())).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -485,11 +487,33 @@ public async Task StopAsync()
// Many services use cancellation to stop. A cancellation may not run
// immediately, so we need to give them some.
// TODO: Would be nice to make this deterministic.
await Task.Delay(TimeSpan.FromMilliseconds(100));
await Task.Delay(TimeSpan.FromMilliseconds(100)).ConfigureAwait(false);

log.Debug("stopped");
}

/// <summary>
/// A synchronous start.
/// </summary>
/// <remarks>
/// Calls <see cref="StartAsync"/> and waits for it to complete.
/// </remarks>
public void Start()
{
StartAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

/// <summary>
/// A synchronous stop.
/// </summary>
/// <remarks>
/// Calls <see cref="StopAsync"/> and waits for it to complete.
/// </remarks>
public void Stop()
{
StopAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

/// <summary>
/// Manages communication with other peers.
/// </summary>
Expand Down Expand Up @@ -517,8 +541,8 @@ async void OnPeerDiscovered(object sender, PeerDiscoveredEventArgs e)
{
try
{
var swarm = await SwarmService;
var peer = await swarm.RegisterPeerAsync(e.Address);
var swarm = await SwarmService.ConfigureAwait(false);
var peer = await swarm.RegisterPeerAsync(e.Address).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -541,13 +565,13 @@ protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
disposedValue = true;

if (disposing)
{
passphrase?.Dispose();
StopAsync().Wait();
Stop();
}

disposedValue = true;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/IpfsEngine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<PackageReference Include="Makaretu.Dns.Unicast" Version="0.8.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.0.0" />
<PackageReference Include="PeerTalk" Version="0.8.0" />
<PackageReference Include="PeerTalk" Version="0.9.0" />
<PackageReference Include="PeterO.Cbor" Version="3.1.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.8.5" />
<PackageReference Include="protobuf-net" Version="2.4.0" />
Expand Down
6 changes: 3 additions & 3 deletions src/SwarmOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ public class SwarmOptions
/// The low water mark for peer connections.
/// </summary>
/// <value>
/// Defaults to 16.
/// Defaults to 0.
/// </value>
/// <remarks>
/// The <see cref="PeerTalk.AutoDialer"/> is used to maintain at
/// least this number of connections.
/// <para>
/// Setting this to zero will disable the auto dial feature.
/// This is an opt-feature. The value must be positive to enable it.
/// </para>
/// </remarks>
public int MinConnections { get; set; } = 16;
public int MinConnections { get; set; } = 0;

}
}
6 changes: 4 additions & 2 deletions test/CoreApi/FileSystemApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ public async Task AddEmpty_Check_Object()
public async Task AddDuplicateWithPin()
{
var ipfs = TestFixture.Ipfs;
var options = new AddFileOptions();
options.Pin = true;
var options = new AddFileOptions
{
Pin = true
};
var node = await ipfs.FileSystem.AddTextAsync("hello world", options);
Assert.AreEqual("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", (string)node.Id);
var pins = await ipfs.Pin.ListAsync();
Expand Down
2 changes: 1 addition & 1 deletion test/CoreApi/SwarmApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Ipfs.Engine
public class SwarmApiTest
{
IpfsEngine ipfs = TestFixture.Ipfs;
MultiAddress somewhere = "/ip4/127.0.0.1";
readonly MultiAddress somewhere = "/ip4/127.0.0.1";

[TestMethod]
public async Task Filter_Add_Remove()
Expand Down
2 changes: 1 addition & 1 deletion test/SwarmOptionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void Defaults()
{
var options = new SwarmOptions();
Assert.IsNull(options.PrivateNetworkKey);
Assert.AreNotEqual(0, options.MinConnections);
Assert.AreEqual(0, options.MinConnections);
}

}
Expand Down

0 comments on commit 9da564d

Please sign in to comment.