Skip to content

Commit a12e35b

Browse files
committed
Observe subscribers
1 parent d1e3009 commit a12e35b

File tree

9 files changed

+41
-29
lines changed

9 files changed

+41
-29
lines changed

source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void SetUp()
2727
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
2828
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
2929
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
30-
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoIdentityObserver.Instance, Substitute.For<ILog>());
30+
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoSubscribersObserver.Instance, Substitute.For<ILog>());
3131
}
3232

3333
// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation

source/Halibut.Tests/Transport/SecureClientFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
6767
var connection = Substitute.For<IConnection>();
6868
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
6969
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
70-
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoIdentityObserver.Instance, log));
70+
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoSubscribersObserver.Instance, log));
7171

7272
await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
7373
}
@@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
9696
{
9797
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
9898
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
99-
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoIdentityObserver.Instance, logger);
99+
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoSubscribersObserver.Instance, logger);
100100
}
101101
}
102102
}

source/Halibut.Tests/Transport/SecureListenerFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow
7474
timeoutsAndLimits,
7575
new StreamFactory(),
7676
NoOpConnectionsObserver.Instance,
77-
NoIdentityObserver.Instance
77+
NoSubscribersObserver.Instance
7878
);
7979

8080
var idleAverage = CollectCounterValues(opsPerSec)

source/Halibut/HalibutRuntime.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class HalibutRuntime : IHalibutRuntime
4545
readonly IConnectionsObserver connectionsObserver;
4646
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
4747
readonly IControlMessageObserver controlMessageObserver;
48-
readonly IIdentityObserver identityObserver;
48+
readonly ISubscribersObserver subscribersObserver;
4949

5050
internal HalibutRuntime(
5151
IServiceFactory serviceFactory,
@@ -61,7 +61,7 @@ internal HalibutRuntime(
6161
IRpcObserver rpcObserver,
6262
IConnectionsObserver connectionsObserver,
6363
IControlMessageObserver controlMessageObserver,
64-
IIdentityObserver identityObserver)
64+
ISubscribersObserver subscribersObserver)
6565
{
6666
this.serverCertificate = serverCertificate;
6767
this.trustProvider = trustProvider;
@@ -76,7 +76,7 @@ internal HalibutRuntime(
7676
TimeoutsAndLimits = halibutTimeoutsAndLimits;
7777
this.connectionsObserver = connectionsObserver;
7878
this.controlMessageObserver = controlMessageObserver;
79-
this.identityObserver = identityObserver;
79+
this.subscribersObserver = subscribersObserver;
8080

8181
connectionManager = new ConnectionManagerAsync();
8282
this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory);
@@ -109,7 +109,7 @@ public int Listen(int port)
109109

110110
ExchangeProtocolBuilder ExchangeProtocolBuilder()
111111
{
112-
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, identityObserver, log);
112+
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, subscribersObserver, log);
113113
}
114114

115115
public int Listen(IPEndPoint endpoint)
@@ -126,7 +126,7 @@ public int Listen(IPEndPoint endpoint)
126126
TimeoutsAndLimits,
127127
streamFactory,
128128
connectionsObserver,
129-
identityObserver);
129+
subscribersObserver);
130130

131131
listeners.DoWithExclusiveAccess(l =>
132132
{

source/Halibut/HalibutRuntimeBuilder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class HalibutRuntimeBuilder
2727
IRpcObserver? rpcObserver;
2828
IConnectionsObserver? connectionsObserver;
2929
IControlMessageObserver? controlMessageObserver;
30-
IIdentityObserver? identityObserver;
30+
ISubscribersObserver? identityObserver;
3131

3232
public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver)
3333
{
@@ -126,9 +126,9 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver)
126126
return this;
127127
}
128128

129-
public HalibutRuntimeBuilder WithIdentityObserver(IIdentityObserver identityObserver)
129+
public HalibutRuntimeBuilder WithIdentityObserver(ISubscribersObserver subscribersObserver)
130130
{
131-
this.identityObserver = identityObserver;
131+
this.identityObserver = subscribersObserver;
132132
return this;
133133
}
134134

@@ -164,7 +164,7 @@ public HalibutRuntime Build()
164164
var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance;
165165
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
166166
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
167-
var identityObserver = this.identityObserver ?? NoIdentityObserver.Instance;
167+
var identityObserver = this.identityObserver ?? NoSubscribersObserver.Instance;
168168

