Skip to content

Commit

Permalink
More locking and less async in AniDBUDPConnectionHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
da3dsoul committed Mar 2, 2025
1 parent ad4054f commit fa6fa2e
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 105 deletions.
53 changes: 26 additions & 27 deletions Shoko.Server/Providers/AniDB/AniDBRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Shoko.Plugin.Abstractions;
using Shoko.Plugin.Abstractions.Events;
using Shoko.Server.Settings;

using ISettingsProvider = Shoko.Server.Settings.ISettingsProvider;

#nullable enable
Expand All @@ -28,7 +27,7 @@ public abstract class AniDBRateLimiter

private readonly Func<IServerSettings, AnidbRateLimitSettings> _settingsSelector;

private int? _shortDelay = null;
private int? _shortDelay;

// From AniDB's wiki about UDP rate limiting:
// Short Term:
Expand All @@ -44,7 +43,7 @@ private int ShortDelay
}
}

private int? _longDelay = null;
private int? _longDelay;

// From AniDB's wiki about UDP rate limiting:
// Long Term:
Expand All @@ -60,7 +59,7 @@ private int LongDelay
}
}

private long? _shortPeriod = null;
private long? _shortPeriod;

// Switch to longer delay after a short period
private long ShortPeriod
Expand All @@ -73,7 +72,7 @@ private long ShortPeriod
}
}

private long? _resetPeriod = null;
private long? _resetPeriod;

// Switch to shorter delay after inactivity
private long ResetPeriod
Expand Down Expand Up @@ -141,31 +140,31 @@ private void ResetRate()
public T EnsureRate<T>(Func<T> action, bool forceShortDelay = false)
{
lock (_lock)
try
{
var delay = _requestWatch.ElapsedMilliseconds;
if (delay > ResetPeriod) ResetRate();
var currentDelay = !forceShortDelay && _activeTimeWatch.ElapsedMilliseconds > ShortPeriod ? LongDelay : ShortDelay;

if (delay > currentDelay)
try
{
_logger.LogTrace("Time since last request is {Delay} ms, not throttling", delay);
_logger.LogTrace("Sending AniDB command");
return action();
}
var delay = _requestWatch.ElapsedMilliseconds;
if (delay > ResetPeriod) ResetRate();
var currentDelay = !forceShortDelay && _activeTimeWatch.ElapsedMilliseconds > ShortPeriod ? LongDelay : ShortDelay;

// add 50ms for good measure
var waitTime = currentDelay - (int)delay + 50;
if (delay > currentDelay)
{
_logger.LogTrace("Time since last request is {Delay} ms, not throttling", delay);
_logger.LogTrace("Sending AniDB command");
return action();
}

_logger.LogTrace("Time since last request is {Delay} ms, throttling for {Time}ms", delay, waitTime);
Thread.Sleep(waitTime);
// add 50ms for good measure
var waitTime = currentDelay - (int)delay + 50;

_logger.LogTrace("Sending AniDB command");
return action();
}
finally
{
_requestWatch.Restart();
}
_logger.LogTrace("Time since last request is {Delay} ms, throttling for {Time}ms", delay, waitTime);
Thread.Sleep(waitTime);

_logger.LogTrace("Sending AniDB command");
return action();
}
finally
{
_requestWatch.Restart();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Shoko.Server.Providers.AniDB.Interfaces;

public interface IAniDBSocketHandler : IDisposable, IAsyncDisposable
{
bool IsConnected { get; }
Task<byte[]> Send(byte[] payload, CancellationToken token = new());
Task<bool> TryConnection();
byte[] Send(byte[] payload);
bool TryConnection();
}
25 changes: 9 additions & 16 deletions Shoko.Server/Providers/AniDB/UDP/AniDBSocketHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,23 @@ public AniDBSocketHandler(ILoggerFactory loggerFactory, string host, ushort serv
_clientPort = clientPort;
}

public async Task<byte[]> Send(byte[] payload, CancellationToken token = new())
public byte[] Send(byte[] payload)
{
if (!IsConnected) return [0];
// this doesn't need to be bigger than 1400, but meh, better safe than sorry
return await SendUnsafe(payload, token);
return SendUnsafe(payload);
}

private async Task<byte[]> SendUnsafe(byte[] payload, CancellationToken token)
private byte[] SendUnsafe(byte[] payload)
{
EmptyBuffer();

if (token.IsCancellationRequested) throw new TaskCanceledException();
using CancellationTokenSource sendCts = new(SendTimeoutMs);
await _aniDBSocket.SendToAsync(payload, SocketFlags.None, _remoteIpEndPoint, sendCts.Token);

if (sendCts.IsCancellationRequested) throw new OperationCanceledException();
if (token.IsCancellationRequested) throw new TaskCanceledException();
_aniDBSocket.SendTo(payload, _remoteIpEndPoint);

using CancellationTokenSource receiveCts = new(ReceiveTimeoutMs);
var result = new byte[1600];
var receivedResult = await _aniDBSocket.ReceiveFromAsync(result, SocketFlags.None, _remoteIpEndPoint, receiveCts.Token);
if (sendCts.IsCancellationRequested) throw new OperationCanceledException();
if (token.IsCancellationRequested) throw new TaskCanceledException();

var received = receivedResult.ReceivedBytes;
EndPoint endpoint = _remoteIpEndPoint;
var received = _aniDBSocket.ReceiveFrom(result, ref endpoint);

if (received > 2 && result[0] == 0 && result[1] == 0)
{
Expand Down Expand Up @@ -97,7 +89,7 @@ private void EmptyBuffer()
}
}

public async Task<bool> TryConnection()
public bool TryConnection()
{
if (IsConnected) return true;
// Don't send Expect 100 requests. These requests aren't always supported by remote internet devices, in which case can cause failure.
Expand All @@ -114,6 +106,7 @@ public async Task<bool> TryConnection()
* The local port may be hardcoded, however, an option to manually specify another port should be offered.
*/
_aniDBSocket.Bind(_localIpEndPoint);
_aniDBSocket.SendTimeout = SendTimeoutMs;
_aniDBSocket.ReceiveTimeout = ReceiveTimeoutMs;

_logger.LogInformation("Bound to local address: {Local} - Port: {ClientPort} ({Family})", _localIpEndPoint,
Expand All @@ -128,7 +121,7 @@ public async Task<bool> TryConnection()

try
{
var remoteHostEntry = await Dns.GetHostEntryAsync(_serverHost).WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
var remoteHostEntry = Dns.GetHostEntry(_serverHost);
_remoteIpEndPoint = new IPEndPoint(remoteHostEntry.AddressList[0], _serverPort);

_logger.LogInformation("Bound to remote address: {Address} : {Port}", _remoteIpEndPoint.Address,
Expand Down
126 changes: 70 additions & 56 deletions Shoko.Server/Providers/AniDB/UDP/AniDBUDPConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace Shoko.Server.Providers.AniDB.UDP;

#nullable enable
public class AniDBUDPConnectionHandler : ConnectionHandler, IUDPConnectionHandler
public partial class AniDBUDPConnectionHandler : ConnectionHandler, IUDPConnectionHandler
{
/****
* From Anidb wiki:
Expand All @@ -35,7 +35,11 @@ public class AniDBUDPConnectionHandler : ConnectionHandler, IUDPConnectionHandle
private readonly IConnectivityService _connectivityService;
private AniDBSocketHandler? _socketHandler;
private readonly object _socketHandlerLock = new();
private static readonly Regex s_logMask = new("(?<=(\\bpass=|&pass=\\bs=|&s=))[^&]+", RegexOptions.Compiled | RegexOptions.IgnoreCase);
// IDK Rider said to use a GeneratedRegex attribute
private static readonly Regex s_logMask = GetLogRegex();

[GeneratedRegex("(?<=(\\bpass=|&pass=\\bs=|&s=))[^&]+", RegexOptions.IgnoreCase | RegexOptions.Compiled, "en-US")]
private static partial Regex GetLogRegex();

public event EventHandler? LoginFailed;

Expand Down Expand Up @@ -150,7 +154,7 @@ private void InitInternal()
}

_socketHandler = new AniDBSocketHandler(_loggerFactory, settings.AniDb.UDPServerAddress, settings.AniDb.UDPServerPort, settings.AniDb.ClientPort);
IsNetworkAvailable = _socketHandler.TryConnection().Result;
IsNetworkAvailable = _socketHandler.TryConnection();
}

_isLoggedOn = false;
Expand Down Expand Up @@ -214,32 +218,32 @@ private void LogoutTimerElapsed(object? sender, ElapsedEventArgs e)
/// <returns></returns>
public string Send(string command, bool needsUnicode = true)
{
lock(_socketHandlerLock)
{
// Steps:
// 1. Check Ban state and throw if Banned
// 2. Check Login State and Login if needed
// 3. Actually Call AniDB
// Steps:
// 1. Check Ban state and throw if Banned
// 2. Check Login State and Login if needed
// 3. Actually Call AniDB

// Check Ban State
// Ideally, this will never happen, as we stop the queue and attempt a graceful rollback of the command
if (IsBanned)
// Check Ban State
// Ideally, this will never happen, as we stop the queue and attempt a graceful rollback of the command
if (IsBanned)
{
throw new AniDBBannedException
{
throw new AniDBBannedException
{
BanType = UpdateType.UDPBan, BanExpires = BanTime?.AddHours(BanTimerResetLength)
};
}
// TODO Low Priority: We need to handle Login Attempt Decay, so that we can try again if it's not just a bad user/pass
// It wasn't handled before, and it's not caused serious problems
BanType = UpdateType.UDPBan, BanExpires = BanTime?.AddHours(BanTimerResetLength)
};
}
// TODO Low Priority: We need to handle Login Attempt Decay, so that we can try again if it's not just a bad user/pass
// It wasn't handled before, and it's not caused serious problems

// login doesn't use this method, so this check won't interfere with it
// if we got here, and it's invalid session, then it already failed to re-log
if (IsInvalidSession)
{
throw new NotLoggedInException();
}
// login doesn't use this method, so this check won't interfere with it
// if we got here, and it's invalid session, then it already failed to re-log
if (IsInvalidSession)
{
throw new NotLoggedInException();
}

lock (_socketHandlerLock)
{
// Check Login State
if (!Login())
{
Expand Down Expand Up @@ -296,7 +300,7 @@ private string SendInternal(string command, bool needsUnicode = true, bool isPin

var start = DateTime.Now;
Logger.LogTrace("AniDB UDP Call: (Using {Unicode}) {Command}", needsUnicode ? "Unicode" : "ASCII", MaskLog(command));
var byReceivedAdd = _socketHandler.Send(sendByteAdd).Result;
var byReceivedAdd = _socketHandler.Send(sendByteAdd);

if (byReceivedAdd.All(a => a == 0))
{
Expand Down Expand Up @@ -344,31 +348,35 @@ private void StopPinging()

public void ForceReconnection()
{
try
{
ForceLogout();
}
catch (Exception ex)
lock (_socketHandlerLock)
{
Logger.LogError(ex, "Failed to logout");
}

try
{
CloseConnections();
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to close socket");
}
try
{
ForceLogout();
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to logout");
}

try
{
InitInternal();
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to reinitialize socket");
try
{
CloseConnections();
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to close socket");
}

try
{
InitInternal();
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to reinitialize socket");
}
}
}

Expand All @@ -387,7 +395,7 @@ public void ForceLogout()
Logger.LogTrace("Logging Out");
try
{
_requestFactory.Create<RequestLogout>().Send();
lock(_socketHandlerLock) _requestFactory.Create<RequestLogout>().Send();
}
catch
{
Expand Down Expand Up @@ -437,8 +445,11 @@ public bool Login()
{
if (IsBanned) return false;
Logger.LogTrace("Failed to login to AniDB. Issuing a Logout command and retrying");
ForceLogout();
return Login(settings.AniDb.Username, settings.AniDb.Password);
lock (_socketHandlerLock)
{
ForceLogout();
return Login(settings.AniDb.Username, settings.AniDb.Password);
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -551,13 +562,16 @@ public bool TestLogin(string username, string password)
return false;
}

var result = Login(username, password);
if (result)
lock (_socketHandlerLock)
{
ForceLogout();
}
var result = Login(username, password);
if (result)
{
ForceLogout();
}

return result;
return result;
}
}

public bool SetCredentials(string username, string password)
Expand Down
4 changes: 2 additions & 2 deletions Shoko.Server/Providers/AniDB/UDP/Connection/RequestLogin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ protected override UDPResponse<ResponseLogin> ParseResponse(UDPResponse<string>
}

// after response code, before "LOGIN"
var sessionID = receivedData.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries).Skip(1)
var sessionID = receivedData.Split(' ', StringSplitOptions.RemoveEmptyEntries).Skip(1)
.FirstOrDefault();
if (string.IsNullOrWhiteSpace(sessionID))
{
throw new UnexpectedUDPResponseException(code, receivedData, Command);
}

var imageServer = receivedData.Split(new[] { '\n' }, StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
var imageServer = receivedData.Split('\n', StringSplitOptions.RemoveEmptyEntries).LastOrDefault();
return new UDPResponse<ResponseLogin>
{
Response = new ResponseLogin { SessionID = sessionID, ImageServer = imageServer }, Code = code
Expand Down

0 comments on commit fa6fa2e

Please sign in to comment.