Skip to content

Commit 2bfac92

Browse files
committed
Simplify modular threads (#1874)
* Simplify * remove unused variable * log thread index
1 parent cf71f1d commit 2bfac92

File tree

2 files changed

+24
-39
lines changed

2 files changed

+24
-39
lines changed

threadFramework.go

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) {
6060

6161
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
6262
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
63-
go func() {
64-
defer func() {
65-
if r := recover(); r != nil {
66-
logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r))
67-
}
68-
}()
63+
for {
64+
rq := externalWorker.ProvideRequest()
6965

70-
for {
71-
var rq *WorkerRequest
72-
func() {
73-
defer func() {
74-
if r := recover(); r != nil {
75-
logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r))
76-
rq = nil
77-
}
78-
}()
79-
rq = externalWorker.ProvideRequest()
80-
}()
66+
if rq == nil || rq.Request == nil {
67+
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
68+
continue
69+
}
8170

82-
if rq == nil || rq.Request == nil {
83-
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
84-
continue
85-
}
71+
r := rq.Request
72+
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
73+
if err != nil {
74+
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
75+
continue
76+
}
8677

87-
r := rq.Request
88-
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
89-
if err != nil {
90-
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
91-
continue
92-
}
78+
if fc, ok := fromContext(fr.Context()); ok {
79+
fc.responseWriter = rq.Response
9380

94-
if fc, ok := fromContext(fr.Context()); ok {
95-
fc.responseWriter = rq.Response
81+
// Queue the request and wait for completion if Done channel was provided
82+
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
9683

97-
// Queue the request and wait for completion if Done channel was provided
98-
w.requestChan <- fc
99-
if rq.Done != nil {
100-
go func() {
101-
<-fc.done
102-
close(rq.Done)
103-
}()
104-
}
84+
w.requestChan <- fc
85+
if rq.Done != nil {
86+
go func() {
87+
<-fc.done
88+
close(rq.Done)
89+
}()
10590
}
10691
}
107-
}()
92+
}
10893
}

worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error {
5555
// create a pipe from the external worker to the main worker
5656
// note: this is locked to the initial thread size the external worker requested
5757
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
58-
startExternalWorkerPipe(w, workerThread.externalWorker, thread)
58+
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
5959
}
6060
workersReady.Done()
6161
}()

0 commit comments

Comments
 (0)