169169
var halibutRuntime = new HalibutRuntime(
170170
serviceFactory,

source/Halibut/Transport/Observability/IIdentityObserver.cs renamed to source/Halibut/Transport/Observability/ISubscribersObserver.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
using System;
1516
using Halibut.Transport.Protocol;
1617

1718
namespace Halibut.Transport.Observability
1819
{
19-
public interface IIdentityObserver
20+
public interface ISubscribersObserver
2021
{
21-
void IdentityEstablished(RemoteIdentity identity);
22+
void SubscriberJoined(Uri subscriptionId);
23+
void SubscriberLeft(Uri subscriptionId);
2224
}
2325
}

source/Halibut/Transport/Observability/NoIdentityObserver.cs renamed to source/Halibut/Transport/Observability/NoSubscribersObserver.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
namespace Halibut.Transport.Observability
1919
{
20-
public class NoIdentityObserver : IIdentityObserver
20+
public class NoSubscribersObserver : ISubscribersObserver
2121
{
22-
static NoIdentityObserver? singleInstance;
23-
public static NoIdentityObserver Instance => singleInstance ??= new NoIdentityObserver();
22+
static NoSubscribersObserver? singleInstance;
23+
public static NoSubscribersObserver Instance => singleInstance ??= new NoSubscribersObserver();
24+
public void SubscriberJoined(Uri subscriptionId)
25+
{
26+
}
2427

25-
public void IdentityEstablished(RemoteIdentity identity)
28+
public void SubscriberLeft(Uri subscriptionId)
2629
{
2730
}
2831
}

source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ public class MessageExchangeProtocol
2121
readonly IMessageExchangeStream stream;
2222
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
2323
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
24-
readonly IIdentityObserver identityObserver;
24+
readonly ISubscribersObserver subscribersObserver;
2525
readonly ILog log;
2626
bool identified;
2727
volatile bool acceptClientRequests = true;
2828

2929
public MessageExchangeProtocol(IMessageExchangeStream stream,
3030
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
3131
IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter,
32-
IIdentityObserver identityObserver,
32+
ISubscribersObserver subscribersObserver,
3333
ILog log)
3434
{
3535
this.stream = stream;
3636
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
3737
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
38-
this.identityObserver = identityObserver;
38+
this.subscribersObserver = subscribersObserver;
3939
this.log = log;
4040
}
4141

@@ -112,7 +112,6 @@ async Task ReceiveAndProcessRequestAsSubscriberAsync(IMessageExchangeStream stre
112112
public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessage>> incomingRequestProcessor, Func<RemoteIdentity, IPendingRequestQueue> pendingRequests, CancellationToken cancellationToken)
113113
{
114114
var identity = await GetRemoteIdentityAsync(cancellationToken);
115-
identityObserver.IdentityEstablished(identity);
116115

117116
//We might need to limit the connection, so by default, we create an unlimited connection lease
118117
var limitedConnectionLease = activeTcpConnectionsLimiter.CreateUnlimitedLease();
@@ -133,9 +132,17 @@ public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessag
133132
await ProcessClientRequestsAsync(incomingRequestProcessor, cancellationToken);
134133
break;
135134
case RemoteIdentityType.Subscriber:
136-
var pendingRequestQueue = pendingRequests(identity);
137-
await ProcessSubscriberAsync(pendingRequestQueue, cancellationToken);
138-
break;
135+
try
136+
{
137+
subscribersObserver.SubscriberJoined(identity.SubscriptionId);
138+
var pendingRequestQueue = pendingRequests(identity);
139+
await ProcessSubscriberAsync(pendingRequestQueue, cancellationToken);
140+
break;
141+
}
142+
finally
143+
{
144+
subscribersObserver.SubscriberLeft(identity.SubscriptionId);
145+
}
139146
default:
140147
log.Write(EventType.ErrorInIdentify, $"Remote with identify {identity.SubscriptionId} identified itself with an unknown identity type {identity.IdentityType}");
141148
throw new ProtocolException("Unexpected remote identity: " + identity.IdentityType);

source/Halibut/Transport/SecureListener.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class SecureListener : IAsyncDisposable
4848
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
4949
readonly IStreamFactory streamFactory;
5050
readonly IConnectionsObserver connectionsObserver;
51-
readonly IIdentityObserver identityObserver;
51+
readonly ISubscribersObserver subscribersObserver;
5252
ILog log;
5353
TcpListener listener;
5454
Thread? backgroundThread;
@@ -69,7 +69,7 @@ public SecureListener(
6969
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
7070
IStreamFactory streamFactory,
7171
IConnectionsObserver connectionsObserver,
72-
IIdentityObserver identityObserver)
72+
ISubscribersObserver subscribersObserver)
7373
{
7474
this.endPoint = endPoint;
7575
this.serverCertificate = serverCertificate;
@@ -83,7 +83,7 @@ public SecureListener(
8383
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
8484
this.streamFactory = streamFactory;
8585
this.connectionsObserver = connectionsObserver;
86-
this.identityObserver = identityObserver;
86+
this.subscribersObserver = subscribersObserver;
8787
this.cts = new CancellationTokenSource();
8888
this.cancellationToken = cts.Token;
8989

0 commit comments

Comments
 (0)