Skip to content

Commit

Permalink
Updated to use Messages for communicating input to JobWorker.
Browse files Browse the repository at this point in the history
  • Loading branch information
KristofferStrube committed Jun 23, 2024
1 parent 993e6c6 commit 3a40e65
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
if (!OperatingSystem.IsBrowser())
throw new PlatformNotSupportedException("Can only be run in the browser!");

new StringSumJob().Execute(args);
await new StringSumJob().Start();
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ result = await dotNetWorker.ExecuteAsync(input);</code>
</p>

@code {
string input = "Hello World!";
string input = string.Join("", Enumerable.Range(0, 1_000_000 / 8).Select(_ => Guid.NewGuid().ToString()[..8]));
int? result = null;
double time;
Worker? jsWorker;
Expand Down
26 changes: 22 additions & 4 deletions src/KristofferStrube.Blazor.WebWorkers/JobWorker.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using KristofferStrube.Blazor.WebIDL;
using KristofferStrube.Blazor.DOM;
using KristofferStrube.Blazor.WebIDL;
using KristofferStrube.Blazor.WebWorkers.Extensions;
using KristofferStrube.Blazor.Window;
using Microsoft.JSInterop;
using System.Collections.Concurrent;
using System.Text.Json;

namespace KristofferStrube.Blazor.WebWorkers;

Expand All @@ -21,12 +24,27 @@ public class JobWorker<TInput, TOutput, TJob> : Worker where TJob : IJob<TInput,
/// <param name="jSRuntime">An <see cref="IJSRuntime"/> instance.</param>
public static new async Task<JobWorker<TInput, TOutput, TJob>> CreateAsync(IJSRuntime jSRuntime)
{
string scriptUrl = "_content/KristofferStrube.Blazor.WebWorkers/KristofferStrube.Blazor.WebWorkers.JobWorker.js"
+ $"?assembly={typeof(TJob).Assembly.GetName().Name}";

await using IJSObjectReference helper = await jSRuntime.GetHelperAsync();
IJSObjectReference jSInstance = await helper.InvokeAsync<IJSObjectReference>("constructWorker",
"_content/KristofferStrube.Blazor.WebWorkers/KristofferStrube.Blazor.WebWorkers.JobWorker.js",
new WorkerOptions() { Type = WorkerType.Module });
scriptUrl, new WorkerOptions() { Type = WorkerType.Module });

JobWorker<TInput, TOutput, TJob> worker = new(jSRuntime, jSInstance, new() { DisposesJSReference = true });

var tcs = new TaskCompletionSource<JobWorker<TInput, TOutput, TJob>>();

EventListener<MessageEvent> readyListener = default!;
readyListener = await EventListener<MessageEvent>.CreateAsync(jSRuntime, async e =>
{
await worker.RemoveOnMessageEventListenerAsync(readyListener);
await readyListener.DisposeAsync();
tcs.SetResult(worker);
});
await worker.AddOnMessageEventListenerAsync(readyListener);

return new JobWorker<TInput, TOutput, TJob>(jSRuntime, jSInstance, new() { DisposesJSReference = true });
return await tcs.Task;
}

/// <inheritdoc cref="Worker(IJSRuntime, IJSObjectReference, CreationOptions)"/>
Expand Down
90 changes: 41 additions & 49 deletions src/KristofferStrube.Blazor.WebWorkers/JsonJob.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using KristofferStrube.Blazor.DOM;
using KristofferStrube.Blazor.Window;
using System.Collections.Concurrent;
using System.Runtime.InteropServices.JavaScript;
using System.Runtime.Versioning;
using System.Text.Json;
using System.Text.Json.Serialization;
Expand Down Expand Up @@ -54,54 +55,57 @@ public static async Task<TOutput> ExecuteAsync<TJob>(TInput input, Worker worker
JobResponse response = await e.GetDataAsync<JobResponse>();
if (pendingTasks.Remove(response.RequestIdentifier, out TaskCompletionSource<TOutput>? successTaskCompletionSource))
{
successTaskCompletionSource.SetResult(JsonSerializer.Deserialize<TOutput>(response.OutputSerialized)!);
if (typeof(TOutput) == typeof(string) && response.OutputSerialized is TOutput stringOutput)
{
successTaskCompletionSource.SetResult(stringOutput);
}
else
{
successTaskCompletionSource.SetResult(JsonSerializer.Deserialize<TOutput>(response.OutputSerialized)!);
}
}
});

await worker.AddOnMessageEventListenerAsync(eventListener);

string inputSerialized = input is string stringInput
? stringInput
: JsonSerializer.Serialize(input);

await worker.PostMessageAsync(new JobArguments()
{
Namespace = typeof(TJob).Assembly.GetName().Name!,
Type = typeof(TJob).Name,
RequestIdentifier = requestIdentifier,
InputSerialized = JsonSerializer.Serialize(input)
InputSerialized = inputSerialized
});

