Skip to content

Commit

Permalink
Use a fixed buffer pool for producing
Browse files Browse the repository at this point in the history
* Implement buffer pooling via channel
* Remove flow controller and rename setting
* Adapt settings after project renaming
  • Loading branch information
jorgebay authored Dec 9, 2022
1 parent df986c0 commit 79371f2
Show file tree
Hide file tree
Showing 49 changed files with 473 additions and 328 deletions.
10 changes: 5 additions & 5 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
envReplicationTimeoutDuration = "POLAR_REPLICATION_TIMEOUT_DURATION"
envReplicationWriteTimeoutDuration = "POLAR_REPLICATION_WRITE_TIMEOUT_DURATION"
envMaxSegmentSize = "POLAR_MAX_SEGMENT_FILE_SIZE"
envAllocationPoolSize = "POLAR_ALLOCATION_POOL_SIZE"
envProducerBufferPoolSize = "POLAR_PRODUCER_BUFFER_POOL_SIZE"
envConsumerAddDelay = "POLAR_CONSUMER_ADD_DELAY_MS"
envConsumerReadTimeout = "POLAR_CONSUMER_READ_TIMEOUT_MS"
envConsumerRanges = "POLAR_CONSUMER_RANGES"
Expand All @@ -62,7 +62,7 @@ const (
defaultLogRetention = "168h" // 7 days
defaultReplicationTimeout = "1s"
defaultReplicationWriteTimeout = "500ms"
defaultAllocationPoolSize = 32 * MiB
defaultProducerBufferPoolSize = 32 * MiB
)

