diff --git a/serialization_test.go b/serialization_test.go index 80750736..50392d5b 100644 --- a/serialization_test.go +++ b/serialization_test.go @@ -419,7 +419,7 @@ func singleSliceInArray() (*Bitmap, []*Bitmap) { func singleSlice() *Bitmap { slice := make([]byte, 2) - return &Bitmap{highlowcontainer:roaringArray{keys: []uint16{0}, containers: []container{&arrayContainer{ byteSliceAsUint16Slice(slice)}}}} + return &Bitmap{highlowcontainer: roaringArray{keys: []uint16{0}, containers: []container{&arrayContainer{byteSliceAsUint16Slice(slice)}}}} } func TestByteSliceAsUint64Slice(t *testing.T) { diff --git a/setutil.go b/setutil.go index 3e8c01dd..ca9c977d 100644 --- a/setutil.go +++ b/setutil.go @@ -582,6 +582,119 @@ mainwhile: return pos } +// returns -1 if x < y, zero otherwise +func branchlessComparator(x, y uint16) int { + return (int(x) - int(y)) >> 63 +} + +// shotgun4Intersect performs intersection between small and large arrays described in +// https://lemire.me/blog/2019/01/16/faster-intersections-between-sorted-arrays-with-shotgun/ +func shotgun4Intersect(small, large, buf []uint16) int { + if len(small) == 0 { + return 0 + } + + nS, nL := len(small), len(large) + buf = buf[:cap(buf)] + idxS, idxL := 0, 0 + pos := 0 + + for (idxS+4 <= nS) && idxL < nL { + t1, t2, t3, t4 := small[idxS], small[idxS+1], small[idxS+2], small[idxS+3] + idx1, idx2, idx3, idx4 := idxL, idxL, idxL, idxL + n := nL - idxL + + for n > 1 { + m := n >> 1 + l1, l2, l3, l4 := large[idx1+m], large[idx2+m], large[idx3+m], large[idx4+m] + idx1 += branchlessComparator(l1, t1) & m + idx2 += branchlessComparator(l2, t2) & m + idx3 += branchlessComparator(l3, t3) & m + idx4 += branchlessComparator(l4, t4) & m + n -= m + } + + l1, l2, l3, l4 := large[idx1], large[idx2], large[idx3], large[idx4] + if idx4+1 < nL { // common case + idx1 -= branchlessComparator(l1, t1) + idx2 -= branchlessComparator(l2, t2) + idx3 -= branchlessComparator(l3, t3) + idx4 -= branchlessComparator(l4, t4) + l1, l2, l3, l4 = large[idx1], large[idx2], large[idx3], large[idx4] + } else { // slow path + if l1 < t1 { + idx1++ + if idx1 < nL { + l1 = large[idx1] + } + } + if l2 < t2 { + idx2++ + if idx2 < nL { + l2 = large[idx2] + } + } + if l3 < t3 { + idx3++ + if idx3 < nL { + l3 = large[idx3] + } + } + if l4 < t4 { + idx4++ + if idx4 < nL { + l4 = large[idx4] + } + } + + } + + if l1 == t1 { + buf[pos] = t1 + pos++ + } + + if l2 == t2 { + buf[pos] = t2 + pos++ + } + + if l3 == t3 { + buf[pos] = t3 + pos++ + } + + if l4 == t4 { + buf[pos] = t4 + pos++ + } + + idxS += 4 + idxL = idx4 + } + + for idxS < nS && idxL < nL { + s := small[idxS] + + if s > large[idxL] { + idxL = advanceUntil(large, idxL, nL, s) + } + + if idxL == nL { + break + } + + if large[idxL] == s { + buf[pos] = s + pos++ + } + + idxS++ + } + + return pos +} + func binarySearch(array []uint16, ikey uint16) int { low := 0 high := len(array) - 1 diff --git a/setutil_test.go b/setutil_test.go index b037e02a..ae7cbad0 100644 --- a/setutil_test.go +++ b/setutil_test.go @@ -3,8 +3,11 @@ package roaring // to run just these tests: go test -run TestSetUtil* import ( - "github.com/stretchr/testify/assert" + "math/rand" + "sort" "testing" + + "github.com/stretchr/testify/assert" ) func TestSetUtilDifference(t *testing.T) { @@ -92,16 +95,80 @@ func TestSetUtilIntersection(t *testing.T) { assert.Equal(t, expectedresult, result) } -func TestSetUtilIntersection2(t *testing.T) { - data1 := []uint16{0, 2, 4, 6, 8, 10, 12, 14, 16, 18} - data2 := []uint16{0, 3, 6, 9, 12, 15, 18} - result := make([]uint16, 0, len(data1)+len(data2)) - expectedresult := []uint16{0, 6, 12, 18} - nl := intersection2by2(data1, data2, result) - result = result[:nl] - result = result[:len(expectedresult)] +// go test -run TestSetUtilIntersectionCases +func TestSetUtilIntersectionCases(t *testing.T) { + algorithms := []struct { + name string + algo func(a, b, buf []uint16) int + }{ + { + name: "onesidedgallopingintersect2by2", + algo: onesidedgallopingintersect2by2, + }, + { + name: "shotgun4Intersect", + algo: shotgun4Intersect, + }, + } - assert.Equal(t, expectedresult, result) + cases := []struct { + a, b, expected []uint16 + }{ + { + a: []uint16{}, + b: []uint16{}, + expected: []uint16{}, + }, + { + a: []uint16{1}, + b: []uint16{1}, + expected: []uint16{1}, + }, + { + a: []uint16{1}, + b: []uint16{2}, + expected: []uint16{}, + }, + { + a: []uint16{1, 2}, + b: []uint16{2, 3}, + expected: []uint16{2}, + }, + { + a: []uint16{1, 2, 3}, + b: []uint16{0, 2, 4, 6, 8, 10, 12, 14, 16, 18}, + expected: []uint16{2}, + }, + { + a: []uint16{0, 3, 6, 9, 12, 15, 18}, + b: []uint16{0, 2, 4, 6, 8, 10, 12, 14, 16, 18}, + expected: []uint16{0, 6, 12, 18}, + }, + { + a: []uint16{0, 3, 6, 9, 12, 15, 18}, + b: []uint16{0, 3, 6, 9, 12, 15, 18}, + expected: []uint16{0, 3, 6, 9, 12, 15, 18}, + }, + { + a: []uint16{1, 2, 3, 5, 7, 11, 13, 16, 30, 40, 100, 131, 200}, + b: []uint16{10, 60, 100}, + expected: []uint16{100}, + }, + { + a: []uint16{10, 60, 100}, + b: []uint16{1, 2, 3, 5, 7, 11, 13, 16, 30, 40, 100, 131, 200}, + expected: []uint16{100}, + }, + } + + for _, a := range algorithms { + for i, c := range cases { + result := make([]uint16, 0, len(c.a)+len(c.b)) + n := a.algo(c.a, c.b, result) + + assert.Equalf(t, c.expected, result[:n], "test %d fail, algorithm: %s", i+1, a.name) + } + } } func TestSetUtilBinarySearch(t *testing.T) { @@ -119,3 +186,95 @@ func TestSetUtilBinarySearch(t *testing.T) { } } } + +// go test -bench BenchmarkIntersectAlgorithms -run - +func BenchmarkIntersectAlgorithms(b *testing.B) { + // sz1 is the small array + sz1 := 64 // this should not be *too* large + s1 := make([]uint16, sz1) + + // to get more realistic results, we try different + // large array sizes. Our benchmarks is going to be + // an average of those... + + sz2 := 3000 + s2 := make([]uint16, sz2) + + sz3 := 2040 + s3 := make([]uint16, sz3) + + sz4 := 1200 + s4 := make([]uint16, sz4) + + r := rand.New(rand.NewSource(1234)) + + // We are going to populate our large arrays with + // random data. Importantly, we need to sort. + // There might be a few duplicates, by random chance, + // but it should not affect results too much. + + for i := 0; i < sz2; i++ { + s2[i] = uint16(r.Intn(MaxUint16)) + } + sort.Sort(uint16Slice(s2)) + + for i := 0; i < sz3; i++ { + s3[i] = uint16(r.Intn(MaxUint16)) + } + sort.Sort(uint16Slice(s3)) + + for i := 0; i < sz4; i++ { + s4[i] = uint16(r.Intn(MaxUint16)) + } + sort.Sort(uint16Slice(s4)) + + buf := make([]uint16, sz1+sz2+sz3+sz4) + commonseed := 123456 + r = rand.New(rand.NewSource(int64(commonseed))) // we set the same seed in both instances + + b.Run("onesidedgallopingintersect2by2", func(b *testing.B) { + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // this is important: you want to start with a new + // small array each time otherwise onesidedgallopingintersect2by2 + // might benefit from nearly perfect branch prediction, making + // the benchmark unrealistic. + // This needs to be super fast, which it should be if sz1 is + // small enough. + for i := 0; i < sz1; i++ { + // This needs to be super fast + s1[i] = uint16(r.Intn(MaxUint16)) + } + sort.Sort(uint16Slice(s1)) // There might be duplicates, ignore them + + onesidedgallopingintersect2by2(s1, s2, buf) + onesidedgallopingintersect2by2(s1, s3, buf) + onesidedgallopingintersect2by2(s1, s4, buf) + + } + }) + r = rand.New(rand.NewSource(int64(commonseed))) // we set the same seed in both instances + + b.Run("shotgun4", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + // this is important: you want to start with a new + // small array each time otherwise onesidedgallopingintersect2by2 + // might benefit from nearly perfect branch prediction, making + // the benchmark unrealistic. + // This needs to be super fast, which it should be if sz1 is + // small enough. + for i := 0; i < sz1; i++ { + s1[i] = uint16(r.Intn(MaxUint16)) + } + sort.Sort(uint16Slice(s1)) // There might be duplicates, ignore them + + shotgun4Intersect(s1, s2, buf) + shotgun4Intersect(s1, s3, buf) + shotgun4Intersect(s1, s4, buf) + + } + }) +}