Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Add unit test for DS #756

Merged
merged 22 commits into from
Jan 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
@@ -80,7 +80,8 @@ jobs:
mkdir -p ./logs/$CASE
cat $DIR/stdout.log
tail -n 10 $DIR/cdc.log
sudo cp -r -L $DIR/{*.log,sync_diff} ./logs/$CASE/
sudo cp -r -L $DIR/{*.log} ./logs/$CASE/
sudo cp -r -L $DIR/{sync_diff} ./logs/$CASE/ || true
sudo chown -R runner ./logs
sudo tar -czvf ./logs.tar.gz ./logs
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
4 changes: 3 additions & 1 deletion .github/workflows/uint_test.yaml
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ jobs:

eventservice_unit_test:
runs-on: ubuntu-latest
name: EventService Unit Test
name: Other Unit Tests
steps:
- name: Check out code
uses: actions/checkout@v2
@@ -104,6 +104,8 @@ jobs:
- name: Unit Test
run: |
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice/...
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/utils/dynstream...




7 changes: 5 additions & 2 deletions utils/dynstream/event_queue.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ func newEventQueue[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](optio
option: option,
handler: handler,
eventBlockAlloc: deque.NewBlockAllocator[eventWrap[A, P, T, D, H]](32, 1024),
signalQueue: deque.NewDeque[eventSignal[A, P, T, D, H]](1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)),
signalQueue: deque.NewDeque(1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)),
totalPendingLength: &atomic.Int64{},
}

@@ -97,8 +97,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
if path.blocking || path.removed {
// The path is blocking or removed, we should ignore the signal completely.
// Since when it is waked, a signal event will be added to the queue.
q.signalQueue.PopFront()
q.totalPendingLength.Add(-int64(signal.eventCount))
q.signalQueue.PopFront()
continue
}

@@ -114,6 +114,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
continue
}
firstGroup := firstEvent.eventType.DataGroup
firstProperty := firstEvent.eventType.Property

appendToBuf(firstEvent, path)

// Try to batch events with the same data group.
@@ -126,6 +128,7 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
// Only batch events with the same data group and when the event is batchable.
if !ok ||
(firstGroup != front.eventType.DataGroup) ||
firstProperty == NonBatchable ||
front.eventType.Property == NonBatchable {
break
}
256 changes: 256 additions & 0 deletions utils/dynstream/event_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package dynstream

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewEventQueue(t *testing.T) {
handler := mockHandler{}
option := Option{BatchCount: 10}

eq := newEventQueue(option, &handler)

require.NotNil(t, eq.eventBlockAlloc)
require.NotNil(t, eq.signalQueue)
require.NotNil(t, eq.totalPendingLength)
require.Equal(t, option, eq.option)
require.Equal(t, &handler, eq.handler)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestAppendAndPopSingleEvent(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 10}, &handler)

// create a path
path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append an event
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}

eq.appendEvent(event)

// verify the event is appended
require.Equal(t, int64(1), eq.totalPendingLength.Load())

// pop the event
buf := make([]*mockEvent, 0)
events, popPath := eq.popEvents(buf)

require.Equal(t, 1, len(events))
require.Equal(t, mockEvent{value: 1}, *events[0])
require.Equal(t, path, popPath)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestBlockAndWakePath(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 10}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append an event
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(event)

// block the path
eq.blockPath(path)

// try to pop the event (should not pop)
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 0, len(events))
require.Equal(t, int64(0), eq.totalPendingLength.Load())

// wake the path
eq.wakePath(path)
require.Equal(t, int64(1), eq.totalPendingLength.Load())

// try to pop the event (should pop)
events, popPath := eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, path, popPath)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestBatchEvents(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append multiple events with the same DataGroup
for i := 1; i <= 5; i++ {
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(event)
}

// verify the batch pop
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)

// since BatchCount = 3, only the first 3 events should be popped
require.Equal(t, 3, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, &mockEvent{value: 3}, events[2])

// verify the remaining event count
require.Equal(t, int64(2), eq.totalPendingLength.Load())
}

func TestBatchableAndNonBatchableEvents(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append a non-batchable event
event1 := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: NonBatchable,
},
}
eq.appendEvent(event1)

// append 2 batchable events
for i := 1; i <= 2; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
}

// add 2 non-batchable events
for i := 1; i <= 2; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: NonBatchable,
},
}
eq.appendEvent(e)
}

// append 5 batchable events
for i := 1; i <= 5; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
}

// case 1: pop the first non-batchable event
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, int64(9), eq.totalPendingLength.Load())

// case 2: pop the first 2 batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 2, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, int64(7), eq.totalPendingLength.Load())

// case 3: pop a non-batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, int64(6), eq.totalPendingLength.Load())

// case 4: pop the second non-batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 2}, events[0])
require.Equal(t, int64(5), eq.totalPendingLength.Load())

// case 5: pop the first 3 batchable events
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 3, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, &mockEvent{value: 3}, events[2])
require.Equal(t, int64(2), eq.totalPendingLength.Load())

// case 6: pop the remaining 2 batchable events
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 2, len(events))
require.Equal(t, &mockEvent{value: 4}, events[0])
require.Equal(t, &mockEvent{value: 5}, events[1])
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestRemovePath(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
require.Equal(t, int64(1), eq.totalPendingLength.Load())

path.removed = true
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 0, len(events))
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}
13 changes: 0 additions & 13 deletions utils/dynstream/memory_control.go
Original file line number Diff line number Diff line change
@@ -214,16 +214,3 @@ func (m *memControl[A, P, T, D, H]) getMetrics() (usedMemory int64, maxMemory in
}
return usedMemory, maxMemory
}

func (p *pathInfo[A, P, T, D, H]) SetHeapIndex(index int) {
p.sizeHeapIndex = index
}

func (p *pathInfo[A, P, T, D, H]) GetHeapIndex() int {
return p.sizeHeapIndex
}

func (p *pathInfo[A, P, T, D, H]) LessThan(other *pathInfo[A, P, T, D, H]) bool {
// pathSizeHeap should be in descending order. That say the node with the largest pending size is the top.
return p.pendingSize.Load() > other.pendingSize.Load()
}
2 changes: 1 addition & 1 deletion utils/dynstream/parallel_dynamic_stream.go
Original file line number Diff line number Diff line change
@@ -128,7 +128,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are
s.pathMap[path] = pi
s.setMemControl(pi, as...)

pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi, newPath: true}
pi.stream.addPath(pi)

s._statAddPathCount.Add(1)
return nil
Loading