Skip to content

Commit 7f99143

Browse files
authored
Merge pull request #104 from Project-MONAI/vchang/connection-resiliency
Expose event delegate for clients to handle connection issues
2 parents 7efbebf + cc13a7d commit 7f99143

13 files changed

+3002
-2988
lines changed

doc/dependency_decisions.yml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
- :who: mocsharp
2626
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.0/LICENSE)
2727
:versions:
28-
- 17.3.2
28+
- 17.4.0
2929
:when: 2022-08-16 21:39:37.382080790 Z
3030
- - :approve
3131
- Microsoft.Extensions.Diagnostics.HealthChecks
@@ -221,7 +221,7 @@
221221
- :who: mocsharp
222222
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
223223
:versions:
224-
- 17.3.2
224+
- 17.4.0
225225
:when: 2022-08-16 21:39:48.253593534 Z
226226
- - :approve
227227
- Microsoft.NETCore.Platforms
@@ -242,14 +242,14 @@
242242
- :who: mocsharp
243243
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
244244
:versions:
245-
- 17.3.2
245+
- 17.4.0
246246
:when: 2022-08-16 21:39:49.547958989 Z
247247
- - :approve
248248
- Microsoft.TestPlatform.TestHost
249249
- :who: mocsharp
250250
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
251251
:versions:
252-
- 17.3.2
252+
- 17.4.0
253253
:when: 2022-08-16 21:39:49.963749572 Z
254254
- - :approve
255255
- Microsoft.Win32.Primitives
@@ -944,3 +944,10 @@
944944
:versions:
945945
- 2.4.5
946946
:when: 2022-08-16 21:40:32.294717110 Z
947+
- - :approve
948+
- Polly
949+
- :who: mocsharp
950+
:why: MIT ( https://licenses.nuget.org/MIT)
951+
:versions:
952+
- 7.2.3
953+
:when: 2022-11-09 18:57:32.294717110 Z
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2021-2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using RabbitMQ.Client;
18+
19+
namespace Monai.Deploy.Messaging.API
20+
{
21+
public delegate void ConnectionErrorHandler(object? sender, ConnectionErrorArgs args);
22+
23+
public class ConnectionErrorArgs
24+
{
25+
public ConnectionErrorArgs(ShutdownEventArgs eventArgs) => ShutdownEventArguments = eventArgs ?? throw new ArgumentNullException(nameof(eventArgs));
26+
27+
public ShutdownEventArgs ShutdownEventArguments { get; }
28+
}
29+
}

src/Messaging/API/IMessageBrokerSubscriberService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ namespace Monai.Deploy.Messaging.API
2121
{
2222
public interface IMessageBrokerSubscriberService : IDisposable
2323
{
24+
/// <summary>
25+
/// Gets or sets the event delegate for client to handle connection errors.
26+
/// </summary>
27+
event ConnectionErrorHandler? OnConnectionError;
28+
2429
/// <summary>
2530
/// Gets or sets the name of the storage service.
2631
/// </summary>

src/Messaging/Tests/IServiceCollectionExtensionsTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService
203203
{
204204
public string Name => throw new NotImplementedException();
205205

206+
public event ConnectionErrorHandler? OnConnectionError;
207+
206208
public void Acknowledge(MessageBase message) => throw new NotImplementedException();
207209

208210
public void Dispose() => throw new NotImplementedException();

src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
</PropertyGroup>
3333

3434
<ItemGroup>
35-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
35+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
3636
<PackageReference Include="Moq" Version="4.18.2" />
3737
<PackageReference Include="System.IO.Abstractions.TestingHelpers" Version="17.2.3" />
3838
<PackageReference Include="xunit" Version="2.4.2" />

src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,5 @@ public interface IRabbitMQConnectionFactory
3333
/// <param name="portNumber">Port Number</param>
3434
/// <returns>Instance of <see cref="IModel"/>.</returns>
3535
IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);
36-
37-
/// <summary>
38-
/// Creates a new channel for RabbitMQ client.
39-
/// The connection factory maintains a single connection to the specified
40-
/// <c>hostName</c>, <c>username</c>, <c>password</c>, and <c>virtualHost</c> combination.
41-
/// </summary>
42-
/// <param name="virtualHost">Virtual host</param>
43-
/// <returns>Instance of <see cref="IModel"/>.</returns>
44-
IModel CreateChannel(CreateChannelArguments args);
4536
}
4637
}

src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs

Lines changed: 29 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
using System;
1818
using System.Collections.Concurrent;
19+
using System.Collections.Generic;
1920
using System.Globalization;
2021
using System.Linq;
2122
using System.Net.Security;
@@ -30,21 +31,18 @@ namespace Monai.Deploy.Messaging.RabbitMQ
3031
{
3132
public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable
3233
{
33-
private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess = new();
34-
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections = new();
35-
private readonly ConcurrentDictionary<string, Lazy<IModel>> _models = new();
36-
34+
private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess;
35+
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections;
3736
private readonly ILogger<RabbitMQConnectionFactory> _logger;
3837
private bool _disposedValue;
3938

4039
public RabbitMQConnectionFactory(ILogger<RabbitMQConnectionFactory> logger)
4140
{
4241
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
42+
_connectionFactoriess = new ConcurrentDictionary<string, Lazy<ConnectionFactory>>();
43+
_connections = new ConcurrentDictionary<string, Lazy<IConnection>>();
4344
}
4445

45-
public IModel CreateChannel(CreateChannelArguments args) =>
46-
CreateChannel(args.HostName, args.Username, args.Password, args.VirtualHost,
47-
args.UseSSL, args.PortNumber);
4846

4947
public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
5048
{
@@ -55,11 +53,6 @@ public IModel CreateChannel(string hostName, string username, string password, s
5553

5654
var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}";
5755

58-
if (ConnectionIsOpen(key, out var value))
59-
{
60-
return value.Value;
61-
}
62-
6356
var connection = _connections.AddOrUpdate(key,
6457
x => CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber),
6558
(updateKey, updateConnection) =>
@@ -68,7 +61,7 @@ public IModel CreateChannel(string hostName, string username, string password, s
6861
// - RMQ service returns before calling the next line, then IsOpen returns false
6962
// - a call is made before RMQ returns, then a new connection
7063
// is made with error with IsValueFaulted = true && IsValueCreated = false
71-
if (updateConnection.IsValueCreated)
64+
if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen)
7265
{
7366
return updateConnection;
7467
}
@@ -78,42 +71,34 @@ public IModel CreateChannel(string hostName, string username, string password, s
7871
}
7972
});
8073

81-
var argsObj = new CreateChannelArguments(hostName, password, username, virtualHost, useSSL, portNumber);
82-
connection.Value.ConnectionShutdown += (connection, args) => OnShutdown(args, key, argsObj);
83-
connection.Value.CallbackException += (connection, args) => OnException(args, key, argsObj);
74+
connection.Value.ConnectionShutdown += (sender, args) => ConnectionShutdown(args, connection.Value, key);
75+
connection.Value.CallbackException += (sender, args) => OnException(args, connection.Value, key);
8476

85-
var model = _models.AddOrUpdate(key,
86-
x =>
87-
{
88-
var model = CreateModelAndAttachEvents(key, connection, argsObj);
89-
return new Lazy<IModel>(model);
90-
},
91-
(updateKey, updateModel) =>
92-
{
93-
// If connection to RMQ is lost and:
94-
// - RMQ service returns before calling the next line, then IsOpen returns false
95-
// - a call is made before RMQ returns, then a new connection
96-
// is made with error with IsValueFaulted = true && IsValueCreated = false
97-
if (updateModel.IsValueCreated)
98-
{
99-
return updateModel;
100-
}
101-
else
102-
{
103-
var model = CreateModelAndAttachEvents(key, connection, argsObj);
104-
return new Lazy<IModel>(model);
105-
}
106-
});
77+
var model = connection.Value.CreateModel();
78+
model.CallbackException += (sender, args) => OnException(args, connection.Value, key);
79+
model.ModelShutdown += (sender, args) => ConnectionShutdown(args, connection.Value, key);
10780

108-
return model.Value;
81+
return model;
10982
}
11083

111-
private IModel CreateModelAndAttachEvents(string key, Lazy<IConnection> connection, CreateChannelArguments argsObj)
84+
private void ConnectionShutdown(ShutdownEventArgs args, IConnection value, string key)
11285
{
113-
var model = connection.Value.CreateModel();
114-
model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj);
115-
model.CallbackException += (connection, args) => OnException(args, key, argsObj);
116-
return model;
86+
_logger.ConnectionShutdown(args.ToString());
87+
88+
if (_connections.ContainsKey(key) && !value.IsOpen)
89+
{
90+
_connections.Remove(key, out _);
91+
}
92+
}
93+
94+
private void OnException(CallbackExceptionEventArgs args, IConnection value, string key)
95+
{
96+
_logger.ConnectionException(args.Exception);
97+
98+
if (_connections.ContainsKey(key) && !value.IsOpen)
99+
{
100+
_connections.Remove(key, out _);
101+
}
117102
}
118103

