Skip to content

Commit fb53720

Browse files
authored
Add features not yet migrated to GitHub (#28)
* Ability to use a Streaming client with no SSL or no SASL * Make topics case sensitive * Flush parameter definition buffer exception on close - Fix the timer issue - Introduce lock in the public flush to mirror how EventWriter works
1 parent 4fb0f77 commit fb53720

File tree

7 files changed

+119
-16
lines changed

7 files changed

+119
-16
lines changed

src/CsharpClient/Quix.Sdk.Streaming/Configuration/SecurityOptions.cs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using Quix.Sdk.Streaming.QuixApi.Portal;
2+
using System;
23

34
namespace Quix.Sdk.Streaming.Configuration
45
{
@@ -10,7 +11,7 @@ public class SecurityOptions
1011
/// <summary>
1112
/// The Sasl mechanism to use
1213
/// </summary>
13-
public SaslMechanism SaslMechanism { get; set; }
14+
public SaslMechanism? SaslMechanism { get; set; }
1415

1516
/// <summary>
1617
/// SASL username.
@@ -27,6 +28,16 @@ public class SecurityOptions
2728
/// </summary>
2829
public string SslCertificates { get; set; }
2930

31+
/// <summary>
32+
/// Use SSL
33+
/// </summary>
34+
public bool UseSsl { get; set; }
35+
36+
/// <summary>
37+
/// Use authentication
38+
/// </summary>
39+
public bool UseSasl { get; set; }
40+
3041
/// <summary>
3142
/// For deserialization when binding to Configurations like Appsettings
3243
/// </summary>
@@ -47,6 +58,12 @@ public SecurityOptions(string sslCertificates, string username, string password,
4758
this.Username = username;
4859
this.Password = password;
4960
this.SaslMechanism = saslMechanism;
61+
62+
// Assume that if we get sslCertificates it's because we will use ssl
63+
this.UseSsl = !string.IsNullOrEmpty(this.SslCertificates);
64+
65+
// Assume that if we have username, we will use Sasl
66+
this.UseSasl = !string.IsNullOrEmpty(this.Username);
5067
}
5168
}
5269
}

src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamEventsWriter.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Quix.Sdk.Process.Models.Utility;
77
using Quix.Sdk.Process.Managers;
88
using System.Linq;
9+
using Quix.Sdk.Streaming.Exceptions;
910

