Skip to content

Commit

Permalink
WIP: Micronaut scaling Pandas with multi-context
Browse files Browse the repository at this point in the history
  • Loading branch information
timfel committed Jan 17, 2025
1 parent 968ee5b commit c3bc1fc
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

@Controller
public class DataAnalysisController {
private final DataAnalysisModuleSingleContext das;
private final DataAnalysisModuleMultiContext dam;

private final DataAnalysisModule da;

DataAnalysisController(DataAnalysisModule da) {
this.da = da;
public DataAnalysisController(DataAnalysisModuleSingleContext das, DataAnalysisModuleMultiContext dam) {
this.das = das;
this.dam = dam;
}

@Get
Expand All @@ -31,10 +32,22 @@ public void index() {
}

@ExecuteOn(TaskExecutors.IO)
@Post(value = "/data_analysis", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
String analyzeCsv(StreamingFileUpload file,
@Post(value = "/data_analysis_single", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
String analyzeCsvSingle(StreamingFileUpload file,
@Part("method") String analysisMethod,
@Part("column") String columnString) {
return analyzeCsv(file, analysisMethod, columnString, das);
}

@ExecuteOn(TaskExecutors.IO)
@Post(value = "/data_analysis_multi", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
String analyzeCsvMulti(StreamingFileUpload file,
@Part("method") String analysisMethod,
@Part("column") String columnString) {
return analyzeCsv(file, analysisMethod, columnString, dam);
}

private String analyzeCsv(StreamingFileUpload file, String analysisMethod, String columnString, DataAnalysisModule da) {
String csv;
try {
csv = new String(file.asInputStream().readAllBytes(), "UTF-8");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package graalpy.micronaut.multithreaded;

import io.micronaut.graal.graalpy.annotations.GraalPyModule;

@GraalPyModule("data_analysis")
public interface DataAnalysisModule {
double calculateMean(String csv, int column);
double calculateMedian(String csv, int column);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package graalpy.micronaut.multithreaded;

import java.util.concurrent.ExecutionException;

import io.micronaut.context.annotation.Bean;

@Bean
public class DataAnalysisModuleMultiContext implements DataAnalysisModule {

private final PythonPool pool;

public DataAnalysisModuleMultiContext(PythonPool pool) {
this.pool = pool;
for (int i = 0; i < pool.getPoolSize(); i++) {
pool.submit(() -> getModule());
}
}

private DataAnalysisModule getModule() {
return pool.eval("import data_analysis; data_analysis").as(DataAnalysisModule.class);
}

@Override
public double calculateMean(String csv, int column) {
try {
return pool.submit(() -> getModule().calculateMean(csv, column)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public double calculateMedian(String csv, int column) {
try {
return pool.submit(() -> getModule().calculateMedian(csv, column)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public String describe(String csv) {
try {
return pool.submit(() -> getModule().describe(csv)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package graalpy.micronaut.multithreaded;

import io.micronaut.graal.graalpy.annotations.GraalPyModule;

@GraalPyModule("data_analysis")
public interface DataAnalysisModuleSingleContext extends DataAnalysisModule {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package graalpy.micronaut.multithreaded;

import java.io.IOException;
import java.nio.file.Path;

import org.graalvm.polyglot.Context;
import org.graalvm.python.embedding.utils.GraalPyResources;
import org.graalvm.python.embedding.utils.VirtualFileSystem;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.graal.graalpy.GraalPyContextBuilderFactory;
import jakarta.inject.Singleton;

@Bean
@Singleton
@Replaces(GraalPyContextBuilderFactory.class)
public class PythonContextBuilderFactory implements GraalPyContextBuilderFactory {
@Override
public Context.Builder createBuilder() {
var resourcesDir = Path.of(System.getProperty("user.dir"), "graalpy.resources.single");
var rf = resourcesDir.toFile();
synchronized (PythonContextBuilderFactory.class) {
if (!rf.isDirectory() || rf.lastModified() / 1000 < ProcessHandle.current().info().startInstant().get().getEpochSecond()) {
var fs = VirtualFileSystem.create();
try {
GraalPyResources.extractVirtualFileSystemResources(fs, resourcesDir);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
return GraalPyResources.contextBuilder(resourcesDir)
.allowExperimentalOptions(true)
.option("python.WarnExperimentalFeatures", "false")
.option("python.IsolateNativeModules", "true")
.allowNativeAccess(true)
.allowCreateProcess(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package graalpy.micronaut.multithreaded;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Engine;
import org.graalvm.polyglot.Value;
import org.graalvm.python.embedding.utils.GraalPyResources;
import org.graalvm.python.embedding.utils.VirtualFileSystem;

@io.micronaut.context.annotation.Context
public class PythonPool extends AbstractExecutorService {
private final Engine engine;
private final ThreadLocal<Context> thisContext;
private final BlockingQueue<Context> contexts;
private final ExecutorService threadPool;
private final int size;

private static Context createContext(Engine engine) {
var resourcesDir = Path.of(System.getProperty("user.dir"), "graalpy.resources");
var rf = resourcesDir.toFile();
synchronized (PythonPool.class) {
if (!rf.isDirectory() || rf.lastModified() / 1000 < ProcessHandle.current().info().startInstant().get().getEpochSecond()) {
var fs = VirtualFileSystem.create();
try {
GraalPyResources.extractVirtualFileSystemResources(fs, resourcesDir);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
var context = GraalPyResources.contextBuilder(resourcesDir)
.engine(engine)
.allowExperimentalOptions(true)
.option("python.PythonHome", "")
.option("python.WarnExperimentalFeatures", "false")
.option("python.IsolateNativeModules", "true")
.allowNativeAccess(true)
.allowCreateProcess(true)
.build();
context.initialize("python");
return context;
}

public Value eval(String code) {
var c = thisContext.get();
if (c == null) {
throw new IllegalStateException("PythonPool#eval can only be called from inside a submitted task");
}
return c.eval("python", code);
}

public PythonPool() {
this(5);
}

public int getPoolSize() {
return size;
}

private PythonPool(int nContexts) {
size = nContexts;
engine = Engine.create();
contexts = new LinkedBlockingQueue<>();
thisContext = new ThreadLocal<>();
threadPool = Executors.newFixedThreadPool(nContexts, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(() -> {
var c = createContext(engine);
contexts.add(c);
thisContext.set(c);
r.run();
});
}
});
}

public static PythonPool newFixedPool(int nContexts) {
return new PythonPool(nContexts);
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return threadPool.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
threadPool.execute(command);
}

@Override
public void shutdown() {
threadPool.shutdown();
contexts.stream().forEach(c -> c.close());
contexts.clear();
}

@Override
public List<Runnable> shutdownNow() {
var r = threadPool.shutdownNow();
contexts.stream().forEach(c -> c.close(true));
contexts.clear();
return r;
}

@Override
public boolean isShutdown() {
return threadPool.isShutdown();
}

@Override
public boolean isTerminated() {
return threadPool.isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
(e) => {
var files = [...e.dataTransfer.files]
files.forEach((file, i) => {
var url = "/data_analysis"
var xhr = new XMLHttpRequest()
var formData = new FormData()
xhr.open("POST", url, true)
xhr.addEventListener("readystatechange", function(e) {
if (xhr.readyState == 4 && xhr.status == 200) {
var url = document.getElementById("parallel").checked ? "/data_analysis_multi" : "/data_analysis_single";
var xhr = new XMLHttpRequest();
var formData = new FormData();
var begin;
xhr.addEventListener("readystatechange", e => {
if (xhr.readyState == 1) {
begin = Date.now();
} else if (xhr.readyState == 4) {
let response = document.createElement("pre");
response.innerHTML = xhr.responseText;
response.innerHTML = "" + xhr.responseText + "\n Request took: " + (Date.now() - begin);
let responses = document.getElementById("responses");
responses.appendChild(response);
responses.scrollTop = responses.scrollHeight;
}
})
});
xhr.open("POST", url, true);
var method = "describe";
if (document.getElementById("calcMean").checked) {
method = "mean";
Expand Down Expand Up @@ -61,14 +64,20 @@
<form class="my-form">
<p>Upload multiple CSV files by dragging and dropping</p>
<fieldset>
<input type="radio" id="calcMean" name="operation">
<label for="calcMean">Mean</label>
<input type="radio" id="calcMedian" name="operation">
<label for="calcMedian">Median</label>
<input type="radio" id="describe" name="operation" checked>
<label for="describe">Describe</label>
<input type="number" id="column" style="width:2em;margin-left:4em" value="3">
<label for="column">Column</label>
<div>
<input type="radio" id="calcMean" name="operation">
<label for="calcMean">Mean</label>
<input type="radio" id="calcMedian" name="operation">
<label for="calcMedian">Median</label>
<input type="radio" id="describe" name="operation" checked>
<label for="describe">Describe</label>
<input type="number" id="column" style="width:2em;margin-left:4em" value="3">
<label for="column">Column</label>
</div>
<div>
<input type="checkbox" id="parallel">
<label for="parallel">Parallelize</label>
</div>
</fieldset>
<input type="file" id="fileElem" multiple accept=".csv" onchange="handleFiles(this.files)" style="display:none">
</form>
Expand Down

0 comments on commit c3bc1fc

Please sign in to comment.