Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ module github.com/basiooo/andromodem
go 1.25.0

require (
github.com/basiooo/goadb v1.0.1
github.com/go-chi/chi/v5 v5.2.2
github.com/basiooo/goadb v1.1.1
github.com/go-chi/chi/v5 v5.2.3
github.com/go-chi/cors v1.2.2
github.com/go-playground/validator/v10 v10.27.0
github.com/gorilla/websocket v1.5.3
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
github.com/basiooo/goadb v1.0.1 h1:zmQQFu+n/QLV2XsfeYvvmrRatOAL77k17JFPvU8hOeA=
github.com/basiooo/goadb v1.0.1/go.mod h1:pp54HJu7sBRbN2ImV1U9FikU4qh8AJ/kYNF+47+cBSQ=
github.com/basiooo/goadb v1.1.1 h1:gh8EqRCdmrkaBPiW3YvgEKXpmMkU0VNhpqz7WKMAzGg=
github.com/basiooo/goadb v1.1.1/go.mod h1:pp54HJu7sBRbN2ImV1U9FikU4qh8AJ/kYNF+47+cBSQ=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gabriel-vasile/mimetype v1.4.9 h1:5k+WDwEsD9eTLL8Tz3L0VnmVh9QxGjRmjBvAG7U/oYY=
github.com/gabriel-vasile/mimetype v1.4.9/go.mod h1:WnSQhFKJuBlRyLiKohA/2DtIlPFAbguNaG7QCHcyGok=
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE=
github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE=
github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
Expand All @@ -24,8 +24,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down
210 changes: 210 additions & 0 deletions internal/handler/ws/mirroring_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package ws

import (
"context"
"net/http"
"sync"
"time"

"github.com/basiooo/andromodem/internal/service/mirroring_service"
"github.com/basiooo/andromodem/pkg/scrcpy"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)

type MirroringHandler struct {
MirroringService mirroring_service.IMirroringService
upgrader websocket.Upgrader
Logger *zap.Logger

activeConns map[string]*websocket.Conn
mu sync.Mutex
}

func NewMirroringHandler(mirroringService mirroring_service.IMirroringService, logger *zap.Logger) *MirroringHandler {
return &MirroringHandler{
MirroringService: mirroringService,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
HandshakeTimeout: 10 * time.Second,
},
Logger: logger,
activeConns: make(map[string]*websocket.Conn),
}
}

func (h *MirroringHandler) StartMirroringStream(w http.ResponseWriter, r *http.Request) {
serial := chi.URLParam(r, "serial")
var client *scrcpy.Client
var err error
conn, err := h.upgrader.Upgrade(w, r, nil)
h.mu.Lock()
if oldConn, ok := h.activeConns[serial]; ok {
h.Logger.Info("closing old websocket connection", zap.String("serial", serial))
oldConn.Close()
for {
if ok := h.MirroringService.GetClient(serial); ok == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
}
h.activeConns[serial] = conn
h.mu.Unlock()

ctx, cancel := context.WithCancel(r.Context())
defer cancel()
conn.SetCloseHandler(func(code int, text string) error {
h.Logger.Info("websocket connection closed by client",
zap.String("serial", serial),
zap.Int("code", code),
zap.String("text", text),
)
message := websocket.FormatCloseMessage(code, "")
if err := conn.WriteControl(
websocket.CloseMessage,
message,
time.Now().Add(time.Second),
); err != nil {
h.Logger.Warn("failed to write close message", zap.Error(err))
}
cancel()
return nil
})

if err != nil {
h.Logger.Error("failed to upgrade to websocket",
zap.String("serial", serial),
zap.Error(err),
)
return
}
h.Logger.Info("websocket connection established for mirroring",
zap.String("serial", serial),
)

if !h.MirroringService.IsRunning(serial) {
client, err = h.MirroringService.StartMirroring(ctx, serial)
if err != nil {
h.Logger.Error("failed to start mirroring",
zap.String("serial", serial),
zap.Error(err),
)
errorMsg := map[string]interface{}{
"type": "error",
"message": "Failed to start mirroring: " + err.Error(),
}
if err := conn.WriteJSON(errorMsg); err != nil {
h.Logger.Error("failed to send error message",
zap.String("serial", serial),
zap.Error(err),
)
}
return
}
}

successMsg := map[string]interface{}{
"type": "connected",
"message": "Mirroring stream connected",
"serial": serial,
"width": client.Width,
"height": client.Height,
}
if err := conn.WriteJSON(successMsg); err != nil {
h.Logger.Error("failed to send success message",
zap.String("serial", serial),
zap.Error(err),
)
return
}

pingTicker := time.NewTicker(30 * time.Second)
defer pingTicker.Stop()

go h.handleWebSocketControl(ctx, cancel, conn, serial)
handleVideoChunk := func(chunk []byte) {
if len(chunk) == 0 {
return
}

conn.SetWriteDeadline(time.Now().Add(5 * time.Second))

if err := conn.WriteMessage(websocket.BinaryMessage, chunk); err != nil {
h.Logger.Warn("failed to send video chunk",
zap.String("serial", serial),
zap.Int("chunk_size", len(chunk)),
zap.Error(err),
)
cancel()
return
}
}
go func() {
if err := h.MirroringService.CaptureVideoStream(serial, handleVideoChunk); err != nil {
h.Logger.Error("video stream capture ended",
zap.String("serial", serial),
zap.Error(err),
)
}
}()

for {
select {
case <-ctx.Done():
h.Logger.Info("mirroring websocket session ended",
zap.String("serial", serial),
)
h.mu.Lock()
delete(h.activeConns, serial)
h.mu.Unlock()

return
case <-pingTicker.C:
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
h.Logger.Error("failed to send ping",
zap.String("serial", serial),
zap.Error(err),
)
return
}
}
}
}