1011
namespace Quix.Sdk.Streaming.Models.StreamWriter
1112
{
@@ -306,7 +307,18 @@ private void ResetFlushDefinitionsTimer()
306307
private void OnFlushDefinitionsTimerEvent(object state)
307308
{
308309
if (!timerEnabled) return;
309-
this.FlushDefinitions();
310+
try
311+
{
312+
this.FlushDefinitions();
313+
}
314+
catch (StreamClosedException exception) when (this.isDisposed)
315+
{
316+
// Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
317+
}
318+
catch (Exception ex)
319+
{
320+
this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush event definition buffer.");
321+
}
310322
}
311323

312324

src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamParametersWriter.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Quix.Sdk.Process.Managers;
55
using Microsoft.Extensions.Logging;
66
using Quix.Sdk.Process.Models.Utility;
7+
using Quix.Sdk.Streaming.Exceptions;
78

89
namespace Quix.Sdk.Streaming.Models.StreamWriter
910
{
@@ -22,6 +23,7 @@ public class StreamParametersWriter : IDisposable
2223
private bool timerEnabled = false; // Here because every now and then reseting its due time to never doesn't work
2324
private bool isDisposed;
2425
private const int TimerInterval = 20;
26+
private readonly object flushLock = new object();
2527

2628
/// <summary>
2729
/// Initializes a new instance of <see cref="StreamParametersWriter"/>
@@ -218,8 +220,19 @@ private void Flush(bool force)
218220
{
219221
throw new ObjectDisposedException(nameof(StreamParametersWriter));
220222
}
221-
this.FlushDefinitions();
222-
this.Buffer.Flush();
223+
224+
try
225+
{
226+
lock (flushLock)
227+
{
228+
this.FlushDefinitions();
229+
this.Buffer.Flush();
230+
}
231+
}
232+
catch (Exception ex)
233+
{
234+
this.logger.LogError(ex, "Exception occurred while trying to flush parameter data buffer.");
235+
}
223236
}
224237

225238
private void ResetFlushDefinitionsTimer()
@@ -231,11 +244,15 @@ private void ResetFlushDefinitionsTimer()
231244

232245
private void OnFlushDefinitionsTimerEvent(object state)
233246
{
234-
if (!timerEnabled) return;
247+
if (!this.timerEnabled) return;
235248
try
236249
{
237250
this.FlushDefinitions();
238251
}
252+
catch (StreamClosedException exception) when (this.isDisposed)
253+
{
254+
// Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
255+
}
239256
catch (Exception ex)
240257
{
241258
this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush parameter definition buffer.");

src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Topic.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public string Id
1818
set
1919
{
2020
if (value == null) throw new ArgumentNullException(nameof(Id));
21-
this.id = value.ToLowerInvariant();
21+
this.id = value;
2222
}
2323
}
2424

src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ internal enum BrokerSecurityMode
9393
/// SSL secured ACL role system.
9494
/// </summary>
9595
SaslSsl,
96+
97+
/// <summary>
98+
/// Plain Text mode
99+
/// </summary>
100+
PlainText,
101+
102+
/// <summary>
103+
/// ACL role system
104+
/// </summary>
105+
Sasl
96106
}
97107

98108
/// <summary>

src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
using Quix.Sdk.Streaming.Raw;
2929
using Quix.Sdk.Streaming.Utils;
3030
using Exception = System.Exception;
31+
using Quix.Sdk.Process.Configuration;
3132

3233
namespace Quix.Sdk.Streaming
3334
{
@@ -208,7 +209,7 @@ public IOutputTopic OpenOutputTopic(string topicIdOrName)
208209
private async Task<(StreamingClient client, string topicId)> ValidateTopicAndCreateClient(string topicIdOrName)
209210
{
210211
CheckToken(token);
211-
topicIdOrName = topicIdOrName.ToLowerInvariant().Trim();;
212+
topicIdOrName = topicIdOrName.Trim();
212213
var sw = Stopwatch.StartNew();
213214
var ws = await GetWorkspaceFromConfiguration(topicIdOrName);
214215
var client = await this.CreateStreamingClientForWorkspace(ws);
@@ -232,7 +233,7 @@ private async Task<string> ValidateTopicExistence(Workspace workspace, string to
232233
{
233234
this.logger.LogTrace("Checking if topic {0} is already created.", topicIdOrName);
234235
var topics = await this.GetTopics(workspace, true);
235-
var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)); // id prio
236+
var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCulture)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCulture)); // id prio
236237
var topicName = existingTopic?.Name;
237238
if (topicName == null)
238239
{
@@ -388,13 +389,36 @@ private async Task<StreamingClient> CreateStreamingClientForWorkspace(Workspace
388389
logger.LogWarning("Workspace {0} is in state {1} instead of {2}.", ws.WorkspaceId, ws.Status, WorkspaceStatus.Ready);
389390
}
390391

391-
var certPath = await GetWorkspaceCertificatePath(ws);
392-
if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
392+
var securityOptions = new SecurityOptions();
393+
394+
if (ws.Broker.SecurityMode == BrokerSecurityMode.Ssl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
395+
{
396+
securityOptions.UseSsl = true;
397+
securityOptions.SslCertificates = await GetWorkspaceCertificatePath(ws);
398+
}
399+
else
400+
{
401+
securityOptions.UseSsl = false;
402+
}
403+
404+
if (ws.Broker.SecurityMode == BrokerSecurityMode.Sasl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
405+
{
406+
if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
407+
{
408+
throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
409+
}
410+
411+
securityOptions.UseSasl = true;
412+
securityOptions.SaslMechanism = parsed;
413+
securityOptions.Username = ws.Broker.Username;
414+
securityOptions.Password = ws.Broker.Password;
415+
}
416+
else
393417
{
394-
throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
418+
securityOptions.UseSasl = false;
395419
}
396420

397-
var client = new StreamingClient(ws.Broker.Address, new SecurityOptions(certPath, ws.Broker.Username, ws.Broker.Password, parsed), brokerProperties, debug);
421+
var client = new StreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
398422
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
399423
}
400424

src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Quix.Sdk.Streaming.Raw;
1414
using Quix.Sdk.Streaming.Utils;
1515
using SaslMechanism = Confluent.Kafka.SaslMechanism;
16+
using Quix.Sdk.Streaming.QuixApi.Portal;
1617

1718
namespace Quix.Sdk.Streaming
1819
{
@@ -41,12 +42,34 @@ public StreamingClient(string brokerAddress, SecurityOptions securityOptions = n
4142
}
4243
else
4344
{
44-
if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out SaslMechanism parsed))
45+
var securityOptionsBuilder = new SecurityOptionsBuilder();
46+
47+
if (securityOptions.UseSsl)
48+
{
49+
securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
50+
}
51+
else
52+
{
53+
securityOptionsBuilder.SetNoEncryption();
54+
}
55+
56+
if (securityOptions.UseSasl)
4557
{
46-
throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
58+
if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out Confluent.Kafka.SaslMechanism parsed))
59+
{
60+
throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
61+
}
62+
63+
securityOptionsBuilder.SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed);
64+
}
65+
else
66+
{
67+
securityOptionsBuilder.SetNoAuthentication();
4768
}
48-
this.brokerProperties = new SecurityOptionsBuilder().SetSslEncryption(securityOptions.SslCertificates).SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed).Build();
69+
70+
this.brokerProperties = securityOptionsBuilder.Build();
4971
}
72+
5073
if (properties != null)
5174
{
5275
foreach (var property in properties)

0 commit comments

Comments
 (0)