Skip to content

Commit 0678e12

Browse files
committed
feat: server supports for auto update
1 parent e259d1d commit 0678e12

File tree

8 files changed

+140
-60
lines changed

8 files changed

+140
-60
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ build: buildgo-server buildgo-worker buildcontainer-server buildcontainer-worker
2828
.PHONY: buildgo-%
2929
buildgo-%:
3030
@echo "Building dist/transcoderd-$*"
31-
@CGO_ENABLED=0 go build -ldflags "-X main.ApplicationName=transcoderd-$* -X main.Version=${PROJECT_VERSION} -X main.Commit=${GIT_COMMIT_SHA} -X main.Date=${BUILD_DATE}" -o dist/transcoderd-$* $*/main.go
31+
@CGO_ENABLED=0 go build -ldflags "-X main.ApplicationName=transcoderd-$* -X transcoder/version.Version=${PROJECT_VERSION} -X transcoder/version.Commit=${GIT_COMMIT_SHA} -X transcoder/version.Date=${BUILD_DATE}" -o dist/transcoderd-$* $*/main.go
3232

3333
.PHONY: publish
3434
publish: publishcontainer-server publishcontainer-worker ## Publish all artifacts

server/main.go

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@ import (
1919
"transcoder/server/repository"
2020
"transcoder/server/scheduler"
2121
"transcoder/server/web"
22+
"transcoder/update"
23+
"transcoder/version"
2224
)
2325

2426
type CmdLineOpts struct {
25-
Database repository.SQLServerConfig `mapstructure:"database"`
26-
Web web.WebServerConfig `mapstructure:"web"`
27-
Scheduler scheduler.SchedulerConfig `mapstructure:"scheduler"`
27+
Database repository.SQLServerConfig `mapstructure:"database"`
28+
Web web.WebServerConfig `mapstructure:"web"`
29+
Scheduler scheduler.SchedulerConfig `mapstructure:"scheduler"`
30+
NoUpdateMode bool `mapstructure:"noUpdateMode"`
31+
NoUpdates bool `mapstructure:"noUpdates"`
2832
}
2933

3034
var (
31-
opts CmdLineOpts
32-
ApplicationFileName string
35+
ApplicationName = "transcoderd-server"
36+
showVersion = false
37+
opts CmdLineOpts
3338
)
3439

