Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions backend/internal/service/openai_account_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ func (r schedulerTestOpenAIAccountRepo) ListSchedulableUngroupedByPlatform(ctx c
return r.ListSchedulableByPlatform(ctx, platform)
}

func (r schedulerTestOpenAIAccountRepo) ListByGroup(ctx context.Context, groupID int64) ([]Account, error) {
return r.accounts, nil
}

func (r schedulerTestOpenAIAccountRepo) ListByPlatform(ctx context.Context, platform string) ([]Account, error) {
var result []Account
for _, acc := range r.accounts {
if acc.Platform == platform {
result = append(result, acc)
}
}
return result, nil
}

func (r schedulerTestOpenAIAccountRepo) ClearRateLimit(ctx context.Context, id int64) error {
return nil
}

type schedulerTestConcurrencyCache struct {
ConcurrencyCache
loadBatchErr error
Expand Down
209 changes: 209 additions & 0 deletions backend/internal/service/openai_gateway_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
codexCLIVersion = "0.125.0"
// Codex 限额快照仅用于后台展示/诊断,不需要每个成功请求都立即落库。
openAICodexSnapshotPersistMinInterval = 30 * time.Second
openAICodexRecoverySnapshotMaxSkew = 2 * time.Minute
)

// OpenAI allowed headers whitelist (for non-passthrough).
Expand Down Expand Up @@ -1369,6 +1370,12 @@ func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.C
selected, compactBlocked := s.selectBestAccount(ctx, groupID, accounts, requestedModel, excludedIDs, requireCompact)

if selected == nil {
if recovered := s.recoverOpenAIRateLimitedAccountBeforeNoAvailable(ctx, groupID, requestedModel, excludedIDs, requireCompact); recovered != nil {
if sessionHash != "" {
_ = s.setStickySessionAccountID(ctx, groupID, sessionHash, recovered.ID, openaiStickySessionTTL)
}
return recovered, nil
}
return nil, noAvailableOpenAISelectionError(requestedModel, compactBlocked)
}

Expand Down Expand Up @@ -1506,6 +1513,199 @@ func (s *OpenAIGatewayService) selectBestAccount(ctx context.Context, groupID *i
return selected, compactBlocked
}

func (s *OpenAIGatewayService) recoverOpenAIRateLimitedAccountBeforeNoAvailable(ctx context.Context, groupID *int64, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool) *Account {
if s == nil || s.accountRepo == nil {
return nil
}

accounts, err := s.listOpenAIAccountsForRateLimitRecovery(ctx, groupID)
if err != nil {
slog.Warn("openai_rate_limit_recovery_list_failed", "group_id", derefGroupID(groupID), "error", err)
return nil
}
if len(accounts) == 0 {
return nil
}

needsUpstreamCheck := s.needsUpstreamChannelRestrictionCheck(ctx, groupID)
candidates := make([]*Account, 0, len(accounts))
for i := range accounts {
acc := &accounts[i]
if _, excluded := excludedIDs[acc.ID]; excluded {
continue
}
if !isRecoverableOpenAIRateLimitedAccount(acc, requestedModel, requireCompact) {
continue
}
if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, acc, requestedModel, requireCompact) {
continue
}
candidates = append(candidates, acc)
}
if len(candidates) == 0 {
return nil
}

sortAccountsByPriorityAndLastUsed(candidates, false)
for _, candidate := range candidates {
if err := s.clearOpenAIRateLimitForRecovery(ctx, candidate.ID); err != nil {
slog.Warn("openai_rate_limit_recovery_clear_failed", "account_id", candidate.ID, "error", err)
continue
}

fresh, err := s.accountRepo.GetByID(ctx, candidate.ID)
if err != nil || fresh == nil {
slog.Warn("openai_rate_limit_recovery_hydrate_failed", "account_id", candidate.ID, "error", err)
continue
}
if !isOpenAIAccountEligibleForRequest(fresh, requestedModel, requireCompact) {
continue
}
if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel, requireCompact) {
continue
}

slog.Info("openai_rate_limit_recovery_selected", "account_id", fresh.ID, "group_id", derefGroupID(groupID), "model", requestedModel)
return fresh
}

return nil
}

func (s *OpenAIGatewayService) recoverOpenAIRateLimitedSelectionBeforeNoAvailable(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool, cfg config.GatewaySchedulingConfig) (*AccountSelectionResult, bool) {
account := s.recoverOpenAIRateLimitedAccountBeforeNoAvailable(ctx, groupID, requestedModel, excludedIDs, requireCompact)
if account == nil {
return nil, false
}
if sessionHash != "" {
_ = s.setStickySessionAccountID(ctx, groupID, sessionHash, account.ID, openaiStickySessionTTL)
}

result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err == nil && result != nil && result.Acquired {
return &AccountSelectionResult{
Account: account,
Acquired: true,
ReleaseFunc: result.ReleaseFunc,
}, true
}

return &AccountSelectionResult{
Account: account,
WaitPlan: &AccountWaitPlan{
AccountID: account.ID,
MaxConcurrency: account.Concurrency,
Timeout: cfg.FallbackWaitTimeout,
MaxWaiting: cfg.FallbackMaxWaiting,
},
}, true
}