var hostRegex = regexp.MustCompile(`([\w\-.]+?)-(\d+)`)
Expand Down Expand Up @@ -124,7 +124,7 @@ type DiscovererConfig interface {
type ProducerConfig interface {
BasicConfig
DatalogConfig
AllocationPoolSize() int // The number of bytes in the shared producer allocation pool
ProducerBufferPoolSize() int
}

type ConsumerConfig interface {
Expand Down Expand Up @@ -312,8 +312,8 @@ func (c *config) MaxSegmentSize() int {
return envInt(envMaxSegmentSize, 1024*MiB)
}

func (c *config) AllocationPoolSize() int {
return envInt(envAllocationPoolSize, defaultAllocationPoolSize)
func (c *config) ProducerBufferPoolSize() int {
return envInt(envProducerBufferPoolSize, defaultProducerBufferPoolSize)
}

func (c *config) SegmentBufferSize() int {
Expand Down
4 changes: 2 additions & 2 deletions internal/consuming/consumer_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/test/discovery/mocks"
"github.com/polarstreams/polar/internal/test/fakes"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

const consumerRanges = 8
Expand Down
4 changes: 2 additions & 2 deletions internal/consuming/group_read_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"net/http"
"time"

"github.com/karlseguin/jsonwriter"
"github.com/klauspost/compress/zstd"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/data"
. "github.com/polarstreams/polar/internal/data"
"github.com/polarstreams/polar/internal/discovery"
"github.com/polarstreams/polar/internal/interbroker"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
"github.com/karlseguin/jsonwriter"
"github.com/klauspost/compress/zstd"
"github.com/rs/zerolog/log"
)

Expand Down
6 changes: 3 additions & 3 deletions internal/consuming/group_read_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"io"
"net/http/httptest"

"github.com/klauspost/compress/zstd"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/data"
cMocks "github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/polarstreams/polar/internal/types"
"github.com/klauspost/compress/zstd"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("groupReadQueue()", func() {
Expand Down
6 changes: 3 additions & 3 deletions internal/consuming/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/karlseguin/jsonwriter"
"github.com/klauspost/compress/zstd"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/data"
"github.com/polarstreams/polar/internal/types"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
"github.com/google/uuid"
"github.com/karlseguin/jsonwriter"
"github.com/klauspost/compress/zstd"
"github.com/rs/zerolog/log"
)

Expand Down
4 changes: 2 additions & 2 deletions internal/consuming/offset_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
cMocks "github.com/polarstreams/polar/internal/test/conf/mocks"
dataMocks "github.com/polarstreams/polar/internal/test/data/mocks"
dMocks "github.com/polarstreams/polar/internal/test/discovery/mocks"
iMocks "github.com/polarstreams/polar/internal/test/interbroker/mocks"
dbMocks "github.com/polarstreams/polar/internal/test/localdb/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/consuming/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/julienschmidt/httprouter"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/data"
"github.com/polarstreams/polar/internal/discovery"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/polarstreams/polar/internal/types"
. "github.com/polarstreams/polar/internal/types"
. "github.com/polarstreams/polar/internal/utils"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog/log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
Expand Down
2 changes: 1 addition & 1 deletion internal/data/datalog_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"path/filepath"
"time"

"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
)

var _ = Describe("datalog", func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/data/datalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"os"
"path/filepath"

"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/data/index_file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"io/ioutil"

"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
)

var _ = Describe("tryReadIndexFile()", func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/data/index_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"path/filepath"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("indexFileWriter", func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/data/offset_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package data
import (
"io/ioutil"

"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/polarstreams/polar/internal/types"
"github.com/stretchr/testify/mock"
)

Expand Down
4 changes: 2 additions & 2 deletions internal/data/read_file_structure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"os"
"path/filepath"

"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
. "github.com/polarstreams/polar/internal/types"
)

var _ = Describe("ReadFileStructure()", func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/data/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"path/filepath"
"testing"

"github.com/polarstreams/polar/internal/conf"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
)

func TestData(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/data/segment_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/test/conf/mocks"
tMocks "github.com/polarstreams/polar/internal/test/types/mocks"
. "github.com/polarstreams/polar/internal/types"
"github.com/google/uuid"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

Expand Down
4 changes: 2 additions & 2 deletions internal/data/segment_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"io/ioutil"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
mocks2 "github.com/polarstreams/polar/internal/test/types/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/discovery/discoverer_client_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"os"

"github.com/julienschmidt/httprouter"
"github.com/polarstreams/polar/internal/conf"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog/log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
Expand Down
4 changes: 2 additions & 2 deletions internal/discovery/discoverer_client_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"time"

"github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/utils"
)

var _ = Describe("discoverer", func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/discovery/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/test/conf/mocks"
dbMocks "github.com/polarstreams/polar/internal/test/localdb/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func Test(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/discovery/generation_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"sync/atomic"

. "github.com/google/uuid"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
. "github.com/google/uuid"
"github.com/rs/zerolog/log"
)

Expand Down
4 changes: 2 additions & 2 deletions internal/discovery/generation_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"fmt"
"sync/atomic"

. "github.com/google/uuid"
. "github.com/onsi/gomega"
. "github.com/polarstreams/polar/internal/test"
cMocks "github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/test/localdb/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/google/uuid"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/interbroker/data_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"sync/atomic"
"time"

"github.com/polarstreams/polar/internal/test/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/polar/internal/test/fakes"
"github.com/rs/zerolog/log"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/interbroker/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

. "github.com/google/uuid"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/data"
"github.com/polarstreams/polar/internal/discovery"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/polarstreams/polar/internal/types"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
. "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
)
Expand Down
4 changes: 2 additions & 2 deletions internal/interbroker/gossip_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"strings"
"sync"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
cMocks "github.com/polarstreams/polar/internal/test/conf/mocks"
dMocks "github.com/polarstreams/polar/internal/test/discovery/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/interbroker/json_messages.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package interbroker

import (
. "github.com/polarstreams/polar/internal/types"
. "github.com/google/uuid"
. "github.com/polarstreams/polar/internal/types"
)

// Represents the interbroker api json message for proposing and accepting a generation to another broker.
Expand Down
2 changes: 1 addition & 1 deletion internal/interbroker/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"io"
"net/url"

. "github.com/polarstreams/polar/internal/types"
. "github.com/google/uuid"
. "github.com/polarstreams/polar/internal/types"
)

// Represents a gossip listener to generation-related messages
Expand Down
4 changes: 2 additions & 2 deletions internal/interbroker/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"sync/atomic"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
confMock "github.com/polarstreams/polar/internal/test/conf/mocks"
"github.com/polarstreams/polar/internal/test/discovery/mocks"
. "github.com/polarstreams/polar/internal/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("SendToFollowers()", func() {
Expand Down
Loading

0 comments on commit 79371f2

Please sign in to comment.