This repository has been archived by the owner on Oct 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
SessionHelpers.cs
144 lines (105 loc) · 4.36 KB
/
SessionHelpers.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
using FrooxEngine;
using System.Collections.Concurrent;
namespace Outflow;
/// <summary>
/// Helper methods for interacting with a <see cref="Session"/>'s StreamMessage queue
/// </summary>
public static class SessionHelpers
{
/// <summary>
/// Correlation between sessions and StreamMessage processing data
/// </summary>
public static ConcurrentDictionary<Session, (AutoResetEvent streamSendEvent, ConcurrentQueue<SyncMessage> streamMessagesToSend)> SessionStreamQueue = new();
/// <summary>
/// Enqueues a StreamMessage into that session's StreamMessage queue
/// </summary>
/// <param name="session">The session to enqueue a StreamMessage for</param>
/// <param name="msg">The message to enqueue</param>
/// <returns>Whether the message was a stream</returns>
public static bool EnqueueStreamForTransmission(Session session, SyncMessage msg)
{
bool isStream = msg is StreamMessage;
if (isStream && SessionStreamQueue.TryGetValue(session, out var data))
{
data.streamMessagesToSend.Enqueue(msg);
data.streamSendEvent.Set();
}
#if DEBUG
Outflow.Debug($"Encoded StreamMessage with state #{msg.SenderStateVersion}");
#endif
return isStream;
}
/// <summary>
/// The processing loop for processing StreamMessages exclusively
/// </summary>
/// <param name="session">The session to operate on</param>
/// <param name="ev">The event to wait on</param>
/// <param name="streamMessagesToSend">The queue to operate on</param>
public static void StreamLoop(Session session, AutoResetEvent ev, ConcurrentQueue<SyncMessage> streamMessagesToSend)
{
var setStreamCount = (Action<int>) // Cache setter for TotalSentStreams
typeof(Session)
.GetProperty("TotalSentStreams")
.GetSetMethod(true)
.CreateDelegate(typeof(Action<int>), session);
Outflow.Msg($"StreamMessage processing successfully initiated!");
while (true)
{
ev.WaitOne();
if (session.IsDisposed) // Break on disposal
break;
while (streamMessagesToSend.TryDequeue(out SyncMessage result))
{
if (result.Targets.Count > 0)
{
setStreamCount(session.TotalSentStreams + 1);
session.NetworkManager.TransmitData(result.Encode());
#if DEBUG
Outflow.Debug($"Successfully processed StreamMessage #{result.SenderStateVersion}");
#endif
}
result.Dispose();
}
}
ev.Dispose();
Outflow.Msg($"StreamMessage processing loop terminated for {session.World.Name}");
}
/// <summary>
/// Removes a StreamMessage queue from the dictionary for a given session
/// </summary>
/// <param name="session">The session to remove a queue from</param>
public static void RemoveStreamQueue(this Session session)
{
if (SessionStreamQueue.ContainsKey(session))
{
SessionStreamQueue.TryRemove(session, out var _);
#if DEBUG
Outflow.Debug($"Removed StreamMessage queue");
#endif
}
}
/// <summary>
/// Adds a StreamMessage queue to the dictionary for a given session
/// </summary>
/// <param name="session">The session to add the queue for</param>
/// <param name="ev">The event the queue processor will wait on</param>
/// <param name="queue">The queue the processor will operate with</param>
public static void AddStreamQueue(this Session session, AutoResetEvent ev, ConcurrentQueue<SyncMessage> queue)
{
SessionStreamQueue.TryAdd(session, (ev, queue));
#if DEBUG
Outflow.Debug($"Added StreamMessage queue");
#endif
}
/// <summary>
/// Only called when the Session is disposed, disposes of the StreamMessage processor
/// </summary>
/// <param name="session">The session to operate on</param>
public static void DisposeStreamMessageProcessor(Session session)
{
if (SessionStreamQueue.TryGetValue(session, out var data))
data.streamSendEvent.Set();
Outflow.Msg("Properly shut down StreamMessage processing thread");
session.RemoveStreamQueue();
}
}