From c1fe33def5b2c6d7d38541eb5a063585fd0c746b Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 13:29:30 -0800 Subject: [PATCH 01/18] More logs for OpenAppendStream, OpenReadStream failures --- services/storehost/storehost.go | 63 ++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index e4f0f0ae..fb79c01e 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -361,7 +361,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque req, err := common.GetOpenAppendStreamRequestHTTP(r.Header) if err != nil { - t.logger.WithField(`error`, err).Error("unable to parse all needed headers") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to parse all needed headers") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) http.Error(w, err.Error(), http.StatusBadRequest) return @@ -370,7 +370,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // setup websocket wsStream, err := t.GetWSConnector().AcceptAppendStream(w, r) if err != nil { - t.logger.WithField(`error`, err).Error("unable to upgrade websocket connection") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to upgrade websocket connection") t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) return } @@ -382,7 +382,7 @@ func (t *StoreHost) OpenAppendStreamHandler(w http.ResponseWriter, r *http.Reque // create thrift stream call wrapper and deligate to streaming call if err = t.OpenAppendStream(ctx, wsStream); err != nil { - t.logger.WithField(`error`, err).Error("unable to open append stream") + t.logger.WithField(common.TagErr, err).Error("OpenAppendStreamHandler: unable to open append stream") return } } @@ -392,25 +392,13 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - - // If the disk available space is low, we should fail any request to write extent - if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { - call.Done() - t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost in read-only mode") - } - // read in args passed in via the Thrift context headers args, err := getInConnArgs(ctx) if err != nil { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + t.logger.Error("OpenAppendStream: error parsing args") return err } @@ -419,8 +407,20 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore common.TagDst: common.FmtDst(args.destID.String()), }) - log.WithField("args", fmt.Sprintf("destType=%v mode=%v", args.destType, args.mode)). - Info("OpenAppendStream: starting inConn") + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + + // If the disk available space is low, we should fail any request to write extent + if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { + call.Done() + t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) + log.Error("OpenAppendStream: storehost currently readonly") + return newInternalServiceError("StoreHost in read-only mode") + } in := newInConn(args, call, t.xMgr, t.m3Client, log) @@ -431,7 +431,12 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore numInConn := atomic.AddInt64(&t.numInConn, 1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream opened") + + log.WithFields(bark.Fields{ + `destType`: args.destType, + `mode`: args.mode, + `numInConn`: numInConn, + }).Info("OpenAppendStream: inConn started") select { // wait for inConn to be done @@ -439,19 +444,19 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenAppendStream: shutdown, stopping inConn") err = in.Stop() // attempt to stop connection // listen to extreme situations case <-t.disableWriteC: - log.Info("Stop write due to available disk space is extremely low") + log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } numInConn = atomic.AddInt64(&t.numInConn, -1) t.m3Client.UpdateGauge(metrics.OpenAppendStreamScope, metrics.StorageWriteStreams, numInConn) - log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: write stream closed") + log.WithField(`numInConn`, numInConn).Info("OpenAppendStream: inConn done") - log.Info("OpenAppendStream done") return err // FIXME: tchannel does *not* currently propagate this to the remote caller } @@ -534,12 +539,6 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageRequests) - if atomic.LoadInt32(&t.started) == 0 { - call.Done() - t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) - return newInternalServiceError("StoreHost not started") - } - // read in args passed in via the Thrift context headers args, e := getOutConnArgs(ctx) @@ -566,6 +565,13 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp common.TagCnsm: common.FmtCnsm(args.consGroupID.String()), }) + if atomic.LoadInt32(&t.started) == 0 { + call.Done() + t.m3Client.IncCounter(metrics.OpenReadStreamScope, metrics.StorageFailures) + log.Error("OpenReadStream: storehost not started") + return newInternalServiceError("StoreHost not started") + } + out := newOutConn(args, call, t.xMgr, t.m3Client, log) t.shutdownWG.Add(1) @@ -590,6 +596,7 @@ func (t *StoreHost) OpenReadStream(ctx thrift.Context, call storeStream.BStoreOp // .. or wait for shutdown to be triggered case <-t.shutdownC: + log.Info("OpenReadStream: shutdown, stopping inConn") out.Stop() // attempt to stop connection } From e363e7d528e3cd8e7d4191e7cc33c6e378d57e48 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 13:36:06 -0800 Subject: [PATCH 02/18] rename/clean-up storageMonitor -> spaceMon --- services/storehost/spaceMon.go | 215 +++++++++++++++++++++++++++ services/storehost/storagemonitor.go | 208 -------------------------- services/storehost/storehost.go | 16 +- 3 files changed, 223 insertions(+), 216 deletions(-) create mode 100644 services/storehost/spaceMon.go delete mode 100644 services/storehost/storagemonitor.go diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go new file mode 100644 index 00000000..6a126b6a --- /dev/null +++ b/services/storehost/spaceMon.go @@ -0,0 +1,215 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storehost + +import ( + "os" + "sync" + "syscall" + "time" + + "github.com/uber-common/bark" + "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/metrics" + "github.com/uber/cherami-server/services/storehost/load" +) + +const ( + warnThreshold = 200 * gigaBytes + alertThreshold = 100 * gigaBytes + resumeWritesThreshold = 200 * gigaBytes +) + +// interval at which to monitor space +const spaceMonInterval = 2 * time.Minute + +const ( + kiloBytes = 1024 + megaBytes = 1024 * kiloBytes + gigaBytes = 1024 * megaBytes + teraBytes = 1024 * gigaBytes +) + +// StorageMode defines the read write mode of the storage host +type StorageMode int32 + +const ( + // StorageModeReadWrite read/write + StorageModeReadWrite StorageMode = iota + // StorageModeReadOnly read only + StorageModeReadOnly +) + +type ( + // SpaceMon keep monitoring disk usage, and log/alert/trigger necessary handling + SpaceMon interface { + common.Daemon + GetMode() StorageMode + } + + // SpaceMon is an implementation of SpaceMon. + spaceMon struct { + sync.RWMutex + + storeHost *StoreHost // TODO: use this to trigger turning into read only mode + logger bark.Logger + m3Client metrics.Client + hostMetrics *load.HostMetrics + + stopCh chan struct{} + path string + mode StorageMode + } +) + +// NewSpaceMon returns an instance of SpaceMon. +func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) SpaceMon { + + return &spaceMon{ + storeHost: store, + logger: logger, + m3Client: m3Client, + hostMetrics: hostMetrics, + path: path, + mode: StorageModeReadWrite, + } +} + +// Start starts the monitoring +func (s *spaceMon) Start() { + + s.logger.Info("SpaceMon: started") + s.stopCh = make(chan struct{}) + go s.pump() +} + +// Stop stops the monitoring +func (s *spaceMon) Stop() { + + close(s.stopCh) + s.logger.Info("SpaceMon: stopped") +} + +// GetMode returns the read/write mode of storage +func (s *spaceMon) GetMode() StorageMode { + + s.RLock() + defer s.RUnlock() + + return s.mode +} + +func (s *spaceMon) pump() { + + var stat syscall.Statfs_t + + path := s.path + + if len(path) <= 0 { + + s.logger.Warn("SpaceMon: monitoring path is empty, trying working directory") + + cwd, err := os.Getwd() + if err != nil { + s.logger.Error("SpaceMon: os.Getwd() failed", err) + return + } + + path = cwd + } + + log := s.logger.WithField(`path`, path) + + ticker := time.NewTicker(spaceMonInterval) + defer ticker.Stop() + + for range ticker.C { + + select { + case <-s.stopCh: + return + default: + // continue below + } + + // query available/total space + err := syscall.Statfs(path, &stat) + if err != nil { + log.WithField(common.TagErr, err).Error("SpaceMon: syscall.Statfs failed", path) + continue + } + + avail := stat.Bavail * uint64(stat.Bsize) + total := stat.Blocks * uint64(stat.Bsize) + + if total <= 0 { + log.Error(`SpaceMon: total space unavailable`) + continue + } + + availMiBs, totalMiBs := avail/megaBytes, total/megaBytes + availPercent := 100.0 * float64(avail) / float64(total) + + s.hostMetrics.Set(load.HostMetricFreeDiskSpaceBytes, int64(avail)) + s.m3Client.UpdateGauge(metrics.SystemResourceScope, metrics.StorageDiskAvailableSpaceMB, int64(availMiBs)) + + xlog := log.WithFields(bark.Fields{ + `avail`: availMiBs, + `total`: totalMiBs, + `percent`: availPercent, + }) + + s.Lock() + defer s.Unlock() + + switch { + case s.mode == StorageModeReadOnly: + + // disable read-only, if above resume-writes threshold + if avail > resumeWritesThreshold { + + s.storeHost.EnableWrite() + s.mode = StorageModeReadWrite + + xlog.Info("SpaceMon: disabling read-only") + + } else { + + xlog.Warn(`SpaceMon: continuing in read-only mode`) + } + + case avail < alertThreshold: // enable read-only, if below alert-threshold + + xlog.Error("SpaceMon: available space less than alert-threshold") + + if s.mode != StorageModeReadOnly { + s.mode = StorageModeReadOnly + s.storeHost.DisableWrite() + } + + case avail < warnThreshold: // warn, if below warn-threshold + xlog.Warn("SpaceMon: available space less than warn-threshold") + + default: + xlog.Debug("SpaceMon: monitoring") + } + } +} diff --git a/services/storehost/storagemonitor.go b/services/storehost/storagemonitor.go deleted file mode 100644 index eeb85878..00000000 --- a/services/storehost/storagemonitor.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright (c) 2016 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storehost - -import ( - "os" - "sync" - "syscall" - "time" - - "github.com/uber-common/bark" - "github.com/uber/cherami-server/common" - "github.com/uber/cherami-server/common/metrics" - "github.com/uber/cherami-server/services/storehost/load" -) - -// Monitoring housekeeping will happen every 2 minutes -const storageMonitoringInterval = time.Duration(2 * time.Minute) - -const ( - thresholdWarn = 75 * gigaBytes - thresholdReadOnly = 50 * gigaBytes - thresholdResumeWrites = 100 * gigaBytes -) - -const ( - kiloBytes = 1024 - megaBytes = 1024 * kiloBytes - gigaBytes = 1024 * megaBytes - teraBytes = 1024 * gigaBytes -) - -// StorageStatus defines the different storage status -type StorageStatus int32 - -// StorageMode defines the read write mode of the storage host -type StorageMode int32 - -const ( - // SMReadWrite allows both read and write - SMReadWrite = iota - // SMReadOnly allows read only - SMReadOnly -) - -type ( - // StorageMonitor keep monitoring disk usage, and log/alert/trigger necessary handling - StorageMonitor interface { - common.Daemon - GetStorageMode() StorageMode - } - - // StorageMonitor is an implementation of StorageMonitor. - storageMonitor struct { - sync.RWMutex - - storeHost *StoreHost // TODO: use this to trigger turning into read only mode - - logger bark.Logger - m3Client metrics.Client - hostMetrics *load.HostMetrics - - closeChannel chan struct{} - - monitoringTicker *time.Ticker - - monitoringPath string - mode StorageMode - } -) - -// NewStorageMonitor returns an instance of NewStorageMonitor. -func NewStorageMonitor(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) StorageMonitor { - return &storageMonitor{ - storeHost: store, - logger: logger, - m3Client: m3Client, - hostMetrics: hostMetrics, - monitoringPath: path, - mode: SMReadWrite, - } -} - -// Start starts the monitoring -func (s *storageMonitor) Start() { - - s.closeChannel = make(chan struct{}) - - go func() { - - ticker := time.NewTicker(storageMonitoringInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - go s.checkStorage() - case <-s.closeChannel: - return - } - } - }() - - s.logger.Info("StorageMonitor: started") -} - -// Stop stops the monitoring -func (s *storageMonitor) Stop() { - close(s.closeChannel) - - s.logger.Info("StorageMonitor: stopped") -} - -// GetStorageMode returns the read/write mode of storage -func (s *storageMonitor) GetStorageMode() StorageMode { - s.RLock() - defer s.RUnlock() - - return s.mode -} - -func (s *storageMonitor) checkStorage() { - var stat syscall.Statfs_t - - path := s.monitoringPath - - if len(path) <= 0 { - s.logger.Warn("StorageMonitor: monitoring path is empty, try working directory") - wd, err := os.Getwd() - if err != nil { - s.logger.Error("StorageMonitor: os.Getwd() failed", err) - return - } - path = wd - } - - log := s.logger.WithField(`path`, path) - - err := syscall.Statfs(path, &stat) - if err != nil { - log.WithField(common.TagErr, err).Error("StorageMonitor: syscall.Statfs failed") - return - } - - avail := int64(stat.Bavail) * int64(stat.Bsize) - total := int64(stat.Blocks) * int64(stat.Bsize) - - xlog := log.WithFields(bark.Fields{ - `availMiB`: avail / megaBytes, - `totalMiB`: total / megaBytes, - }) - - if total <= 0 { - xlog.Error(`StorageMonitor: total space unavailable`) - return - } - - s.hostMetrics.Set(load.HostMetricFreeDiskSpaceBytes, avail) - s.m3Client.UpdateGauge(metrics.SystemResourceScope, metrics.StorageDiskAvailableSpaceMB, avail/megaBytes) - - s.Lock() - defer s.Unlock() - - switch { - case s.mode == SMReadOnly: - - // disable read-only, if above resume-writes threshold - if avail > thresholdResumeWrites { - xlog.Info("StorageMonitor: disabling read-only") - s.storeHost.EnableWrite() - s.mode = SMReadWrite - } else { - xlog.Warn("StorageMonitor: continuing read-only") - } - - case avail < thresholdReadOnly: // enable read-only, if below readonly-threshold - xlog.Error("StorageMonitor: available space less than readonly-threshold") - - if s.mode != SMReadOnly { - s.mode = SMReadOnly - s.storeHost.DisableWrite() - } - - case avail < thresholdWarn: // warn, if below warn-threshold - xlog.Warn("StorageMonitor: available space less than warn-threshold") - - default: - xlog.Debug("StorageMonitor: monitoring") - } -} diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index fb79c01e..2b57d0d3 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -195,8 +195,8 @@ type ( // extStatsReporter reports various stats on active extents extStatsReporter *ExtStatsReporter - // Storage Monitoring - storageMonitor StorageMonitor + // Free-space monitor + spaceMon SpaceMon // metrics aggregated at host level and reported to controller hostMetrics *load.HostMetrics @@ -283,8 +283,8 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.xMgr = NewExtentManager(storeMgr, t.m3Client, t.hostMetrics, t.logger) - t.storageMonitor = NewStorageMonitor(t, t.m3Client, t.hostMetrics, t.logger, baseDir) - t.storageMonitor.Start() + t.spaceMon = NewSpaceMon(t, t.m3Client, t.hostMetrics, t.logger, baseDir) + t.spaceMon.Start() t.replMgr = NewReplicationManager(t.xMgr, t.m3Client, t.mClient, t.logger, hostID, t.GetWSConnector()) @@ -316,7 +316,7 @@ func (t *StoreHost) Stop() { t.loadReporter.Stop() t.hostIDHeartbeater.Stop() - t.storageMonitor.Stop() + t.spaceMon.Stop() t.replicationJobRunner.Stop() t.extStatsReporter.Stop() t.SCommon.Stop() @@ -415,7 +415,7 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore } // If the disk available space is low, we should fail any request to write extent - if t.storageMonitor != nil && t.storageMonitor.GetStorageMode() == SMReadOnly { + if t.spaceMon != nil && t.spaceMon.GetMode() == StorageModeReadOnly { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) log.Error("OpenAppendStream: storehost currently readonly") @@ -1349,14 +1349,14 @@ func (t *StoreHost) reportHostMetric(reporter common.LoadReporter, diffSecs int6 } // check and notify read-only state - if t.storageMonitor.GetStorageMode() == SMReadOnly { + if t.spaceMon.GetMode() == StorageModeReadOnly { hostMetrics.NodeState = common.Int64Ptr(controller.NODE_STATE_READONLY) } remDiskSpaceBytes := t.hostMetrics.Get(load.HostMetricFreeDiskSpaceBytes) if remDiskSpaceBytes > 0 { // the remaining disk space computation happens - // as part of the storageMonitor thread and the + // as part of the spaceMon thread and the // load reporter could be called before the storage // monitor gets a chance to do this computation. // Make sure we don't report zero values in the From ad50a7301021cc398625d4019f1fe4d171fc0109 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 13:42:51 -0800 Subject: [PATCH 03/18] ~ --- services/storehost/spaceMon.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 6a126b6a..4e921f43 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -38,7 +38,7 @@ const ( resumeWritesThreshold = 200 * gigaBytes ) -// interval at which to monitor space +// monitoring interval const spaceMonInterval = 2 * time.Minute const ( @@ -59,13 +59,13 @@ const ( ) type ( - // SpaceMon keep monitoring disk usage, and log/alert/trigger necessary handling + // SpaceMon monitors free-space and switches store to read-only on low-space SpaceMon interface { common.Daemon GetMode() StorageMode } - // SpaceMon is an implementation of SpaceMon. + // spaceMon implements SpaceMon interface spaceMon struct { sync.RWMutex @@ -89,7 +89,7 @@ func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.Ho m3Client: m3Client, hostMetrics: hostMetrics, path: path, - mode: StorageModeReadWrite, + mode: StorageModeReadWrite, // default: read-write } } @@ -119,10 +119,9 @@ func (s *spaceMon) GetMode() StorageMode { func (s *spaceMon) pump() { - var stat syscall.Statfs_t - - path := s.path + var path = s.path + // if no path specified, use the current working directory if len(path) <= 0 { s.logger.Warn("SpaceMon: monitoring path is empty, trying working directory") @@ -141,16 +140,16 @@ func (s *spaceMon) pump() { ticker := time.NewTicker(spaceMonInterval) defer ticker.Stop() - for range ticker.C { - + for { select { + case <-ticker.C: + // continue below to check free-space, etc case <-s.stopCh: - return - default: - // continue below + return // done } // query available/total space + var stat syscall.Statfs_t err := syscall.Statfs(path, &stat) if err != nil { log.WithField(common.TagErr, err).Error("SpaceMon: syscall.Statfs failed", path) From d864041d9a14f71c6a6556e013232579eab81a90 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 13:49:41 -0800 Subject: [PATCH 04/18] fix potential race around disableWriteC --- services/storehost/spaceMon.go | 7 ++++--- services/storehost/storehost.go | 24 ++++++++++++------------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 4e921f43..0d92f778 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -69,7 +69,7 @@ type ( spaceMon struct { sync.RWMutex - storeHost *StoreHost // TODO: use this to trigger turning into read only mode + storeHost *StoreHost logger bark.Logger m3Client metrics.Client hostMetrics *load.HostMetrics @@ -185,8 +185,8 @@ func (s *spaceMon) pump() { // disable read-only, if above resume-writes threshold if avail > resumeWritesThreshold { - s.storeHost.EnableWrite() s.mode = StorageModeReadWrite + s.storeHost.EnableWrite() xlog.Info("SpaceMon: disabling read-only") @@ -200,8 +200,9 @@ func (s *spaceMon) pump() { xlog.Error("SpaceMon: available space less than alert-threshold") if s.mode != StorageModeReadOnly { - s.mode = StorageModeReadOnly + s.storeHost.DisableWrite() + s.mode = StorageModeReadOnly } case avail < warnThreshold: // warn, if below warn-threshold diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index 2b57d0d3..2d1be533 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -177,7 +177,7 @@ type ( xMgr *ExtentManager // extent manager replMgr *ReplicationManager // replication manager shutdownC chan struct{} - disableWriteC chan struct{} + disableWriteC atomic.Value // chan struct{} numInConn, numOutConn int64 // number of active inConns/outConns respectively @@ -233,14 +233,13 @@ func NewStoreHost(serviceName string, sCommon common.SCommon, mClient metadata.T m3Client := metrics.NewClient(sCommon.GetMetricsReporter(), metrics.Storage) t := &StoreHost{ - SCommon: sCommon, - opts: opts, - hostMetrics: load.NewHostMetrics(), - shutdownC: make(chan struct{}), - disableWriteC: make(chan struct{}), - logger: logger, - m3Client: m3Client, - mClient: mm.NewMetadataMetricsMgr(mClient, m3Client, logger), + SCommon: sCommon, + opts: opts, + hostMetrics: load.NewHostMetrics(), + shutdownC: make(chan struct{}), + logger: logger, + m3Client: m3Client, + mClient: mm.NewMetadataMetricsMgr(mClient, m3Client, logger), } return t, []thrift.TChanServer{store.NewTChanBStoreServer(t)} @@ -303,6 +302,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.extStatsReporter = NewExtStatsReporter(hostID, t.xMgr, t.mClient, t.logger) t.extStatsReporter.Start() + t.EnableWrite() atomic.StoreInt32(&t.started, 1) // started t.logger.WithField("options", fmt.Sprintf("Store=%v BaseDir=%v", t.opts.Store, t.opts.BaseDir)). @@ -448,7 +448,7 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore err = in.Stop() // attempt to stop connection // listen to extreme situations - case <-t.disableWriteC: + case <-t.disableWriteC.Load().(chan struct{}): log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } @@ -1314,12 +1314,12 @@ func (t *StoreHost) Shutdown() { // DisableWrite disables all the write func (t *StoreHost) DisableWrite() { t.logger.Error("Write disabled") - close(t.disableWriteC) + close(t.disableWriteC.Load().(chan struct{})) } // EnableWrite enables write mode func (t *StoreHost) EnableWrite() { - t.disableWriteC = make(chan struct{}) + t.disableWriteC.Store(make(chan struct{})) } // RegisterWSHandler is the implementation of WSService interface From 87269bc88d8c50732b7d6ae07291864d4ce85948 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 13:55:54 -0800 Subject: [PATCH 05/18] WriteDisableNotify --- services/storehost/storehost.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index 2d1be533..adb6d3b3 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -313,6 +313,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { func (t *StoreHost) Stop() { atomic.StoreInt32(&t.started, 0) // stopped + t.DisableWrite() t.loadReporter.Stop() t.hostIDHeartbeater.Stop() @@ -447,8 +448,8 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore log.Info("OpenAppendStream: shutdown, stopping inConn") err = in.Stop() // attempt to stop connection - // listen to extreme situations - case <-t.disableWriteC.Load().(chan struct{}): + // .. or for store to switch to read-only (on low space) + case <-t.WriteDisabledNotify(): log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } @@ -1303,8 +1304,9 @@ readMsgsLoop: func (t *StoreHost) Shutdown() { atomic.StoreInt32(&t.started, 0) // shutdown - t.logger.Info("Storehost: shutting down") + t.DisableWrite() close(t.shutdownC) // 'broadcast' shutdown, by closing the shutdownC + t.logger.Info("Storehost: shutting down") // wait until all connections have been closed if !common.AwaitWaitGroup(&t.shutdownWG, 30*time.Second) { t.logger.Error("Timed out waiting for store host to shutdown") @@ -1317,6 +1319,10 @@ func (t *StoreHost) DisableWrite() { close(t.disableWriteC.Load().(chan struct{})) } +func (t *StoreHost) WriteDisabledNotify() chan struct{} { + return t.disableWriteC.Load().(chan struct{}) +} + // EnableWrite enables write mode func (t *StoreHost) EnableWrite() { t.disableWriteC.Store(make(chan struct{})) From a21c6729f334b4c134bc2ea03d7186e0e0b95c44 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 16:29:49 -0800 Subject: [PATCH 06/18] fix lint --- services/storehost/storehost.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index adb6d3b3..050d5695 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -1319,6 +1319,7 @@ func (t *StoreHost) DisableWrite() { close(t.disableWriteC.Load().(chan struct{})) } +// WriteDisabledNotify returns a channel that is used to notify when writes are disabled func (t *StoreHost) WriteDisabledNotify() chan struct{} { return t.disableWriteC.Load().(chan struct{}) } From 3b8ca3314df193e0d19e306ef8c9caceb503c147 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 18:37:59 -0800 Subject: [PATCH 07/18] fix names --- services/storehost/spaceMon.go | 4 ++-- services/storehost/storehost.go | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 0d92f778..72504ba8 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -186,7 +186,7 @@ func (s *spaceMon) pump() { if avail > resumeWritesThreshold { s.mode = StorageModeReadWrite - s.storeHost.EnableWrite() + s.storeHost.EnableWrites() xlog.Info("SpaceMon: disabling read-only") @@ -201,7 +201,7 @@ func (s *spaceMon) pump() { if s.mode != StorageModeReadOnly { - s.storeHost.DisableWrite() + s.storeHost.DisableWrites() s.mode = StorageModeReadOnly } diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index 050d5695..5a4b7b0a 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -302,7 +302,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.extStatsReporter = NewExtStatsReporter(hostID, t.xMgr, t.mClient, t.logger) t.extStatsReporter.Start() - t.EnableWrite() + t.EnableWrites() atomic.StoreInt32(&t.started, 1) // started t.logger.WithField("options", fmt.Sprintf("Store=%v BaseDir=%v", t.opts.Store, t.opts.BaseDir)). @@ -313,7 +313,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { func (t *StoreHost) Stop() { atomic.StoreInt32(&t.started, 0) // stopped - t.DisableWrite() + t.DisableWrites() t.loadReporter.Stop() t.hostIDHeartbeater.Stop() @@ -449,7 +449,7 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore err = in.Stop() // attempt to stop connection // .. or for store to switch to read-only (on low space) - case <-t.WriteDisabledNotify(): + case <-t.NotifyWritesDisabled(): log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } @@ -1304,7 +1304,7 @@ readMsgsLoop: func (t *StoreHost) Shutdown() { atomic.StoreInt32(&t.started, 0) // shutdown - t.DisableWrite() + t.DisableWrites() close(t.shutdownC) // 'broadcast' shutdown, by closing the shutdownC t.logger.Info("Storehost: shutting down") // wait until all connections have been closed @@ -1313,20 +1313,20 @@ func (t *StoreHost) Shutdown() { } } -// DisableWrite disables all the write -func (t *StoreHost) DisableWrite() { +// DisableWrites disables writes, switching into 'read-only' mode +func (t *StoreHost) DisableWrites() { t.logger.Error("Write disabled") close(t.disableWriteC.Load().(chan struct{})) } -// WriteDisabledNotify returns a channel that is used to notify when writes are disabled -func (t *StoreHost) WriteDisabledNotify() chan struct{} { - return t.disableWriteC.Load().(chan struct{}) +// EnableWrites enables writes again (disables read-only) +func (t *StoreHost) EnableWrites() { + t.disableWriteC.Store(make(chan struct{})) } -// EnableWrite enables write mode -func (t *StoreHost) EnableWrite() { - t.disableWriteC.Store(make(chan struct{})) +// NotifyWritesDisabled returns a channel that is used to notify when writes are disabled +func (t *StoreHost) NotifyWritesDisabled() chan struct{} { + return t.disableWriteC.Load().(chan struct{}) } // RegisterWSHandler is the implementation of WSService interface From 6b01dc9c7ce9abe95dec9606a1c2e4e827aec355 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 22:15:17 -0800 Subject: [PATCH 08/18] glide up --- glide.lock | 2 +- glide.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/glide.lock b/glide.lock index b08c77a6..592a285c 100644 --- a/glide.lock +++ b/glide.lock @@ -134,7 +134,7 @@ imports: - common/websocket - stream - name: github.com/uber/cherami-thrift - version: ce2fc989809bb711289c2420c8979ef37bcf5b61 + version: bd124e14a34390b624f60d70351b9f375ade996c subpackages: - .generated/go/admin - .generated/go/cherami diff --git a/glide.yaml b/glide.yaml index 10b46460..4f82f4d2 100644 --- a/glide.yaml +++ b/glide.yaml @@ -40,7 +40,7 @@ import: - common/websocket - stream - package: github.com/uber/cherami-thrift - version: ce2fc989809bb711289c2420c8979ef37bcf5b61 + version: bd124e14a34390b624f60d70351b9f375ade996c subpackages: - .generated/go/admin - .generated/go/cherami From c22fe7edabcbbee7dc5496de758ae3e2d3f50753 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 23:06:01 -0800 Subject: [PATCH 09/18] glide up, fix build --- clients/metadata/metadata_cassandra.go | 6 ++-- cmd/tools/cmq/meta.go | 2 +- glide.lock | 40 ++++++++++++++------------ tools/cassandra/cqlclient.go | 2 +- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 00605622..80860e8f 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -198,18 +198,18 @@ func parseConsistency(cfgCons string) (lowCons gocql.Consistency, midCons gocql. switch cons := strings.Split(cfgCons, ","); len(cons) { case 3: - lowCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[2])) + lowCons = gocql.ParseConsistency(strings.TrimSpace(cons[2])) fallthrough case 2: - midCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[1])) + midCons = gocql.ParseConsistency(strings.TrimSpace(cons[1])) if len(cons) == 2 { lowCons = midCons } fallthrough case 1: - highCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[0])) + highCons = gocql.ParseConsistency(strings.TrimSpace(cons[0])) } return diff --git a/cmd/tools/cmq/meta.go b/cmd/tools/cmq/meta.go index a9f2a653..db519b1a 100644 --- a/cmd/tools/cmq/meta.go +++ b/cmd/tools/cmq/meta.go @@ -44,7 +44,7 @@ func newMetadataClient(opts *opts) (*metadataClient, error) { } } - cluster.Consistency, _ = gocql.ParseConsistency(opts.Consistency) + cluster.Consistency = gocql.ParseConsistency(opts.Consistency) cluster.NumConns = numConns cluster.ProtoVersion = protocolVersion diff --git a/glide.lock b/glide.lock index 592a285c..29b248c5 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,12 @@ -hash: d0c29905b07aed1278050c3471afb0f75bcaa2288b9e1394fa9eee1d52a55fa3 -updated: 2017-11-16T11:59:10.262960188-08:00 +hash: 312dfc2c60f5c9d4ac443860f677486ed9b096c4213982f10d59fe4d3c162181 +updated: 2017-11-20T23:02:41.026687027-08:00 imports: - name: github.com/apache/thrift version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/aws/aws-sdk-go - version: 011ff14652ceaed8ce090cef3f1d94ec9668bd83 + version: 8a972b4459c2f2582b06f3e2d74448987cc6e19f subpackages: - aws - aws/awserr @@ -38,9 +38,9 @@ imports: - name: github.com/benbjohnson/clock version: 7dc76406b6d3c05b5f71a86293cbcf3c4ea03b19 - name: github.com/bsm/sarama-cluster - version: c4d3d28d22e7bdea71656725bf5cd1d988454b41 + version: 5efe630369ab4ed5cc4cedeadd61b4d1b2523169 - name: github.com/cactus/go-statsd-client - version: ce77ca9ecdee1c3ffd097e32f9bb832825ccb203 + version: 91c326c3f7bd20f0226d3d1c289dd9f8ce28d33d subpackages: - statsd - name: github.com/cockroachdb/c-jemalloc @@ -54,7 +54,7 @@ imports: - name: github.com/codegangsta/cli version: 0bdeddeeb0f650497d603c4ad7b20cfe685682f6 - name: github.com/davecgh/go-spew - version: ecdeabc65495df2dec95d7c4a4c3e021903035e5 + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 subpackages: - spew - name: github.com/dgryski/go-farm @@ -68,9 +68,9 @@ imports: - name: github.com/eapache/queue version: 44cc805cf13205b55f69e14bcb69867d1ae92f98 - name: github.com/go-ini/ini - version: 5b3e00af70a9484542169a976dcab8d03e601a17 + version: d3de07a94d22b4a0972deb4b96d790c2c0ce8333 - name: github.com/gocql/gocql - version: 843f6b1d2288839f5d00f92be11cbbebd600ba51 + version: 3e8b36f5e9e52cdeb265f385808c504a53db55fc subpackages: - internal/lru - internal/murmur @@ -93,13 +93,13 @@ imports: - name: github.com/pborman/uuid version: a97ce2ca70fa5a848076093f05e639a89ca34d06 - name: github.com/pierrec/lz4 - version: 08c27939df1bd95e881e2c2367a749964ad1fceb + version: 5a3d2245f97fc249850e7802e3c01fad02a1c316 - name: github.com/pierrec/xxHash - version: a0006b13c722f7f12368c00a3d3c2ae8a999a0c6 + version: 5a004441f897722c627870a981d02b29924215fa subpackages: - xxHash32 - name: github.com/pmezard/go-difflib - version: 792786c7400a136282c1664665ae0a8db921c6c2 + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: - difflib - name: github.com/rcrowley/go-metrics @@ -110,9 +110,9 @@ imports: - name: github.com/sirupsen/logrus version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f - name: github.com/stretchr/objx - version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 + version: cbeaeb16a013161a98496fad62933b1d21786672 - name: github.com/stretchr/testify - version: 2aa2c176b9dab406a6970f6a55f513e8a8c8b18f + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 subpackages: - assert - mock @@ -123,7 +123,9 @@ imports: - name: github.com/uber-common/bark version: 02d883c81a4e7b76904d97efb176efdf4be791bd - name: github.com/uber-go/atomic - version: 54f72d32435d760d5604f17a82e2435b28dc4ba5 + version: 8474b86a5a6f79c443ce4b2992817ff32cf208b8 + subpackages: + - utils - name: github.com/uber/cherami-client-go version: 05b8db6966cc3413c1105a809977febef05ca5a1 subpackages: @@ -157,7 +159,7 @@ imports: - swim - util - name: github.com/uber/tchannel-go - version: a7ad9ecb640b5f10a0395b38d6319175172b3ab2 + version: cc230a2942d078a8b01f4a79895dad62e6c572f1 subpackages: - hyperbahn - hyperbahn/gen-go/hyperbahn @@ -174,7 +176,7 @@ imports: - name: github.com/urfave/cli version: 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c - name: golang.org/x/net - version: a337091b0525af65de94df2eb7e98bd9962dcbe2 + version: 9dfe39835686865bff950a07b394c12a98ddc811 subpackages: - bpf - context @@ -183,13 +185,13 @@ imports: - ipv4 - ipv6 - name: golang.org/x/sys - version: 8dbc5d05d6edcc104950cc299a1ce6641235bc86 + version: 0b25a408a50076fbbcae6b7ac0ea5fbb0b085e79 subpackages: - unix - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 - name: gopkg.in/validator.v2 - version: 460c83432a98c35224a6fe352acf8b23e067ad06 + version: 07ffaad256c8e957050ad83d6472eb97d785013d - name: gopkg.in/yaml.v2 - version: eb3733d160e74a9c7e442f435eb3bea458e1d19f + version: 287cf08546ab5e7e37d55a84f7ed3fd1db036de5 testImports: [] diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index a75a8e5c..e0bae06b 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -76,7 +76,7 @@ func newCQLClient(config *SchemaUpdaterConfig) (CQLClient, error) { clusterCfg.Keyspace = config.Keyspace clusterCfg.Timeout = defaultTimeout clusterCfg.ProtoVersion = config.ProtoVersion - clusterCfg.Consistency, _ = gocql.ParseConsistency(defaultConsistency) + clusterCfg.Consistency = gocql.ParseConsistency(defaultConsistency) if config.Username != "" && config.Password != "" { clusterCfg.Authenticator = gocql.PasswordAuthenticator{ From 446ec9102530f2c5be9780f97c3c8001e04e2237 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Mon, 20 Nov 2017 23:52:26 -0800 Subject: [PATCH 10/18] ~ --- services/storehost/spaceMon.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 72504ba8..62adb9e5 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -34,7 +34,7 @@ import ( const ( warnThreshold = 200 * gigaBytes - alertThreshold = 100 * gigaBytes + readOnlyThreshold = 100 * gigaBytes resumeWritesThreshold = 200 * gigaBytes ) @@ -195,7 +195,7 @@ func (s *spaceMon) pump() { xlog.Warn(`SpaceMon: continuing in read-only mode`) } - case avail < alertThreshold: // enable read-only, if below alert-threshold + case avail < readOnlyThreshold: // enable read-only, if below alert-threshold xlog.Error("SpaceMon: available space less than alert-threshold") From 7c5cc0fb2fc36e264764f74dea3e6bdad469b904 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Tue, 21 Nov 2017 11:30:40 -0800 Subject: [PATCH 11/18] remove circular dependency --- services/storehost/spaceMon.go | 96 +++++++++++---------------------- services/storehost/storehost.go | 37 ++++++++----- 2 files changed, 54 insertions(+), 79 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 62adb9e5..b32d5c28 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -21,8 +21,6 @@ package storehost import ( - "os" - "sync" "syscall" "time" @@ -30,6 +28,7 @@ import ( "github.com/uber/cherami-server/common" "github.com/uber/cherami-server/common/metrics" "github.com/uber/cherami-server/services/storehost/load" + "go.uber.org/atomic" ) const ( @@ -59,100 +58,69 @@ const ( ) type ( - // SpaceMon monitors free-space and switches store to read-only on low-space - SpaceMon interface { - common.Daemon - GetMode() StorageMode - } - - // spaceMon implements SpaceMon interface - spaceMon struct { - sync.RWMutex - + // SpaceMon monitors free-space and switches stores to read-only on low-space + SpaceMon struct { storeHost *StoreHost logger bark.Logger m3Client metrics.Client hostMetrics *load.HostMetrics - stopCh chan struct{} - path string - mode StorageMode + stopCh chan struct{} + path string + readonly *atomic.Bool // read-only } ) // NewSpaceMon returns an instance of SpaceMon. -func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) SpaceMon { +func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.HostMetrics, logger bark.Logger, path string) *SpaceMon { - return &spaceMon{ + return &SpaceMon{ storeHost: store, logger: logger, m3Client: m3Client, hostMetrics: hostMetrics, path: path, - mode: StorageModeReadWrite, // default: read-write + readonly: atomic.NewBool(false), // default: read-write } } // Start starts the monitoring -func (s *spaceMon) Start() { +func (s *SpaceMon) Start() { - s.logger.Info("SpaceMon: started") s.stopCh = make(chan struct{}) go s.pump() + s.storeHost.DisableReadonly() + + s.logger.Info("SpaceMon: started") } // Stop stops the monitoring -func (s *spaceMon) Stop() { +func (s *SpaceMon) Stop() { close(s.stopCh) s.logger.Info("SpaceMon: stopped") } -// GetMode returns the read/write mode of storage -func (s *spaceMon) GetMode() StorageMode { - - s.RLock() - defer s.RUnlock() - - return s.mode -} - -func (s *spaceMon) pump() { - - var path = s.path - - // if no path specified, use the current working directory - if len(path) <= 0 { +func (s *SpaceMon) pump() { - s.logger.Warn("SpaceMon: monitoring path is empty, trying working directory") - - cwd, err := os.Getwd() - if err != nil { - s.logger.Error("SpaceMon: os.Getwd() failed", err) - return - } - - path = cwd - } - - log := s.logger.WithField(`path`, path) + log := s.logger.WithField(`path`, s.path) ticker := time.NewTicker(spaceMonInterval) defer ticker.Stop() for { select { - case <-ticker.C: - // continue below to check free-space, etc case <-s.stopCh: return // done + case <-ticker.C: + // continue below to check free-space, etc } // query available/total space var stat syscall.Statfs_t - err := syscall.Statfs(path, &stat) + err := syscall.Statfs(s.path, &stat) if err != nil { - log.WithField(common.TagErr, err).Error("SpaceMon: syscall.Statfs failed", path) + log.WithField(common.TagErr, err).Error("SpaceMon: syscall.Statfs failed") continue } @@ -176,37 +144,33 @@ func (s *spaceMon) pump() { `percent`: availPercent, }) - s.Lock() - defer s.Unlock() - switch { - case s.mode == StorageModeReadOnly: + case s.readonly.Load(): // disable readonly, if above resume-writes threshold // disable read-only, if above resume-writes threshold if avail > resumeWritesThreshold { - s.mode = StorageModeReadWrite - s.storeHost.EnableWrites() + if s.readonly.CAS(false, true) { - xlog.Info("SpaceMon: disabling read-only") + xlog.Info("SpaceMon: disabling read-only") + s.storeHost.EnableReadonly() + } } else { - xlog.Warn(`SpaceMon: continuing in read-only mode`) + xlog.Warn(`SpaceMon: continuing in read-only`) } case avail < readOnlyThreshold: // enable read-only, if below alert-threshold - xlog.Error("SpaceMon: available space less than alert-threshold") - - if s.mode != StorageModeReadOnly { + if s.readonly.CAS(true, false) { - s.storeHost.DisableWrites() - s.mode = StorageModeReadOnly + xlog.Error("SpaceMon: switching to read-only") + s.storeHost.DisableReadonly() } case avail < warnThreshold: // warn, if below warn-threshold - xlog.Warn("SpaceMon: available space less than warn-threshold") + xlog.Warn("SpaceMon: warning: running low on space") default: xlog.Debug("SpaceMon: monitoring") diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index 5a4b7b0a..c5e945c4 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -196,7 +196,7 @@ type ( extStatsReporter *ExtStatsReporter // Free-space monitor - spaceMon SpaceMon + spaceMon common.Daemon // metrics aggregated at host level and reported to controller hostMetrics *load.HostMetrics @@ -302,7 +302,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { t.extStatsReporter = NewExtStatsReporter(hostID, t.xMgr, t.mClient, t.logger) t.extStatsReporter.Start() - t.EnableWrites() + t.DisableReadonly() atomic.StoreInt32(&t.started, 1) // started t.logger.WithField("options", fmt.Sprintf("Store=%v BaseDir=%v", t.opts.Store, t.opts.BaseDir)). @@ -313,7 +313,7 @@ func (t *StoreHost) Start(thriftService []thrift.TChanServer) { func (t *StoreHost) Stop() { atomic.StoreInt32(&t.started, 0) // stopped - t.DisableWrites() + t.EnableReadonly() t.loadReporter.Stop() t.hostIDHeartbeater.Stop() @@ -416,7 +416,7 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore } // If the disk available space is low, we should fail any request to write extent - if t.spaceMon != nil && t.spaceMon.GetMode() == StorageModeReadOnly { + if t.spaceMon != nil && t.IsReadonly() { call.Done() t.m3Client.IncCounter(metrics.OpenAppendStreamScope, metrics.StorageFailures) log.Error("OpenAppendStream: storehost currently readonly") @@ -449,7 +449,7 @@ func (t *StoreHost) OpenAppendStream(ctx thrift.Context, call storeStream.BStore err = in.Stop() // attempt to stop connection // .. or for store to switch to read-only (on low space) - case <-t.NotifyWritesDisabled(): + case <-t.NotifyReadonly(): log.Error("OpenAppendStream: writes disabled, stopping inConn") err = in.Stop() } @@ -1304,7 +1304,7 @@ readMsgsLoop: func (t *StoreHost) Shutdown() { atomic.StoreInt32(&t.started, 0) // shutdown - t.DisableWrites() + t.EnableReadonly() close(t.shutdownC) // 'broadcast' shutdown, by closing the shutdownC t.logger.Info("Storehost: shutting down") // wait until all connections have been closed @@ -1313,22 +1313,33 @@ func (t *StoreHost) Shutdown() { } } -// DisableWrites disables writes, switching into 'read-only' mode -func (t *StoreHost) DisableWrites() { +// EnableReadonly disables writes, switching into 'read-only' mode +func (t *StoreHost) EnableReadonly() { t.logger.Error("Write disabled") close(t.disableWriteC.Load().(chan struct{})) } -// EnableWrites enables writes again (disables read-only) -func (t *StoreHost) EnableWrites() { +// DisableReadonly enables writes again (disables read-only) +func (t *StoreHost) DisableReadonly() { t.disableWriteC.Store(make(chan struct{})) } -// NotifyWritesDisabled returns a channel that is used to notify when writes are disabled -func (t *StoreHost) NotifyWritesDisabled() chan struct{} { +// NotifyReadonly returns a channel that is used to notify when writes are disabled +func (t *StoreHost) NotifyReadonly() chan struct{} { return t.disableWriteC.Load().(chan struct{}) } +// IsReadonly returns whether the store is currently in read-only +func (t *StoreHost) IsReadonly() bool { + + select { + case <-t.NotifyReadonly(): + return true + default: + return false + } +} + // RegisterWSHandler is the implementation of WSService interface func (t *StoreHost) RegisterWSHandler() *http.ServeMux { mux := http.NewServeMux() @@ -1356,7 +1367,7 @@ func (t *StoreHost) reportHostMetric(reporter common.LoadReporter, diffSecs int6 } // check and notify read-only state - if t.spaceMon.GetMode() == StorageModeReadOnly { + if t.IsReadonly() { hostMetrics.NodeState = common.Int64Ptr(controller.NODE_STATE_READONLY) } From 28ba21adb347ed0417962552ca97241e14fdd4dc Mon Sep 17 00:00:00 2001 From: kiranrg Date: Tue, 21 Nov 2017 11:36:31 -0800 Subject: [PATCH 12/18] remove modes --- services/storehost/spaceMon.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index b32d5c28..7865b6c7 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -47,16 +47,6 @@ const ( teraBytes = 1024 * gigaBytes ) -// StorageMode defines the read write mode of the storage host -type StorageMode int32 - -const ( - // StorageModeReadWrite read/write - StorageModeReadWrite StorageMode = iota - // StorageModeReadOnly read only - StorageModeReadOnly -) - type ( // SpaceMon monitors free-space and switches stores to read-only on low-space SpaceMon struct { @@ -147,23 +137,22 @@ func (s *SpaceMon) pump() { switch { case s.readonly.Load(): // disable readonly, if above resume-writes threshold - // disable read-only, if above resume-writes threshold - if avail > resumeWritesThreshold { - - if s.readonly.CAS(false, true) { - - xlog.Info("SpaceMon: disabling read-only") - s.storeHost.EnableReadonly() - } + // if below resume-writes threshold, do nothing + if avail < resumeWritesThreshold { + xlog.Warn(`SpaceMon: continuing in read-only`) + continue + } - } else { + // disable readonly, if we have recovered free-space + if s.readonly.CAS(true, false) { - xlog.Warn(`SpaceMon: continuing in read-only`) + xlog.Info("SpaceMon: disabling read-only") + s.storeHost.EnableReadonly() } case avail < readOnlyThreshold: // enable read-only, if below alert-threshold - if s.readonly.CAS(true, false) { + if s.readonly.CAS(false, true) { xlog.Error("SpaceMon: switching to read-only") s.storeHost.DisableReadonly() From 245e7016aa62cb0c6bf6fd07eec09f6868d71601 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Tue, 21 Nov 2017 11:44:53 -0800 Subject: [PATCH 13/18] binary prefixes --- services/storehost/spaceMon.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 7865b6c7..0c0cd556 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -32,19 +32,20 @@ import ( ) const ( - warnThreshold = 200 * gigaBytes - readOnlyThreshold = 100 * gigaBytes - resumeWritesThreshold = 200 * gigaBytes + warnThreshold = 200 * GiB + readOnlyThreshold = 100 * GiB + resumeWritesThreshold = 200 * GiB ) // monitoring interval const spaceMonInterval = 2 * time.Minute const ( - kiloBytes = 1024 - megaBytes = 1024 * kiloBytes - gigaBytes = 1024 * megaBytes - teraBytes = 1024 * gigaBytes + // binary prefixes + KiB = 1024 + MiB = 1024 * KiB + GiB = 1024 * MiB + TiB = 1024 * GiB ) type ( @@ -122,7 +123,7 @@ func (s *SpaceMon) pump() { continue } - availMiBs, totalMiBs := avail/megaBytes, total/megaBytes + availMiBs, totalMiBs := avail/MiB, total/MiB availPercent := 100.0 * float64(avail) / float64(total) s.hostMetrics.Set(load.HostMetricFreeDiskSpaceBytes, int64(avail)) From 885c08c4cd16de1447ef913dc21ba9cd1fd46f1c Mon Sep 17 00:00:00 2001 From: kiranrg Date: Tue, 21 Nov 2017 11:50:27 -0800 Subject: [PATCH 14/18] ~ --- services/storehost/storehost.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index c5e945c4..de3ca0bb 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -174,10 +174,10 @@ type ( shutdownWG sync.WaitGroup // the following is used by inConn/outConn - xMgr *ExtentManager // extent manager - replMgr *ReplicationManager // replication manager - shutdownC chan struct{} - disableWriteC atomic.Value // chan struct{} + xMgr *ExtentManager // extent manager + replMgr *ReplicationManager // replication manager + shutdownC chan struct{} + readonlyC atomic.Value // chan struct{} numInConn, numOutConn int64 // number of active inConns/outConns respectively @@ -1315,18 +1315,17 @@ func (t *StoreHost) Shutdown() { // EnableReadonly disables writes, switching into 'read-only' mode func (t *StoreHost) EnableReadonly() { - t.logger.Error("Write disabled") - close(t.disableWriteC.Load().(chan struct{})) + close(t.readonlyC.Load().(chan struct{})) } // DisableReadonly enables writes again (disables read-only) func (t *StoreHost) DisableReadonly() { - t.disableWriteC.Store(make(chan struct{})) + t.readonlyC.Store(make(chan struct{})) } // NotifyReadonly returns a channel that is used to notify when writes are disabled func (t *StoreHost) NotifyReadonly() chan struct{} { - return t.disableWriteC.Load().(chan struct{}) + return t.readonlyC.Load().(chan struct{}) } // IsReadonly returns whether the store is currently in read-only From 751a0ec679ad0190ea6c9dc5a248b2c9b8026d92 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Wed, 22 Nov 2017 11:06:18 -0800 Subject: [PATCH 15/18] Increase thresholds --- services/storehost/spaceMon.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 0c0cd556..4f28db28 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -31,20 +31,30 @@ import ( "go.uber.org/atomic" ) -const ( - warnThreshold = 200 * GiB - readOnlyThreshold = 100 * GiB - resumeWritesThreshold = 200 * GiB -) +var ( + // TODO: make these dynamically configurable! + + // SpaceMonThresholdWarn if free-space below this, emit warning logs + SpaceMonThresholdWarn uint64 = 250 * GiB -// monitoring interval -const spaceMonInterval = 2 * time.Minute + // SpaceMonThresholdReadOnly if free-space below this, switch store to read-only + SpaceMonThresholdReadOnly uint64 = 200 * GiB + + // SpaceMonThresholdResumeWrites if read-only and free-space exceeds this, then make read-write + SpaceMonThresholdResumeWrites uint64 = 275 * GiB + + // SpaceMonInterval interval at which to monitor free-space and take action + SpaceMonInterval = 2 * time.Minute +) const ( - // binary prefixes + // KiB kibibytes KiB = 1024 + // MiB mebibytes MiB = 1024 * KiB + // GiB gibibytes GiB = 1024 * MiB + // TiB tebibytes TiB = 1024 * GiB ) @@ -96,7 +106,7 @@ func (s *SpaceMon) pump() { log := s.logger.WithField(`path`, s.path) - ticker := time.NewTicker(spaceMonInterval) + ticker := time.NewTicker(SpaceMonInterval) defer ticker.Stop() for { @@ -139,7 +149,7 @@ func (s *SpaceMon) pump() { case s.readonly.Load(): // disable readonly, if above resume-writes threshold // if below resume-writes threshold, do nothing - if avail < resumeWritesThreshold { + if avail < SpaceMonThresholdResumeWrites { xlog.Warn(`SpaceMon: continuing in read-only`) continue } @@ -151,7 +161,7 @@ func (s *SpaceMon) pump() { s.storeHost.EnableReadonly() } - case avail < readOnlyThreshold: // enable read-only, if below alert-threshold + case avail < SpaceMonThresholdReadOnly: // enable read-only, if below alert-threshold if s.readonly.CAS(false, true) { @@ -159,7 +169,7 @@ func (s *SpaceMon) pump() { s.storeHost.DisableReadonly() } - case avail < warnThreshold: // warn, if below warn-threshold + case avail < SpaceMonThresholdWarn: // warn, if below warn-threshold xlog.Warn("SpaceMon: warning: running low on space") default: From 6eece0233b30bef0d4d3a03f51d9cd709ecc9e12 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Wed, 22 Nov 2017 13:58:56 -0800 Subject: [PATCH 16/18] module tag --- services/storehost/spaceMon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/storehost/spaceMon.go b/services/storehost/spaceMon.go index 4f28db28..c4635044 100644 --- a/services/storehost/spaceMon.go +++ b/services/storehost/spaceMon.go @@ -77,7 +77,7 @@ func NewSpaceMon(store *StoreHost, m3Client metrics.Client, hostMetrics *load.Ho return &SpaceMon{ storeHost: store, - logger: logger, + logger: logger.WithField(common.TagModule, `spaceMon`), m3Client: m3Client, hostMetrics: hostMetrics, path: path, From ba3c40a853fdfa86c7103108edbc30b18edea477 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Wed, 22 Nov 2017 15:32:19 -0800 Subject: [PATCH 17/18] ttt --- services/controllerhost/controllerhost.go | 2 ++ services/controllerhost/placement.go | 20 ++++++++++---------- services/storehost/storehost.go | 1 + 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/services/controllerhost/controllerhost.go b/services/controllerhost/controllerhost.go index c6cef9c4..3027719a 100644 --- a/services/controllerhost/controllerhost.go +++ b/services/controllerhost/controllerhost.go @@ -557,6 +557,8 @@ func (mcp *Mcp) ReportNodeMetric(ctx thrift.Context, request *c.ReportNodeMetric } if metrics.IsSetNodeState() && metrics.GetNodeState()&c.NODE_STATE_READONLY != 0 { loadMetrics.Put(hostID, load.EmptyTag, load.ReadOnly, 1, timestamp) + } else { + loadMetrics.Put(hostID, load.EmptyTag, load.ReadOnly, 0, timestamp) } if metrics.IsSetNodeStatus() && request.IsSetRole() && metrics.GetNodeStatus() == c.NodeStatus_GOING_DOWN { switch request.GetRole() { diff --git a/services/controllerhost/placement.go b/services/controllerhost/placement.go index 61a27005..65ae8455 100644 --- a/services/controllerhost/placement.go +++ b/services/controllerhost/placement.go @@ -216,6 +216,16 @@ func (p *DistancePlacement) PickStoreHosts(count int) ([]*common.HostInfo, error // meets all requirements to host a new extent. func (p *DistancePlacement) doesStoreMeetConstraints(host *common.HostInfo) bool { + // if the store-node has reported to be in 'read-only' any time in + // the last minute, then don't place extents on it. + if val, err := p.context.loadMetrics.Get(host.UUID, load.EmptyTag, load.ReadOnly, load.OneMinSum); err == nil && val > 0 { + p.context.log.WithFields(bark.Fields{ + common.TagHostIP: host.Addr, + `read-only`: val, + `reason`: "ReadOnly"}).Info("Placement ignoring store host") + return false + } + cfgObj, err := p.context.cfgMgr.Get(common.StoreServiceName, "*", host.Sku, host.Name) if err != nil { return true @@ -233,16 +243,6 @@ func (p *DistancePlacement) doesStoreMeetConstraints(host *common.HostInfo) bool return false } - // if the store-node has reported to be in 'read-only' any time in - // the last minute, then don't place extents on it. - if val, err := p.context.loadMetrics.Get(host.UUID, load.EmptyTag, load.ReadOnly, load.OneMinSum); err == nil && val > 0 { - p.context.log.WithFields(bark.Fields{ - common.TagHostIP: host.Addr, - `read-only`: val, - `reason`: "ReadOnly"}).Info("Placement ignoring store host") - return false - } - val, err := p.context.loadMetrics.Get(host.UUID, load.EmptyTag, load.RemDiskSpaceBytes, load.OneMinAvg) if err != nil { return true diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index de3ca0bb..fb0c822f 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -1367,6 +1367,7 @@ func (t *StoreHost) reportHostMetric(reporter common.LoadReporter, diffSecs int6 // check and notify read-only state if t.IsReadonly() { + log.Warn("reportHostMetric: storehost currently readonly") hostMetrics.NodeState = common.Int64Ptr(controller.NODE_STATE_READONLY) } From 1c60db7ae9c6e84aac6ec4963dcb945005024884 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Wed, 22 Nov 2017 16:17:26 -0800 Subject: [PATCH 18/18] ttt --- services/storehost/storehost.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/storehost/storehost.go b/services/storehost/storehost.go index fb0c822f..8584e015 100644 --- a/services/storehost/storehost.go +++ b/services/storehost/storehost.go @@ -1367,7 +1367,7 @@ func (t *StoreHost) reportHostMetric(reporter common.LoadReporter, diffSecs int6 // check and notify read-only state if t.IsReadonly() { - log.Warn("reportHostMetric: storehost currently readonly") + t.logger.Warn("reportHostMetric: storehost currently readonly") hostMetrics.NodeState = common.Int64Ptr(controller.NODE_STATE_READONLY) }