Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit 8e9bc92

Browse files
committed
WIP add importfrags testing command
this is basically working, but mostly untested
1 parent ad53edf commit 8e9bc92

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed

importfrags/main.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io/ioutil"
7+
"log"
8+
"math/rand"
9+
"os"
10+
"path/filepath"
11+
"strconv"
12+
"time"
13+
14+
"github.com/golang/protobuf/proto"
15+
"golang.org/x/sync/errgroup"
16+
17+
"github.com/jaffee/commandeer/cobrafy"
18+
"github.com/pilosa/go-pilosa"
19+
pbuf "github.com/pilosa/go-pilosa/gopilosa_pbuf"
20+
"github.com/pilosa/pilosa/roaring"
21+
"github.com/pkg/errors"
22+
)
23+
24+
type Main struct {
25+
Dir string `help:"Directory to walk looking for fragment data."`
26+
Index string
27+
Field string
28+
Workers int `help:"Number of worker goroutines to run."`
29+
Pilosa []string
30+
Shards uint64 `help:"Number of shards into which to ingest"`
31+
Duration time.Duration `help:"How long to run the import"`
32+
33+
shardNodes map[uint64][]pilosa.URI
34+
}
35+
36+
func NewMain() *Main {
37+
return &Main{
38+
Dir: "frags",
39+
Index: "fragtest",
40+
Field: "field",
41+
Workers: 8,
42+
Pilosa: []string{"localhost:10101"},
43+
Shards: 10,
44+
}
45+
}
46+
47+
func (m *Main) Run() error {
48+
fragments := make([]*roaring.Bitmap, 0)
49+
50+
// walk all files in directory structure and load the ones which are roaring bitmaps.
51+
err := filepath.Walk(m.Dir,
52+
func(path string, info os.FileInfo, err error) error {
53+
if err != nil {
54+
return errors.Wrap(err, "walk func")
55+
}
56+
if info.IsDir() {
57+
return nil
58+
}
59+
f, err := os.Open(path)
60+
data, err := ioutil.ReadAll(f)
61+
if err != nil {
62+
return errors.Wrap(err, "reading all")
63+
}
64+
bm := roaring.NewFileBitmap()
65+
err = bm.UnmarshalBinary(data)
66+
if err != nil {
67+
log.Printf("%s was not a valid roaring bitmap: %v", path, err)
68+
}
69+
fragments = append(fragments, bm)
70+
return nil
71+
})
72+
if err != nil {
73+
return errors.Wrap(err, "walking file path")
74+
}
75+
76+
if len(fragments) == 0 {
77+
return errors.New("no valid bitmaps found.")
78+
}
79+
fmt.Printf("found %d bitmap files\n", len(fragments))
80+
81+
client, err := pilosa.NewClient(m.Pilosa)
82+
if err != nil {
83+
return errors.Wrapf(err, "getting client for %v", m.Pilosa)
84+
}
85+
sch, err := client.Schema()
86+
if err != nil {
87+
return errors.Wrap(err, "getting schema")
88+
}
89+
idx := sch.Index(m.Index)
90+
idx.Field(m.Field)
91+
err = client.SyncSchema(sch)
92+
if err != nil {
93+
return errors.Wrap(err, "syncing schema")
94+
}
95+
96+
m.shardNodes, err = client.ShardNodes(m.Index, m.Shards)
97+
if err != nil {
98+
return errors.Wrap(err, "getting shard nodes")
99+
}
100+
101+
eg := errgroup.Group{}
102+
done := make(chan struct{})
103+
for i := 0; i < m.Workers; i++ {
104+
i := i
105+
eg.Go(func() error {
106+
return m.importWorker(i, client, fragments, done)
107+
})
108+
}
109+
if m.Duration > 0 {
110+
fmt.Println("sleeping")
111+
time.Sleep(m.Duration)
112+
close(done)
113+
}
114+
115+
return eg.Wait()
116+
}
117+
118+
func main() {
119+
err := cobrafy.Execute(NewMain())
120+
if err != nil {
121+
log.Fatal(err)
122+
}
123+
}
124+
125+
func (m *Main) importWorker(num int, client *pilosa.Client, fragments []*roaring.Bitmap, done chan struct{}) error {
126+
idx := num % len(fragments)
127+
path := fmt.Sprintf("/index/%s/field/%s/import-roaring/", m.Index, m.Field)
128+
headers := map[string]string{
129+
"Content-Type": "application/x-protobuf",
130+
"Accept": "application/x-protobuf",
131+
"PQL-Version": pilosa.PQLVersion,
132+
}
133+
for {
134+
shard := rand.Uint64() % m.Shards
135+
hosts, ok := m.shardNodes[shard]
136+
if !ok {
137+
panic("tried to get unknown shard")
138+
}
139+
140+
bitmap := fragments[idx]
141+
data := &bytes.Buffer{}
142+
bitmap.WriteTo(data)
143+
req := &pbuf.ImportRoaringRequest{
144+
Views: []*pbuf.ImportRoaringRequestView{{Data: data.Bytes()}},
145+
}
146+
r, err := proto.Marshal(req)
147+
if err != nil {
148+
return errors.Wrap(err, "marshaling request to protobuf")
149+
}
150+
151+
resp, err := client.ExperimentalDoRequest(&hosts[0], "POST", path+strconv.Itoa(int(shard)), headers, r)
152+
if err != nil {
153+
return errors.Wrap(err, "error doing request")
154+
}
155+
fmt.Println(resp)
156+
157+
idx = (idx + 1) % len(fragments)
158+
select {
159+
case <-done:
160+
return nil
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)