func (s *OpenAIGatewayService) listOpenAIAccountsForRateLimitRecovery(ctx context.Context, groupID *int64) ([]Account, error) {
if s == nil || s.accountRepo == nil {
return nil, nil
}
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return s.accountRepo.ListByPlatform(ctx, PlatformOpenAI)
}
if groupID != nil {
accounts, err := s.accountRepo.ListByGroup(ctx, *groupID)
if err != nil {
return nil, err
}
return filterOpenAIAccounts(accounts), nil
}

accounts, err := s.accountRepo.ListByPlatform(ctx, PlatformOpenAI)
if err != nil {
return nil, err
}
out := make([]Account, 0, len(accounts))
for _, acc := range accounts {
if len(acc.AccountGroups) == 0 {
out = append(out, acc)
}
}
return out, nil
}

func filterOpenAIAccounts(accounts []Account) []Account {
out := make([]Account, 0, len(accounts))
for _, acc := range accounts {
if acc.IsOpenAI() {
out = append(out, acc)
}
}
return out
}

func (s *OpenAIGatewayService) clearOpenAIRateLimitForRecovery(ctx context.Context, accountID int64) error {
if s != nil && s.accountRepo != nil {
return s.accountRepo.ClearRateLimit(ctx, accountID)
}
return nil
}

func isRecoverableOpenAIRateLimitedAccount(account *Account, requestedModel string, requireCompact bool) bool {
if account == nil || !account.IsOpenAI() || !account.IsActive() || !account.Schedulable {
return false
}
now := time.Now()
if account.RateLimitResetAt == nil || !now.Before(*account.RateLimitResetAt) {
return false
}
if account.AutoPauseOnExpired && account.ExpiresAt != nil && !now.Before(*account.ExpiresAt) {
return false
}
if account.OverloadUntil != nil && now.Before(*account.OverloadUntil) {
return false
}
if account.TempUnschedulableUntil != nil && now.Before(*account.TempUnschedulableUntil) {
return false
}
if account.IsAPIKeyOrBedrock() && account.IsQuotaExceeded() {
return false
}
if requestedModel != "" && !account.IsModelSupported(requestedModel) {
return false
}
if requireCompact && openAICompactSupportTier(account) == 0 {
return false
}
return openAICodexSnapshotShowsNonExhaustedWindows(account)
}

func openAICodexSnapshotShowsNonExhaustedWindows(account *Account) bool {
used5h, ok5h := accountExtraFloat64(account, "codex_5h_used_percent")
used7d, ok7d := accountExtraFloat64(account, "codex_7d_used_percent")
if !ok5h || !ok7d || used5h >= 100 || used7d >= 100 {
return false
}
if account.RateLimitedAt == nil {
return false
}
sampledAt := account.getExtraTime("codex_usage_updated_at")
if sampledAt.IsZero() {
return false
}
diff := sampledAt.Sub(*account.RateLimitedAt)
if diff < 0 {
diff = -diff
}
return diff <= openAICodexRecoverySnapshotMaxSkew
}

func accountExtraFloat64(account *Account, key string) (float64, bool) {
if account == nil || account.Extra == nil {
return 0, false
}
value, ok := account.Extra[key]
if !ok {
return 0, false
}
return parseExtraFloat64(value), true
}

// isBetterAccount 判断 candidate 是否比 current 更优。
// 规则:优先级更高(数值更小)优先;同优先级时,未使用过的优先,其次是最久未使用的。
//
Expand Down Expand Up @@ -1593,6 +1793,9 @@ func (s *OpenAIGatewayService) selectAccountWithLoadAwareness(ctx context.Contex
return nil, err
}
if len(accounts) == 0 {
if recovered, ok := s.recoverOpenAIRateLimitedSelectionBeforeNoAvailable(ctx, groupID, sessionHash, requestedModel, excludedIDs, requireCompact, cfg); ok {
return recovered, nil
}
return nil, ErrNoAvailableAccounts
}

Expand Down Expand Up @@ -1667,6 +1870,9 @@ func (s *OpenAIGatewayService) selectAccountWithLoadAwareness(ctx context.Contex
}

if len(candidates) == 0 {
if recovered, ok := s.recoverOpenAIRateLimitedSelectionBeforeNoAvailable(ctx, groupID, sessionHash, requestedModel, excludedIDs, requireCompact, cfg); ok {
return recovered, nil
}
return nil, ErrNoAvailableAccounts
}

Expand Down Expand Up @@ -1809,6 +2015,9 @@ func (s *OpenAIGatewayService) selectAccountWithLoadAwareness(ctx context.Contex
})
}

if recovered, ok := s.recoverOpenAIRateLimitedSelectionBeforeNoAvailable(ctx, groupID, sessionHash, requestedModel, excludedIDs, requireCompact, cfg); ok {
return recovered, nil
}
if requireCompact && baseCandidateCount > 0 {
return nil, ErrNoAvailableCompactAccounts
}
Expand Down
Loading
Loading