3540
func init() {
3641
//Scheduler
3742
var verbose bool
43+
pflag.BoolVar(&showVersion, "version", false, "Print version and exit")
3844
pflag.BoolVar(&verbose, "verbose", false, "Enable verbose logging")
3945
pflag.Duration("scheduler.scheduleTime", time.Minute*5, "Execute the scheduling loop every X seconds")
4046
pflag.Duration("scheduler.jobTimeout", time.Hour*24, "Requeue jobs that are running for more than X minutes")
@@ -52,6 +58,7 @@ func init() {
5258
pflag.String("database.User", "postgres", "DB User")
5359
pflag.String("database.Password", "postgres", "DB Password")
5460
pflag.String("database.Scheme", "server", "DB Scheme")
61+
update.PFlags()
5562
pflag.Usage = usage
5663

5764
//pflag.Parse()
@@ -104,18 +111,6 @@ func init() {
104111
//Fix Paths
105112
opts.Scheduler.SourcePath = filepath.Clean(opts.Scheduler.SourcePath)
106113
helper.CheckPath(opts.Scheduler.SourcePath)
107-
108-
/*
109-
scheduleTimeDuration, err := time.ParseDuration(opts.ScheduleTime)
110-
if err!=nil {
111-
log.Panic(err)
112-
}
113-
jobTimeout, err := time.ParseDuration(opts.JobTimeout)
114-
if err!=nil {
115-
log.Panic(err)
116-
}
117-
opts.Scheduler.ScheduleTime = scheduleTimeDuration
118-
opts.Scheduler.JobTimeout = jobTimeout*/
119114
}
120115

121116
func usage() {
@@ -125,6 +120,10 @@ func usage() {
125120
}
126121

127122
func main() {
123+
if showVersion {
124+
version.LogVersion()
125+
os.Exit(0)
126+
}
128127
wg := &sync.WaitGroup{}
129128
ctx, cancel := context.WithCancel(context.Background())
130129
sigs := make(chan os.Signal, 1)
@@ -133,8 +132,28 @@ func main() {
133132
shutdownHandler(ctx, sigs, cancel)
134133
wg.Done()
135134
}()
136-
//Prepare resources
137-
log.Infof("Preparing to RunWithContext...")
135+
136+
if opts.NoUpdates {
137+
version.AppLogger().Warnf("Updates are disabled, %s won't check for updates", ApplicationName)
138+
}
139+
140+
updater, err := update.NewUpdater(version.Version, ApplicationName, opts.NoUpdates, os.TempDir())
141+
if err != nil {
142+
log.Panic(err)
143+
}
144+
145+
if opts.NoUpdateMode || opts.NoUpdates {
146+
version.AppLogger().Infof("Starting server")
147+
applicationRun(wg, ctx, updater)
148+
} else {
149+
updater.Run(wg, ctx)
150+
}
151+
152+
wg.Wait()
153+
log.Info("Exit...")
154+
}
155+
156+
func applicationRun(wg *sync.WaitGroup, ctx context.Context, updater *update.Updater) {
138157
//Repository persist
139158
var repo repository.Repository
140159
repo, err := repository.NewSQLRepository(opts.Database)
@@ -155,9 +174,8 @@ func main() {
155174

156175
//WebConfig Server
157176
var webServer *web.WebServer
158-
webServer = web.NewWebServer(opts.Web, scheduler)
177+
webServer = web.NewWebServer(opts.Web, scheduler, updater)
159178
webServer.Run(wg, ctx)
160-
wg.Wait()
161179
}
162180

163181
func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {

server/web/web.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,24 @@ import (
1010
"io"
1111
"net/http"
1212
"net/url"
13+
"os"
1314
"strconv"
1415
"strings"
1516
"sync"
17+
"sync/atomic"
1618
"time"
1719
"transcoder/model"
1820
"transcoder/server/scheduler"
21+
"transcoder/update"
1922
)
2023

2124
type WebServer struct {
2225
WebServerConfig
23-
scheduler scheduler.Scheduler
24-
srv http.Server
25-
ctx context.Context
26+
scheduler scheduler.Scheduler
27+
srv http.Server
28+
ctx context.Context
29+
activeTracker *ActiveTracker
30+
updater *update.Updater
2631
}
2732

2833
func (W *WebServer) requestJob(writer http.ResponseWriter, request *http.Request) {
@@ -221,11 +226,24 @@ type WebServerConfig struct {
221226
Domain string `mapstructure:"domain", envconfig:"WEB_DOMAIN"`
222227
}
223228

224-
func NewWebServer(config WebServerConfig, scheduler scheduler.Scheduler) *WebServer {
229+
type ActiveTracker struct {
230+
activeRequests int64
231+
}
232+
233+
func (A *ActiveTracker) ActiveRequests() bool {
234+
return atomic.LoadInt64(&A.activeRequests) > 0
235+
}
236+
237+
func NewWebServer(config WebServerConfig, scheduler scheduler.Scheduler, updater *update.Updater) *WebServer {
225238
rtr := mux.NewRouter()
239+
at := &ActiveTracker{}
240+
rtr.Use(at.ActiveRequestsMiddleware())
226241
rtr.Use(LoggingMiddleware())
242+
227243
webServer := &WebServer{
228244
WebServerConfig: config,
245+
activeTracker: at,
246+
updater: updater,
229247
scheduler: scheduler,
230248
srv: http.Server{
231249
Addr: ":" + strconv.Itoa(config.Port),
@@ -241,6 +259,17 @@ func NewWebServer(config WebServerConfig, scheduler scheduler.Scheduler) *WebSer
241259
return webServer
242260
}
243261

262+
func (A *ActiveTracker) ActiveRequestsMiddleware() mux.MiddlewareFunc {
263+
return func(next http.Handler) http.Handler {
264+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
265+
atomic.AddInt64(&A.activeRequests, 1)
266+
defer atomic.AddInt64(&A.activeRequests, -1)
267+
next.ServeHTTP(w, req)
268+
})
269+
}
270+
271+
}
272+
244273
func LoggingMiddleware() mux.MiddlewareFunc {
245274
return func(next http.Handler) http.Handler {
246275
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@@ -274,12 +303,35 @@ func (W *WebServer) Run(wg *sync.WaitGroup, ctx context.Context) {
274303
}
275304

276305
func (W *WebServer) start() {
306+
// Web server
277307
go func() {
278308
err := W.srv.ListenAndServe()
279309
if err != nil && !errors.Is(err, http.ErrServerClosed) {
280310
log.Panic(err)
281311
}
282312
}()
313+
314+
// updater
315+
go func() {
316+
for {
317+
select {
318+
case <-time.After(time.Minute * 5):
319+
log.Debug("Checking for updates")
320+
if !W.activeTracker.ActiveRequests() {
321+
release, updateRequired, err := W.updater.CheckForUpdate()
322+
if err != nil {
323+
log.Error(err)
324+
continue
325+
}
326+
if updateRequired {
327+
log.Warnf("New version available %s,exiting ...", release.TagName)
328+
os.Exit(update.UPDATE_EXIT_CODE)
329+
}
330+
}
331+
332+
}
333+
}
334+
}()
283335
}
284336

285337
func (W *WebServer) stop(ctx context.Context) {
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/blang/semver/v4"
1010
"github.com/minio/selfupdate"
1111
log "github.com/sirupsen/logrus"
12+
"github.com/spf13/pflag"
1213
"io"
1314
"net/http"
1415
"os"
@@ -44,6 +45,10 @@ type Updater struct {
4445
noUpdates bool
4546
}
4647

48+
func PFlags() {
49+
pflag.Bool("noUpdateMode", false, "DON'T USE THIS FLAG, INTERNAL USE")
50+
pflag.Bool("noUpdates", false, "Application will not update itself")
51+
}
4752
func NewUpdater(currentVersionString string, assetName string, noUpdates bool, tmpPath string) (*Updater, error) {
4853
currentVersion, err := semver.Parse(cleanVersion(currentVersionString))
4954
if err != nil {
@@ -87,7 +92,7 @@ func (U *Updater) Run(wg *sync.WaitGroup, ctx context.Context) {
8792

8893
func (U *Updater) runApplication(ctx context.Context) {
8994
arguments := os.Args[1:]
90-
arguments = append(arguments, "--worker.noUpdateMode")
95+
arguments = append(arguments, "--noUpdateMode")
9196
ecode, err := command.NewCommand(U.binaryPath, arguments...).
9297
SetStderrFunc(func(buffer []byte, exit bool) {
9398
os.Stderr.Write(buffer)
@@ -127,7 +132,7 @@ func (U *Updater) CheckForUpdate() (*GitHubRelease, bool, error) {
127132
l.Info("Newer version available")
128133
return latestRelease, true, nil
129134
}
130-
l.Info("No new version available")
135+
l.Debug("No new version available")
131136
return nil, false, nil
132137
}
133138

version/version.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package version
2+
3+
import log "github.com/sirupsen/logrus"
4+
5+
var (
6+
Version = "v0.0.0-dev"
7+
Commit = "0000000"
8+
Date = "0000-00-00T00:00:00Z"
9+
)
10+
11+
func AppLogger() log.FieldLogger {
12+
return log.WithFields(log.Fields{
13+
"Version": Version,
14+
"Commit": Commit,
15+
"Date": Date,
16+
})
17+
}
18+
19+
func LogVersion() {
20+
AppLogger().Info("Version Info")
21+
}

worker/main.go

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,23 @@ import (
1515
"syscall"
1616
"transcoder/cmd"
1717
"transcoder/server/web"
18+
"transcoder/update"
19+
"transcoder/version"
1820
"transcoder/worker/serverclient"
1921
"transcoder/worker/task"
20-
"transcoder/worker/update"
2122
)
2223

2324
type CmdLineOpts struct {
2425
WebConfig web.WebServerConfig `mapstructure:"web"`
2526
WorkerConfig task.Config `mapstructure:"worker"`
27+
NoUpdateMode bool `mapstructure:"noUpdateMode"`
28+
NoUpdates bool `mapstructure:"noUpdates"`
2629
}
2730

2831
var (
32+
ApplicationName = "transcoderd-worker"
2933
opts CmdLineOpts
3034
showVersion bool
31-
Version = "v0.0.0-dev"
32-
Commit = "0000000"
33-
Date = "0000-00-00T00:00:00Z"
34-
ApplicationName = "transcoderd-worker"
3535
)
3636

3737
func init() {
@@ -66,8 +66,7 @@ func init() {
6666

6767
pflag.Var(&opts.WorkerConfig.StartAfter, "worker.startAfter", "Accept jobs only After HH:mm")
6868
pflag.Var(&opts.WorkerConfig.StopAfter, "worker.stopAfter", "Stop Accepting new Jobs after HH:mm")
69-
pflag.Bool("worker.noUpdateMode", false, "DON'T USE THIS FLAG, INTERNAL USE")
70-
pflag.Bool("worker.noUpdates", false, "Application will not update itself")
69+
update.PFlags()
7170
pflag.Usage = usage
7271

7372
viper.SetConfigType("yaml")
@@ -120,12 +119,7 @@ func usage() {
120119

121120
func main() {
122121
if showVersion {
123-
log.WithFields(log.Fields{
124-
"Version": Version,
125-
"Commit": Commit,
126-
"Date": Date,
127-
"AppName": ApplicationName,
128-
}).Info("Version Info")
122+
version.LogVersion()
129123
os.Exit(0)
130124
}
131125

@@ -139,24 +133,18 @@ func main() {
139133
wg.Done()
140134
}()
141135

142-
appLogger := log.WithFields(log.Fields{
143-
"Version": Version,
144-
"Commit": Commit,
145-
"Date": Date,
146-
"AppName": ApplicationName,
147-
})
148-
149-
if opts.WorkerConfig.NoUpdates {
150-
appLogger.Warnf("Updates are disabled, %s won't check for updates", ApplicationName)
136+
if opts.NoUpdates {
137+
version.AppLogger().Warnf("Updates are disabled, %s won't check for updates", ApplicationName)
151138
}
152139

153-
updater, err := update.NewUpdater(Version, ApplicationName, opts.WorkerConfig.NoUpdates, opts.WorkerConfig.TemporalPath)
140+
updater, err := update.NewUpdater(version.Version, ApplicationName, opts.NoUpdates, os.TempDir())
154141
if err != nil {
155142
log.Panic(err)
156143
}
157144

158-
if opts.WorkerConfig.NoUpdateMode || opts.WorkerConfig.NoUpdates {
159-
applicationRun(appLogger, wg, ctx, updater)
145+
if opts.NoUpdateMode || opts.NoUpdates {
146+
version.AppLogger().Info("Starting Worker")
147+
applicationRun(wg, ctx, updater)
160148
} else {
161149
updater.Run(wg, ctx)
162150
}
@@ -165,8 +153,7 @@ func main() {
165153
log.Info("Exit...")
166154
}
167155

168-
func applicationRun(appLogger log.FieldLogger, wg *sync.WaitGroup, ctx context.Context, updater *update.Updater) {
169-
appLogger.Info("Starting Worker")
156+
func applicationRun(wg *sync.WaitGroup, ctx context.Context, updater *update.Updater) {
170157
printer := task.NewConsoleWorkerPrinter()
171158
serverClient := serverclient.NewServerClient(opts.WebConfig)
172159
encodeWorker := task.NewEncodeWorker(opts.WorkerConfig, serverClient, printer)

worker/task/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ type FFMPEGConfig struct {
5757
}
5858

5959
type Config struct {
60-
NoUpdateMode bool `mapstructure:"noUpdateMode", envconfig:"WORKER_NOUPDATE"`
61-
NoUpdates bool `mapstructure:"noUpdates", envconfig:"WORKER_NOUPDATES"`
6260
TemporalPath string `mapstructure:"temporalPath", envconfig:"WORKER_TMP_PATH"`
6361
Name string `mapstructure:"name", envconfig:"WORKER_NAME"`
6462
Threads int `mapstructure:"threads", envconfig:"WORKER_THREADS"`

0 commit comments

Comments
 (0)