Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/internal/handler/admin/account_codex_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (h *AccountHandler) importCodexSessions(ctx context.Context, req CodexSessi
Items: make([]CodexSessionImportItem, 0, len(entries)),
}

existingAccounts, err := h.listAccountsFiltered(ctx, service.PlatformOpenAI, service.AccountTypeOAuth, "", "", 0, "", "created_at", "desc")
existingAccounts, err := h.listAccountsFiltered(ctx, service.PlatformOpenAI, service.AccountTypeOAuth, "", "", 0, "", "", "", "", "created_at", "desc")
if err != nil {
return result, err
}
Expand Down
9 changes: 6 additions & 3 deletions backend/internal/handler/admin/account_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,12 @@ func (h *AccountHandler) listAllProxies(ctx context.Context) ([]service.Proxy, e
return out, nil
}

func (h *AccountHandler) listAccountsFiltered(ctx context.Context, platform, accountType, status, search string, groupID int64, privacyMode, sortBy, sortOrder string) ([]service.Account, error) {
func (h *AccountHandler) listAccountsFiltered(ctx context.Context, platform, accountType, status, search string, groupID int64, model, quotaStrategy, proxyFilter, privacyMode, sortBy, sortOrder string) ([]service.Account, error) {
page := 1
pageSize := dataPageCap
var out []service.Account
for {
items, total, err := h.adminService.ListAccounts(ctx, page, pageSize, platform, accountType, status, search, groupID, privacyMode, sortBy, sortOrder)
items, total, err := h.adminService.ListAccounts(ctx, page, pageSize, platform, accountType, status, search, groupID, model, quotaStrategy, proxyFilter, privacyMode, sortBy, sortOrder)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -410,6 +410,9 @@ func (h *AccountHandler) resolveExportAccounts(ctx context.Context, ids []int64,
platform := c.Query("platform")
accountType := c.Query("type")
status := c.Query("status")
model := strings.TrimSpace(c.Query("model"))
quotaStrategy := strings.TrimSpace(c.Query("quota_strategy"))
proxyFilter := strings.TrimSpace(c.Query("proxy_filter"))
privacyMode := strings.TrimSpace(c.Query("privacy_mode"))
search := strings.TrimSpace(c.Query("search"))
sortBy := c.DefaultQuery("sort_by", "name")
Expand All @@ -431,7 +434,7 @@ func (h *AccountHandler) resolveExportAccounts(ctx context.Context, ids []int64,
}
}

return h.listAccountsFiltered(ctx, platform, accountType, status, search, groupID, privacyMode, sortBy, sortOrder)
return h.listAccountsFiltered(ctx, platform, accountType, status, search, groupID, model, quotaStrategy, proxyFilter, privacyMode, sortBy, sortOrder)
}

func (h *AccountHandler) resolveExportProxies(ctx context.Context, accounts []service.Account) ([]service.Proxy, error) {
Expand Down
158 changes: 147 additions & 11 deletions backend/internal/handler/admin/account_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log"
"log/slog"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -58,6 +59,7 @@ type AccountHandler struct {
sessionLimitCache service.SessionLimitCache
rpmCache service.RPMCache
tokenCacheInvalidator service.TokenCacheInvalidator
accountTestQueue *accountTestQueue
}

// NewAccountHandler creates a new admin account handler
Expand Down Expand Up @@ -90,6 +92,7 @@ func NewAccountHandler(
sessionLimitCache: sessionLimitCache,
rpmCache: rpmCache,
tokenCacheInvalidator: tokenCacheInvalidator,
accountTestQueue: newAccountTestQueue(3 * time.Second),
}
}

Expand Down Expand Up @@ -230,6 +233,9 @@ func (h *AccountHandler) List(c *gin.Context) {
accountType := c.Query("type")
status := c.Query("status")
search := c.Query("search")
model := strings.TrimSpace(c.Query("model"))
quotaStrategy := strings.TrimSpace(c.Query("quota_strategy"))
proxyFilter := strings.TrimSpace(c.Query("proxy_filter"))
privacyMode := strings.TrimSpace(c.Query("privacy_mode"))
sortBy := c.DefaultQuery("sort_by", "name")
sortOrder := c.DefaultQuery("sort_order", "asc")
Expand Down Expand Up @@ -258,7 +264,7 @@ func (h *AccountHandler) List(c *gin.Context) {
}
}

accounts, total, err := h.adminService.ListAccounts(c.Request.Context(), page, pageSize, platform, accountType, status, search, groupID, privacyMode, sortBy, sortOrder)
accounts, total, err := h.adminService.ListAccounts(c.Request.Context(), page, pageSize, platform, accountType, status, search, groupID, model, quotaStrategy, proxyFilter, privacyMode, sortBy, sortOrder)
if err != nil {
response.ErrorFrom(c, err)
return
Expand Down Expand Up @@ -730,17 +736,9 @@ func (h *AccountHandler) Test(c *gin.Context) {
// Allow empty body, model_id is optional
_ = c.ShouldBindJSON(&req)

// Use AccountTestService to test the account with SSE streaming
if err := h.accountTestService.TestAccountConnection(c, accountID, req.ModelID, req.Prompt, req.Mode); err != nil {
// Error already sent via SSE, just log
if err := h.runQueuedInteractiveAccountTest(c, accountID, req); err != nil {
return
}

if h.rateLimitService != nil {
if _, err := h.rateLimitService.RecoverAccountAfterSuccessfulTest(c.Request.Context(), accountID); err != nil {
_ = c.Error(err)
}
}
}

// RecoverState handles unified recovery of recoverable account runtime state.
Expand Down Expand Up @@ -1200,6 +1198,78 @@ func (h *AccountHandler) BatchRefresh(c *gin.Context) {
})
}

// BatchTest handles batch testing account connectivity.
// POST /api/v1/admin/accounts/batch-test
func (h *AccountHandler) BatchTest(c *gin.Context) {
if h.accountTestService == nil {
response.Error(c, http.StatusServiceUnavailable, "Account test service unavailable")
return
}

var req struct {
AccountIDs []int64 `json:"account_ids"`
}
if err := c.ShouldBindJSON(&req); err != nil {
response.BadRequest(c, "Invalid request: "+err.Error())
return
}
if len(req.AccountIDs) == 0 {
response.BadRequest(c, "account_ids is required")
return
}

ctx := c.Request.Context()
accounts, err := h.adminService.GetAccountsByIDs(ctx, req.AccountIDs)
if err != nil {
response.ErrorFrom(c, err)
return
}

foundIDs := make(map[int64]bool, len(accounts))
for _, acc := range accounts {
if acc != nil {
foundIDs[acc.ID] = true
}
}

successCount := 0
failedCount := 0
errors := make([]gin.H, 0)

for _, id := range req.AccountIDs {
if foundIDs[id] {
continue
}
failedCount++
errors = append(errors, gin.H{
"account_id": id,
"error": "account not found",
})
}

for _, account := range accounts {
if account == nil {
continue
}
if err := h.runQueuedBackgroundAccountTest(ctx, account.ID); err != nil {
failedCount++
errors = append(errors, gin.H{
"account_id": account.ID,
"error": err.Error(),
})
continue
}
successCount++
}

response.Success(c, gin.H{
"total": len(req.AccountIDs),
"success": successCount,
"failed": failedCount,
"errors": errors,
})
}

// BatchCreate handles batch creating accounts
// POST /api/v1/admin/accounts/batch
func (h *AccountHandler) BatchCreate(c *gin.Context) {
Expand Down Expand Up @@ -1691,6 +1761,12 @@ func (h *AccountHandler) ClearRateLimit(c *gin.Context) {
response.Success(c, h.buildAccountResponseWithRuntime(c.Request.Context(), account))
}

func (h *AccountHandler) GetFilterModels(c *gin.Context) {
platform := strings.TrimSpace(c.Query("platform"))
groups := service.ListAccountModelFilterGroups()
response.Success(c, service.FilterAccountModelGroupsByPlatform(groups, platform))
}

// ResetQuota handles resetting account quota usage
// POST /api/v1/admin/accounts/:id/reset-quota
func (h *AccountHandler) ResetQuota(c *gin.Context) {
Expand All @@ -1705,15 +1781,75 @@ func (h *AccountHandler) ResetQuota(c *gin.Context) {
return
}

if h.rateLimitService != nil {
if _, err := h.rateLimitService.RecoverAccountState(c.Request.Context(), accountID, service.AccountRecoveryOptions{
InvalidateToken: true,
}); err != nil {
response.ErrorFrom(c, err)
return
}
}

account, err := h.adminService.GetAccount(c.Request.Context(), accountID)
if err != nil {
response.ErrorFrom(c, err)
return
}

if account.Platform == service.PlatformOpenAI && h.accountTestService != nil {
if err := h.runQueuedBackgroundAccountTest(c.Request.Context(), accountID); err != nil {
log.Printf("[WARN] auto test after quota reset failed for account %d: %v", accountID, err)
}
account, err = h.adminService.GetAccount(c.Request.Context(), accountID)
if err != nil {
response.ErrorFrom(c, err)
return
}
}

response.Success(c, h.buildAccountResponseWithRuntime(c.Request.Context(), account))
}

func (h *AccountHandler) runQueuedInteractiveAccountTest(c *gin.Context, accountID int64, req TestAccountRequest) error {
if h.accountTestService == nil {
response.Error(c, http.StatusServiceUnavailable, "Account test service unavailable")
return errors.New("account test service unavailable")
}

return h.accountTestQueue.Run(c.Request.Context(), func() error {
if err := h.accountTestService.TestAccountConnection(c, accountID, req.ModelID, req.Prompt, req.Mode); err != nil {
return err
}
if h.rateLimitService != nil {
if _, err := h.rateLimitService.RecoverAccountAfterSuccessfulTest(c.Request.Context(), accountID); err != nil {
_ = c.Error(err)
}
}
return nil
})
}

func (h *AccountHandler) runQueuedBackgroundAccountTest(ctx context.Context, accountID int64) error {
if h.accountTestService == nil {
return errors.New("account test service unavailable")
}

return h.accountTestQueue.Run(ctx, func() error {
recorder := httptest.NewRecorder()
testCtx, _ := gin.CreateTestContext(recorder)
testCtx.Request = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/admin/accounts/%d/test", accountID), nil).WithContext(ctx)
if err := h.accountTestService.TestAccountConnection(testCtx, accountID, "", "", ""); err != nil {
return err
}
if h.rateLimitService != nil {
if _, err := h.rateLimitService.RecoverAccountAfterSuccessfulTest(ctx, accountID); err != nil {
return err
}
}
return nil
})
}

// GetTempUnschedulable handles getting temporary unschedulable status
// GET /api/v1/admin/accounts/:id/temp-unschedulable
func (h *AccountHandler) GetTempUnschedulable(c *gin.Context) {
Expand Down Expand Up @@ -2107,7 +2243,7 @@ func (h *AccountHandler) BatchRefreshTier(c *gin.Context) {
accounts := make([]*service.Account, 0)

if len(req.AccountIDs) == 0 {
allAccounts, _, err := h.adminService.ListAccounts(ctx, 1, 10000, "gemini", "oauth", "", "", 0, "", "name", "asc")
allAccounts, _, err := h.adminService.ListAccounts(ctx, 1, 10000, "gemini", "oauth", "", "", 0, "", "", "", "", "name", "asc")
if err != nil {
response.ErrorFrom(c, err)
return
Expand Down
Loading
Loading