Skip to content

Commit

Permalink
add job processor
Browse files Browse the repository at this point in the history
  • Loading branch information
nameofSEOKWONHONG committed Oct 9, 2024
1 parent 7805a3e commit 39885d5
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 28 deletions.
113 changes: 97 additions & 16 deletions src/Job/JobHandler.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System.Collections.Concurrent;
using System.Security.Claims;

namespace eXtensionSharp.Job;

public class JobHandler<T>
public class JobHandler<T> where T : class
{
private static Lazy<JobHandler<T>> _instance = new Lazy<JobHandler<T>>(() => new JobHandler<T>());
public static JobHandler<T> Instance => _instance.Value;
Expand All @@ -14,38 +15,42 @@ private JobHandler()
}

public void Enqueue(T item) => _queue.Enqueue(item);
public T Dequeue() => _queue.TryDequeue(out var result) ? result : default;
}

public class JobProcessor<T>
{
private static Lazy<JobProcessor<T>> _instance = new Lazy<JobProcessor<T>>(() => new JobProcessor<T>());
public static JobProcessor<T> Instance => _instance.Value;
public T Dequeue()
{
if (_queue.TryDequeue(out var result))
{
return result;
}

return null;
}
}

public class JobProcessor<T> : IDisposable
where T : class
{
private JobHandler<T> _handler;
private Action<T> _action;
private bool _isRunning;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

private Thread _thread;
private JobProcessor()
public JobProcessor()
{
_thread = new Thread(Start);
_thread.IsBackground = true;
_thread.Start();
}

public void SetProcessor(JobHandler<T> jobHandler, Action<T> callback)
public void SetProcess(JobHandler<T> jobHandler, Action<T> callback)
{
_handler = jobHandler;
_action = callback;
}

private void Start()
{
_isRunning = true;

while (_isRunning)
while (!_cts.Token.IsCancellationRequested)
{
try
{
Expand All @@ -54,7 +59,7 @@ private void Start()
if (_action.xIsNotEmpty())
{
var item = _handler.Dequeue();
if (item.xIsNotEmpty())
if (item.xIsNotNull())
{
_action(item);
}
Expand All @@ -69,15 +74,91 @@ private void Start()
}
}
}

public void Stop()
{
_isRunning = false;
_cts.Cancel();
}

internal class ProcessItem
{
public JobHandler<T> JobHandler { get; set; }
public Action<T> Callback { get; set; }
}

public void Dispose()
{
_cts?.Cancel();
_cts?.Dispose();
}
}

public class JobProcessorAsync<T> : IDisposable
where T : class
{
private JobHandler<T> _handler;
private Func<T, Task> _func;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

private Task _task;

public JobProcessorAsync()
{
_task = Task.Run(Start);
}

public void SetProcess(JobHandler<T> jobHandler, Func<T, Task> callback)
{
_handler = jobHandler;
_func = callback;
}

private async Task Start()
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
if (_handler.xIsNotEmpty())
{
if (_func.xIsNotEmpty())
{
var item = _handler.Dequeue();
if (item.xIsNotNull())
{
await _func(item);
}
}
}
// Instead of Thread.Sleep, we use Task.Delay to make this asynchronous
await Task.Delay(10);
}
catch (Exception e)
{
Console.WriteLine(e);
break;
}
}
}

public void Stop()
{
_cts.Cancel();
_task.Wait(); // Wait for the task to finish gracefully
}

internal class ProcessItem
{
public JobHandler<T> JobHandler { get; set; }
public Action<T> Callback { get; set; }
}

public void Dispose()
{
_cts?.Cancel();
_task?.Wait();

_cts?.Dispose();
_task?.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/XIsIfExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static bool xIsEmpty<T>(this T obj)
{
return string.IsNullOrWhiteSpace(v);
}

switch (obj)
{
case ICollection { Count: 0 }:
Expand Down
46 changes: 35 additions & 11 deletions test/JobProcessorTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using eXtensionSharp.Job;
using NUnit.Framework;
Expand All @@ -10,29 +14,49 @@ public class JobProcessorTest
[Test]
public async Task job_processor_test()
{
JobProcessor<string>.Instance.SetProcessor(JobHandler<string>.Instance, item =>
using var processor = new JobProcessor<string>();
processor.SetProcess(JobHandler<string>.Instance, item =>
{
if(item.xIsNotEmpty()) TestContext.WriteLine(item);
});
JobProcessor<int>.Instance.SetProcessor(JobHandler<int>.Instance, item =>
{
if(item > 0) TestContext.WriteLine(item);
});

var texts = "hello world";
var numbers = new[] { 1, 2, 3, 4, 5 };

Parallel.ForEach(texts.ToArray(), item =>
{
JobHandler<string>.Instance.Enqueue(item.ToString());
});
Parallel.ForEach(numbers, item =>
await Task.Delay(5000);
}

[Test]
public async Task job_processor_test2()
{
var r1 = new StringBuilder();
var r2 = new StringBuilder();
using var processor = new JobProcessorAsync<string>();
processor.SetProcess(JobHandler<string>.Instance, async item =>
{
JobHandler<int>.Instance.Enqueue(item);
r1.Append(item);
using var client = new HttpClient();
var res = await client.GetAsync("http://www.google.com");
res.EnsureSuccessStatusCode();
r2.AppendLine(await res.Content.ReadAsStringAsync());
});

var texts = "hello world";

Parallel.ForEach(texts.ToArray(), item =>
{
JobHandler<string>.Instance.Enqueue(item.ToString());
});

await Task.Delay(5000);

JobProcessor<int>.Instance.Stop();

Assert.Multiple(() =>
{
Assert.That(r1.Length, Is.EqualTo(texts.Length));
Assert.That(r2.Length, Is.GreaterThan(0));
});
}
}

0 comments on commit 39885d5

Please sign in to comment.