return await tcs.Task;
}

/// <summary>
/// This method is called from the Worker project.
/// This method is called from the Worker project to start listening for events
/// </summary>
/// <remarks>
/// Throws an argument exception if the first arg can't be deserialized as the input type.
/// </remarks>
/// <param name="args">Parse the args from the program for this parameter.</param>
/// <exception cref="ArgumentException"></exception>
[SupportedOSPlatform("browser")]
public void Execute(string[] args)
public async Task Start()
{
JobArguments arguments;

try
{
arguments = JsonSerializer.Deserialize<JobArguments>(args[0])!;
}
catch
Imports.RegisterOnMessage(message =>
{
throw new ArgumentException("First argument could not be serialized as job argument.");
}
JSObject data = message.GetPropertyAsJSObject("data")!;
string? inputSerialized = data.GetPropertyAsString("inputSerialized");
string requestIdentifier = data.GetPropertyAsString("requestIdentifier")!;

if (!arguments.Type.Equals(GetType().Name))
{
return;
}
if (inputSerialized is null) return;

TInput input = typeof(TInput) == typeof(string) && requestIdentifier is TInput stringInput
? stringInput
: JsonSerializer.Deserialize<TInput>(inputSerialized)!;

TOutput output = Work(input);

TInput input = JsonSerializer.Deserialize<TInput>(arguments.InputSerialized)!;
TOutput output = Work(input);
PostOutput(output, requestIdentifier);
});

PostOutput(JsonSerializer.Serialize(output), arguments.RequestIdentifier);
while (true)
await Task.Delay(100);
}

/// <summary>
Expand All @@ -110,13 +114,19 @@ public void Execute(string[] args)
/// <param name="output">The output serialized.</param>
/// <param name="requestIdentifier">The <see cref="JobResponse.RequestIdentifier"/>.</param>
[SupportedOSPlatform("browser")]
private void PostOutput(string output, string requestIdentifier)
private void PostOutput(TOutput output, string requestIdentifier)
{
System.Runtime.InteropServices.JavaScript.JSObject outputObject = Imports.CreateObject();
JSObject outputObject = Imports.CreateObject();

outputObject.SetProperty("type", GetType().Name);
outputObject.SetProperty("requestIdentifier", requestIdentifier);
outputObject.SetProperty("outputSerialized", output);
if (output is string stringOutput)
{
outputObject.SetProperty("outputSerialized", stringOutput);
}
else
{
outputObject.SetProperty("outputSerialized", JsonSerializer.Serialize(output));
}

Imports.PostMessage(outputObject);
}
Expand All @@ -126,18 +136,6 @@ private void PostOutput(string output, string requestIdentifier)
/// </summary>
public class JobArguments
{
/// <summary>
/// The namespace of the job.
/// </summary>
[JsonPropertyName("namespace")]
public required string Namespace { get; set; }

/// <summary>
/// The type of the job. Used to ensure that the correct job is run.
/// </summary>
[JsonPropertyName("type")]
public required string Type { get; set; }

/// <summary>
/// The unique identifier for the specific request. Used to identify which task has finished when the job responds.
/// </summary>
Expand All @@ -156,12 +154,6 @@ public class JobArguments
/// </summary>
public class JobResponse
{
/// <summary>
/// The type of the job that ran.
/// </summary>
[JsonPropertyName("type")]
public required string Type { get; set; }

/// <summary>
/// The same unique identifier sent in <see cref="JobArguments.RequestIdentifier"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Save messages while dotnet is being instantiated.
let savedMessages = []
function saveMessagesWhileInstantiatingDotnet(e) {
savedMessages.push(e);
}
self.addEventListener("message", saveMessagesWhileInstantiatingDotnet);
const params = new Proxy(new URLSearchParams(self.location.search), {
get: (searchParams, prop) => searchParams.get(prop),
});
let assembly = params.assembly;

import { dotnet } from "../../_framework/dotnet.js"

Expand All @@ -24,16 +22,6 @@ instance.setModuleImports("boot.js", {
registerOnMessage: (handler) => self.addEventListener("message", handler)
});

async function runWorker(e) {
await instance.runMainAndExit(`${e.data.namespace}.wasm`, [JSON.stringify(e.data)]);
}

// No longer save messages as we switch to handle them as they come in.
self.removeEventListener("message", saveMessagesWhileInstantiatingDotnet);
// Go through events that were saved and run the
for (const savedMessage of savedMessages) {
await runWorker(savedMessage);
};
self.postMessage("ready");

// Now listen for all future messages and process them.
self.addEventListener("message", runWorker);
await instance.runMainAndExit(`${assembly}.wasm`, []);

0 comments on commit 3a40e65

Please sign in to comment.