Skip to content

Conversation

@withinboredom
Copy link
Member

@withinboredom withinboredom commented Aug 7, 2025

This allows for extensions to register as external workers. For example, someone could create an in-process, multi-threaded queue that can be used as a Symfony Messenger:

package simple_messenger

import "C"
import (
	"bytes"
	"io"
	"net/http"
	"unsafe"

	"github.com/dunglas/frankenphp"
)

type messenger struct {
	messages chan *string
}

func (m *messenger) Name() string {
	return "m#Messenger"
}

func (m *messenger) FileName() string {
	return "vendor/somewhere/MessengerWorker.php"
}

func (m *messenger) GetMinThreads() int {
	return 4
}

func (m *messenger) ThreadActivatedNotification(threadId int)   {}
func (m *messenger) ThreadDrainNotification(threadId int)       {}
func (m *messenger) ThreadDeactivatedNotification(threadId int) {}
func (m *messenger) Env() frankenphp.PreparedEnv {
	return frankenphp.PreparedEnv{}
}

func (m *messenger) ProvideRequest() *frankenphp.WorkerRequest {
	msg := <-m.messages
	body := bytes.NewBufferString(*msg)
	return &frankenphp.WorkerRequest{
		Request: &http.Request{
			Body: io.NopCloser(body),
		},
	}
}

var m = &messenger{
	messages: make(chan *string),
}

// export_php:function sendMessage(string $message): void
func sendMessage(msgStr C.zend_string) {
	msg := frankenphp.GoString(unsafe.Pointer(msgStr))
	m.messages <- &msg
}

func init() {
	frankenphp.RegisterExternalWorker(m)
}

The external worker may then send "requests" to the registered PHP file that can package them up as whatever object is required. Any responses are sent back to the extension if a ResponseWriter is provided, otherwise it is sent to stdout.

GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a specific use case for these notifications? Otherwise it would allow moving this code to the ext module, which removes a lot of coupling.

If tasks are sent via request, then the workers or even frankenphp don't really need to know about 'external workers', they just need to handle the requests.

It would even be cleaner to expose hooks for thread start/stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like these worker options:

func WithWorkerOnThreadActivation(hook func(threadId int)) WorkerOption

func WithWorkerOnThreadDrain(hook func(threadId int)) WorkerOption

func WithWorkerOnThreadDeactivation(hook func(threadId int)) WorkerOption

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a specific use case for these notifications? Otherwise it would allow moving this code to the ext module, which removes a lot of coupling.

It can be quite handy to know how many worker threads you have running (especially with autoscaling). That being said, it isn’t necessary. Good call about the decoupling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can we provide a default implementation as an embeddable struct, but this can be done in a follow up PR.

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

I'm playing with this, I would like to show something using it at API Platform Con, it's almost working but I've a weird issue: the request sent to the worker by ProvidedRequest seems to always be empty PHP-side.

I've something like this:

func (w *worker) ProvideRequest() *frankenphp.WorkerRequest {
	msg := <-w.messages
	caddy.Log().Info("Provided request", zap.String("request", string(msg)))

	u, _ := url.Parse("https://example.com")
	u.Query().Add("request", string(msg))

	return &frankenphp.WorkerRequest{
		Request: &http.Request{
			URL:  u,
			Body: io.NopCloser(bytes.NewReader(msg)),
		},
	}
}

and both $_GET and php://input are always empty, inside frankenphp_handle_request().

Do you have an idea of what's going on @withinboredom?

Another thing that I notice while playing with this: it would be nice to be able to pass PHP values directly to the worker script. In my case, I can compute PHP arrays directly from the Go-side, but here I've to encode the data in JSON Go-side, and decode it PHP-side. This seems unnecessary. We would think to a more low level API allowing to pass extra parameters to the closure in frankenphp_handle_request for instance. WDYT?

@@ -0,0 +1,108 @@
package frankenphp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

customworker.go?

// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type WorkerExtension interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type WorkerExtension interface {
type CustomWorker interface {

?

rq = externalWorker.ProvideRequest()
}()

if rq == nil || rq.Request == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here: the goroutine could not have set rq yet.


// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe could we start startExternalWorkerPipe() using the go keyword on the caller side to simplify the code?

