Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
516 changes: 375 additions & 141 deletions README.md

Large diffs are not rendered by default.

508 changes: 343 additions & 165 deletions README_CN.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ remote-management:
# Disable the bundled management control panel asset download and HTTP route when true.
disable-control-panel: false

# Enable background and on-demand download checks for management.html.
# Default is false: only an existing local file will be served.
auto-update-control-panel: false

# GitHub repository for the management control panel. Accepts a repository URL or releases API URL.
panel-github-repository: "https://github.com/router-for-me/Cli-Proxy-API-Management-Center"

Expand Down
4 changes: 4 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ func (s *Server) serveManagementControlPanel(c *gin.Context) {

if _, err := os.Stat(filePath); err != nil {
if os.IsNotExist(err) {
if !managementasset.AutoUpdateEnabled(cfg) {
c.AbortWithStatus(http.StatusNotFound)
return
}
// Synchronously ensure management.html is available with a detached context.
// Control panel bootstrap should not be canceled by client disconnects.
if !managementasset.EnsureLatestManagementHTML(context.Background(), managementasset.StaticDir(s.configFilePath), cfg.ProxyURL, cfg.RemoteManagement.PanelGitHubRepository) {
Expand Down
47 changes: 47 additions & 0 deletions internal/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,53 @@ func newTestServer(t *testing.T) *Server {
return NewServer(cfg, authManager, accessManager, configPath)
}

func TestManagementControlPanelServesLocalFileWhenAutoUpdateDisabled(t *testing.T) {
t.Setenv("WRITABLE_PATH", "")
t.Setenv("writable_path", "")

staticDir := t.TempDir()
t.Setenv("MANAGEMENT_STATIC_PATH", staticDir)

filePath := filepath.Join(staticDir, "management.html")
const body = "<html><body>local control panel</body></html>"
if err := os.WriteFile(filePath, []byte(body), 0o644); err != nil {
t.Fatalf("failed to write management asset: %v", err)
}

server := newTestServer(t)
server.cfg.RemoteManagement.AutoUpdateControlPanel = false

req := httptest.NewRequest(http.MethodGet, "/management.html", nil)
rr := httptest.NewRecorder()
server.engine.ServeHTTP(rr, req)

if rr.Code != http.StatusOK {
t.Fatalf("unexpected status code: got %d want %d; body=%s", rr.Code, http.StatusOK, rr.Body.String())
}
if got := rr.Body.String(); !strings.Contains(got, "local control panel") {
t.Fatalf("unexpected body: %s", got)
}
}

func TestManagementControlPanelMissingFileReturnsNotFoundWhenAutoUpdateDisabled(t *testing.T) {
t.Setenv("WRITABLE_PATH", "")
t.Setenv("writable_path", "")

staticDir := t.TempDir()
t.Setenv("MANAGEMENT_STATIC_PATH", staticDir)

server := newTestServer(t)
server.cfg.RemoteManagement.AutoUpdateControlPanel = false

req := httptest.NewRequest(http.MethodGet, "/management.html", nil)
rr := httptest.NewRecorder()
server.engine.ServeHTTP(rr, req)

if rr.Code != http.StatusNotFound {
t.Fatalf("unexpected status code: got %d want %d; body=%s", rr.Code, http.StatusNotFound, rr.Body.String())
}
}

func TestAmpProviderModelRoutes(t *testing.T) {
testCases := []struct {
name string
Expand Down
178 changes: 139 additions & 39 deletions internal/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bridge

import (
"context"
"strings"
"sync"
"time"

Expand All @@ -15,7 +16,9 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/sessions"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// Bridge connects CLIProxyAPI to a malice-network server via gRPC.
Expand All @@ -29,6 +32,7 @@ type Bridge struct {
spiteStream listenerrpc.ListenerRPC_SpiteStreamClient
jobStream listenerrpc.ListenerRPC_JobStreamClient
sendMu sync.Mutex // serializes spiteStream.Send() calls
reconnectMu sync.Mutex // serializes bridge state recovery

registry *Registry
taskManager *TaskManager
Expand Down Expand Up @@ -83,56 +87,28 @@ func NewBridge(cfg *config.C2BridgeConfig) (*Bridge, error) {

// Start registers the listener and pipeline, opens streams, and begins processing.
func (b *Bridge) Start(ctx context.Context) error {
// Register listener.
_, err := b.rpc.RegisterListener(b.listenerContext(), &clientpb.RegisterListener{
Name: b.cfg.ListenerName,
Host: b.cfg.ListenerIP,
})
if err != nil {
if err := b.registerListener(); err != nil {
return err
}
log.Infof("[bridge] registered listener %s at %s", b.cfg.ListenerName, b.cfg.ListenerIP)

// Register pipeline as a custom (externally-managed) type.
_, err = b.rpc.RegisterPipeline(b.listenerContext(), &clientpb.Pipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
Enable: true,
Type: "llm",
Body: &clientpb.Pipeline_Custom{
Custom: &clientpb.CustomPipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
Host: b.cfg.ListenerIP,
},
},
})
if err != nil {
if err := b.registerPipeline(); err != nil {
return err
}
log.Infof("[bridge] registered pipeline %s", b.cfg.PipelineName)

// Open JobStream BEFORE StartPipeline — the server pushes a CtrlPipelineStart
// job and blocks until the listener responds via this stream.
b.jobStream, err = b.rpc.JobStream(b.listenerContext())
if err != nil {
if err := b.openJobStream(); err != nil {
return err
}
go b.handleJobStream()

// Start pipeline.
_, err = b.rpc.StartPipeline(b.listenerContext(), &clientpb.CtrlPipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
})
if err != nil {
if err := b.startPipeline(); err != nil {
return err
}
log.Infof("[bridge] pipeline %s started", b.cfg.PipelineName)

// Open SpiteStream with pipeline_id metadata.
b.spiteStream, err = b.rpc.SpiteStream(b.pipelineContext())
if err != nil {
if err := b.openSpiteStream(); err != nil {
return err
}

Expand Down Expand Up @@ -169,6 +145,77 @@ func (b *Bridge) Close() error {
return nil
}

func (b *Bridge) registerListener() error {
_, err := b.rpc.RegisterListener(b.listenerContext(), &clientpb.RegisterListener{
Name: b.cfg.ListenerName,
Host: b.cfg.ListenerIP,
})
if err != nil && status.Code(err) != codes.AlreadyExists {
return err
}
log.Infof("[bridge] registered listener %s at %s", b.cfg.ListenerName, b.cfg.ListenerIP)
return nil
}

func (b *Bridge) registerPipeline() error {
_, err := b.rpc.RegisterPipeline(b.listenerContext(), &clientpb.Pipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
Enable: true,
Type: "llm",
Body: &clientpb.Pipeline_Custom{
Custom: &clientpb.CustomPipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
Host: b.cfg.ListenerIP,
},
},
})
if err != nil && status.Code(err) != codes.AlreadyExists {
return err
}
log.Infof("[bridge] registered pipeline %s", b.cfg.PipelineName)
return nil
}

func (b *Bridge) startPipeline() error {
_, err := b.rpc.StartPipeline(b.listenerContext(), &clientpb.CtrlPipeline{
Name: b.cfg.PipelineName,
ListenerId: b.cfg.ListenerName,
})
if err != nil {
return err
}
log.Infof("[bridge] pipeline %s started", b.cfg.PipelineName)
return nil
}

func (b *Bridge) startPipelineAsync() {
go func() {
if err := b.startPipeline(); err != nil && b.ctx.Err() == nil {
log.Errorf("[bridge] failed to restart pipeline %s: %v", b.cfg.PipelineName, err)
}
}()
}

func (b *Bridge) openJobStream() error {
stream, err := b.rpc.JobStream(b.listenerContext())
if err != nil {
return err
}
b.jobStream = stream
return nil
}

func (b *Bridge) openSpiteStream() error {
stream, err := b.rpc.SpiteStream(b.pipelineContext())
if err != nil {
return err
}
b.spiteStream = stream
return nil
}

// listenerContext returns a gRPC context with listener_id metadata.
func (b *Bridge) listenerContext() context.Context {
return metadata.NewOutgoingContext(b.ctx, metadata.Pairs(
Expand Down Expand Up @@ -256,43 +303,96 @@ func (b *Bridge) notifySessionReady(sessionID string) {
}

// reconnectSpiteStream attempts to re-open the SpiteStream with exponential backoff.
func (b *Bridge) reconnectSpiteStream() {
func (b *Bridge) reconnectSpiteStream(lastErr error) {
restore := shouldRestoreBridgeState(lastErr)
for attempt := 1; ; attempt++ {
select {
case <-b.ctx.Done():
return
case <-time.After(reconnectDelay(attempt)):
}
stream, err := b.rpc.SpiteStream(b.pipelineContext())
var err error
if restore {
err = b.restoreBridgeState(true)
} else {
err = b.openSpiteStream()
if shouldRestoreBridgeState(err) {
restore = true
}
}
if err != nil {
log.Errorf("[bridge] SpiteStream reconnect attempt %d failed: %v", attempt, err)
continue
}
b.spiteStream = stream
log.Infof("[bridge] SpiteStream reconnected after %d attempts", attempt)
return
}
}

// reconnectJobStream attempts to re-open the JobStream with exponential backoff.
func (b *Bridge) reconnectJobStream() {
func (b *Bridge) reconnectJobStream(lastErr error) {
restore := shouldRestoreBridgeState(lastErr)
for attempt := 1; ; attempt++ {
select {
case <-b.ctx.Done():
return
case <-time.After(reconnectDelay(attempt)):
}
stream, err := b.rpc.JobStream(b.listenerContext())
var err error
if restore {
err = b.restoreBridgeState(false)
} else {
err = b.openJobStream()
if shouldRestoreBridgeState(err) {
restore = true
}
}
if err != nil {
log.Errorf("[bridge] JobStream reconnect attempt %d failed: %v", attempt, err)
continue
}
b.jobStream = stream
log.Infof("[bridge] JobStream reconnected after %d attempts", attempt)
return
}
}

func (b *Bridge) restoreBridgeState(restoreSpite bool) error {
b.reconnectMu.Lock()
defer b.reconnectMu.Unlock()

if err := b.registerListener(); err != nil {
return err
}
if err := b.registerPipeline(); err != nil {
return err
}
if err := b.openJobStream(); err != nil {
return err
}
if restoreSpite {
if err := b.openSpiteStream(); err != nil {
return err
}
}

// StartPipeline can block until JobStream receives CtrlPipelineStart,
// so it must run outside the reconnect caller's Recv loop.
b.startPipelineAsync()
b.reregisterSessions()
return nil
}

func shouldRestoreBridgeState(err error) bool {
if err == nil {
return false
}
if status.Code(err) == codes.NotFound {
return true
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "listener not found") || strings.Contains(msg, "pipeline not found")
}

// reconnectDelay returns a backoff duration: 2s, 4s, 6s, ..., capped at 30s.
func reconnectDelay(attempt int) time.Duration {
delay := time.Duration(attempt) * 2 * time.Second
Expand Down
Loading
Loading