func (h *MirroringHandler) handleWebSocketControl(ctx context.Context, cancelCtx context.CancelFunc, conn *websocket.Conn, serial string) {
conn.SetReadLimit(1024)

go func() {
for {
select {
case <-ctx.Done():
return
default:
_, messageBytes, err := conn.ReadMessage()
if err != nil {
h.Logger.Info("client disconnected",
zap.String("serial", serial),
zap.Error(err),
)
cancelCtx()
return
}

if len(messageBytes) > 1024 {
h.Logger.Warn("received oversized message, ignoring",
zap.String("serial", serial),
zap.Int("size", len(messageBytes)),
)
continue
}

if len(messageBytes) > 0 {
h.MirroringService.HandleControlMessage(serial, messageBytes)
}
}
}
}()

}
7 changes: 7 additions & 0 deletions internal/handler/ws/mirroring_handler_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ws

import "net/http"

type IMirroringHandler interface {
StartMirroringStream(http.ResponseWriter, *http.Request)
}
23 changes: 17 additions & 6 deletions internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package router
import (
"context"

RESTHandler "github.com/basiooo/andromodem/internal/handler/rest"
"github.com/basiooo/andromodem/internal/handler/rest"
"github.com/basiooo/andromodem/internal/handler/ws"

"github.com/basiooo/andromodem/internal/handler/web"
"github.com/basiooo/andromodem/internal/service/devices_service"
"github.com/basiooo/andromodem/internal/service/messages_service"
"github.com/basiooo/andromodem/internal/service/mirroring_service"
"github.com/basiooo/andromodem/internal/service/monitoring_service"
network_service "github.com/basiooo/andromodem/internal/service/network"
"github.com/basiooo/andromodem/templates"
Expand Down Expand Up @@ -61,15 +64,18 @@ func (r *Router) GetRouters() chi.Router {
messagesService := messages_service.NewMessagesService(r.Adb, adbProcessor, r.Logger, r.Ctx)
networkService := network_service.NewNetworkService(r.Adb, adbProcessor, r.Logger, r.Ctx)
monitoringService := monitoring_service.NewMonitoringService(r.Adb, adbProcessor, networkService, r.Logger, r.Ctx)
mirroringService := mirroring_service.NewMirroringService(r.Adb, r.Logger, r.Ctx)

// Handlers
devicesEventHandler := SSEHandler.NewDevicesEventHandler(devicesService, r.Logger)
monitoringLogEventHandler := SSEHandler.NewMonitoringLogEventHandler(monitoringService, r.Logger)
devicesHandler := RESTHandler.NewDevicesHandler(devicesService, r.Logger, r.Validator)
messagesHandler := RESTHandler.NewMessagesHandler(messagesService, r.Logger, r.Validator)
networkHandler := RESTHandler.NewNetworkHandler(networkService, r.Logger, r.Validator)
monitoringHandler := RESTHandler.NewMonitoringHandler(monitoringService, r.Logger, r.Validator)
healthHandler := RESTHandler.NewHealthHandler()
devicesHandler := rest.NewDevicesHandler(devicesService, r.Logger, r.Validator)
messagesHandler := rest.NewMessagesHandler(messagesService, r.Logger, r.Validator)
networkHandler := rest.NewNetworkHandler(networkService, r.Logger, r.Validator)
monitoringHandler := rest.NewMonitoringHandler(monitoringService, r.Logger, r.Validator)
mirroringHandler := ws.NewMirroringHandler(mirroringService, r.Logger)

healthHandler := rest.NewHealthHandler()

// Frontend Handler
frontendHandler := web.NewFrontendHandler(r.Logger, templates.MainPage)
Expand Down Expand Up @@ -107,6 +113,11 @@ func (r *Router) GetRouters() chi.Router {
})
})

r.ChiRouter.Route("/ws", func(chiRouter chi.Router) {
chiRouter.Route("/devices/{serial}", func(chiRouter chi.Router) {
chiRouter.Get("/mirroring", mirroringHandler.StartMirroringStream)
})
})
r.ChiRouter.Get("/assets/*", frontendHandler.ServeAssets().ServeHTTP)
r.ChiRouter.Get("/", frontendHandler.ServeIndex)
return r.ChiRouter
Expand Down
Loading
Loading