Skip to content

Commit 10a6693

Browse files
authored
fix: guard from potential deadlock with requests in flight (#6484)
* fix(watchdog): guard from potential deadlock with requests in flight Signed-off-by: Ettore Di Giacinto <[email protected]> * Improve locking when loading models Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent f0245fa commit 10a6693

File tree

2 files changed

+36
-14
lines changed

2 files changed

+36
-14
lines changed

pkg/model/loader.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,18 @@ func (ml *ModelLoader) ListLoadedModels() []*Model {
149149
}
150150

151151
func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) {
152+
ml.mu.Lock()
153+
defer ml.mu.Unlock()
154+
152155
// Check if we already have a loaded model
153-
if model := ml.CheckIsLoaded(modelID); model != nil {
156+
if model := ml.checkIsLoaded(modelID); model != nil {
154157
return model, nil
155158
}
156159

157160
// Load the model and keep it in memory for later use
158161
modelFile := filepath.Join(ml.ModelPath, modelName)
159162
log.Debug().Msgf("Loading model in memory from file: %s", modelFile)
160163

161-
ml.mu.Lock()
162-
defer ml.mu.Unlock()
163164
model, err := loader(modelID, modelName, modelFile)
164165
if err != nil {
165166
return nil, fmt.Errorf("failed to load model with internal loader: %s", err)
@@ -184,6 +185,10 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
184185
func (ml *ModelLoader) CheckIsLoaded(s string) *Model {
185186
ml.mu.Lock()
186187
defer ml.mu.Unlock()
188+
return ml.checkIsLoaded(s)
189+
}
190+
191+
func (ml *ModelLoader) checkIsLoaded(s string) *Model {
187192
m, ok := ml.models[s]
188193
if !ok {
189194
return nil

pkg/model/watchdog.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy
4444
busyCheck: busy,
4545
idleCheck: idle,
4646
addressModelMap: make(map[string]string),
47+
stop: make(chan bool, 1),
4748
}
4849
}
4950

@@ -104,18 +105,18 @@ func (wd *WatchDog) Run() {
104105

105106
func (wd *WatchDog) checkIdle() {
106107
wd.Lock()
107-
defer wd.Unlock()
108108
log.Debug().Msg("[WatchDog] Watchdog checks for idle connections")
109+
110+
// Collect models to shutdown while holding the lock
111+
var modelsToShutdown []string
109112
for address, t := range wd.idleTime {
110113
log.Debug().Msgf("[WatchDog] %s: idle connection", address)
111114
if time.Since(t) > wd.idletimeout {
112115
log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address)
113116
model, ok := wd.addressModelMap[address]
114117
if ok {
115-
if err := wd.pm.ShutdownModel(model); err != nil {
116-
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
117-
}
118-
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
118+
modelsToShutdown = append(modelsToShutdown, model)
119+
// Clean up the maps while we have the lock
119120
delete(wd.idleTime, address)
120121
delete(wd.addressModelMap, address)
121122
delete(wd.addressMap, address)
@@ -125,25 +126,32 @@ func (wd *WatchDog) checkIdle() {
125126
}
126127
}
127128
}
129+
wd.Unlock()
130+
131+
// Now shutdown models without holding the watchdog lock to prevent deadlock
132+
for _, model := range modelsToShutdown {
133+
if err := wd.pm.ShutdownModel(model); err != nil {
134+
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
135+
}
136+
log.Debug().Msgf("[WatchDog] model shut down: %s", model)
137+
}
128138
}
129139

130140
func (wd *WatchDog) checkBusy() {
131141
wd.Lock()
132-
defer wd.Unlock()
133142
log.Debug().Msg("[WatchDog] Watchdog checks for busy connections")
134143

144+
// Collect models to shutdown while holding the lock
145+
var modelsToShutdown []string
135146
for address, t := range wd.timetable {
136147
log.Debug().Msgf("[WatchDog] %s: active connection", address)
137148

138149
if time.Since(t) > wd.timeout {
139-
140150
model, ok := wd.addressModelMap[address]
141151
if ok {
142152
log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
143-
if err := wd.pm.ShutdownModel(model); err != nil {
144-
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
145-
}
146-
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
153+
modelsToShutdown = append(modelsToShutdown, model)
154+
// Clean up the maps while we have the lock
147155
delete(wd.timetable, address)
148156
delete(wd.addressModelMap, address)
149157
delete(wd.addressMap, address)
@@ -153,4 +161,13 @@ func (wd *WatchDog) checkBusy() {
153161
}
154162
}
155163
}
164+
wd.Unlock()
165+
166+
// Now shutdown models without holding the watchdog lock to prevent deadlock
167+
for _, model := range modelsToShutdown {
168+
if err := wd.pm.ShutdownModel(model); err != nil {
169+
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
170+
}
171+
log.Debug().Msgf("[WatchDog] model shut down: %s", model)
172+
}
156173
}

0 commit comments

Comments
 (0)