Skip to content

Commit

Permalink
Get testing running - Congestion control for observe
Browse files Browse the repository at this point in the history
Put in the needed changes to have congestion control for the observe option

Get all of the test running again.
  • Loading branch information
jimsch committed Mar 28, 2020
1 parent 0ff2418 commit f31b54b
Show file tree
Hide file tree
Showing 16 changed files with 658 additions and 336 deletions.
3 changes: 2 additions & 1 deletion CoAP.NET/CoAP.Std10.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ It is intented primarily for research and verification work.

1.8
- Internal cleanup work.

- Add congestion control for observe notifications
- Restore DTLS reliability checking
1.7
- Remove the media types that had been previously marked as obsolete.
- Restore the incorrect removal of reliability for DTLS on the server side.
Expand Down
9 changes: 9 additions & 0 deletions CoAP.NET/CoapConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.ComponentModel;
using System.Globalization;
using System.IO;
// ReSharper disable InconsistentNaming

namespace Com.AugustCellars.CoAP
{
Expand Down Expand Up @@ -49,7 +50,9 @@ public static ICoapConfig Default
}

private const int Default_HttpPort = 8080;

private const double Default_AckTimeoutScale = 2D;

private const int Default_MaxMessageSize = 1024;
private const int Default_BlockwiseStatusLifetime = 10 * 60 * 1000; // ms
private const bool Default_UseRandomIdStart = true;
Expand Down Expand Up @@ -124,6 +127,12 @@ public int MaxRetransmit
set => SetValue("MaxRetransmit", value);
}

public int NonTimeout
{
get => GetInt("NonTimeout", CoapConstants.AckTimeout);
set => SetValue("NonTimeout", value);
}

/// <inheritdoc/>
public int MaxMessageSize
{
Expand Down
11 changes: 8 additions & 3 deletions CoAP.NET/ICoapConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ public interface ICoapConfig : System.ComponentModel.INotifyPropertyChanged
int HttpPort { get; }

/// <summary>
/// Input to computing the resend intervolt
/// Input to computing the resend interval
/// </summary>
int AckTimeout { get; }

/// <summary>
/// Input to computing the resend intervolt
/// Input to computing the resend interval
/// </summary>
double AckRandomFactor { get; }

/// <summary>
/// Input to cmputing the resend intervolt
/// Input to computing the resend interval
/// </summary>
double AckTimeoutScale { get; }

Expand All @@ -61,6 +61,11 @@ public interface ICoapConfig : System.ComponentModel.INotifyPropertyChanged
/// </summary>
int MaxRetransmit { get; }

/// <summary>
/// Input to compute timeout interval for NON messages
/// </summary>
int NonTimeout { get; }

/// <summary>
/// Size of message to start blocking at
/// </summary>
Expand Down
74 changes: 23 additions & 51 deletions CoAP.NET/Net/Matcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,19 @@ class Matcher : IMatcher, IDisposable
/// </summary>
readonly IDictionary<Exchange.KeyID, Exchange> _exchangesByID
= new ConcurrentDictionary<Exchange.KeyID, Exchange>();

/// <summary>
/// for outgoing
/// </summary>
readonly IDictionary<Exchange.KeyToken, Exchange> _exchangesByToken
= new ConcurrentDictionary<Exchange.KeyToken, Exchange>();

/// <summary>
/// for blockwise
/// </summary>
readonly ConcurrentDictionary<Exchange.KeyUri, Exchange> _ongoingExchanges
= new ConcurrentDictionary<Exchange.KeyUri, Exchange>();

private Int32 _running;
private Int32 _currentID;
private IDeduplicator _deduplicator;
Expand All @@ -56,6 +59,7 @@ public Matcher(ICoapConfig config)
if (config.UseRandomIDStart) {
_currentID = new Random().Next(1 << 16);
}

_tokenLength = config.TokenLength;

config.PropertyChanged += PropertyChanged;
Expand Down Expand Up @@ -133,6 +137,7 @@ public void SendRequest(Exchange exchange, Request request)
_random.NextBytes(token);
keyToken = new Exchange.KeyToken(token);
} while (_exchangesByToken.ContainsKey(keyToken));

