Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add high-level boundary management interface
Browse files Browse the repository at this point in the history
I rewrote the router's boundary management part to implement dynamic management
from a high-level box interface. This also includes a number of changes I made
in the process of rewriting some messy parts, such as the Outbound tree
bottom-top starter.
beryll1um committed Oct 17, 2024
1 parent ea70360 commit f25cb7c
Showing 13 changed files with 503 additions and 352 deletions.
18 changes: 16 additions & 2 deletions adapter/router.go
Original file line number Diff line number Diff line change
@@ -18,14 +18,28 @@ import (
)

type Router interface {
Service
AddOutbound(outbound Outbound) error
AddInbound(inbound Inbound) error

RemoveOutbound(tag string) error
RemoveInbound(tag string) error

PreStarter

StartOutbounds() error

Service

StartInbounds() error

PostStarter

Cleanup() error

DefaultOutbound(network string) (Outbound, error)
Outbounds() []Outbound
Outbound(tag string) (Outbound, bool)
DefaultOutbound(network string) (Outbound, error)
Inbound(tag string) (Inbound, bool)

FakeIPStore() FakeIPStore

225 changes: 101 additions & 124 deletions box.go
Original file line number Diff line number Diff line change
@@ -29,16 +29,16 @@ import (
var _ adapter.Service = (*Box)(nil)

type Box struct {
createdAt time.Time
router adapter.Router
inbounds []adapter.Inbound
outbounds []adapter.Outbound
logFactory log.Factory
logger log.ContextLogger
preServices1 map[string]adapter.Service
preServices2 map[string]adapter.Service
postServices map[string]adapter.Service
done chan struct{}
createdAt time.Time
router adapter.Router
logFactory log.Factory
logger log.ContextLogger
preServices1 map[string]adapter.Service
preServices2 map[string]adapter.Service
postServices map[string]adapter.Service
platformInterface platform.Interface
ctx context.Context
done chan struct{}
}

type Options struct {
@@ -97,57 +97,6 @@ func New(options Options) (*Box, error) {
if err != nil {
return nil, E.Cause(err, "parse route options")
}
inbounds := make([]adapter.Inbound, 0, len(options.Inbounds))
outbounds := make([]adapter.Outbound, 0, len(options.Outbounds))
for i, inboundOptions := range options.Inbounds {
var in adapter.Inbound
var tag string
if inboundOptions.Tag != "" {
tag = inboundOptions.Tag
} else {
tag = F.ToString(i)
}
in, err = inbound.New(
ctx,
router,
logFactory.NewLogger(F.ToString("inbound/", inboundOptions.Type, "[", tag, "]")),
tag,
inboundOptions,
options.PlatformInterface,
)
if err != nil {
return nil, E.Cause(err, "parse inbound[", i, "]")
}
inbounds = append(inbounds, in)
}
for i, outboundOptions := range options.Outbounds {
var out adapter.Outbound
var tag string
if outboundOptions.Tag != "" {
tag = outboundOptions.Tag
} else {
tag = F.ToString(i)
}
out, err = outbound.New(
ctx,
router,
logFactory.NewLogger(F.ToString("outbound/", outboundOptions.Type, "[", tag, "]")),
tag,
outboundOptions)
if err != nil {
return nil, E.Cause(err, "parse outbound[", i, "]")
}
outbounds = append(outbounds, out)
}
err = router.Initialize(inbounds, outbounds, func() adapter.Outbound {
out, oErr := outbound.New(ctx, router, logFactory.NewLogger("outbound/direct"), "direct", option.Outbound{Type: "direct", Tag: "default"})
common.Must(oErr)
outbounds = append(outbounds, out)
return out
})
if err != nil {
return nil, err
}
if options.PlatformInterface != nil {
err = options.PlatformInterface.Initialize(ctx, router)
if err != nil {
@@ -183,18 +132,35 @@ func New(options Options) (*Box, error) {
router.SetV2RayServer(v2rayServer)
preServices2["v2ray api"] = v2rayServer
}
return &Box{
router: router,
inbounds: inbounds,
outbounds: outbounds,
createdAt: createdAt,
logFactory: logFactory,
logger: logFactory.Logger(),
preServices1: preServices1,
preServices2: preServices2,
postServices: postServices,
done: make(chan struct{}),
}, nil
box := &Box{
router: router,
createdAt: createdAt,
logFactory: logFactory,
logger: logFactory.Logger(),
preServices1: preServices1,
preServices2: preServices2,
postServices: postServices,
platformInterface: options.PlatformInterface,
ctx: ctx,
done: make(chan struct{}),
}
for i, outOpts := range options.Outbounds {
if outOpts.Tag == "" {
outOpts.Tag = F.ToString(i)
}
if err := box.AddOutbound(outOpts); err != nil {
return nil, E.Cause(err, "create outbound")
}
}
for i, inOpts := range options.Inbounds {
if inOpts.Tag == "" {
inOpts.Tag = F.ToString(i)
}
if err := box.AddInbound(inOpts); err != nil {
return nil, E.Cause(err, "create inbound")
}
}
return box, nil
}

func (s *Box) PreStart() error {
@@ -263,12 +229,10 @@ func (s *Box) preStart() error {
}
}
}
err = s.router.PreStart()
if err != nil {
if err := s.router.PreStart(); err != nil {
return E.Cause(err, "pre-start router")
}
err = s.startOutbounds()
if err != nil {
if err := s.router.StartOutbounds(); err != nil {
return err
}
return s.router.Start()
@@ -291,20 +255,10 @@ func (s *Box) start() error {
return E.Cause(err, "start ", serviceName)
}
}
for i, in := range s.inbounds {
var tag string
if in.Tag() == "" {
tag = F.ToString(i)
} else {
tag = in.Tag()
}
err = in.Start()
if err != nil {
return E.Cause(err, "initialize inbound/", in.Type(), "[", tag, "]")
}
if err := s.router.StartInbounds(); err != nil {
return E.Cause(err, "start inbounds")
}
err = s.postStart()
if err != nil {
if err = s.postStart(); err != nil {
return err
}
return s.router.Cleanup()
@@ -317,26 +271,8 @@ func (s *Box) postStart() error {
return E.Cause(err, "start ", serviceName)
}
}
// TODO: reorganize ALL start order
for _, out := range s.outbounds {
if lateOutbound, isLateOutbound := out.(adapter.PostStarter); isLateOutbound {
err := lateOutbound.PostStart()
if err != nil {
return E.Cause(err, "post-start outbound/", out.Tag())
}
}
}
err := s.router.PostStart()
if err != nil {
return err
}
for _, in := range s.inbounds {
if lateInbound, isLateInbound := in.(adapter.PostStarter); isLateInbound {
err = lateInbound.PostStart()
if err != nil {
return E.Cause(err, "post-start inbound/", in.Tag())
}
}
if err := s.router.PostStart(); err != nil {
return E.Cause(err, "post-start")
}
return nil
}
@@ -357,20 +293,6 @@ func (s *Box) Close() error {
})
monitor.Finish()
}
for i, in := range s.inbounds {
monitor.Start("close inbound/", in.Type(), "[", i, "]")
errors = E.Append(errors, in.Close(), func(err error) error {
return E.Cause(err, "close inbound/", in.Type(), "[", i, "]")
})
monitor.Finish()
}
for i, out := range s.outbounds {
monitor.Start("close outbound/", out.Type(), "[", i, "]")
errors = E.Append(errors, common.Close(out), func(err error) error {
return E.Cause(err, "close outbound/", out.Type(), "[", i, "]")
})
monitor.Finish()
}
monitor.Start("close router")
if err := common.Close(s.router); err != nil {
errors = E.Append(errors, err, func(err error) error {
@@ -403,3 +325,58 @@ func (s *Box) Close() error {
func (s *Box) Router() adapter.Router {
return s.router
}

func (s *Box) AddOutbound(option option.Outbound) error {
if option.Tag == "" {
return E.New("empty tag")
}
out, err := outbound.New(
s.ctx,
s.router,
s.logFactory.NewLogger(F.ToString("outbound/", option.Type, "[", option.Tag, "]")),
option.Tag,
option,
)
if err != nil {
return E.Cause(err, "parse addited outbound")
}
if err := s.router.AddOutbound(out); err != nil {
return E.Cause(err, "outbound/", option.Type, "[", option.Tag, "]")
}
return nil
}

func (s *Box) AddInbound(option option.Inbound) error {
if option.Tag == "" {
return E.New("empty tag")
}
in, err := inbound.New(
s.ctx,
s.router,
s.logFactory.NewLogger(F.ToString("inbound/", option.Type, "[", option.Tag, "]")),
option.Tag,
option,
s.platformInterface,
)
if err != nil {
return E.Cause(err, "parse addited inbound")
}
if err := s.router.AddInbound(in); err != nil {
return E.Cause(err, "inbound/", option.Type, "[", option.Tag, "]")
}
return nil
}

func (s *Box) RemoveOutbound(tag string) error {
if err := s.router.RemoveOutbound(tag); err != nil {
return E.Cause(err, "outbound[", tag, "]")
}
return nil
}

func (s *Box) RemoveInbound(tag string) error {
if err := s.router.RemoveInbound(tag); err != nil {
return E.Cause(err, "inbound[", tag, "]")
}
return nil
}
85 changes: 0 additions & 85 deletions box_outbound.go

This file was deleted.

6 changes: 3 additions & 3 deletions common/process/searcher.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (
)

type Searcher interface {
FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*Info, error)
FindProcessInfo(ctx context.Context, network string, source netip.AddrPort) (*Info, error)
}

var ErrNotFound = E.New("process not found")
@@ -29,8 +29,8 @@ type Info struct {
UserId int32
}

func FindProcessInfo(searcher Searcher, ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*Info, error) {
info, err := searcher.FindProcessInfo(ctx, network, source, destination)
func FindProcessInfo(searcher Searcher, ctx context.Context, network string, source netip.AddrPort) (*Info, error) {
info, err := searcher.FindProcessInfo(ctx, network, source)
if err != nil {
return nil, err
}
4 changes: 2 additions & 2 deletions common/process/searcher_linux.go
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ func NewSearcher(config Config) (Searcher, error) {
return &linuxSearcher{config.Logger}, nil
}

func (s *linuxSearcher) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*Info, error) {
inode, uid, err := resolveSocketByNetlink(network, source, destination)
func (s *linuxSearcher) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort) (*Info, error) {
inode, uid, err := resolveSocketByNetlink(network, source)
if err != nil {
return nil, err
}
2 changes: 1 addition & 1 deletion common/process/searcher_linux_shared.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ const (
pathProc = "/proc"
)

func resolveSocketByNetlink(network string, source netip.AddrPort, destination netip.AddrPort) (inode, uid uint32, err error) {
func resolveSocketByNetlink(network string, source netip.AddrPort) (inode, uid uint32, err error) {
var family uint8
var protocol uint8

2 changes: 1 addition & 1 deletion experimental/libbox/config.go
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ func (s *platformInterfaceStub) ReadWIFIState() adapter.WIFIState {
return adapter.WIFIState{}
}

func (s *platformInterfaceStub) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*process.Info, error) {
func (s *platformInterfaceStub) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort) (*process.Info, error) {
return nil, os.ErrInvalid
}

2 changes: 1 addition & 1 deletion experimental/libbox/internal/procfs/procfs.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ func init() {
}
}

func ResolveSocketByProcSearch(network string, source, _ netip.AddrPort) int32 {
func ResolveSocketByProcSearch(network string, source netip.AddrPort) int32 {
if netIndexOfLocal < 0 || netIndexOfUid < 0 {
return -1
}
2 changes: 1 addition & 1 deletion experimental/libbox/platform.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ type PlatformInterface interface {
OpenTun(options TunOptions) (int32, error)
WriteLog(message string)
UseProcFS() bool
FindConnectionOwner(ipProtocol int32, sourceAddress string, sourcePort int32, destinationAddress string, destinationPort int32) (int32, error)
FindConnectionOwner(ipProtocol int32, sourceAddress string, sourcePort int32) (int32, error)
PackageNameByUid(uid int32) (string, error)
UIDByPackageName(packageName string) (int32, error)
UsePlatformDefaultInterfaceMonitor() bool
6 changes: 3 additions & 3 deletions experimental/libbox/service.go
Original file line number Diff line number Diff line change
@@ -202,10 +202,10 @@ func (w *platformInterfaceWrapper) ReadWIFIState() adapter.WIFIState {
return (adapter.WIFIState)(*wifiState)
}

func (w *platformInterfaceWrapper) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort, destination netip.AddrPort) (*process.Info, error) {
func (w *platformInterfaceWrapper) FindProcessInfo(ctx context.Context, network string, source netip.AddrPort) (*process.Info, error) {
var uid int32
if w.useProcFS {
uid = procfs.ResolveSocketByProcSearch(network, source, destination)
uid = procfs.ResolveSocketByProcSearch(network, source)
if uid == -1 {
return nil, E.New("procfs: not found")
}
@@ -220,7 +220,7 @@ func (w *platformInterfaceWrapper) FindProcessInfo(ctx context.Context, network
return nil, E.New("unknown network: ", network)
}
var err error
uid, err = w.iif.FindConnectionOwner(ipProtocol, source.Addr().String(), int32(source.Port()), destination.Addr().String(), int32(destination.Port()))
uid, err = w.iif.FindConnectionOwner(ipProtocol, source.Addr().String(), int32(source.Port()))
if err != nil {
return nil, err
}
6 changes: 6 additions & 0 deletions inbound/default.go
Original file line number Diff line number Diff line change
@@ -115,6 +115,12 @@ func (a *myInboundAdapter) Start() error {

func (a *myInboundAdapter) Close() error {
a.inShutdown.Store(true)
if a.tcpListener != nil {
a.logger.Info("tcp server closed at ", a.tcpListener.Addr())
}
if a.udpConn != nil {
a.logger.Info("udp server closed at ", a.udpConn.LocalAddr())
}
var err error
if a.systemProxy != nil && a.systemProxy.IsEnabled() {
err = a.systemProxy.Disable()
437 changes: 308 additions & 129 deletions route/router.go

Large diffs are not rendered by default.

60 changes: 60 additions & 0 deletions route/router_outbound_starter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package route

import (
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/taskmonitor"
E "github.com/sagernet/sing/common/exceptions"
)

type OutboundStarter struct {
outboundByTag map[string]adapter.Outbound
startedTags map[string]struct{}
monitor *taskmonitor.Monitor
}

func (s *OutboundStarter) Start(tag string, pathIncludesTags map[string]struct{}) error {
adapter := s.outboundByTag[tag]
if adapter == nil {
return E.New("dependency[", tag, "] is not found")
}

// The outbound may have been started by another subtree in the previous,
// we don't need to start it again.
if _, ok := s.startedTags[tag]; ok {
return nil
}

// If we detected the repetition of the tags in scope of tree evaluation,
// the circular dependency is found, as it grows from bottom to top.
if _, ok := pathIncludesTags[tag]; ok {
return E.New("circular dependency related with outbound/", adapter.Type(), "[", tag, "]")
}

// This required to be done only if that outbound isn't already started,
// because some dependencies may come to the same root,
// but they aren't circular.
pathIncludesTags[tag] = struct{}{}

// Next, we are recursively starting all dependencies of the current
// outbound and repeating the cycle.
for _, dependencyTag := range adapter.Dependencies() {
if err := s.Start(dependencyTag, pathIncludesTags); err != nil {
return err
}
}

// Anyway, it will be finished soon, nothing will happen if I'll include
// Startable interface typecasting too.
s.monitor.Start("initialize outbound/", adapter.Type(), "[", tag, "]")
defer s.monitor.Finish()

// After the evaluation of entire tree let's begin to start all
// the outbounds!
if startable, isStartable := adapter.(interface{ Start() error }); isStartable {
if err := startable.Start(); err != nil {
return E.Cause(err, "initialize outbound/", adapter.Type(), "[", tag, "]")
}
}

return nil
}

0 comments on commit f25cb7c

Please sign in to comment.