func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
go func() {
defer func() {
if r := recover(); r != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we let the program panicking instead? To me such errors are unexpected and shouldn't be handled on our side.

var rq *WorkerRequest
func() {
defer func() {
if r := recover(); r != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@withinboredom
Copy link
Member Author

How are you creating your body @dunglas? Here's the gist of what I'm doing to create requests: https://github.com/bottledcode/durable-php/pull/160/files#diff-67975c8380131293fe23decd46264638b7225b03f08297dbd46d1cd581b09ed7R150-R222

If you're looking for an example to show, I have durable php I could show off -- no more Doctrine, no more databases... just regular php code that magically survives requests.

it would be nice to be able to pass PHP values directly to the worker script. In my case, I can compute PHP arrays directly from the Go-side, but here I've to encode the data in JSON Go-side, and decode it PHP-side.

I'm handling this by providing a global object that is magically given context of the current situation. It's not ideal, but I didn't feel like fighting this (yet! I generally follow the principle of 'get it working' before I focus on 'get it working well' and then finally move on to 'get it working fast'). But basically you just do Worker::getCurrent()->queryState($someId) to get an array, then pass the array to Serde to get the original object. I haven't tested yet whether that is faster than serializing to JSON. I have also been working on Crell/Serde#85 to try using protobufs, which could faster than using JSON or array -- possibly (since it is just walking an array of bytes which doesn't need any parsing).

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

Actually, the URL is correctly passed to the worker, it was my bad. However the body is empty. Here is my code:

func (w *worker) ProvideRequest() *frankenphp.WorkerRequest {
	msg := <-w.messages

	p := url.Values{}
	p.Set("request", string(msg))

	u := url.URL{RawQuery: p.Encode()} // This works

	req, err := http.NewRequest("POST", u.String(), bytes.NewReader(msg)) // The body is empty
	if err != nil {
		panic(err)
	}

	return &frankenphp.WorkerRequest{
		Request: req,
	}
}

@withinboredom
Copy link
Member Author

Are you sure that msg actually has bytes after calling NewReader(msg)?

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

Yes:

	b, _ := io.ReadAll(req.Body)
	caddy.Log().Info("body", zap.String("request", string(b)))

It logs the correct string, even in startExternalWorkerPipe(), but for some reasons t I didn't figure out yet, php://input is empty.

@withinboredom
Copy link
Member Author

I'd check in worker.go that the body is still set when it sends the request to php. Otherwise, I can't imagine it where it would go between Go and PHP.

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

In waitForWorkerRequest(), the body is still available.

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

Another question: is there an easy way to execute some code when we get the response from PHP? Using a custom implementation of http.ResponseWriter and a chan waiting for the done chan to be closed doesn't look convenient at all.

@withinboredom
Copy link
Member Author

This is my worker script for this branch: https://github.com/bottledcode/durable-php/blob/61bd04de5367f9729a84a56eb1184a50585b8308/src/Glue/worker.php and it eventually calls this to get the body: https://github.com/bottledcode/durable-php/blob/61bd04de5367f9729a84a56eb1184a50585b8308/src/Glue/Glue.php#L106

I'll have to run it to verify it gets a payload; I'll check it in a bit.

@withinboredom
Copy link
Member Author

is there an easy way to execute some code when we get the response from PHP?

https://github.com/bottledcode/durable-php/blob/d31dc1e9c8cca3543e793ffe98a88529e7bba419/cli/glue/response_writer.go#L32-L48

This is pretty much the minimalist ResponseWriter. We could make this quite a bit simpler.

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

Ok I figured out the problem with request bodies. For the request body to be available, a ResponseWriter needs to be passed to frankenphp.WorkerRequest because of this line:

if fc.responseWriter == nil {

This works as expected:

&frankenphp.WorkerRequest{
		Request:  req,
		Response: httptest.NewRecorder(),
	}

@dunglas
Copy link
Member

dunglas commented Sep 12, 2025

Regarding the response writer, I may be missing something but I seen no easy way to be notified when the full body has been received (except playing with the done chan and some global states).

@withinboredom
Copy link
Member Author

Ah :) yes. The good ole 'lets just stream a response' problem...

Basically PHP won't write to the response writer unless and until you call flush() OR the output buffer becomes full (which can be set via ini_set). The "proper" way to do this is to us ob_* at the beginning of a request, then flush the whole thing in one go at the end of the request (or let PHP do it automatically). If you try to stream it, it is likely not to go well unless you're watching the response writer for a EOF.

@withinboredom
Copy link
Member Author

(In my case, I was streaming a response one line at a time to send multiple messages. So, this was a bit simpler. Echo a line and then flush.)

@AlliBalliBaba
Copy link
Contributor

AlliBalliBaba commented Sep 14, 2025

If you want to get more experimental, you can also add a new type of thread and pass readonly PHP objects directly via a frankenphp_handle_task(...) function or something similar. Might also be useful in other situations.

Basically, a stripped down worker without the whole request/response cycle (threadmessagequeue threadtaskworker?). Something I wanted to do at some point, but I'm not sure about the proper way to pass zvals around.

@withinboredom
Copy link
Member Author

We don't have handy-dandy (go-side constructed) objects in the generator, yet. Then you get into "types" and there are no generics ... yet. And then you may as well pass back a PSR Request object ...

@AlliBalliBaba
Copy link
Contributor

AlliBalliBaba commented Sep 14, 2025

For reference this is how far I got, the worker looking like this and tasks being dispatched like that.

It also just passes a string currently, but it could instead deep-copy the zval to global memory or so. Just wasn't really sure where to go with it.

@dunglas dunglas force-pushed the add/modular-threads branch 2 times, most recently from 457c2cf to 2a7dc7f Compare September 15, 2025 14:26
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
withinboredom and others added 5 commits September 15, 2025 17:19
Signed-off-by: Robert Landers <[email protected]>
Signed-off-by: Robert Landers <[email protected]>
* Simplify

* remove unused variable

* log thread index
@dunglas dunglas force-pushed the add/modular-threads branch from 2a7dc7f to 2bfac92 Compare September 15, 2025 15:19
dunglas and others added 4 commits September 17, 2025 11:51
@withinboredom
Copy link
Member Author

@dunglas: fixed a couple of segfaults for when there is no parameter or return value.

GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can we provide a default implementation as an embeddable struct, but this can be done in a follow up PR.

Comment on lines +44 to +47
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure that these fields are optional (they are useless for many use cases, and we could do some optimisations when they are nil).

Could be done later too.

Co-authored-by: Kévin Dunglas <[email protected]>
@AlliBalliBaba
Copy link
Contributor

Hmm I might create my task-worker PR. It's not refined yet and the timing is unfortunate, but those types of workers would probably be better suited for simple message queues since they don't include the request/response cycle.

@withinboredom
Copy link
Member Author

Having the request itself is actually pretty great. For example, headers can pass metadata out of band with the payload that >90% of the time you don't need, but when you do need it, it is there. Queue messages, GRPC, jobs, etc., all have headers/metadata, like observability tokens that libraries know how to emit spans from out-of-the-box without any changes required.

@dunglas dunglas merged commit 52df300 into main Sep 18, 2025
69 of 71 checks passed
@dunglas dunglas deleted the add/modular-threads branch September 18, 2025 07:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants