From f31b54b58ab2b516ea1cf41ab323aa49dc362c9d Mon Sep 17 00:00:00 2001 From: Jim Schaad Date: Sat, 28 Mar 2020 15:55:10 -0700 Subject: [PATCH] Get testing running - Congestion control for observe Put in the needed changes to have congestion control for the observe option Get all of the test running again. --- CoAP.NET/CoAP.Std10.csproj | 3 +- CoAP.NET/CoapConfig.cs | 9 + CoAP.NET/ICoapConfig.cs | 11 +- CoAP.NET/Net/Matcher.cs | 74 +-- .../Observe/ObserveNotificationOrderer.cs | 26 +- CoAP.NET/Observe/ObserveRelation.cs | 84 +--- CoAP.NET/Observe/ObservingEndpoint.cs | 11 +- CoAP.NET/Stack/AbstractLayer.cs | 4 +- CoAP.NET/Stack/ILayer.cs | 6 +- CoAP.NET/Stack/LayerStack.cs | 7 +- CoAP.NET/Stack/ObserveLayer.cs | 204 ++++----- CoAP.NET/Stack/ReliabilityLayer.cs | 108 ++--- CoAP.Std10.sln.DotSettings | 3 + CoAP.Test/CoapClientTest.cs | 6 +- CoAP.Test/Observe/ObserveRelation.cs | 12 + CoAP.Test/Observe/ObserveTests.cs | 426 ++++++++++++++++++ 16 files changed, 658 insertions(+), 336 deletions(-) create mode 100644 CoAP.Std10.sln.DotSettings create mode 100644 CoAP.Test/Observe/ObserveRelation.cs create mode 100644 CoAP.Test/Observe/ObserveTests.cs diff --git a/CoAP.NET/CoAP.Std10.csproj b/CoAP.NET/CoAP.Std10.csproj index 60992a7..9f7cf7c 100644 --- a/CoAP.NET/CoAP.Std10.csproj +++ b/CoAP.NET/CoAP.Std10.csproj @@ -25,7 +25,8 @@ It is intented primarily for research and verification work. 1.8 - Internal cleanup work. - + - Add congestion control for observe notifications + - Restore DTLS reliability checking 1.7 - Remove the media types that had been previously marked as obsolete. - Restore the incorrect removal of reliability for DTLS on the server side. diff --git a/CoAP.NET/CoapConfig.cs b/CoAP.NET/CoapConfig.cs index e7f31d6..ad31570 100644 --- a/CoAP.NET/CoapConfig.cs +++ b/CoAP.NET/CoapConfig.cs @@ -16,6 +16,7 @@ using System.ComponentModel; using System.Globalization; using System.IO; +// ReSharper disable InconsistentNaming namespace Com.AugustCellars.CoAP { @@ -49,7 +50,9 @@ public static ICoapConfig Default } private const int Default_HttpPort = 8080; + private const double Default_AckTimeoutScale = 2D; + private const int Default_MaxMessageSize = 1024; private const int Default_BlockwiseStatusLifetime = 10 * 60 * 1000; // ms private const bool Default_UseRandomIdStart = true; @@ -124,6 +127,12 @@ public int MaxRetransmit set => SetValue("MaxRetransmit", value); } + public int NonTimeout + { + get => GetInt("NonTimeout", CoapConstants.AckTimeout); + set => SetValue("NonTimeout", value); + } + /// public int MaxMessageSize { diff --git a/CoAP.NET/ICoapConfig.cs b/CoAP.NET/ICoapConfig.cs index a4dbbf0..cb7fc2a 100644 --- a/CoAP.NET/ICoapConfig.cs +++ b/CoAP.NET/ICoapConfig.cs @@ -42,17 +42,17 @@ public interface ICoapConfig : System.ComponentModel.INotifyPropertyChanged int HttpPort { get; } /// - /// Input to computing the resend intervolt + /// Input to computing the resend interval /// int AckTimeout { get; } /// - /// Input to computing the resend intervolt + /// Input to computing the resend interval /// double AckRandomFactor { get; } /// - /// Input to cmputing the resend intervolt + /// Input to computing the resend interval /// double AckTimeoutScale { get; } @@ -61,6 +61,11 @@ public interface ICoapConfig : System.ComponentModel.INotifyPropertyChanged /// int MaxRetransmit { get; } + /// + /// Input to compute timeout interval for NON messages + /// + int NonTimeout { get; } + /// /// Size of message to start blocking at /// diff --git a/CoAP.NET/Net/Matcher.cs b/CoAP.NET/Net/Matcher.cs index b125dfe..aa6b708 100644 --- a/CoAP.NET/Net/Matcher.cs +++ b/CoAP.NET/Net/Matcher.cs @@ -34,16 +34,19 @@ class Matcher : IMatcher, IDisposable /// readonly IDictionary _exchangesByID = new ConcurrentDictionary(); + /// /// for outgoing /// readonly IDictionary _exchangesByToken = new ConcurrentDictionary(); + /// /// for blockwise /// readonly ConcurrentDictionary _ongoingExchanges = new ConcurrentDictionary(); + private Int32 _running; private Int32 _currentID; private IDeduplicator _deduplicator; @@ -56,6 +59,7 @@ public Matcher(ICoapConfig config) if (config.UseRandomIDStart) { _currentID = new Random().Next(1 << 16); } + _tokenLength = config.TokenLength; config.PropertyChanged += PropertyChanged; @@ -133,6 +137,7 @@ public void SendRequest(Exchange exchange, Request request) _random.NextBytes(token); keyToken = new Exchange.KeyToken(token); } while (_exchangesByToken.ContainsKey(keyToken)); + request.Token = token; } else { @@ -141,7 +146,7 @@ public void SendRequest(Exchange exchange, Request request) exchange.Completed += OnExchangeCompleted; - + _Log.Debug(m => m("Stored open request by {0}, {1}", keyID, keyToken)); _exchangesByID[keyID] = exchange; @@ -164,40 +169,25 @@ public void SendResponse(Exchange exchange, Response response) * exchange and the ReliabilityLayer resends this response. */ - // If this is a CON notification we now can forget all previous NON notifications - if (response.Type == MessageType.CON || response.Type == MessageType.ACK) - { - ObserveRelation relation = exchange.Relation; - if (relation != null) - { - RemoveNotificatoinsOf(relation); - } - } - // Blockwise transfers are identified by URI and remote endpoint - if (response.HasOption(OptionType.Block2)) - { + if (response.HasOption(OptionType.Block2)) { Request request = exchange.CurrentRequest; Exchange.KeyUri keyUri = new Exchange.KeyUri(request, response.Destination); // Observe notifications only send the first block, hence do not store them as ongoing - if (exchange.ResponseBlockStatus != null && !response.HasOption(OptionType.Observe)) - { + if (exchange.ResponseBlockStatus != null && !response.HasOption(OptionType.Observe)) { // Remember ongoing blockwise GET requests - if (Utils.Put(_ongoingExchanges, keyUri, exchange) == null) - { + if (Utils.Put(_ongoingExchanges, keyUri, exchange) == null) { if (_Log.IsDebugEnabled) _Log.Debug("Ongoing Block2 started late, storing " + keyUri + " for " + request); } - else - { + else { if (_Log.IsDebugEnabled) _Log.Debug("Ongoing Block2 continued, storing " + keyUri + " for " + request); } } - else - { + else { if (_Log.IsDebugEnabled) _Log.Debug("Ongoing Block2 completed, cleaning up " + keyUri + " for " + request); Exchange exc; @@ -207,15 +197,13 @@ public void SendResponse(Exchange exchange, Response response) // Insert CON and NON to match ACKs and RSTs to the exchange // Do not insert ACKs and RSTs. - if (response.Type == MessageType.CON || response.Type == MessageType.NON) - { + if (response.Type == MessageType.CON || response.Type == MessageType.NON) { Exchange.KeyID keyID = new Exchange.KeyID(response.ID, null, response.Session); _exchangesByID[keyID] = exchange; } // Only CONs and Observe keep the exchange active - if (response.Type != MessageType.CON && response.Last) - { + if (response.Type != MessageType.CON && response.Last) { exchange.Complete = true; } } @@ -223,8 +211,7 @@ public void SendResponse(Exchange exchange, Response response) /// public void SendEmptyMessage(Exchange exchange, EmptyMessage message) { - if (message.Type == MessageType.RST && exchange != null) - { + if (message.Type == MessageType.RST && exchange != null) { // We have rejected the request or response exchange.Complete = true; } @@ -264,6 +251,7 @@ public Exchange ReceiveRequest(Request request) if (_Log.IsInfoEnabled) { _Log.Info("Duplicate request: " + request); } + request.Duplicate = true; return previous; } @@ -282,6 +270,7 @@ public Exchange ReceiveRequest(Request request) if (_Log.IsInfoEnabled) { _Log.Info("Duplicate ongoing request: " + request); } + request.Duplicate = true; } else { @@ -291,9 +280,11 @@ public Exchange ReceiveRequest(Request request) if (_Log.IsDebugEnabled) { _Log.Debug("Ongoing exchange got new request, cleaning up " + keyId); } + _exchangesByID.Remove(keyId); } } + return ongoing; } else { @@ -312,6 +303,7 @@ public Exchange ReceiveRequest(Request request) if (_Log.IsDebugEnabled) { _Log.Debug("New ongoing request, storing " + keyUri + " for " + request); } + exchange.Completed += OnExchangeCompleted; _ongoingExchanges[keyUri] = exchange; return exchange; @@ -320,6 +312,7 @@ public Exchange ReceiveRequest(Request request) if (_Log.IsInfoEnabled) { _Log.Info("Duplicate initial request: " + request); } + request.Duplicate = true; return previous; } @@ -374,7 +367,7 @@ public Exchange ReceiveResponse(Response response) if (response.Type == MessageType.ACK && exchange.CurrentRequest.ID != response.ID) { // The token matches but not the MID. This is a response for an older exchange - _Log.Warn( m => m($"Possible MID reuse before lifetime end: {response.TokenString} expected MID {exchange.CurrentRequest.ID} but received {response.ID}")); + _Log.Warn(m => m($"Possible MID reuse before lifetime end: {response.TokenString} expected MID {exchange.CurrentRequest.ID} but received {response.ID}")); } return exchange; @@ -405,15 +398,13 @@ public Exchange ReceiveEmptyMessage(EmptyMessage message) // local namespace Exchange.KeyID keyID = new Exchange.KeyID(message.ID, null, null); Exchange exchange; - if (_exchangesByID.TryGetValue(keyID, out exchange)) - { + if (_exchangesByID.TryGetValue(keyID, out exchange)) { if (_Log.IsDebugEnabled) _Log.Debug("Exchange got reply: Cleaning up " + keyID); _exchangesByID.Remove(keyID); return exchange; } - else - { + else { if (_Log.IsInfoEnabled) _Log.Info("Ignoring unmatchable empty message from " + message.Source + ": " + message); return null; @@ -428,19 +419,6 @@ public void Dispose() d.Dispose(); } - private void RemoveNotificatoinsOf(ObserveRelation relation) - { - if (_Log.IsDebugEnabled) - _Log.Debug("Remove all remaining NON-notifications of observe relation"); - - foreach (Response previous in relation.ClearNotifications()) - { - // notifications are local MID namespace - Exchange.KeyID keyId = new Exchange.KeyID(previous.ID, null, null); - _exchangesByID.Remove(keyId); - } - } - private void OnExchangeCompleted(Object sender, EventArgs e) { Exchange exchange = (Exchange) sender; @@ -485,12 +463,6 @@ private void OnExchangeCompleted(Object sender, EventArgs e) Exchange exc; _ongoingExchanges.TryRemove(uriKey, out exc); } - - // Remove all remaining NON-notifications if this exchange is an observe relation - ObserveRelation relation = exchange.Relation; - if (relation != null) { - RemoveNotificatoinsOf(relation); - } } } } diff --git a/CoAP.NET/Observe/ObserveNotificationOrderer.cs b/CoAP.NET/Observe/ObserveNotificationOrderer.cs index 930c8b7..82ab79b 100644 --- a/CoAP.NET/Observe/ObserveNotificationOrderer.cs +++ b/CoAP.NET/Observe/ObserveNotificationOrderer.cs @@ -21,7 +21,8 @@ namespace Com.AugustCellars.CoAP.Observe public class ObserveNotificationOrderer { private readonly ICoapConfig _config; - private Int32 _number; + private static int _number; + private int _lastSeen; public ObserveNotificationOrderer() : this(null) @@ -36,9 +37,9 @@ public ObserveNotificationOrderer(ICoapConfig config) /// Gets a new observe option number. /// /// a new observe option number - public Int32 GetNextObserveNumber() + public int GetNextObserveNumber() { - Int32 next = Interlocked.Increment(ref _number); + int next = Interlocked.Increment(ref _number); while (next >= 1 << 24) { Interlocked.CompareExchange(ref _number, 0, next); next = Interlocked.Increment(ref _number); @@ -49,10 +50,7 @@ public Int32 GetNextObserveNumber() /// /// Gets the current notification number. /// - public Int32 Current - { - get => _number; - } + public int Current => _number; public DateTime Timestamp { get; set; } @@ -61,9 +59,9 @@ public Int32 Current /// /// /// - public Boolean IsNew(Response response) + public bool IsNew(Response response) { - Int32? obs = response.Observe; + int? obs = response.Observe; if (!obs.HasValue) { // this is a final response, e.g., error or proactive cancellation return true; @@ -75,14 +73,14 @@ public Boolean IsNew(Response response) // We use the notation from the observe draft-08. DateTime t1 = Timestamp; DateTime t2 = DateTime.Now; - Int32 v1 = Current; - Int32 v2 = obs.Value; - Int64 notifMaxAge = (_config ?? CoapConfig.Default).NotificationMaxAge; + int v1 = _lastSeen; + int v2 = obs.Value; + long notifyMaxAge = (_config ?? CoapConfig.Default).NotificationMaxAge; if ((v1 < v2) && (v2 - v1 < 1 << 23) || (v1 > v2) && (v1 - v2 > 1 << 23) - || (t2 > t1.AddMilliseconds(notifMaxAge))) { + || (t2 > t1.AddMilliseconds(notifyMaxAge))) { Timestamp = t2; - _number = v2; + _lastSeen = v2; return true; } else { diff --git a/CoAP.NET/Observe/ObserveRelation.cs b/CoAP.NET/Observe/ObserveRelation.cs index cef50cf..beb121b 100644 --- a/CoAP.NET/Observe/ObserveRelation.cs +++ b/CoAP.NET/Observe/ObserveRelation.cs @@ -14,8 +14,6 @@ using Com.AugustCellars.CoAP.Net; using Com.AugustCellars.CoAP.Server.Resources; using Com.AugustCellars.CoAP.Util; -using System.Collections.Concurrent; -using System.Collections.Generic; namespace Com.AugustCellars.CoAP.Observe { @@ -27,16 +25,8 @@ public class ObserveRelation private static readonly ILogger _Log = LogManager.GetLogger(typeof(ObserveRelation)); private readonly ICoapConfig _config; private readonly ObservingEndpoint _endpoint; - private readonly IResource _resource; - private readonly Exchange _exchange; - private readonly String _key; private DateTime _interestCheckTime = DateTime.Now; - private Int32 _interestCheckCounter = 1; - - /// - /// The notifications that have been sent, so they can be removed from the Matcher - /// - private readonly ConcurrentQueue _notifications = new ConcurrentQueue(); + private int _interestCheckCounter = 0; /// /// Constructs a new observe relation. @@ -49,39 +39,27 @@ public ObserveRelation(ICoapConfig config, ObservingEndpoint endpoint, IResource { _config = config?? throw ThrowHelper.ArgumentNull("config"); _endpoint = endpoint?? throw ThrowHelper.ArgumentNull("endpoint"); - _resource = resource?? throw ThrowHelper.ArgumentNull("resource"); - _exchange = exchange?? throw ThrowHelper.ArgumentNull("exchange"); - _key = $"{Source}#{exchange.Request.TokenString}"; + Resource = resource?? throw ThrowHelper.ArgumentNull("resource"); + Exchange = exchange?? throw ThrowHelper.ArgumentNull("exchange"); + Key = $"{Source}#{exchange.Request.TokenString}"; } /// /// Gets the resource. /// - public IResource Resource - { - get =>_resource; - } + public IResource Resource { get; } /// /// Gets the exchange. /// - public Exchange Exchange - { - get => _exchange; - } + public Exchange Exchange { get; } - public String Key - { - get => _key; - } + public string Key { get; } /// /// Gets the source endpoint of the observing endpoint. /// - public System.Net.EndPoint Source - { - get => _endpoint.EndPoint; - } + public System.Net.EndPoint Source => _endpoint.EndPoint; public Response CurrentControlNotification { get; set; } @@ -90,29 +68,28 @@ public System.Net.EndPoint Source /// /// Gets or sets a value indicating if this relation has been established. /// - public Boolean Established { get; set; } + public bool Established { get; set; } /// /// Cancel this observe relation. /// public void Cancel() - { - if (_Log.IsDebugEnabled) { - _Log.Debug("Cancel observe relation from " + _key + " with " + _resource.Path); - } + { + _Log.Debug(m => m($"Cancel observe relation from {Key} with {Resource.Path}")); + // stop ongoing retransmissions - if (_exchange.Response != null) { - _exchange.Response.Cancel(); + if (Exchange.Response != null) { + Exchange.Response.Cancel(); } Established = false; - _resource.RemoveObserveRelation(this); + Resource.RemoveObserveRelation(this); _endpoint.RemoveObserveRelation(this); - _exchange.Complete = true; + Exchange.Complete = true; } /// /// Cancel all observer relations that this server has - /// established with this's realtion's endpoint. + /// established with this relation's endpoint. /// public void CancelAll() { @@ -125,15 +102,15 @@ public void CancelAll() public void NotifyObservers() { // makes the resource process the same request again - _resource.HandleRequest(_exchange); + Resource.HandleRequest(Exchange); } /// /// Do we think that we should be doing a CON check on the resource? - /// The check is done on both a time intervolt and a number of notifications. + /// The check is done on both a time interval and a number of notifications. /// /// true if should do a CON check - public Boolean Check() + public bool Check() { bool check = false; DateTime now = DateTime.Now; @@ -145,26 +122,5 @@ public Boolean Check() } return check; } - - /// - /// Add the response to the notification list for this observation - /// - /// Response to send - public void AddNotification(Response notification) - { - _notifications.Enqueue(notification); - } - - /// - /// Enumerate through all of the queued notifications - /// - /// - public IEnumerable ClearNotifications() - { - Response resp; - while (_notifications.TryDequeue(out resp)) { - yield return resp; - } - } } } diff --git a/CoAP.NET/Observe/ObservingEndpoint.cs b/CoAP.NET/Observe/ObservingEndpoint.cs index cd925d0..00e314e 100644 --- a/CoAP.NET/Observe/ObservingEndpoint.cs +++ b/CoAP.NET/Observe/ObservingEndpoint.cs @@ -11,6 +11,7 @@ using System; using System.Collections.Generic; +using System.Linq; using Com.AugustCellars.CoAP.Util; namespace Com.AugustCellars.CoAP.Observe @@ -23,7 +24,6 @@ namespace Com.AugustCellars.CoAP.Observe /// public class ObservingEndpoint { - private readonly System.Net.EndPoint _endpoint; private readonly ICollection _relations = new SynchronizedCollection(); /// @@ -31,16 +31,13 @@ public class ObservingEndpoint /// public ObservingEndpoint(System.Net.EndPoint ep) { - _endpoint = ep; + EndPoint = ep; } /// /// Gets the of this endpoint. /// - public System.Net.EndPoint EndPoint - { - get => _endpoint; - } + public System.Net.EndPoint EndPoint { get; } /// /// Adds the specified observe relation. @@ -77,7 +74,7 @@ public ObserveRelation GetObserveRelation(Byte[] token) /// public void CancelAll() { - foreach (ObserveRelation relation in _relations) { + foreach (ObserveRelation relation in _relations.ToArray()) { relation.Cancel(); } } diff --git a/CoAP.NET/Stack/AbstractLayer.cs b/CoAP.NET/Stack/AbstractLayer.cs index 8b8d3e0..20734dd 100644 --- a/CoAP.NET/Stack/AbstractLayer.cs +++ b/CoAP.NET/Stack/AbstractLayer.cs @@ -35,9 +35,9 @@ public virtual void SendResponse(INextLayer nextLayer, Exchange exchange, Respon } /// - public virtual void SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message) + public virtual bool SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message) { - nextLayer.SendEmptyMessage(exchange, message); + return nextLayer.SendEmptyMessage(exchange, message); } /// diff --git a/CoAP.NET/Stack/ILayer.cs b/CoAP.NET/Stack/ILayer.cs index 095d8c8..3d2c86d 100644 --- a/CoAP.NET/Stack/ILayer.cs +++ b/CoAP.NET/Stack/ILayer.cs @@ -43,7 +43,8 @@ public interface ILayer /// the next layer /// the exchange associated /// the empty message to send - void SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message); + /// true to continue + bool SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message); /// /// Filters a request receiving event. /// @@ -83,7 +84,8 @@ public interface INextLayer /// /// Sends an empty message to next layer. /// - void SendEmptyMessage(Exchange exchange, EmptyMessage message); + bool SendEmptyMessage(Exchange exchange, EmptyMessage message); + /// /// Receives a request to next layer. /// diff --git a/CoAP.NET/Stack/LayerStack.cs b/CoAP.NET/Stack/LayerStack.cs index 3c9fea4..4b5e3d8 100644 --- a/CoAP.NET/Stack/LayerStack.cs +++ b/CoAP.NET/Stack/LayerStack.cs @@ -159,9 +159,10 @@ public override void SendResponse(INextLayer nextLayer, Exchange exchange, Respo exchange.Outbox.SendResponse(exchange, response); } - public override void SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message) + public override bool SendEmptyMessage(INextLayer nextLayer, Exchange exchange, EmptyMessage message) { exchange.Outbox.SendEmptyMessage(exchange, message); + return true; } } @@ -184,9 +185,9 @@ public void SendResponse(Exchange exchange, Response response) _entry.NextEntry.Filter.SendResponse(_entry.NextEntry.NextFilter, exchange, response); } - public void SendEmptyMessage(Exchange exchange, EmptyMessage message) + public bool SendEmptyMessage(Exchange exchange, EmptyMessage message) { - _entry.NextEntry.Filter.SendEmptyMessage(_entry.NextEntry.NextFilter, exchange, message); + return _entry.NextEntry.Filter.SendEmptyMessage(_entry.NextEntry.NextFilter, exchange, message); } public void ReceiveRequest(Exchange exchange, Request request) diff --git a/CoAP.NET/Stack/ObserveLayer.cs b/CoAP.NET/Stack/ObserveLayer.cs index bf5be20..ba30189 100644 --- a/CoAP.NET/Stack/ObserveLayer.cs +++ b/CoAP.NET/Stack/ObserveLayer.cs @@ -10,11 +10,7 @@ */ using System; -#if NETSTANDARD1_3 -using System.Threading; -#else using System.Timers; -#endif using Com.AugustCellars.CoAP.Log; using Com.AugustCellars.CoAP.Net; using Com.AugustCellars.CoAP.Observe; @@ -24,20 +20,20 @@ namespace Com.AugustCellars.CoAP.Stack public class ObserveLayer : AbstractLayer { private static readonly ILogger log = LogManager.GetLogger(typeof(ObserveLayer)); - private static readonly Object ReregistrationContextKey = "ReregistrationContext"; - private static readonly Random _Random = new Random(); + private static readonly object reregistrationContextKey = "ReregistrationContext"; + private static readonly Random random = new Random(); /// /// Additional time to wait until re-registration /// - private Int32 _backoff; + private readonly int _backOff; /// /// Constructs a new observe layer. /// public ObserveLayer(ICoapConfig config) { - _backoff = config.NotificationReregistrationBackoff; + _backOff = config.NotificationReregistrationBackoff; } /// @@ -48,18 +44,14 @@ public override void SendResponse(INextLayer nextLayer, Exchange exchange, Respo if (exchange.Request.IsAcknowledged || exchange.Request.Type == MessageType.NON) { // Transmit errors as CON if (!Code.IsSuccess(response.Code)) { - if (log.IsDebugEnabled) { - log.Debug("Response has error code " + response.Code + " and must be sent as CON"); - } + log.Debug(m => m($"Response has error code {response.Code} and must be sent as CON")); response.Type = MessageType.CON; relation.Cancel(); } else { // Make sure that every now and than a CON is mixed within - if (relation.Check()) { - if (log.IsDebugEnabled) { - log.Debug("The observe relation check requires the notification to be sent as CON"); - } + if (relation.Check()) { + log.Debug("The observe relation check requires the notification to be sent as CON"); response.Type = MessageType.CON; } else { @@ -79,7 +71,8 @@ public override void SendResponse(INextLayer nextLayer, Exchange exchange, Respo * them from the exchangesByID map */ if (response.Type == MessageType.NON) { - relation.AddNotification(response); + // relation.AddNotification(response); + PrepareTimeout(nextLayer, exchange, response); } /* @@ -100,12 +93,17 @@ public override void SendResponse(INextLayer nextLayer, Exchange exchange, Respo // synchronized lock (exchange) { Response current = relation.CurrentControlNotification; - if (current != null && IsInTransit(current)) { - if (log.IsDebugEnabled) { - log.Debug("A former notification is still in transit. Postpone " + response); + if (current != null && IsInTransit(current)) { + log.Debug(m => m($"A former notification is still in transit. Postpone {response}")); + if (relation.NextControlNotification != null && relation.NextControlNotification.Type == MessageType.CON) { + response.Type = MessageType.CON; + } + + if (response.Type == MessageType.CON) { + // use the same ID + response.ID = current.ID; } - // use the same ID - response.ID = current.ID; + relation.NextControlNotification = response; return; } @@ -126,23 +124,20 @@ public override void ReceiveResponse(INextLayer nextLayer, Exchange exchange, Re if (response.HasOption(OptionType.Observe)) { if (exchange.Request.IsCancelled) { // The request was canceled and we no longer want notifications - if (log.IsDebugEnabled) { - log.Debug("ObserveLayer rejecting notification for canceled Exchange"); - } + log.Debug("ObserveLayer rejecting notification for canceled Exchange"); EmptyMessage rst = EmptyMessage.NewRST(response); SendEmptyMessage(nextLayer, exchange, rst); // Matcher sets exchange as complete when RST is sent + + return; } - else { + + if (exchange.Request.ObserveReconnect) { PrepareReregistration(exchange, response, msg => SendRequest(nextLayer, exchange, msg)); - base.ReceiveResponse(nextLayer, exchange, response); } } - else { - // No observe option in response => always deliver - base.ReceiveResponse(nextLayer, exchange, response); - } + base.ReceiveResponse(nextLayer, exchange, response); } /// @@ -150,24 +145,23 @@ public override void ReceiveEmptyMessage(INextLayer nextLayer, Exchange exchange { // NOTE: We could also move this into the MessageObserverAdapter from // sendResponse into the method rejected(). - if (message.Type == MessageType.RST && exchange.Origin == Origin.Remote) - { + + if (message.Type == MessageType.RST && exchange.Origin == Origin.Remote) { // The response has been rejected ObserveRelation relation = exchange.Relation; - if (relation != null) - { + if (relation != null) { relation.Cancel(); } // else there was no observe relation ship and this layer ignores the rst } base.ReceiveEmptyMessage(nextLayer, exchange, message); } - private static Boolean IsInTransit(Response response) + private static bool IsInTransit(Response response) { MessageType type = response.Type; - Boolean acked = response.IsAcknowledged; - Boolean timeout = response.IsTimedOut; - Boolean result = type == MessageType.CON && !acked && !timeout; + bool acked = response.IsAcknowledged; + bool timeout = response.IsTimedOut; + bool result = ((type == MessageType.CON) || (type == MessageType.NON)) && !acked && !timeout; return result; } @@ -179,14 +173,14 @@ private void PrepareSelfReplacement(INextLayer nextLayer, Exchange exchange, Res Response next = relation.NextControlNotification; relation.CurrentControlNotification = next; // next may be null relation.NextControlNotification = null; - if (next != null) { - if (log.IsDebugEnabled) { - log.Debug("Notification has been acknowledged, send the next one"); - } + if (next != null) { + log.Debug("Notification has been acknowledged, send the next one"); + // this is not a self replacement, hence a new ID next.ID = Message.None; + // Create a new task for sending next response so that we can leave the sync-block - Executor.Start(() => SendResponse(nextLayer, exchange, next)); + Executor.Start(() => base.SendResponse(nextLayer, exchange, next)); } } }; @@ -196,13 +190,16 @@ private void PrepareSelfReplacement(INextLayer nextLayer, Exchange exchange, Res ObserveRelation relation = exchange.Relation; Response next = relation.NextControlNotification; if (next != null) { - if (log.IsDebugEnabled) { - log.Debug("The notification has timed out and there is a fresher notification for the retransmission."); - } + log.Debug("The notification has timed out and there is a fresher notification for the retransmission."); + // Cancel the original retransmission and send the fresh notification here response.IsCancelled = true; - // use the same ID - next.ID = response.ID; + + if (relation.CurrentControlNotification.Type == MessageType.CON) { + // use the same ID if continuing from CON to CON + next.ID = response.ID; + } + // Convert all notification retransmissions to CON if (next.Type != MessageType.CON) { next.Type = MessageType.CON; @@ -210,32 +207,54 @@ private void PrepareSelfReplacement(INextLayer nextLayer, Exchange exchange, Res } relation.CurrentControlNotification = next; relation.NextControlNotification = null; + // Create a new task for sending next response so that we can leave the sync-block - Executor.Start(() => SendResponse(nextLayer, exchange, next)); + Executor.Start(() => base.SendResponse(nextLayer, exchange, next)); } } }; response.TimedOut += (o, e) => { - ObserveRelation relation = exchange.Relation; - if (log.IsDebugEnabled) { - log.Debug($"Notification {relation.Exchange.Request.TokenString} timed out. Cancel all relations with source {relation.Source}"); - } + ObserveRelation relation = exchange.Relation; + log.Debug(m => m($"Notification {relation.Exchange.Request.TokenString} timed out. Cancel all relations with source {relation.Source}")); relation.CancelAll(); }; } + private void PrepareTimeout(INextLayer nextLayer, Exchange exchange, Response response) + { + log.Debug(m => m($"PrepareTimeout - for response {response}")); + response.TimedOut += (o, e) => { + lock (exchange) { + ObserveRelation relation = exchange.Relation; + log.Debug(m => m($"Notification {relation.Exchange.Request.TokenString} timed out.")); + + Response next = relation.NextControlNotification; + if (next != null) { + log.Debug("The notification has timed out and there is a fresher notification for the retransmission."); + + // don't use the same ID + // next.ID = response.ID; + next.ID = Message.None; + + relation.CurrentControlNotification = next; + relation.NextControlNotification = null; + + // Create a new task for sending next response so that we can leave the sync-block + Executor.Start(() => base.SendResponse(nextLayer, exchange, next)); + } + } + }; + } + private void PrepareReregistration(Exchange exchange, Response response, Action reregister) { - if (!exchange.Request.ObserveReconnect) return; - Int64 timeout = response.MaxAge * 1000 + _backoff + _Random.Next(2, 15) * 1000; + long timeout = response.MaxAge * 1000 + _backOff + random.Next(2, 15) * 1000; ReregistrationContext ctx = exchange.GetOrAdd( - ReregistrationContextKey, _ => new ReregistrationContext(exchange, timeout, reregister)); + reregistrationContextKey, _ => new ReregistrationContext(exchange, timeout, reregister)); - if (log.IsDebugEnabled) { - log.Debug("Scheduling re-registration in " + timeout + "ms for " + exchange.Request); - } + log.Debug(m => m("Scheduling re-registration in " + timeout + "ms for " + exchange.Request)); ctx.Restart(); } @@ -244,32 +263,21 @@ class ReregistrationContext : IDisposable { private readonly Exchange _exchange; private readonly Action _reregister; - private Timer _timer; -#if NETSTANDARD1_3 - private int _timeout; -#endif + private readonly Timer _timer; - public ReregistrationContext(Exchange exchange, Int64 timeout, Action reregister) + public ReregistrationContext(Exchange exchange, long timeout, Action reregister) { _exchange = exchange; _reregister = reregister; -#if NETSTANDARD1_3 - _timeout = (int) timeout; -#else _timer = new Timer(timeout) { AutoReset = false }; _timer.Elapsed += timer_Elapsed; -#endif } public void Start() { -#if NETSTANDARD1_3 - _timer = new Timer(timer_Elapsed, this, _timeout, Timeout.Infinite); -#else _timer.Start(); -#endif } public void Restart() @@ -280,14 +288,7 @@ public void Restart() public void Stop() { -#if NETSTANDARD1_3 - if (_timer != null) { - _timer.Dispose(); - } - _timer = null; -#else _timer.Stop(); -#endif } public void Cancel() @@ -301,58 +302,29 @@ public void Dispose() _timer.Dispose(); } -#if NETSTANDARD1_3 - static void timer_Elapsed(Object obj) - { - ReregistrationContext sender = obj as ReregistrationContext; - Request request = sender._exchange.Request; - if (!request.IsCancelled) { - Request refresh = Request.NewGet(); - refresh.SetOptions(request.GetOptions()); - // make sure Observe is set and zero - refresh.MarkObserve(); - // use same Token - refresh.Token = request.Token; - refresh.Destination = request.Destination; - refresh.CopyEventHandler(request); - if (log.IsDebugEnabled) { - log.Debug("Re-registering for " + request); - } - request.FireReregister(refresh); - sender._reregister(refresh); - } - else { - if (log.IsDebugEnabled) { - log.Debug("Dropping re-registration for canceled " + request); - } - } - } -#else - void timer_Elapsed(Object sender, ElapsedEventArgs e) + void timer_Elapsed(object sender, ElapsedEventArgs e) { Request request = _exchange.Request; if (!request.IsCancelled) { Request refresh = new Request(request.Method); + refresh.SetOptions(request.GetOptions()); // make sure Observe is set and zero refresh.MarkObserve(); + // use same Token refresh.Token = request.Token; refresh.Destination = request.Destination; - refresh.CopyEventHandler(request); - if (log.IsDebugEnabled) { - log.Debug("Re-registering for " + request); - } + refresh.CopyEventHandler(request); + log.Debug(m => m( "Re-registering for " + request)); request.FireReregister(refresh); _reregister(refresh); + } - else { - if (log.IsDebugEnabled) { - log.Debug("Dropping re-registration for canceled " + request); - } + else { + log.Debug(m => m("Dropping re-registration for canceled " + request)); } } -#endif } } } diff --git a/CoAP.NET/Stack/ReliabilityLayer.cs b/CoAP.NET/Stack/ReliabilityLayer.cs index d76a0d6..3f32129 100644 --- a/CoAP.NET/Stack/ReliabilityLayer.cs +++ b/CoAP.NET/Stack/ReliabilityLayer.cs @@ -11,12 +11,7 @@ using System; using System.ComponentModel; -#if NETSTANDARD1_3 -using System.Threading; -#else -using System.Diagnostics.Contracts; using System.Timers; -#endif using Com.AugustCellars.CoAP.Log; using Com.AugustCellars.CoAP.Net; @@ -28,7 +23,7 @@ namespace Com.AugustCellars.CoAP.Stack public class ReliabilityLayer : AbstractLayer { static readonly ILogger _Log = LogManager.GetLogger(typeof(ReliabilityLayer)); - static readonly Object _TransmissionContextKey = "TransmissionContext"; + static readonly object _TransmissionContextKey = "TransmissionContext"; private readonly Random _rand = new Random(); private readonly ICoapConfig _config; @@ -37,7 +32,7 @@ public class ReliabilityLayer : AbstractLayer private double _ackRandomFactor; private int _ackTimeout; private int _maxRetransmitCount; - + private int _nonTimeout; /// /// Constructs a new reliability layer. @@ -50,6 +45,7 @@ public ReliabilityLayer(ICoapConfig config) _ackRandomFactor = _config.AckRandomFactor; _ackTimeoutScale = _config.AckTimeoutScale; _maxRetransmitCount = config.MaxRetransmit; + _nonTimeout = config.NonTimeout; _config.PropertyChanged += PropertyChanged; } @@ -72,6 +68,14 @@ private void PropertyChanged(object obj, PropertyChangedEventArgs args) case "AckTimeoutScale": _ackTimeoutScale = _config.AckTimeoutScale; break; + + case "MaxRetransmit": + _maxRetransmitCount = _config.MaxRetransmit; + break; + + case "NonTimeout": + _nonTimeout = _config.NonTimeout; + break; } } @@ -129,6 +133,10 @@ public override void SendResponse(INextLayer nextLayer, Exchange exchange, Respo _Log.Debug(m => m("Scheduling retransmission for {0}", response)); PrepareRetransmission(exchange, response, ctx => SendResponse(nextLayer, exchange, response)); } + else if (response.Type == MessageType.NON && response.HasOption(OptionType.Observe)) { + _Log.Debug(m => m($"Scheduling timeout for {response} @ {_nonTimeout}")); + PrepareTimeout(exchange, response); + } base.SendResponse(nextLayer, exchange, response); } @@ -250,7 +258,7 @@ private void PrepareRetransmission(Exchange exchange, Message msg, Action new TransmissionContext(_config, exchange, msg, retransmit, _maxRetransmitCount)); if (ctx.FailedTransmissionCount > 0) { - ctx.CurrentTimeout = (Int32) (ctx.CurrentTimeout * _ackTimeoutScale); + ctx.CurrentTimeout = (int) (ctx.CurrentTimeout * _ackTimeoutScale); } else if (ctx.CurrentTimeout == 0) { ctx.CurrentTimeout = InitialTimeout(_ackTimeout, _ackRandomFactor); @@ -261,9 +269,23 @@ private void PrepareRetransmission(Exchange exchange, Message msg, Action(_TransmissionContextKey, _ => new TransmissionContext(_config, exchange, msg, null, 0)); + if (ctx.CurrentTimeout == 0) { + ctx.CurrentTimeout = _nonTimeout; + } + + _Log.Debug(m => m("Send request, timeout only")); + + ctx.Start(); + } + + private int InitialTimeout(int initialTimeout, double factor) + { + return (int) (initialTimeout + initialTimeout * (factor - 1D) * _rand.NextDouble()); } internal class TransmissionContext : IDisposable @@ -283,35 +305,24 @@ public TransmissionContext(ICoapConfig config, Exchange exchange, Message messag _retransmit = retransmit; _maxRetransmitCount = maxRetransmitCount; CurrentTimeout = message.AckTimeout; -#if NETSTANDARD1_3 -#else _timer = new Timer() { AutoReset = false }; _timer.Elapsed += timer_Elapsed; -#endif } - public Int32 FailedTransmissionCount { get; private set; } + public int FailedTransmissionCount { get; private set; } - public Int32 CurrentTimeout { get; set; } + public int CurrentTimeout { get; set; } public void Start() { -#if NETSTANDARD1_3 - Stop(); -#else _timer.Stop(); -#endif if (CurrentTimeout > 0) { -#if NETSTANDARD1_3 - _timer = new Timer(timer_Elapsed, this, CurrentTimeout, Timeout.Infinite); -#else _timer.Interval = CurrentTimeout; _timer.Start(); -#endif } } @@ -321,16 +332,11 @@ public void Stop() // avoid race condition of multiple responses (e.g., notifications) - if (t != null) { - t.Dispose(); - } + t?.Dispose(); } public void Cancel() { -#if NETSTANDARD1_3 - Stop(); -#else Timer t = System.Threading.Interlocked.Exchange(ref _timer, null); // avoid race condition of multiple responses (e.g., notifications) @@ -345,7 +351,6 @@ public void Cancel() catch (ObjectDisposedException) { // ignore } -#endif if (_Log.IsDebugEnabled) { _Log.Debug("Cancel retransmission for -->"); @@ -363,53 +368,15 @@ public void Dispose() Cancel(); } -#if NETSTANDARD1_3 - static void timer_Elapsed(Object obj) - { - TransmissionContext sender = (TransmissionContext)obj; - /* - * Do not retransmit a message if it has been acknowledged, - * rejected, canceled or already been retransmitted for the maximum - * number of times. - */ - Int32 failedCount = ++sender.FailedTransmissionCount; - - if (sender._message.IsAcknowledged) { - _Log.Debug(m => m("Timeout: message already acknowledged, cancel retransmission of {0}", sender._message)); - } - else if (sender._message.IsRejected) { - _Log.Debug(m => m("Timeout: message already rejected, cancel retransmission of {0}", sender._message)); - } - else if (sender._message.IsCancelled) { - _Log.Debug(m => m("Timeout: canceled (ID={0}), do not retransmit", sender._message.ID)); - } - else if (failedCount <= (sender._message.MaxRetransmit != 0 ? sender._message.MaxRetransmit : sender._maxRetransmitCount)) { - _Log.Debug(m => m("Timeout: retransmit message, failed: {0}, message: {1}", failedCount, sender._message)); - - sender._message.FireRetransmitting(); - // message might have canceled - if (!sender._message.IsCancelled) { - sender._retransmit(sender); - } - } - else { - _Log.Debug(m => m("Timeout: retransmission limit reached, exchange failed, message: {0}", sender._message)); - sender._exchange.TimedOut = true; - sender._message.IsTimedOut = true; - sender._exchange.Remove(_TransmissionContextKey); - sender.Cancel(); - } - } -#else - void timer_Elapsed(Object sender, ElapsedEventArgs e) + void timer_Elapsed(object sender, ElapsedEventArgs e) { /* * Do not retransmit a message if it has been acknowledged, * rejected, canceled or already been retransmitted for the maximum * number of times. */ - Int32 failedCount = ++FailedTransmissionCount; + int failedCount = ++FailedTransmissionCount; if (_message.IsAcknowledged) { _Log.Debug(m => m("Timeout: message already acknowledged, cancel retransmission of {0}", _message)); @@ -438,7 +405,6 @@ void timer_Elapsed(Object sender, ElapsedEventArgs e) Cancel(); } } -#endif } } } diff --git a/CoAP.Std10.sln.DotSettings b/CoAP.Std10.sln.DotSettings new file mode 100644 index 0000000..5e3a11d --- /dev/null +++ b/CoAP.Std10.sln.DotSettings @@ -0,0 +1,3 @@ + + <data /> + <data><IncludeFilters /><ExcludeFilters /></data> \ No newline at end of file diff --git a/CoAP.Test/CoapClientTest.cs b/CoAP.Test/CoapClientTest.cs index 641e7bf..cb96431 100644 --- a/CoAP.Test/CoapClientTest.cs +++ b/CoAP.Test/CoapClientTest.cs @@ -214,7 +214,6 @@ public void TestAsynchronousCall() Thread.Sleep(100); Assert.AreEqual(0, actualTest); Assert.AreEqual(0, notifyTest); - Assert.AreEqual(5, notifications); Assert.IsFalse(_failed); } @@ -278,7 +277,10 @@ private void Fail(CoapClient.FailReason reason) private void CreateServer() { - CoAPEndPoint endpoint = new CoAPEndPoint(0); + CoapConfig config = new CoapConfig(); + config.NonTimeout = 10; + + CoAPEndPoint endpoint = new CoAPEndPoint(0, config); _resource = new StorageResource(TARGET, CONTENT_1); _server = new CoapServer(); _server.Add(_resource); diff --git a/CoAP.Test/Observe/ObserveRelation.cs b/CoAP.Test/Observe/ObserveRelation.cs new file mode 100644 index 0000000..f436703 --- /dev/null +++ b/CoAP.Test/Observe/ObserveRelation.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CoAP.Test.Std10.Observe +{ + [TestClass] + public class ObserveRelation + { + } +} diff --git a/CoAP.Test/Observe/ObserveTests.cs b/CoAP.Test/Observe/ObserveTests.cs new file mode 100644 index 0000000..099b78a --- /dev/null +++ b/CoAP.Test/Observe/ObserveTests.cs @@ -0,0 +1,426 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Com.AugustCellars.CoAP; +using Com.AugustCellars.CoAP.Log; +using Com.AugustCellars.CoAP.Net; +using Com.AugustCellars.CoAP.Server; +using Com.AugustCellars.CoAP.Server.Resources; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace CoAP.Test.Std10.Observe +{ + [TestClass] + public class ObserveTests + { + private static readonly string target1 = "target1"; + private static readonly string target2 = "target2"; + + int _serverPort; + CoapServer _server; + ObserveResource _resource; + private ObserveResource _resource2; + + string _expected; + int _notifications; + bool _failed; + private CoapConfig _config; + + + [TestInitialize] + public void SetupServer() + { + LogManager.Level = LogLevel.Debug; + CreateServer(); + } + + [TestCleanup] + public void ShutdownServer() + { + _server.Dispose(); + } + +#if false + [TestMethod] + public void TestOutOfOrder() + { + + // Check what happens with out of order delivery + + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + Assert.AreEqual(_expected, response.ResponseText); + Assert.IsTrue(response.HasOption(OptionType.Observe)); + Assert.IsTrue(response.Observe.HasValue); + int oNumber = response.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + }); + Assert.IsFalse(obs1.Canceled); + + Thread.Sleep(100); + _resource.Changed(); + + _resource2.ObserveNo = lastObserve + 3; + + } +#endif + + [TestMethod] + public void TestCancel() + { + // Check what happens with out of order delivery + + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + AutoResetEvent trigger = new AutoResetEvent(false); + Response nextResponse = null; + int lastObserve = -1; + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + nextResponse = response; + trigger.Set(); + + }); + Assert.IsFalse(obs1.Canceled); + + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + int oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Changed(); + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Changed(); + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Canceled(false); + Assert.IsFalse(trigger.WaitOne(1000)); + + _resource.Changed(); + Assert.IsFalse(trigger.WaitOne(1000)); + + client = null; + } + + [TestMethod] + public void TestCancelWithNotify() + { + // Check what happens with out of order delivery + + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + AutoResetEvent trigger = new AutoResetEvent(false); + Response nextResponse = null; + int lastObserve = -1; + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + nextResponse = response; + trigger.Set(); + + }); + Assert.IsFalse(obs1.Canceled); + + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + int oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Changed(); + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Changed(); + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _resource.Canceled(true); + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(!Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(null, nextResponse.ResponseText); + Assert.IsFalse(nextResponse.HasOption(OptionType.Observe)); + + _resource.Changed(); + Assert.IsFalse(trigger.WaitOne(1000)); + + client = null; + } + + [TestMethod] + public void CheckOnCount() + { + int timeout = 1000; + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + AutoResetEvent trigger = new AutoResetEvent(false); + Response nextResponse = null; + int lastObserve = -1; + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + nextResponse = response; + trigger.Set(); + + }); + Assert.IsFalse(obs1.Canceled); + + Assert.IsTrue(trigger.WaitOne(timeout)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + int oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _config.NotificationCheckIntervalCount = 100; + int conCount = 0; + + for (int i = 1; i < 2100; i++) { + _expected = $"Content for {i}"; + _resource.UpdateContent(_expected); + Assert.IsTrue(trigger.WaitOne(timeout)); + + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + conCount += nextResponse.Type == MessageType.CON ? 1 : 0; + } + + Assert.IsTrue(conCount > 0); + + + } + + [TestMethod] + public void CheckOnTime() + { + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + AutoResetEvent trigger = new AutoResetEvent(false); + Response nextResponse = null; + int lastObserve = -1; + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + nextResponse = response; + trigger.Set(); + + }); + Assert.IsFalse(obs1.Canceled); + + Assert.IsTrue(trigger.WaitOne(1000)); + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + int oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + _config.NotificationCheckIntervalTime = 1000; + int count = 0; + + for (int i = 0; i < 20; i++) { + Thread.Sleep(100); + _expected = $"Content for {i}"; + _resource.UpdateContent(_expected); + Assert.IsTrue(trigger.WaitOne(1000)); + + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + Console.WriteLine($"lastObserve = {lastObserve}, type = {nextResponse.Type}"); + count += (nextResponse.Type == MessageType.CON) ? 1 : 0; + } + + Assert.IsTrue(count > 0); + } + +#if false + [TestMethod] + public void Reregister() + { + Uri uri = new Uri($"coap://localhost:{_serverPort}/{target1}"); + CoapClient client = new CoapClient(uri); + + AutoResetEvent trigger = new AutoResetEvent(false); + Response nextResponse = null; + int lastObserve = -1; + + + _expected = "No Content Yet"; + CoapObserveRelation obs1 = client.Observe(response => { + nextResponse = response; + trigger.Set(); + + }); + Assert.IsFalse(obs1.Canceled); + Assert.IsTrue(trigger.WaitOne(1000)); + + int oNumber = -1; + + for (int i = 0; i < 20; i++) { + _expected = $"Content for {i}"; + _resource.UpdateContent(_expected); + Assert.IsTrue(trigger.WaitOne(1000)); + + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + Console.WriteLine($"lastObserve = {lastObserve}, type = {nextResponse.Type}"); + } + + Request request = Request.NewGet(); + request.URI = uri; + request.MarkObserve(); + request.Token = nextResponse.Token; + + client.Send(request); + + for (int i = 0; i < 20; i++) { + _expected = $"Content for {i}"; + _resource.UpdateContent(_expected); + Assert.IsTrue(trigger.WaitOne(1000)); + + Assert.IsTrue(Code.IsSuccess(nextResponse.Code)); + Assert.AreEqual(_expected, nextResponse.ResponseText); + Assert.IsTrue(nextResponse.HasOption(OptionType.Observe)); + Assert.IsTrue(nextResponse.Observe.HasValue); + oNumber = nextResponse.Observe.Value; + Assert.IsTrue(oNumber > lastObserve); + lastObserve = oNumber; + + Console.WriteLine($"lastObserve = {lastObserve}, type = {nextResponse.Type}"); + } + + } +#endif + + + private void CreateServer() + { + _config = new CoapConfig(); + _config.NonTimeout = 10; // 10 ms + + CoAPEndPoint endpoint = new CoAPEndPoint(0, _config); + _resource = new ObserveResource(target1); + _resource2 = new ObserveResource(target2); + _server = new CoapServer(_config); + _server.Add(_resource); + _server.Add(_resource2); + + _server.AddEndPoint(endpoint); + _server.Start(); + _serverPort = ((System.Net.IPEndPoint)endpoint.LocalEndPoint).Port; + + } + + class ObserveResource : Resource + { + private string _content = "No Content Yet"; + public int ObserveNo { get; set; } + + public ObserveResource(string name) + : base(name) + { + Observable = true; + } + + protected override void DoGet(CoapExchange exchange) + { + IEnumerable queries = exchange.Request.UriQueries; + string c = _content; + + exchange.Respond(_content); + } + + protected override void DoPost(CoapExchange exchange) + { + string old = _content; + _content = exchange.Request.PayloadString; + exchange.Respond(StatusCode.Changed, old); + Changed(); + } + + public void Canceled(bool notify) + { + if (notify) { + ClearAndNotifyObserveRelations(StatusCode.BadRequest); + } + else { + ClearObserveRelations(); + } + } + + public void UpdateContent(string text) + { + _content = text; + Changed(); + } + } + + + } +}