Skip to content

Commit

Permalink
Made job only begin listening once when constructed.
Browse files Browse the repository at this point in the history
  • Loading branch information
KristofferStrube committed Jul 8, 2024
1 parent dafe260 commit 516f63b
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@

public async Task CreatePongWorker()
{
SlimWorker slimWorker = await SlimWorker.CreateAsync(
jSRuntime: JSRuntime,
assembly: typeof(AssemblyPongWorker).Assembly.GetName().Name!,
["Argument1", "Argument2"]
);

EventListener<MessageEvent> eventListener = default!;
eventListener = await EventListener<MessageEvent>.CreateAsync(JSRuntime, async e =>
{
object? data = await e.Data.GetValueAsync();
switch (data)
{
case "ready":
Log("We are sending a ping!");
await slimWorker.PostMessageAsync("ping");
break;
case "pong":
Log("We received a pong!");
await slimWorker.RemoveOnMessageEventListenerAsync(eventListener);
await eventListener.DisposeAsync();
await slimWorker.DisposeAsync();
break;
}
});
await slimWorker.AddOnMessageEventListenerAsync(eventListener);
SlimWorker slimWorker = await SlimWorker.CreateAsync(
jSRuntime: JSRuntime,
assembly: typeof(AssemblyPongWorker).Assembly.GetName().Name!,
["Argument1", "Argument2"]
);

EventListener<MessageEvent> eventListener = default!;
eventListener = await EventListener<MessageEvent>.CreateAsync(JSRuntime, async e =>
{
object? data = await e.Data.GetValueAsync();
switch (data)
{
case "ready":
Log("We are sending a ping!");
await slimWorker.PostMessageAsync("ping");
break;
case "pong":
Log("We received a pong!");
await slimWorker.RemoveOnMessageEventListenerAsync(eventListener);
await eventListener.DisposeAsync();
await slimWorker.DisposeAsync();
break;
}
});
await slimWorker.AddOnMessageEventListenerAsync(eventListener);
}

private void Log(string message)
Expand Down
13 changes: 11 additions & 2 deletions src/KristofferStrube.Blazor.WebWorkers/IJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Concurrent;
using KristofferStrube.Blazor.DOM;
using KristofferStrube.Blazor.Window;
using System.Collections.Concurrent;

namespace KristofferStrube.Blazor.WebWorkers;

Expand All @@ -8,7 +10,14 @@ namespace KristofferStrube.Blazor.WebWorkers;
public interface IJob<TInput, TOutput>
{
/// <summary>
/// How the job will send execute the job on the worker.
/// Initializes the job so that it is ready to be executed.
/// </summary>
/// <param name="worker">The worker that the job should be runned on.</param>
/// <param name="pendingTasks">The dictionary that manages which executions finishes.</param>
public abstract static Task<EventListener<MessageEvent>> InitializeAsync(Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks);

/// <summary>
/// Sends the <paramref name="input"/> to the <paramref name="worker"/>.
/// </summary>
/// <typeparam name="TJob">The type of the job.</typeparam>
/// <param name="input">The input that the job should be executed with.</param>
Expand Down
18 changes: 17 additions & 1 deletion src/KristofferStrube.Blazor.WebWorkers/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ namespace KristofferStrube.Blazor.WebWorkers;
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
/// <typeparam name="TJob"></typeparam>
public class JobWorker<TInput, TOutput, TJob> : Worker where TJob : IJob<TInput, TOutput>
public class JobWorker<TInput, TOutput, TJob> : Worker, IAsyncDisposable where TJob : IJob<TInput, TOutput>
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks = new();
private EventListener<MessageEvent>? messageListener;

/// <summary>
/// Creates a <see cref="JobWorker{TInput, TOutput, TJob}"/> that can execute some specific <typeparamref name="TJob"/> on a worker thread.
Expand All @@ -39,6 +40,8 @@ public class JobWorker<TInput, TOutput, TJob> : Worker where TJob : IJob<TInput,
{
await worker.RemoveOnMessageEventListenerAsync(readyListener);
await readyListener.DisposeAsync();

worker.messageListener = await TJob.InitializeAsync(worker, worker.pendingTasks);
tcs.SetResult(worker);
});
await worker.AddOnMessageEventListenerAsync(readyListener);
Expand All @@ -60,4 +63,17 @@ public async Task<TOutput> ExecuteAsync(TInput input)
{
return await TJob.ExecuteAsync<TJob>(input, this, pendingTasks);
}