119104
private Lazy<IConnection> CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
@@ -157,97 +142,6 @@ private static object HashPassword(string password)
157142
return hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture));
158143
}
159144

160-
private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments)
161-
{
162-
_logger.ConnectionShutdown(args.ReplyText);
163-
164-
if (ConnectionIsOpen(key, out var _))
165-
{
166-
return;
167-
}
168-
169-
_logger.ConnectionReconnect();
170-
_connections.TryRemove(key, out var value);
171-
172-
if (value is not null)
173-
{
174-
value?.Value.Dispose();
175-
}
176-
177-
CreateChannel(createChannelArguments);
178-
}
179-
180-
private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments)
181-
{
182-
_logger.ConnectionException(args.Exception);
183-
184-
if (ConnectionIsOpen(key, out var _))
185-
{
186-
return;
187-
}
188-
189-
_logger.ConnectionReconnect();
190-
CreateChannel(createChannelArguments);
191-
}
192-
193-
/// <summary>
194-
/// Checks if we have a connection and it is open on both channel/model and connection.
195-
/// </summary>
196-
/// <param name="key">Lookup Key</param>
197-
/// <param name="model">IModel</param>
198-
/// <returns>If this function returns true output param model will have the value.</returns>
199-
private bool ConnectionIsOpen(string key, out Lazy<IModel> outModel)
200-
{
201-
outModel = new Lazy<IModel>();
202-
203-
_models.TryGetValue(key, out var model);
204-
_connections.TryGetValue(key, out var connection);
205-
206-
if (model is null || connection is null)
207-
{
208-
return false;
209-
}
210-
211-
outModel = model;
212-
if (model.IsValueCreated == false || connection.IsValueCreated == false)
213-
{
214-
return false;
215-
}
216-
217-
if (connection.Value.IsOpen == false)
218-
{
219-
RemoveConnection(key);
220-
RemoveModel(key);
221-
return false;
222-
}
223-
224-
if (model.Value.IsOpen == false)
225-
{
226-
RemoveModel(key);
227-
return false;
228-
}
229-
230-
return true;
231-
}
232-
233-
private void RemoveConnection(string key)
234-
{
235-
_connections.TryRemove(key, out var conn);
236-
if (conn is not null)
237-
{
238-
conn.Value.Dispose();
239-
}
240-
}
241-
242-
private void RemoveModel(string key)
243-
{
244-
_models.TryRemove(key, out var mod);
245-
if (mod is not null)
246-
{
247-
mod.Value.Dispose();
248-
}
249-
}
250-
251145
protected virtual void Dispose(bool disposing)
252146
{
253147
if (!_disposedValue)

0 commit comments

Comments
 (0)