diff --git a/go.mod b/go.mod index 9ee4f77..4b33e74 100644 --- a/go.mod +++ b/go.mod @@ -7,5 +7,5 @@ require ( github.com/oklog/run v1.1.0 github.com/pkg/errors v0.9.1 github.com/spiral/goridge/v2 v2.4.4 - go.mongodb.org/mongo-driver v1.3.3 + go.mongodb.org/mongo-driver v1.12.1 ) diff --git a/go.sum b/go.sum index fffe3ef..676a278 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,7 @@ golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2 h1:T5DasATyLQfmbTpfEXx/IOL9vfjzW6up+ZDkmHvIf2s= golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= diff --git a/pkg/gotask/process_default.go b/pkg/gotask/process_default.go new file mode 100644 index 0000000..5cf25e8 --- /dev/null +++ b/pkg/gotask/process_default.go @@ -0,0 +1,4 @@ +//go:build !windows +// +build !windows + +package gotask diff --git a/pkg/gotask/process_windows.go b/pkg/gotask/process_windows.go new file mode 100644 index 0000000..626e955 --- /dev/null +++ b/pkg/gotask/process_windows.go @@ -0,0 +1,181 @@ +// Copyright (C) 2019-2023 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +//go:build windows +// +build windows + +package gotask + +import ( + "errors" + "os" + "syscall" + "unsafe" + + "golang.org/x/sys/windows" +) + +const ( + ERROR_INVALID_PARAMETER = syscall.Errno(87) + + STATUS_CANCELLED = uint32(0xC0000120) + + processTerminateWaitInMs = 1000 + + killChildsPassCount = 4 +) + +var ( + errFinishedProcess = errors.New("os: process already finished") +) + +// FindProcess looks for a running process by its pid +func FindProcess(pid int) (*os.Process, error) { + var h syscall.Handle + + process, err := os.FindProcess(pid) + if err != nil { + if isInvalidParameterError(err) { // NOTE: See function definition for details + return nil, nil + } + return nil, err + } + + // If we have a process, check if it is terminated + h, err = syscall.OpenProcess(syscall.SYNCHRONIZE, false, uint32(pid)) + if err == nil { + defer func() { + _ = syscall.CloseHandle(h) + }() + + ret, e2 := syscall.WaitForSingleObject(h, 0) + if e2 == nil && ret == syscall.WAIT_OBJECT_0 { + return nil, nil + } + } else { + if isInvalidParameterError(err) { // NOTE: See function definition for details + return nil, nil + } + } + + return process, nil +} + +// KillProcess kills a running OS process +func KillProcess(pid int, signal os.Signal) error { + // Signal(0) only checks if we have access to kill a process and if it is really dead + if signal == syscall.Signal(0) { + return isProcessAlive(pid) + } + + return killProcessTree(pid) +} + +func isProcessAlive(pid int) error { + var ret uint32 + + h, err := syscall.OpenProcess(syscall.SYNCHRONIZE|syscall.PROCESS_TERMINATE, false, uint32(pid)) + if err != nil { + if isInvalidParameterError(err) { // NOTE: See function definition for details + return errFinishedProcess + } + return err + } + ret, err = syscall.WaitForSingleObject(h, 0) + if err == nil && ret == syscall.WAIT_OBJECT_0 { + err = errFinishedProcess + } + + _ = syscall.CloseHandle(h) + return err +} + +func killProcessTree(pid int) error { + err := killProcess(pid) + if err != nil { + return err + } + + // We do several passes just in case the process being killed spawns a new one + for pass := 1; pass <= killChildsPassCount; pass++ { + childProcessList := getChildProcesses(pid) + if len(childProcessList) == 0 { + break + } + for _, childPid := range childProcessList { + killProcessTree(childPid) + } + } + + return nil +} + +func getChildProcesses(pid int) []int { + var pe32 windows.ProcessEntry32 + + out := make([]int, 0) + + snap, err := windows.CreateToolhelp32Snapshot(windows.TH32CS_SNAPPROCESS, uint32(0)) + if err != nil { + return out + } + + defer func() { + _ = windows.CloseHandle(snap) + }() + + pe32.Size = uint32(unsafe.Sizeof(pe32)) + err = windows.Process32First(snap, &pe32) + for err != nil { + if pe32.ParentProcessID == uint32(pid) { + // Add to list + out = append(out, int(pe32.ProcessID)) + } + + err = windows.Process32Next(snap, &pe32) + } + + return out +} + +func killProcess(pid int) error { + h, err := syscall.OpenProcess(syscall.SYNCHRONIZE|syscall.PROCESS_TERMINATE, false, uint32(pid)) + if err == nil { + err = syscall.TerminateProcess(h, STATUS_CANCELLED) + if err == nil { + _, _ = syscall.WaitForSingleObject(h, processTerminateWaitInMs) + } + + _ = syscall.CloseHandle(h) + } + + return err +} + +// NOTE: Unlike Unix, Windows tries to open the target process in order to kill it. +// +// ERROR_INVALID_PARAMETER is returned if the process does not exists. +// To mimic other OS behavior, if the process does not exist, don't return an error +func isInvalidParameterError(err error) bool { + var syscallError syscall.Errno + + if errors.As(err, &syscallError) { + if syscallError == ERROR_INVALID_PARAMETER { + return true + } + } + return false +} diff --git a/pkg/gotask/server.go b/pkg/gotask/server_default.go similarity index 99% rename from pkg/gotask/server.go rename to pkg/gotask/server_default.go index f4a1489..ab4ded4 100644 --- a/pkg/gotask/server.go +++ b/pkg/gotask/server_default.go @@ -1,3 +1,6 @@ +//go:build !windows +// +build !windows + package gotask import ( diff --git a/pkg/gotask/server_test.go b/pkg/gotask/server_test.go index 29f3e03..f7550ac 100644 --- a/pkg/gotask/server_test.go +++ b/pkg/gotask/server_test.go @@ -1,3 +1,6 @@ +//go:build !windows +// +build !windows + package gotask import ( diff --git a/pkg/gotask/server_windows.go b/pkg/gotask/server_windows.go new file mode 100644 index 0000000..bcc550b --- /dev/null +++ b/pkg/gotask/server_windows.go @@ -0,0 +1,193 @@ +//go:build windows +// +build windows + +package gotask + +import ( + "context" + "flag" + "fmt" + "net" + "net/rpc" + "os" + "os/signal" + "path" + "syscall" + "time" + + "github.com/oklog/run" + "github.com/pkg/errors" + "github.com/spiral/goridge/v2" +) + +var g run.Group + +func checkProcess(pid int, quit chan bool) { + if *standalone { + return + } + + _, err := FindProcess(int(pid)) + if err != nil { + close(quit) + return + } + + err = isProcessAlive(pid) + if err != nil { + close(quit) + } +} + +// Register a net/rpc compatible service +func Register(receiver interface{}) error { + if !flag.Parsed() { + flag.Parse() + } + if !*reflection { + return rpc.Register(receiver) + } + return generatePHP(receiver) +} + +// Set the address of socket +func SetAddress(addr string) { + *address = addr +} + +// Get the address of the socket +func GetAddress() string { + return *address +} + +// Run the sidecar, receive any fatal errors. +func Run() error { + if !flag.Parsed() { + flag.Parse() + } + + if *reflection { + return nil + } + + if *listenOnPipe { + relay := goridge.NewPipeRelay(os.Stdin, os.Stdout) + codec := goridge.NewCodecWithRelay(relay) + g.Add(func() error { + rpc.ServeCodec(codec) + return fmt.Errorf("pipe is closed") + }, func(err error) { + _ = os.Stdin.Close() + _ = os.Stdout.Close() + _ = codec.Close() + }) + } + + if *address != "" { + network, addr := parseAddr(*address) + cleanup, err := checkAddr(network, addr) + if err != nil { + return errors.Wrap(err, "cannot remove existing unix socket") + } + defer cleanup() + + ln, err := net.Listen(network, addr) + if err != nil { + return errors.Wrap(err, "unable to listen") + } + + g.Add(func() error { + for { + conn, err := ln.Accept() + if err != nil { + return err + } + go rpc.ServeCodec(goridge.NewCodec(conn)) + } + }, func(err error) { + _ = ln.Close() + }) + } + + { + var ( + termChan chan os.Signal + ppid int + pdeadChan chan bool + ticker *time.Ticker + ) + termChan = make(chan os.Signal) + signal.Notify(termChan, os.Interrupt, os.Kill) + ppid = os.Getppid() + pdeadChan = make(chan bool) + ticker = time.NewTicker(500 * time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + for { + select { + case sig := <-termChan: + return fmt.Errorf("received system call:%+v, shutting down\n", sig) + case <-pdeadChan: + return nil + case <-ticker.C: + checkProcess(ppid, pdeadChan) + case <-ctx.Done(): + return ctx.Err() + } + } + }, func(err error) { + cancel() + }) + } + + return g.Run() +} + +// Add an actor (function) to the group. Each actor must be pre-emptable by an +// interrupt function. That is, if interrupt is invoked, execute should return. +// Also, it must be safe to call interrupt even after execute has returned. +// +// The first actor (function) to return interrupts all running actors. +// The error is passed to the interrupt functions, and is returned by Run. +func Add(execute func() error, interrupt func(error)) { + g.Add(execute, interrupt) +} + +func checkAddr(network, addr string) (func(), error) { + if network != "unix" { + return func() {}, nil + } + if _, err := os.Stat(addr); !os.IsNotExist(err) { + return func() {}, os.Remove(addr) + } + if err := os.MkdirAll(path.Dir(addr), os.ModePerm); err != nil { + return func() {}, err + } + if ok, err := isWritable(path.Dir(addr)); err != nil || !ok { + return func() {}, errors.Wrap(err, "socket directory is not writable") + } + return func() { os.Remove(addr) }, nil +} + +func isWritable(path string) (isWritable bool, err error) { + info, err := os.Stat(path) + if err != nil { + return false, err + } + + if !info.IsDir() { + return false, fmt.Errorf("%s isn't a directory", path) + } + + // Check if the user bit is enabled in file permission + if info.Mode().Perm()&(1<<(uint(7))) == 0 { + return false, fmt.Errorf("write permission bit is not set on this %s for user", path) + } + + fileAttrs, ok := info.Sys().(*syscall.Win32FileAttributeData) + if !ok || fileAttrs == nil { + return false, fmt.Errorf("unexpected fileinfo sys type %T for %v", info.Sys(), path) + } + + return true, nil +} diff --git a/pkg/gotask/server_windows_test.go b/pkg/gotask/server_windows_test.go new file mode 100644 index 0000000..77575e5 --- /dev/null +++ b/pkg/gotask/server_windows_test.go @@ -0,0 +1,26 @@ +//go:build windows +// +build windows + +package gotask + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestClearAddr(t *testing.T) { + dir, _ := ioutil.TempDir("", "") + defer os.Remove(dir) + + if _, err := checkAddr("unix", dir+"/non-exist.sock"); err != nil { + t.Errorf("checkAddr should not return error for non-exist files") + } + if _, err := checkAddr("tcp", "127.0.0.1:6000"); err != nil { + t.Errorf("checkAddr should not return error for tcp ports") + } + + if _, err := checkAddr("unix", dir+"/path/to/dir/temp.sock"); err != nil { + t.Errorf("checkAddr should be able to create directory if not exist") + } +} diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index 455e8e9..3455b40 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -9,6 +9,7 @@ * @contact guxi99@gmail.com * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */ + namespace Hyperf\GoTask; use Hyperf\GoTask\Listener\CommandListener; @@ -27,8 +28,7 @@ public function __invoke(): array 'dependencies' => [ GoTask::class => GoTaskFactory::class, ], - 'commands' => [ - ], + 'commands' => [], 'processes' => [ GoTaskProcess::class, ], @@ -73,6 +73,10 @@ public function __invoke(): array public static function address(): string { + if (strtoupper(substr(PHP_OS, 0, 6)) === 'CYGWIN' || strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { + throw new \RuntimeException('Windows platform, please manually specify the port'); + } + if (defined('BASE_PATH')) { $root = BASE_PATH . '/runtime'; } else {