request.Token = token;
}
else {
Expand All @@ -141,7 +146,7 @@ public void SendRequest(Exchange exchange, Request request)

exchange.Completed += OnExchangeCompleted;


_Log.Debug(m => m("Stored open request by {0}, {1}", keyID, keyToken));

_exchangesByID[keyID] = exchange;
Expand All @@ -164,40 +169,25 @@ public void SendResponse(Exchange exchange, Response response)
* exchange and the ReliabilityLayer resends this response.
*/

// If this is a CON notification we now can forget all previous NON notifications
if (response.Type == MessageType.CON || response.Type == MessageType.ACK)
{
ObserveRelation relation = exchange.Relation;
if (relation != null)
{
RemoveNotificatoinsOf(relation);
}
}

// Blockwise transfers are identified by URI and remote endpoint
if (response.HasOption(OptionType.Block2))
{
if (response.HasOption(OptionType.Block2)) {
Request request = exchange.CurrentRequest;

Exchange.KeyUri keyUri = new Exchange.KeyUri(request, response.Destination);

// Observe notifications only send the first block, hence do not store them as ongoing
if (exchange.ResponseBlockStatus != null && !response.HasOption(OptionType.Observe))
{
if (exchange.ResponseBlockStatus != null && !response.HasOption(OptionType.Observe)) {
// Remember ongoing blockwise GET requests
if (Utils.Put(_ongoingExchanges, keyUri, exchange) == null)
{
if (Utils.Put(_ongoingExchanges, keyUri, exchange) == null) {
if (_Log.IsDebugEnabled)
_Log.Debug("Ongoing Block2 started late, storing " + keyUri + " for " + request);
}
else
{
else {
if (_Log.IsDebugEnabled)
_Log.Debug("Ongoing Block2 continued, storing " + keyUri + " for " + request);
}
}
else
{
else {
if (_Log.IsDebugEnabled)
_Log.Debug("Ongoing Block2 completed, cleaning up " + keyUri + " for " + request);
Exchange exc;
Expand All @@ -207,24 +197,21 @@ public void SendResponse(Exchange exchange, Response response)

// Insert CON and NON to match ACKs and RSTs to the exchange
// Do not insert ACKs and RSTs.
if (response.Type == MessageType.CON || response.Type == MessageType.NON)
{
if (response.Type == MessageType.CON || response.Type == MessageType.NON) {
Exchange.KeyID keyID = new Exchange.KeyID(response.ID, null, response.Session);
_exchangesByID[keyID] = exchange;
}

// Only CONs and Observe keep the exchange active
if (response.Type != MessageType.CON && response.Last)
{
if (response.Type != MessageType.CON && response.Last) {
exchange.Complete = true;
}
}

/// <inheritdoc/>
public void SendEmptyMessage(Exchange exchange, EmptyMessage message)
{
if (message.Type == MessageType.RST && exchange != null)
{
if (message.Type == MessageType.RST && exchange != null) {
// We have rejected the request or response
exchange.Complete = true;
}
Expand Down Expand Up @@ -264,6 +251,7 @@ public Exchange ReceiveRequest(Request request)
if (_Log.IsInfoEnabled) {
_Log.Info("Duplicate request: " + request);
}

request.Duplicate = true;
return previous;
}
Expand All @@ -282,6 +270,7 @@ public Exchange ReceiveRequest(Request request)
if (_Log.IsInfoEnabled) {
_Log.Info("Duplicate ongoing request: " + request);
}

request.Duplicate = true;
}
else {
Expand All @@ -291,9 +280,11 @@ public Exchange ReceiveRequest(Request request)
if (_Log.IsDebugEnabled) {
_Log.Debug("Ongoing exchange got new request, cleaning up " + keyId);
}

_exchangesByID.Remove(keyId);
}
}

return ongoing;
}
else {
Expand All @@ -312,6 +303,7 @@ public Exchange ReceiveRequest(Request request)
if (_Log.IsDebugEnabled) {
_Log.Debug("New ongoing request, storing " + keyUri + " for " + request);
}

exchange.Completed += OnExchangeCompleted;
_ongoingExchanges[keyUri] = exchange;
return exchange;
Expand All @@ -320,6 +312,7 @@ public Exchange ReceiveRequest(Request request)
if (_Log.IsInfoEnabled) {
_Log.Info("Duplicate initial request: " + request);
}

request.Duplicate = true;
return previous;
}
Expand Down Expand Up @@ -374,7 +367,7 @@ public Exchange ReceiveResponse(Response response)

if (response.Type == MessageType.ACK && exchange.CurrentRequest.ID != response.ID) {
// The token matches but not the MID. This is a response for an older exchange
_Log.Warn( m => m($"Possible MID reuse before lifetime end: {response.TokenString} expected MID {exchange.CurrentRequest.ID} but received {response.ID}"));
_Log.Warn(m => m($"Possible MID reuse before lifetime end: {response.TokenString} expected MID {exchange.CurrentRequest.ID} but received {response.ID}"));
}

return exchange;
Expand Down Expand Up @@ -405,15 +398,13 @@ public Exchange ReceiveEmptyMessage(EmptyMessage message)
// local namespace
Exchange.KeyID keyID = new Exchange.KeyID(message.ID, null, null);
Exchange exchange;
if (_exchangesByID.TryGetValue(keyID, out exchange))
{
if (_exchangesByID.TryGetValue(keyID, out exchange)) {
if (_Log.IsDebugEnabled)
_Log.Debug("Exchange got reply: Cleaning up " + keyID);
_exchangesByID.Remove(keyID);
return exchange;
}
else
{
else {
if (_Log.IsInfoEnabled)
_Log.Info("Ignoring unmatchable empty message from " + message.Source + ": " + message);
return null;
Expand All @@ -428,19 +419,6 @@ public void Dispose()
d.Dispose();
}

private void RemoveNotificatoinsOf(ObserveRelation relation)
{
if (_Log.IsDebugEnabled)
_Log.Debug("Remove all remaining NON-notifications of observe relation");

foreach (Response previous in relation.ClearNotifications())
{
// notifications are local MID namespace
Exchange.KeyID keyId = new Exchange.KeyID(previous.ID, null, null);
_exchangesByID.Remove(keyId);
}
}

private void OnExchangeCompleted(Object sender, EventArgs e)
{
Exchange exchange = (Exchange) sender;
Expand Down Expand Up @@ -485,12 +463,6 @@ private void OnExchangeCompleted(Object sender, EventArgs e)
Exchange exc;
_ongoingExchanges.TryRemove(uriKey, out exc);
}

// Remove all remaining NON-notifications if this exchange is an observe relation
ObserveRelation relation = exchange.Relation;
if (relation != null) {
RemoveNotificatoinsOf(relation);
}
}
}
}
Expand Down
26 changes: 12 additions & 14 deletions CoAP.NET/Observe/ObserveNotificationOrderer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace Com.AugustCellars.CoAP.Observe
public class ObserveNotificationOrderer
{
private readonly ICoapConfig _config;
private Int32 _number;
private static int _number;
private int _lastSeen;

public ObserveNotificationOrderer()
: this(null)
Expand All @@ -36,9 +37,9 @@ public ObserveNotificationOrderer(ICoapConfig config)
/// Gets a new observe option number.
/// </summary>
/// <returns>a new observe option number</returns>
public Int32 GetNextObserveNumber()
public int GetNextObserveNumber()
{
Int32 next = Interlocked.Increment(ref _number);
int next = Interlocked.Increment(ref _number);
while (next >= 1 << 24) {
Interlocked.CompareExchange(ref _number, 0, next);
next = Interlocked.Increment(ref _number);
Expand All @@ -49,10 +50,7 @@ public Int32 GetNextObserveNumber()
/// <summary>
/// Gets the current notification number.
/// </summary>
public Int32 Current
{
get => _number;
}
public int Current => _number;

public DateTime Timestamp { get; set; }

Expand All @@ -61,9 +59,9 @@ public Int32 Current
/// </summary>
/// <param name="response"></param>
/// <returns></returns>
public Boolean IsNew(Response response)
public bool IsNew(Response response)
{
Int32? obs = response.Observe;
int? obs = response.Observe;
if (!obs.HasValue) {
// this is a final response, e.g., error or proactive cancellation
return true;
Expand All @@ -75,14 +73,14 @@ public Boolean IsNew(Response response)
// We use the notation from the observe draft-08.
DateTime t1 = Timestamp;
DateTime t2 = DateTime.Now;
Int32 v1 = Current;
Int32 v2 = obs.Value;
Int64 notifMaxAge = (_config ?? CoapConfig.Default).NotificationMaxAge;
int v1 = _lastSeen;
int v2 = obs.Value;
long notifyMaxAge = (_config ?? CoapConfig.Default).NotificationMaxAge;
if ((v1 < v2) && (v2 - v1 < 1 << 23)
|| (v1 > v2) && (v1 - v2 > 1 << 23)
|| (t2 > t1.AddMilliseconds(notifMaxAge))) {
|| (t2 > t1.AddMilliseconds(notifyMaxAge))) {
Timestamp = t2;
_number = v2;
_lastSeen = v2;
return true;
}
else {
Expand Down
Loading

0 comments on commit f31b54b

Please sign in to comment.