diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs old mode 100755 new mode 100644 index 2ddb20e41..7d4aa787a --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -2222,9 +2222,10 @@ private bool PrepareToPushMessageToBridge(Message message, ResultProcessor } } + var profilingSession = _profilingSessionProvider?.Invoke(); + if (server != null) { - var profilingSession = _profilingSessionProvider?.Invoke(); if (profilingSession != null) { message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server)); @@ -2242,9 +2243,17 @@ private bool PrepareToPushMessageToBridge(Message message, ResultProcessor Trace("Queueing on server: " + message); return true; } - Trace("No server or server unavailable - aborting: " + message); - return false; + else + { + if (profilingSession != null) + { + message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server)); + } + Trace("No server or server unavailable - aborting: " + message); + return false; + } } + private ValueTask TryPushMessageToBridgeAsync(Message message, ResultProcessor processor, IResultBox resultBox, ref ServerEndPoint server) => PrepareToPushMessageToBridge(message, processor, resultBox, ref server) ? server.TryWriteAsync(message) : new ValueTask(WriteResult.NoConnectionAvailable); @@ -2750,16 +2759,22 @@ internal Task ExecuteAsyncImpl(Message message, ResultProcessor process var write = TryPushMessageToBridgeAsync(message, processor, source, ref server); if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited(this, write, tcs, message, server); + var result = write.Result; if (tcs == null) { + if (result != WriteResult.Success && message.HasPerformance) + { + var ex = GetException(result, message, server); + message.SetExceptionAndComplete(ex, null); + } return CompletedTask.Default(null); // F+F explicitly does not get async-state } else { - var result = write.Result; if (result != WriteResult.Success) { var ex = GetException(result, message, server); + message.SetExceptionAndComplete(ex, null); ThrowFailed(tcs, ex); } return tcs.Task; @@ -2772,6 +2787,7 @@ private static async Task ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @ if (result != WriteResult.Success) { var ex = @this.GetException(result, message, server); + message.SetExceptionAndComplete(ex, null); ThrowFailed(tcs, ex); } return tcs == null ? default(T) : await tcs.Task.ForAwait(); @@ -2818,42 +2834,54 @@ internal T ExecuteSyncImpl(Message message, ResultProcessor processor, Ser if (message.IsFireAndForget) { #pragma warning disable CS0618 - TryPushMessageToBridgeSync(message, processor, null, ref server); + var result = TryPushMessageToBridgeSync(message, processor, null, ref server); + if (result != WriteResult.Success && message.HasPerformance) + { + message.SetExceptionAndComplete(GetException(result, message, server), null); + } #pragma warning restore CS0618 Interlocked.Increment(ref fireAndForgets); return default(T); } else { - var source = SimpleResultBox.Get(); - - lock (source) + try { + var source = SimpleResultBox.Get(); + + lock (source) + { #pragma warning disable CS0618 - var result = TryPushMessageToBridgeSync(message, processor, source, ref server); + var result = TryPushMessageToBridgeSync(message, processor, source, ref server); #pragma warning restore CS0618 - if (result != WriteResult.Success) - { - throw GetException(result, message, server); - } + if (result != WriteResult.Success) + { + throw GetException(result, message, server); + } - if (Monitor.Wait(source, TimeoutMilliseconds)) - { - Trace("Timeley response to " + message); - } - else - { - Trace("Timeout performing " + message); - Interlocked.Increment(ref syncTimeouts); - throw ExceptionFactory.Timeout(this, null, message, server); - // very important not to return "source" to the pool here + if (Monitor.Wait(source, TimeoutMilliseconds)) + { + Trace("Timeley response to " + message); + } + else + { + Trace("Timeout performing " + message); + Interlocked.Increment(ref syncTimeouts); + throw ExceptionFactory.Timeout(this, null, message, server); + // very important not to return "source" to the pool here + } } + // snapshot these so that we can recycle the box + var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it... + if (ex != null) throw ex; + Trace(message + " received " + val); + return val; + } + catch (Exception ex) + { + message.SetExceptionAndComplete(ex, null); + throw; } - // snapshot these so that we can recycle the box - var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it... - if (ex != null) throw ex; - Trace(message + " received " + val); - return val; } } diff --git a/src/StackExchange.Redis/Interfaces/IKeysMessage.cs b/src/StackExchange.Redis/Interfaces/IKeysMessage.cs new file mode 100644 index 000000000..e7ae687d4 --- /dev/null +++ b/src/StackExchange.Redis/Interfaces/IKeysMessage.cs @@ -0,0 +1,7 @@ +namespace StackExchange.Redis +{ + internal interface IKeysMessage + { + RedisKey[] Keys { get; } + } +} diff --git a/src/StackExchange.Redis/Interfaces/IValuesMessage.cs b/src/StackExchange.Redis/Interfaces/IValuesMessage.cs new file mode 100644 index 000000000..bb5ab7418 --- /dev/null +++ b/src/StackExchange.Redis/Interfaces/IValuesMessage.cs @@ -0,0 +1,7 @@ +namespace StackExchange.Redis +{ + internal interface IValuesMessage + { + RedisValue[] Values { get; } + } +} diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 574557bfb..5b0ca3491 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -162,7 +162,7 @@ internal void PrepareToResend(ServerEndPoint resendTo, bool isMoved) var oldPerformance = performance; - oldPerformance.SetCompleted(); + oldPerformance.SetCompleted(null); performance = null; CreatedDateTime = DateTime.UtcNow; @@ -222,6 +222,7 @@ internal void SetScriptUnavailable() public bool IsFireAndForget => (Flags & CommandFlags.FireAndForget) != 0; public bool IsInternalCall => (Flags & InternalCallFlag) != 0; + public bool HasPerformance => performance != null; public IResultBox ResultBox => resultBox; @@ -467,7 +468,7 @@ public void Complete() var currBox = Interlocked.Exchange(ref resultBox, null); // set the completion/performance data - performance?.SetCompleted(); + performance?.SetCompleted(currBox); currBox?.ActivateContinuations(); } @@ -794,10 +795,12 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i public override string CommandAndKey => Command + " " + Channel; } - internal abstract class CommandKeyBase : Message + internal abstract class CommandKeyBase : Message, IKeysMessage { protected readonly RedisKey Key; + public virtual RedisKey[] Keys => new RedisKey[] { Key }; + protected CommandKeyBase(int db, CommandFlags flags, RedisCommand command, in RedisKey key) : base(db, flags, command) { key.AssertNotNull(); @@ -824,9 +827,12 @@ protected override void WriteImpl(PhysicalConnection physical) public override int ArgCount => 1; } - private sealed class CommandChannelValueMessage : CommandChannelBase + private sealed class CommandChannelValueMessage : CommandChannelBase, IValuesMessage { private readonly RedisValue value; + + public RedisValue[] Values => new RedisValue[] { value }; + public CommandChannelValueMessage(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel, in RedisValue value) : base(db, flags, command, channel) { value.AssertNotNull(); @@ -845,6 +851,9 @@ protected override void WriteImpl(PhysicalConnection physical) private sealed class CommandKeyKeyKeyMessage : CommandKeyBase { private readonly RedisKey key1, key2; + + public override RedisKey[] Keys => new RedisKey[] { Key, key1, key2 }; + public CommandKeyKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key0, in RedisKey key1, in RedisKey key2) : base(db, flags, command, key0) { key1.AssertNotNull(); @@ -873,6 +882,9 @@ protected override void WriteImpl(PhysicalConnection physical) private class CommandKeyKeyMessage : CommandKeyBase { protected readonly RedisKey key1; + + public override RedisKey[] Keys => new RedisKey[] { Key, key1 }; + public CommandKeyKeyMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key0, in RedisKey key1) : base(db, flags, command, key0) { key1.AssertNotNull(); @@ -896,36 +908,37 @@ protected override void WriteImpl(PhysicalConnection physical) private sealed class CommandKeyKeysMessage : CommandKeyBase { - private readonly RedisKey[] keys; + public override RedisKey[] Keys { get; } + public CommandKeyKeysMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys) : base(db, flags, command, key) { for (int i = 0; i < keys.Length; i++) { keys[i].AssertNotNull(); } - this.keys = keys; + Keys = keys; } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { var slot = serverSelectionStrategy.HashSlot(Key); - for (int i = 0; i < keys.Length; i++) + for (int i = 0; i < Keys.Length; i++) { - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); + slot = serverSelectionStrategy.CombineSlot(slot, Keys[i]); } return slot; } protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(command, keys.Length + 1); + physical.WriteHeader(command, Keys.Length + 1); physical.Write(Key); - for (int i = 0; i < keys.Length; i++) + for (int i = 0; i < Keys.Length; i++) { - physical.Write(keys[i]); + physical.Write(Keys[i]); } } - public override int ArgCount => keys.Length + 1; + public override int ArgCount => Keys.Length + 1; } private sealed class CommandKeyKeyValueMessage : CommandKeyKeyMessage @@ -960,60 +973,62 @@ protected override void WriteImpl(PhysicalConnection physical) public override int ArgCount => 1; } - private sealed class CommandValuesMessage : Message + private sealed class CommandValuesMessage : Message, IValuesMessage { - private readonly RedisValue[] values; + public RedisValue[] Values { get; } + public CommandValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisValue[] values) : base(db, flags, command) { for (int i = 0; i < values.Length; i++) { values[i].AssertNotNull(); } - this.values = values; + Values = values; } protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(command, values.Length); - for (int i = 0; i < values.Length; i++) + physical.WriteHeader(command, Values.Length); + for (int i = 0; i < Values.Length; i++) { - physical.WriteBulkString(values[i]); + physical.WriteBulkString(Values[i]); } } - public override int ArgCount => values.Length; + public override int ArgCount => Values.Length; } - private sealed class CommandKeysMessage : Message + private sealed class CommandKeysMessage : Message, IKeysMessage { - private readonly RedisKey[] keys; + public RedisKey[] Keys { get; } + public CommandKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey[] keys) : base(db, flags, command) { for (int i = 0; i < keys.Length; i++) { keys[i].AssertNotNull(); } - this.keys = keys; + Keys = keys; } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { int slot = ServerSelectionStrategy.NoSlot; - for (int i = 0; i < keys.Length; i++) + for (int i = 0; i < Keys.Length; i++) { - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); + slot = serverSelectionStrategy.CombineSlot(slot, Keys[i]); } return slot; } protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(command, keys.Length); - for (int i = 0; i < keys.Length; i++) + physical.WriteHeader(command, Keys.Length); + for (int i = 0; i < Keys.Length; i++) { - physical.Write(keys[i]); + physical.Write(Keys[i]); } } - public override int ArgCount => keys.Length; + public override int ArgCount => Keys.Length; } private sealed class CommandKeyValueMessage : CommandKeyBase @@ -1034,17 +1049,21 @@ protected override void WriteImpl(PhysicalConnection physical) public override int ArgCount => 2; } - private sealed class CommandKeyValuesKeyMessage : CommandKeyBase + private sealed class CommandKeyValuesKeyMessage : CommandKeyBase, IKeysMessage, IValuesMessage { private readonly RedisKey key1; - private readonly RedisValue[] values; + + public override RedisKey[] Keys => new RedisKey[] { Key, key1 }; + + public RedisValue[] Values { get; } + public CommandKeyValuesKeyMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key0, RedisValue[] values, in RedisKey key1) : base(db, flags, command, key0) { for (int i = 0; i < values.Length; i++) { values[i].AssertNotNull(); } - this.values = values; + Values = values; key1.AssertNotNull(); this.key1 = key1; } @@ -1057,33 +1076,34 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(Command, values.Length + 2); + physical.WriteHeader(Command, Values.Length + 2); physical.Write(Key); - for (int i = 0; i < values.Length; i++) physical.WriteBulkString(values[i]); + for (int i = 0; i < Values.Length; i++) physical.WriteBulkString(Values[i]); physical.Write(key1); } - public override int ArgCount => values.Length + 2; + public override int ArgCount => Values.Length + 2; } - private sealed class CommandKeyValuesMessage : CommandKeyBase + private sealed class CommandKeyValuesMessage : CommandKeyBase, IValuesMessage { - private readonly RedisValue[] values; + public RedisValue[] Values { get; } + public CommandKeyValuesMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisValue[] values) : base(db, flags, command, key) { for (int i = 0; i < values.Length; i++) { values[i].AssertNotNull(); } - this.values = values; + Values = values; } protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(Command, values.Length + 1); + physical.WriteHeader(Command, Values.Length + 1); physical.Write(Key); - for (int i = 0; i < values.Length; i++) physical.WriteBulkString(values[i]); + for (int i = 0; i < Values.Length; i++) physical.WriteBulkString(Values[i]); } - public override int ArgCount => values.Length + 1; + public override int ArgCount => Values.Length + 1; } private sealed class CommandKeyValueValueMessage : CommandKeyBase @@ -1168,10 +1188,11 @@ protected override void WriteImpl(PhysicalConnection physical) public override int ArgCount => 0; } - private class CommandSlotValuesMessage : Message + private class CommandSlotValuesMessage : Message, IValuesMessage { private readonly int slot; - private readonly RedisValue[] values; + + public RedisValue[] Values { get; } public CommandSlotValuesMessage(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values) : base(db, flags, command) @@ -1181,7 +1202,7 @@ public CommandSlotValuesMessage(int db, int slot, CommandFlags flags, RedisComma { values[i].AssertNotNull(); } - this.values = values; + Values = values; } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) @@ -1191,13 +1212,13 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(command, values.Length); - for (int i = 0; i < values.Length; i++) + physical.WriteHeader(command, Values.Length); + for (int i = 0; i < Values.Length; i++) { - physical.WriteBulkString(values[i]); + physical.WriteBulkString(Values[i]); } } - public override int ArgCount => values.Length; + public override int ArgCount => Values.Length; } private sealed class CommandValueChannelMessage : CommandChannelBase diff --git a/src/StackExchange.Redis/Profiling/IProfiledCommand.cs b/src/StackExchange.Redis/Profiling/IProfiledCommand.cs index c89b039cc..3522d5125 100644 --- a/src/StackExchange.Redis/Profiling/IProfiledCommand.cs +++ b/src/StackExchange.Redis/Profiling/IProfiledCommand.cs @@ -27,11 +27,36 @@ public interface IProfiledCommand /// string Command { get; } + /// + /// The script to execute when EVAL command is issued. + /// + string Script { get; } + + /// + /// The keys supplied on the command, if any. + /// + RedisKey[] Keys { get; } + /// /// The CommandFlags the command was submitted with. /// CommandFlags Flags { get; } + /// + /// The values returned by the command, if any. + /// + RedisValue[] Values { get; } + + /// + /// Gets a value indicating whether or not the command faulted. + /// + bool IsFaulted { get; } + + /// + /// Gets the exception that was thrown, if any. + /// + Exception Exception { get; } + /// /// /// When this command was *created*, will be approximately @@ -85,7 +110,7 @@ public interface IProfiledCommand /// /// If RetransmissionOf is not null, this property will be set to either Ask or Moved to indicate /// what sort of response triggered the retransmission. - /// + /// /// This can be useful for determining the root cause of extra commands. /// RetransmissionReasonType? RetransmissionReason { get; } diff --git a/src/StackExchange.Redis/Profiling/ProfiledCommand.cs b/src/StackExchange.Redis/Profiling/ProfiledCommand.cs index 7d6a8fcfe..7daa83c1c 100644 --- a/src/StackExchange.Redis/Profiling/ProfiledCommand.cs +++ b/src/StackExchange.Redis/Profiling/ProfiledCommand.cs @@ -17,8 +17,18 @@ internal sealed class ProfiledCommand : IProfiledCommand public string Command => Message is RedisDatabase.ExecuteMessage em ? em.Command.ToString() : Message.Command.ToString(); + public string Script => Message is RedisDatabase.ScriptEvalMessage sem ? sem.Script : null; + + public RedisKey[] Keys => Message is IKeysMessage km ? km.Keys : null; + public CommandFlags Flags => Message.Flags; + public RedisValue[] Values => Message is IValuesMessage vm ? vm.Values : null; + + public bool IsFaulted => ResultBox?.IsFaulted == true; + + public Exception Exception => ResultBox?.Exception; + public DateTime CommandCreated => MessageCreatedDateTime; public TimeSpan CreationToEnqueued => GetElapsedTime(EnqueuedTimeStamp - MessageCreatedTimeStamp); @@ -46,6 +56,7 @@ private static TimeSpan GetElapsedTime(long timestampDelta) public ProfiledCommand NextElement { get; set; } private Message Message; + private IResultBox ResultBox; private readonly ServerEndPoint Server; private readonly ProfiledCommand OriginalProfiling; @@ -99,7 +110,7 @@ private static void SetTimestamp(ref long field) Interlocked.CompareExchange(ref field, now, 0); } - public void SetCompleted() + public void SetCompleted(IResultBox resultBox) { // this method can be called multiple times, depending on how the task completed (async vs not) // so we actually have to guard against it. @@ -112,6 +123,7 @@ public void SetCompleted() { // fake a response if we completed prematurely (timeout, broken connection, etc) Interlocked.CompareExchange(ref ResponseReceivedTimeStamp, now, 0); + ResultBox = resultBox; PushToWhenFinished?.Add(this); } } diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 223a23313..ac97c5ea9 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1933,7 +1933,7 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, RedisV key, groupName, position, - true, + true, flags); } @@ -2251,7 +2251,7 @@ public Task StreamReadGroupAsync(RedisKey key, RedisValue groupNa false, flags); } - + public Task StreamReadGroupAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue? position = null, int? count = null, bool noAck = false, CommandFlags flags = CommandFlags.None) { var actualPosition = position ?? StreamPosition.NewMessages; @@ -2815,7 +2815,7 @@ private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int? * [7] = id1 * [8] = id2 * [9] = id3 - * + * * */ var pairCount = streamPositions.Length; @@ -3760,14 +3760,17 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) public override int ArgCount => _args.Count; } - private sealed class ScriptEvalMessage : Message, IMultiMessage + internal sealed class ScriptEvalMessage : Message, IMultiMessage, IKeysMessage, IValuesMessage { - private readonly RedisKey[] keys; - private readonly string script; - private readonly RedisValue[] values; private byte[] asciiHash; private readonly byte[] hexHash; + public string Script { get; } + + public RedisKey[] Keys { get; } + + public RedisValue[] Values { get; } + public ScriptEvalMessage(int db, CommandFlags flags, string script, RedisKey[] keys, RedisValue[] values) : this(db, flags, ResultProcessor.ScriptLoadProcessor.IsSHA1(script) ? RedisCommand.EVALSHA : RedisCommand.EVAL, script, null, keys, values) { @@ -3784,40 +3787,40 @@ public ScriptEvalMessage(int db, CommandFlags flags, byte[] hash, RedisKey[] key private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, string script, byte[] hexHash, RedisKey[] keys, RedisValue[] values) : base(db, flags, command) { - this.script = script; + Script = script; this.hexHash = hexHash; if (keys == null) keys = Array.Empty(); if (values == null) values = Array.Empty(); for (int i = 0; i < keys.Length; i++) keys[i].AssertNotNull(); - this.keys = keys; + Keys = keys; for (int i = 0; i < values.Length; i++) values[i].AssertNotNull(); - this.values = values; + Values = values; } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { int slot = ServerSelectionStrategy.NoSlot; - for (int i = 0; i < keys.Length; i++) - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); + for (int i = 0; i < Keys.Length; i++) + slot = serverSelectionStrategy.CombineSlot(slot, Keys[i]); return slot; } public IEnumerable GetMessages(PhysicalConnection connection) { PhysicalBridge bridge; - if (script != null && (bridge = connection.BridgeCouldBeNull) != null + if (Script != null && (bridge = connection.BridgeCouldBeNull) != null && bridge.Multiplexer.CommandMap.IsAvailable(RedisCommand.SCRIPT) && (Flags & CommandFlags.NoScriptCache) == 0) { // a script was provided (rather than a hash); check it is known and supported - asciiHash = bridge.ServerEndPoint.GetScriptHash(script, command); + asciiHash = bridge.ServerEndPoint.GetScriptHash(Script, command); if (asciiHash == null) { - var msg = new ScriptLoadMessage(Flags, script); + var msg = new ScriptLoadMessage(Flags, Script); msg.SetInternalCall(); msg.SetSource(ResultProcessor.ScriptLoad, null); yield return msg; @@ -3830,26 +3833,26 @@ protected override void WriteImpl(PhysicalConnection physical) { if (hexHash != null) { - physical.WriteHeader(RedisCommand.EVALSHA, 2 + keys.Length + values.Length); + physical.WriteHeader(RedisCommand.EVALSHA, 2 + Keys.Length + Values.Length); physical.WriteSha1AsHex(hexHash); } else if (asciiHash != null) { - physical.WriteHeader(RedisCommand.EVALSHA, 2 + keys.Length + values.Length); + physical.WriteHeader(RedisCommand.EVALSHA, 2 + Keys.Length + Values.Length); physical.WriteBulkString((RedisValue)asciiHash); } else { - physical.WriteHeader(RedisCommand.EVAL, 2 + keys.Length + values.Length); - physical.WriteBulkString((RedisValue)script); + physical.WriteHeader(RedisCommand.EVAL, 2 + Keys.Length + Values.Length); + physical.WriteBulkString((RedisValue)Script); } - physical.WriteBulkString(keys.Length); - for (int i = 0; i < keys.Length; i++) - physical.Write(keys[i]); - for (int i = 0; i < values.Length; i++) - physical.WriteBulkString(values[i]); + physical.WriteBulkString(Keys.Length); + for (int i = 0; i < Keys.Length; i++) + physical.Write(Keys[i]); + for (int i = 0; i < Values.Length; i++) + physical.WriteBulkString(Values[i]); } - public override int ArgCount => 2 + keys.Length + values.Length; + public override int ArgCount => 2 + Keys.Length + Values.Length; } private sealed class SetScanResultProcessor : ScanResultProcessor @@ -3871,40 +3874,42 @@ protected override RedisValue[] Parse(in RawResult result, out int count) } } - private sealed class SortedSetCombineAndStoreCommandMessage : Message.CommandKeyBase // ZINTERSTORE and ZUNIONSTORE have a very unusual signature + private sealed class SortedSetCombineAndStoreCommandMessage : Message.CommandKeyBase, IKeysMessage, IValuesMessage // ZINTERSTORE and ZUNIONSTORE have a very unusual signature { - private readonly RedisKey[] keys; - private readonly RedisValue[] values; + public override RedisKey[] Keys { get; } + + public RedisValue[] Values { get; } + public SortedSetCombineAndStoreCommandMessage(int db, CommandFlags flags, RedisCommand command, RedisKey destination, RedisKey[] keys, RedisValue[] values) : base(db, flags, command, destination) { for (int i = 0; i < keys.Length; i++) keys[i].AssertNotNull(); - this.keys = keys; + Keys = keys; for (int i = 0; i < values.Length; i++) values[i].AssertNotNull(); - this.values = values; + Values = values; } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { int slot = base.GetHashSlot(serverSelectionStrategy); - for (int i = 0; i < keys.Length; i++) - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); + for (int i = 0; i < Keys.Length; i++) + slot = serverSelectionStrategy.CombineSlot(slot, Keys[i]); return slot; } protected override void WriteImpl(PhysicalConnection physical) { - physical.WriteHeader(Command, 2 + keys.Length + values.Length); + physical.WriteHeader(Command, 2 + Keys.Length + Values.Length); physical.Write(Key); - physical.WriteBulkString(keys.Length); - for (int i = 0; i < keys.Length; i++) - physical.Write(keys[i]); - for (int i = 0; i < values.Length; i++) - physical.WriteBulkString(values[i]); + physical.WriteBulkString(Keys.Length); + for (int i = 0; i < Keys.Length; i++) + physical.Write(Keys[i]); + for (int i = 0; i < Values.Length; i++) + physical.WriteBulkString(Values[i]); } - public override int ArgCount => 2 + keys.Length + values.Length; + public override int ArgCount => 2 + Keys.Length + Values.Length; } private sealed class SortedSetScanResultProcessor : ScanResultProcessor diff --git a/src/StackExchange.Redis/ResultBox.cs b/src/StackExchange.Redis/ResultBox.cs index 4fe34f114..3c5755aec 100644 --- a/src/StackExchange.Redis/ResultBox.cs +++ b/src/StackExchange.Redis/ResultBox.cs @@ -8,6 +8,7 @@ internal interface IResultBox { bool IsAsync { get; } bool IsFaulted { get; } + Exception Exception { get; } void SetException(Exception ex); void ActivateContinuations(); void Cancel(); @@ -24,6 +25,7 @@ internal abstract class SimpleResultBox : IResultBox bool IResultBox.IsAsync => false; bool IResultBox.IsFaulted => _exception != null; + Exception IResultBox.Exception => _exception; void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException; void IResultBox.Cancel() => _exception = CancelledException; @@ -95,6 +97,8 @@ private TaskResultBox(object asyncState, TaskCreationOptions creationOptions) : bool IResultBox.IsFaulted => _exception != null; + Exception IResultBox.Exception => _exception; + void IResultBox.Cancel() => _exception = SimpleResultBox.CancelledException; void IResultBox.SetException(Exception ex) => _exception = ex ?? SimpleResultBox.CancelledException;