/// <summary>
/// Diposes listener for events and the worker itself.
/// </summary>
public new async ValueTask DisposeAsync()
{
if (messageListener is not null)
{
await RemoveOnMessageEventListenerAsync(messageListener);
await messageListener.DisposeAsync();
}
await base.DisposeAsync();
}
}
25 changes: 13 additions & 12 deletions src/KristofferStrube.Blazor.WebWorkers/JsonJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,12 @@ public TOutput ExecuteWithoutUsingWorker(TInput input)
return outputSerializedAndDeserialized;
}

/// <summary>
/// How an input is transfered to the <see cref="JobWorker{TInput, TOutput, TJob}"/> for the <see cref="JsonJob{TInput, TOutput}"/>.
/// </summary>
public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks) where TJob : IJob<TInput, TOutput>
/// <inheritdoc/>
public static async Task<EventListener<MessageEvent>> InitializeAsync(Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks)
{
string requestIdentifier = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<TOutput>();
pendingTasks[requestIdentifier] = tcs;

EventListener<MessageEvent> eventListener = default!;
eventListener = await EventListener<MessageEvent>.CreateAsync(worker.JSRuntime, async e =>
{
await worker.RemoveOnMessageEventListenerAsync(eventListener);
await eventListener.DisposeAsync();

JobResponse response = await e.GetDataAsync<JobResponse>();
if (pendingTasks.Remove(response.RequestIdentifier, out TaskCompletionSource<TOutput>? successTaskCompletionSource))
{
Expand All @@ -68,6 +59,16 @@ public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker

await worker.AddOnMessageEventListenerAsync(eventListener);

return eventListener;
}

/// <inheritdoc/>
public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks) where TJob : IJob<TInput, TOutput>
{
string requestIdentifier = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<TOutput>();
pendingTasks[requestIdentifier] = tcs;

string inputSerialized = input is string stringInput
? stringInput
: JsonSerializer.Serialize(input);
Expand Down Expand Up @@ -95,7 +96,7 @@ public async Task StartAsync()

if (inputSerialized is null) return;

TInput input = typeof(TInput) == typeof(string) && requestIdentifier is TInput stringInput
TInput input = typeof(TInput) == typeof(string) && inputSerialized is TInput stringInput
? stringInput
: JsonSerializer.Deserialize<TInput>(inputSerialized)!;

Expand Down
28 changes: 16 additions & 12 deletions src/KristofferStrube.Blazor.WebWorkers/TaskJsonJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,13 @@ public async Task<TOutput> ExecuteWithoutUsingWorker(TInput input)
return outputSerializedAndDeserialized;
}

/// <summary>
/// How an input is transfered to the <see cref="JobWorker{TInput, TOutput, TJob}"/> for the <see cref="JsonJob{TInput, TOutput}"/>.
/// </summary>
public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks) where TJob : IJob<TInput, TOutput>
{
string requestIdentifier = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<TOutput>();
pendingTasks[requestIdentifier] = tcs;

/// <inheritdoc/>
public static async Task<EventListener<MessageEvent>> InitializeAsync(Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks)
{
EventListener<MessageEvent> eventListener = default!;
eventListener = await EventListener<MessageEvent>.CreateAsync(worker.JSRuntime, async e =>
{
await worker.RemoveOnMessageEventListenerAsync(eventListener);
await eventListener.DisposeAsync();

JobResponse response = await e.GetDataAsync<JobResponse>();
if (pendingTasks.Remove(response.RequestIdentifier, out TaskCompletionSource<TOutput>? successTaskCompletionSource))
{
Expand All @@ -68,6 +60,18 @@ public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker

await worker.AddOnMessageEventListenerAsync(eventListener);

return eventListener;
}

/// <summary>
/// How an input is transfered to the <see cref="JobWorker{TInput, TOutput, TJob}"/> for the <see cref="JsonJob{TInput, TOutput}"/>.
/// </summary>
public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker, ConcurrentDictionary<string, TaskCompletionSource<TOutput>> pendingTasks) where TJob : IJob<TInput, TOutput>
{
string requestIdentifier = Guid.NewGuid().ToString();
var tcs = new TaskCompletionSource<TOutput>();
pendingTasks[requestIdentifier] = tcs;

string inputSerialized = input is string stringInput
? stringInput
: JsonSerializer.Serialize(input);
Expand Down Expand Up @@ -95,7 +99,7 @@ public async Task StartAsync()

if (inputSerialized is null) return;

TInput input = typeof(TInput) == typeof(string) && requestIdentifier is TInput stringInput
TInput input = typeof(TInput) == typeof(string) && inputSerialized is TInput stringInput
? stringInput
: JsonSerializer.Deserialize<TInput>(inputSerialized)!;

Expand Down

0 comments on commit 516f63b

Please sign in to comment.