Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ require (
github.com/ti-mo/netfilter v0.5.2
github.com/tidwall/btree v1.7.0
github.com/tidwall/pretty v1.2.1
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9
github.com/unum-cloud/usearch/golang v0.0.0-20260106013029-7306bb446be5
github.com/viterin/partial v1.1.0
go.starlark.net v0.0.0-20250701195324-d457b4515e0e
go.uber.org/automaxprocs v1.5.3
go.uber.org/ratelimit v0.2.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,8 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 h1:JrHCee+uqpF2zXooiKu7ymvKgnzlUIXtTlZ7vi21Tr0=
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9/go.mod h1:NxBpQibuBBeA/V8RGbrNzVAv4OyWWL5yNao7mVz656k=
github.com/unum-cloud/usearch/golang v0.0.0-20260106013029-7306bb446be5 h1:15aYPHUsC96Bu9uKmg97gvfBcADl92etFSedi3hKy1Y=
github.com/unum-cloud/usearch/golang v0.0.0-20260106013029-7306bb446be5/go.mod h1:NxBpQibuBBeA/V8RGbrNzVAv4OyWWL5yNao7mVz656k=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
Expand All @@ -859,6 +859,8 @@ github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tz
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/viterin/partial v1.1.0 h1:iH1l1xqBlapXsYzADS1dcbizg3iQUKTU1rbwkHv/80E=
github.com/viterin/partial v1.1.0/go.mod h1:oKGAo7/wylWkJTLrWX8n+f4aDPtQMQ6VG4dd2qur5QA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
70 changes: 70 additions & 0 deletions pkg/common/concurrent/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrent

import (
"context"
"runtime"

"golang.org/x/sync/errgroup"
)

type ThreadPoolExecutor struct {
nthreads int
}

func NewThreadPoolExecutor(nthreads int) ThreadPoolExecutor {
if nthreads == 0 {
nthreads = runtime.NumCPU()
}
return ThreadPoolExecutor{nthreads: nthreads}
}

func (e ThreadPoolExecutor) Execute(
ctx context.Context,
nitems int,
fn func(ctx context.Context, thread_id int, start, end int) error) (err error) {

g, ctx := errgroup.WithContext(ctx)

q := nitems / e.nthreads
r := nitems % e.nthreads

start := 0
for i := 0; i < e.nthreads; i++ {
size := q
if i < r {
size++
}
if size == 0 {
break
}

end := start + size
thread_id := i
curStart := start
curEnd := end
g.Go(func() error {
if err2 := fn(ctx, thread_id, curStart, curEnd); err2 != nil {
return err2
}

return nil
})
start = end
}

return g.Wait()
}
89 changes: 89 additions & 0 deletions pkg/common/concurrent/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrent

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestExecutor(t *testing.T) {

ctx := context.Background()
nthreads := 3
vec := make([]int, 1024)
answer := 0
for i := range vec {
vec[i] = i
answer += i
}

e := NewThreadPoolExecutor(nthreads)

r := make([]int, nthreads)

err := e.Execute(ctx, len(vec), func(ctx context.Context, thread_id int, start, end int) error {
subSlice := vec[start:end]
for j := range subSlice {
if j%100 == 0 && ctx.Err() != nil {
return ctx.Err()
}

r[thread_id] += subSlice[j]
}
return nil
})

require.NoError(t, err)

sum := 0
for _, v := range r {
sum += v
}

require.Equal(t, sum, answer)
}

func TestExecutorDistribution(t *testing.T) {
ctx := context.Background()
nitems := 10
nthreads := 9

e := NewThreadPoolExecutor(nthreads)

activeThreads := make([]bool, nthreads)
var mu sync.Mutex // Note: sync needs to be imported if not already, but wait, looking at imports...

err := e.Execute(ctx, nitems, func(ctx context.Context, thread_id int, start, end int) error {
mu.Lock()
activeThreads[thread_id] = true
mu.Unlock()
return nil
})

require.NoError(t, err)

count := 0
for _, active := range activeThreads {
if active {
count++
}
}

require.Equal(t, 9, count)
}
4 changes: 4 additions & 0 deletions pkg/common/util/unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func UnsafeSliceCast[B any, From []A, A any, To []B](from From) To {
func UnsafeUintptr[P *T, T any](p P) uintptr {
return uintptr(unsafe.Pointer(p))
}

func UnsafePointer[P *T, T any](p P) unsafe.Pointer {
return unsafe.Pointer(p)
}
10 changes: 10 additions & 0 deletions pkg/container/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,16 @@ func (t Type) DescString() string {
return t.Oid.String()
}

func (t Type) GetArrayElementSize() int {
switch t.Oid {
case T_array_float32:
return 4
case T_array_float64:
return 8
}
panic(moerr.NewInternalErrorNoCtx(fmt.Sprintf("unknown array type %d", t)))
}

func (t Type) Eq(b Type) bool {
switch t.Oid {
// XXX need to find out why these types have different size/width
Expand Down
Loading
Loading