Skip to content

Commit

Permalink
fix: Segment file mismatch on replicas
Browse files Browse the repository at this point in the history
When writing at high throughput rates it was possible that
replicas used the buffered offset instead of the original
file name.

Fixes #76.
  • Loading branch information
jorgebay authored Oct 24, 2022
1 parent ad923b9 commit 8b141ee
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docs/install/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
### Installing Barco Streams on Kubernetes
# Installing Barco Streams on Kubernetes

You can use `kubectl` to install Barco on Kubernetes.

Expand Down
7 changes: 7 additions & 0 deletions internal/data/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,10 @@ func (s *ReadSegmentChunk) StartOffset() int64 {
func (s *ReadSegmentChunk) RecordLength() uint32 {
return s.Length
}

type writerType string

const (
replicaWriter writerType = "replica"
leaderWriter writerType = "leader"
)
21 changes: 17 additions & 4 deletions internal/data/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type SegmentWriter struct {
segmentLength int64
basePath string
replicator Replicator
writerType writerType
}

func NewSegmentWriter(
Expand All @@ -63,6 +64,7 @@ func NewSegmentWriter(
indexFile: newIndexFileWriter(basePath, config),
basePath: basePath,
replicator: gossiper,
writerType: leaderWriter,
}

if segmentId == nil {
Expand All @@ -72,6 +74,7 @@ func NewSegmentWriter(
go s.writeLoopAsLeader()
} else {
log.Info().Msgf("Creating segment writer as replica for %s", &topic)
s.writerType = replicaWriter
s.createFile(*segmentId)
go s.writeLoopAsReplica()
}
Expand Down Expand Up @@ -99,6 +102,11 @@ func (s *SegmentWriter) writeLoopAsLeader() {

s.writeToBuffer(item)

if s.segmentFile == nil {
// We need to make sure the file and segmentId is created locally before sending to replicas
s.createFile(s.bufferedOffset)
}

// Response channel should be buffered in case the response is discarded
response := make(chan error, 1)

Expand Down Expand Up @@ -134,9 +142,10 @@ func (s *SegmentWriter) writeLoopAsReplica() {

if s.segmentId != item.SegmentId() {
if s.buffer.Len() > 0 {
s.flush("closing")
s.flush("closing as replica")
}
s.closeFile()
s.createFile(item.SegmentId())
}

s.writeToBuffer(item)
Expand Down Expand Up @@ -184,8 +193,8 @@ func (s *SegmentWriter) maybeFlush() bool {

func (s *SegmentWriter) createFile(segmentId int64) {
s.segmentId = segmentId
name := conf.SegmentFileName(s.segmentId)
log.Debug().Msgf("Creating segment file %s on %s", name, s.basePath)
name := conf.SegmentFileName(segmentId)
log.Info().Str("type", string(s.writerType)).Msgf("Creating segment file %s on %s", name, s.basePath)

f, err := os.OpenFile(filepath.Join(s.basePath, name), conf.SegmentFileWriteFlags, FilePermissions)
if err != nil {
Expand All @@ -201,12 +210,16 @@ func (s *SegmentWriter) flush(reason string) {
length := int64(s.buffer.Len())

if s.segmentFile == nil {
if s.writerType == replicaWriter {
log.Panic().Msgf("Flush should not create file on replicas as the file name will be invalid")
}
s.createFile(s.bufferedOffset)
}

buf := s.buffer.Bytes()
log.Debug().
Str("reason", reason).
Str("writerType", string(s.writerType)).
Int64("offset", s.tailOffset).
Msgf("Writing %d bytes to segment file %s/%s", len(buf), s.basePath, conf.SegmentFileName(s.segmentId))

Expand Down Expand Up @@ -240,7 +253,7 @@ func (s *SegmentWriter) closeFile() {
// Close the segment file in the background
go func() {
err := previousFile.Close()
log.Err(err).Msgf("Segment file %d closed on %s", previousSegmentId, s.basePath)
log.Err(err).Msgf("Closed segment file %s on %s", conf.SegmentFileName(previousSegmentId), s.basePath)
}()

// Close the index file
Expand Down
10 changes: 8 additions & 2 deletions internal/test/integration/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ func NewTestBroker(ordinal int, options ...*TestBrokerOptions) *TestBroker {

func (b *TestBroker) Start() {
buildOutput, err := exec.Command("go", "build", "-o", "barco.exe", "../../../.").CombinedOutput()
// buildOutput, err := exec.Command("go", "build", "-tags=profiling", "-o", "barco.exe", "../../../.").CombinedOutput()
Expect(err).NotTo(HaveOccurred(), "Build failed: %s", string(buildOutput))
cmd := exec.Command("./barco.exe", "-debug")

logPretty := ""

if os.Getenv("BARCO_TEST_LOG_PRETTY") == "true" {
logPretty = "-pretty"
}

cmd := exec.Command("./barco.exe", "-debug", logPretty)
os.RemoveAll(fmt.Sprintf("./home%d", b.ordinal))

names := make([]string, 0)
Expand Down

0 comments on commit 8b141ee

Please sign in to comment.