diff --git a/dql/parser.go b/dql/parser.go index 354126f95ef..326d3e6fc55 100644 --- a/dql/parser.go +++ b/dql/parser.go @@ -32,12 +32,13 @@ import ( ) const ( - uidFunc = "uid" - valueFunc = "val" - typFunc = "type" - lenFunc = "len" - countFunc = "count" - uidInFunc = "uid_in" + uidFunc = "uid" + valueFunc = "val" + typFunc = "type" + lenFunc = "len" + countFunc = "count" + uidInFunc = "uid_in" + similarToFn = "similar_to" ) var ( @@ -1621,7 +1622,7 @@ func validFuncName(name string) bool { switch name { case "regexp", "anyofterms", "allofterms", "alloftext", "anyoftext", - "has", "uid", "uid_in", "anyof", "allof", "type", "match": + "has", "uid", "uid_in", "anyof", "allof", "type", "match", "similar_to": return true } return false @@ -1794,7 +1795,7 @@ L: case IsInequalityFn(function.Name): err = parseFuncArgs(it, function) - case function.Name == "uid_in": + case function.Name == "uid_in" || function.Name == "similar_to": err = parseFuncArgs(it, function) default: diff --git a/go.mod b/go.mod index 4eccd936ab2..05750394263 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,9 @@ require ( github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 github.com/dgraph-io/ristretto v0.1.1 github.com/dgraph-io/simdjson-go v0.3.0 + github.com/dgraph-io/vector_indexer v0.0.7-beta + github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1 github.com/docker/docker v24.0.5+incompatible diff --git a/posting/heap.go b/posting/heap.go new file mode 100644 index 00000000000..dae4673a226 --- /dev/null +++ b/posting/heap.go @@ -0,0 +1,52 @@ +package posting + +import ( + "container/heap" +) + +type minBadgerHeapElement struct { + value float64 + index uint64 +} + +func initBadgerHeapElement(val float64, i uint64) *minBadgerHeapElement { + return &minBadgerHeapElement{ + value: val, + index: i, + } +} + +type minBadgerTupleHeap []minBadgerHeapElement + +func (h minBadgerTupleHeap) Len() int { + return len(h) +} + +func (h minBadgerTupleHeap) Less(i, j int) bool { + return h[i].value < h[j].value +} + +func (h minBadgerTupleHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minBadgerTupleHeap) Push(x interface{}) { + *h = append(*h, x.(minBadgerHeapElement)) +} + +func (h *minBadgerTupleHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +// Time: O(n) +func buildBadgerHeapByInit(array []minBadgerHeapElement) *minBadgerTupleHeap { + // initialize the MinTupleHeap that has implement the heap.Interface + minBadgerTupleHeap := &minBadgerTupleHeap{} + *minBadgerTupleHeap = array + heap.Init(minBadgerTupleHeap) + return minBadgerTupleHeap +} diff --git a/posting/helper.go b/posting/helper.go new file mode 100644 index 00000000000..13d3b119487 --- /dev/null +++ b/posting/helper.go @@ -0,0 +1,100 @@ +package posting + +import ( + "errors" + "math" + "sort" +) + +func norm(v []float64) float64 { + vectorNorm, _ := dotProduct(v, v) + return math.Sqrt(vectorNorm) +} + +func dotProduct(a, b []float64) (float64, error) { + var dotProduct float64 + if len(a) != len(b) { + err := errors.New("can not compute dot product on vectors of different lengths") + return dotProduct, err + } + for i := range a { + dotProduct += a[i] * b[i] + } + return dotProduct, nil +} + +func euclidianDistance(a, b []float64) (float64, error) { + subtractResult := make([]float64, len(a)) + err := vectorSubtract(a, b, subtractResult) + return norm(subtractResult), err +} + +func cosineSimilarity(a, b []float64) (float64, error) { + dotProd, err := dotProduct(a, b) + if err != nil { + return 0, err + } + if norm(a) == 0 || norm(b) == 0 { + err := errors.New("can not compute cosine similarity on zero vector") + return 0, err + } + return dotProd / (norm(a) * norm(b)), nil +} + +func max(a, b int) int { + if a < b { + return b + } + return a +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func vectorAdd(a, b, result []float64) error { + if len(a) != len(b) { + return errors.New("can not add vectors of different lengths") + } + if len(a) != len(result) { + return errors.New("result and operand vectors must be same length") + } + for i := range a { + result[i] = a[i] + b[i] + } + return nil +} + +func vectorSubtract(a, b, result []float64) error { + if len(a) != len(b) { + return errors.New("can not subtract vectors of different lengths") + } + if len(a) != len(result) { + return errors.New("result and operand vectors must be same length") + } + for i := range a { + result[i] = a[i] - b[i] + } + return nil +} + +// Used for distance, since shorter distance is better +func insortBadgerHeapAscending(slice []minBadgerHeapElement, val minBadgerHeapElement) []minBadgerHeapElement { + i := sort.Search(len(slice), func(i int) bool { return slice[i].value > val.value }) + slice = append(slice, *initBadgerHeapElement(0.0, 0)) + copy(slice[i+1:], slice[i:]) + slice[i] = val + return slice +} + +// Used for cosine similarity, since higher similarity score is better +func insortBadgerHeapDescending(slice []minBadgerHeapElement, val minBadgerHeapElement) []minBadgerHeapElement { + i := sort.Search(len(slice), func(i int) bool { return slice[i].value > val.value }) + slice = append(slice, *initBadgerHeapElement(0.0, 0)) + copy(slice[i+1:], slice[i:]) + slice[i] = val + return slice +} diff --git a/posting/index.go b/posting/index.go index e21d4b6c975..315cf2ae79c 100644 --- a/posting/index.go +++ b/posting/index.go @@ -127,6 +127,12 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok if err = plist.addMutation(ctx, txn, edge); err != nil { return err } + if edge.Attr == "0-profile" { // change to checking for vector type do get on attr + InsertToBadger(ctx, txn, edge.ValueId, edge.Attr, 5, 3, 12) // use ctx.Value to access current vector GENIUS + } + // if edge.ValueType == pb.Posting_VFLOAT { + // InsertToBadger(ctx, plist, txn, edge.ValueId, edge.Attr, 5, 3, 12) + // } ostats.Record(ctx, x.NumEdges.M(1)) return nil } diff --git a/posting/vector_index.go b/posting/vector_index.go new file mode 100644 index 00000000000..93ff4e0c6ee --- /dev/null +++ b/posting/vector_index.go @@ -0,0 +1,303 @@ +package posting + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "math" + "math/rand" + + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/vector_indexer/index" +) + +type FloatVector []float64 +type UintSlice []uint64 + +func getInsertLayer(maxNeighbors, maxLevels int) int { + // multFactor is a multiplicative factor used to normalize the distribution + var level int + randFloat := rand.Float64() + for i := 0; i < maxLevels; i++ { + // calculate level based on section 3.1 here + if randFloat < math.Pow(1.0/float64(maxNeighbors), float64(maxLevels-1-i)) { + level = i + break + } + } + return level +} + +func searchBadgerLayer(cache *LocalCache, txn *Txn, isInsert bool, pred string, level int, entry uint64, query []float64, expectedNeighbors int, filter index.SearchFilter) ([]minBadgerHeapElement, map[minBadgerHeapElement]bool, error) { + var nns []minBadgerHeapElement // track nearest neighbors to return + var visited map[minBadgerHeapElement]bool // track all visited elements to lock on insert mutation + entryKey := x.DataKey(pred, entry) + var pl *List + var err error + // insert and query have two diff methods of accessing cache so use isInsert flag to keep track + if isInsert { + pl, err = txn.Get(entryKey) + } else { + pl, err = cache.Get(entryKey) + } + data, err := pl.Value(txn.StartTs) + if err != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, err + } + var startVec *[]float64 + unmarshalErr := json.Unmarshal(data.Value.([]byte), startVec) + if unmarshalErr != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, unmarshalErr + } + // startVec := BytesAsFloatArray(data) //from vfloat type code not pushed yet + bestDist, err := euclidianDistance(*startVec, query) + if err != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, err + } + best := minBadgerHeapElement{ + value: bestDist, + index: entry, + } + nns = []minBadgerHeapElement{best} + //create set using map to append to on future visited nodes + visited = map[minBadgerHeapElement]bool{best: true} + candidateHeap := *buildBadgerHeapByInit([]minBadgerHeapElement{best}) + + for candidateHeap.Len() != 0 { + currCandidate := candidateHeap.Pop().(minBadgerHeapElement) + if nns[len(nns)-1].value < currCandidate.value { + break + } + + candidateKey := x.DataKey(pred+"_vector_"+fmt.Sprint(level), currCandidate.index) + + var pl *List + var err error + if isInsert { + pl, err = txn.Get(candidateKey) + } else { + pl, err = cache.Get(candidateKey) + } + data, err := pl.Value(txn.StartTs) + if err != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, err + } + eVecs := [][]float64{} + var edges *[]uint64 + unmarshalErr := json.Unmarshal(data.Value.([]byte), edges) + if unmarshalErr != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, unmarshalErr + } + for _, edge := range *edges { + key := x.DataKey(pred, edge) + var pl *List + var err error + if isInsert { + pl, err = txn.Get(key) + } else { + pl, err = cache.Get(key) + } + data, err := pl.Value(txn.StartTs) + if err != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, err + } + var eVec *[]float64 + unmarshalErr := json.Unmarshal(data.Value.([]byte), eVec) + if unmarshalErr != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, unmarshalErr + } + eVecs = append(eVecs, *eVec) + } + for i := range *edges { + currDist, err := euclidianDistance(eVecs[i], query) // iterate over candidate's neighbors distances to get best ones + if err != nil { + return []minBadgerHeapElement{}, map[minBadgerHeapElement]bool{}, err + } + edgesDeref := *edges + currElement := initBadgerHeapElement(currDist, edgesDeref[i]) + _, nodeExists := visited[*currElement] + if !nodeExists { + visited[*currElement] = true + + // push only better vectors that pass filter into candidate heap and add to nearest neighbors + if filter(query, eVecs[i], edgesDeref[i]) && (currDist < nns[len(nns)-1].value || len(nns) < expectedNeighbors) { + candidateHeap.Push(*currElement) + nns = insortBadgerHeapAscending(nns, *currElement) + if len(nns) > expectedNeighbors { + nns = nns[:len(nns)-1] + } + } + + } + } + } + + return nns, visited, nil +} + +func newBadgerEdgeKeyValueEntry(ctx context.Context, plist *List, txn *Txn, pred string, level int, uuid uint64, edges []byte) error { + edge := &pb.DirectedEdge{ + Entity: uuid, + Attr: pred + "_vector_" + fmt.Sprint(level), + Value: edges, + ValueType: pb.Posting_ValType(0), + Op: pb.DirectedEdge_SET, + } + if err := plist.addMutation(ctx, txn, edge); err != nil { + return err + } + return nil +} + +func entryUuidInsert(ctx context.Context, plist *List, txn *Txn, pred string, entryUuid []byte) error { + edge := &pb.DirectedEdge{ + Entity: 1, + Attr: pred + "_vector_entry", + Value: entryUuid, + ValueType: pb.Posting_ValType(7), + Op: pb.DirectedEdge_SET, + } + if err := plist.addMutation(ctx, txn, edge); err != nil { + return err + } + return nil +} + +func InsertToBadger(ctx context.Context, txn *Txn, inUuid uint64, pred string, maxLevels int, maxNeighbors int, efConstruction int) (map[minBadgerHeapElement]bool, error) { + // str := pred + "_vector_" + fmt.Sprint(maxLevels-1) + // duplicateCheckKey := x.DataKey(str, inUuid) + // dup, dupErr := txn.Get(duplicateCheckKey) + // if dupErr == nil && dup == nil { + // return map[minBadgerHeapElement]bool{}, nil + // } + + entryKey := x.DataKey(pred+"_vector_entry", 1) // 0-profile_vector_entry + pl, err := txn.Get(entryKey) + if err != nil { + fmt.Println("oh ok") + } + data, valErr := pl.Value(txn.StartTs) + // if valErr != nil { + // return map[minBadgerHeapElement]bool{}, valErr + // } + if valErr != nil { + // if valErr.Error() == "No value found" { + // no entries in vector index yet b/c no entry exists, so put in all levels + for i := 0; i < maxLevels; i++ { + key := x.DataKey(pred+"_vector_"+fmt.Sprint(i), inUuid) + pl, err := txn.Get(key) + if err != nil { + return map[minBadgerHeapElement]bool{}, err + } + newBadgerEdgeKeyValueEntry(ctx, pl, txn, pred, i, inUuid, []byte{}) + } + inUuidByte := make([]byte, 8) + binary.BigEndian.PutUint64(inUuidByte, inUuid) // convert inUuid to bytes + entryUuidInsert(ctx, pl, txn, pred, inUuidByte) // add inUuid as entry for this structure from now on + return map[minBadgerHeapElement]bool{}, nil + } + entry := binary.BigEndian.Uint64(data.Value.([]byte)) // convert entry Uuid returned from Get to uint64 + if entry == inUuid { // something interesting is you physically cannot add duplicate nodes, it'll just overwrite w the same info + // only situation where you can add duplicate nodes is if youre mutation adds the same node as entry + return map[minBadgerHeapElement]bool{}, nil + } + + inLevel := getInsertLayer(maxNeighbors, maxLevels) // calculate layer to insert node at (randomized every time) + var startVecs []minBadgerHeapElement // vectors used to calc where to start up until inLevel + var nns []minBadgerHeapElement // nearest neighbors to return after + var visited map[minBadgerHeapElement]bool // visited nodes to use later to lock them? TODO + var inVec *[]float64 + var layerErr error + for level := 0; level < maxLevels; level++ { + // perform insertion for layers [level, max_level) only, when level < inLevel just find better start + if level < inLevel { + key := x.DataKey(pred, inUuid) + pl, err := txn.Get(key) + data, err := pl.Value(txn.StartTs) // Reading this pl doesnt work...? + if err != nil { + return map[minBadgerHeapElement]bool{}, err + } + unmarshalErr := json.Unmarshal(data.Value.([]byte), inVec) // retrieve vector from inUuid save as inVec + if unmarshalErr != nil { + return map[minBadgerHeapElement]bool{}, unmarshalErr + } + startVecs, visited, err = searchBadgerLayer(nil, txn, true, pred, level, entry, *inVec, 1, index.AcceptAll) + if err != nil { + return map[minBadgerHeapElement]bool{}, err + } + entry = startVecs[0].index // update entry to best uuid from current level + } else { + nns, visited, layerErr = searchBadgerLayer(nil, txn, true, pred, level, entry, *inVec, efConstruction, index.AcceptAll) + if layerErr != nil { + return map[minBadgerHeapElement]bool{}, layerErr + } + outboundEdges := []uint64{} + for i := 0; i < min(len(nns), maxNeighbors); i++ { // iterate over nns at this layer to approx find what to add as edges + // key := pred + "_vector_" + fmt.Sprint(level) + "_" + fmt.Sprint(nns[i].index) + key := x.DataKey(pred+"_vector_"+fmt.Sprint(level), nns[i].index) + pl, err := txn.Get(key) + data, err := pl.Value(txn.StartTs) + if err != nil { + return map[minBadgerHeapElement]bool{}, err + } + var nnEdges *[]uint64 + unmarshalErr := json.Unmarshal(data.Value.([]byte), nnEdges) // edges of nearest neighbor + if unmarshalErr != nil { + return map[minBadgerHeapElement]bool{}, unmarshalErr + } + nnEdgesDeref := *nnEdges + if len(nnEdgesDeref) < maxNeighbors { // check if # of nn edges are up to maximum. If < max, append, otherwise replace last edge w in Uuid + nnEdgesDeref = append(nnEdgesDeref, inUuid) + } else { + nnEdgesDeref[len(nnEdgesDeref)-1] = inUuid + } + inboundEdgesBytes, marshalErr := json.Marshal(nnEdgesDeref) + if marshalErr != nil { + return map[minBadgerHeapElement]bool{}, marshalErr + } + newBadgerEdgeKeyValueEntry(ctx, pl, txn, pred, level, nns[i].index, inboundEdgesBytes) // This is only supposed to update existing key value pair, is this okay? + outboundEdges = append(outboundEdges, nns[i].index) // add nn to outboundEdges + } + outboundEdgesBytes, marshalErr := json.Marshal(outboundEdges) + if marshalErr != nil { + return map[minBadgerHeapElement]bool{}, marshalErr + } + key := x.DataKey(pred+"_vector_"+fmt.Sprint(level), inUuid) + pl, err := txn.Get(key) + if err != nil { + return map[minBadgerHeapElement]bool{}, err + } + newBadgerEdgeKeyValueEntry(ctx, pl, txn, pred, level, inUuid, outboundEdgesBytes) // add outboundEdges as value to inUuid key + } + } + return visited, nil +} + +func Search(cache *LocalCache, query []float64, maxLevels int, pred string, entry uint64, maxResults int, efSearch int, filter index.SearchFilter) ([]uint64, error) { + + for level := 0; level < maxLevels; level++ { + currBestNns, _, err := searchBadgerLayer(cache, nil, false, pred, level, entry, query, efSearch, index.AcceptAll) + if err != nil { + return []uint64{}, err + } + entry = currBestNns[0].index + } + nn_vals, _, err := searchBadgerLayer(cache, nil, false, pred, maxLevels-1, entry, query, efSearch, filter) + if err != nil { + return []uint64{}, err + } + var nn_uids []uint64 + for _, nn_val := range nn_vals { + nn_uids = append(nn_uids, nn_val.index) + } + return nn_uids, nil +} + +//need Plist for each mutation maxLevel # of posting list +// uid: 0x1 attr: 0-profile_vector_1 plist1 +// uid: 0x2 attr: 0-profile_vector_1 plist2 + +// uid: 0x1 attr: 0-profile_vector_2 plist3 +// uid: 0x2 attr: 0-profile_vector_2 plist4 diff --git a/query/query.go b/query/query.go index 9ea5c015efa..a1cc0473081 100644 --- a/query/query.go +++ b/query/query.go @@ -2645,7 +2645,7 @@ func isValidArg(a string) bool { func isValidFuncName(f string) bool { switch f { case "anyofterms", "allofterms", "val", "regexp", "anyoftext", "alloftext", - "has", "uid", "uid_in", "anyof", "allof", "type", "match": + "has", "uid", "uid_in", "anyof", "allof", "type", "match", "similar_to": return true } return isInequalityFn(f) || types.IsGeoFunc(f) diff --git a/worker/task.go b/worker/task.go index 8c73c58b606..2f1ef11d93f 100644 --- a/worker/task.go +++ b/worker/task.go @@ -19,6 +19,7 @@ package worker import ( "bytes" "context" + "fmt" "sort" "strconv" "strings" @@ -44,6 +45,7 @@ import ( "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/vector_indexer/index" ) func invokeNetworkRequest(ctx context.Context, addr string, @@ -220,6 +222,7 @@ const ( uidInFn customIndexFn matchFn + similarToFn standardFn = 100 ) @@ -258,6 +261,8 @@ func parseFuncTypeHelper(name string) (FuncType, string) { return hasFn, f case "uid_in": return uidInFn, f + case "similar_to": + return similarToFn, f case "anyof", "allof": return customIndexFn, f case "match": @@ -282,6 +287,8 @@ func needsIndex(fnType FuncType, uidList *pb.List) bool { return true case geoFn, fullTextSearchFn, standardFn, matchFn: return true + case similarToFn: + return true } return false } @@ -317,7 +324,7 @@ func (srcFn *functionContext) needsValuePostings(typ types.TypeID) (bool, error) case uidInFn, compareScalarFn: // Operate on uid postings return false, nil - case notAFunction: + case notAFunction, similarToFn: return typ.IsScalar(), nil } return false, errors.Errorf("Unhandled case in fetchValuePostings for fn: %s", srcFn.fname) @@ -341,11 +348,39 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er } switch srcFn.fnType { - case notAFunction, aggregatorFn, passwordFn, compareAttrFn: + case notAFunction, aggregatorFn, passwordFn, compareAttrFn, similarToFn: default: return errors.Errorf("Unhandled function in handleValuePostings: %s", srcFn.fname) } + if srcFn.fnType == similarToFn { + + numNeighbors, err := strconv.ParseInt(q.SrcFunc.Args[0], 10, 32) + if err != nil { + return fmt.Errorf("invalid value for number of neighbors: %s", q.SrcFunc.Args[0]) + } + // TODO: get entry from badger by accessing key predicate_entry_uuid, converting to uint64 + //TODO: get maxLevels from schema, filter, etc. + nn_uids, err := posting.Search(qs.cache, srcFn.vectorInfo, 5, args.q.Attr, 1, int(numNeighbors), 12, index.AcceptAll) + if err != nil { + return err + } + // hnswVecIndex, err := manager.IndexMgr.Find(args.q.Attr) + // if err != nil { + // panic(err) + // } + // if hnswVecIndex == nil { + // return fmt.Errorf("Failed to find the vector index for %s", args.q.Attr) + // } + // nn_uids, err := hnswVecIndex.Search(srcFn.vectorInfo, int(numNeighbors), index.AcceptAll) + // fmt.Println(len(nn_uids)) + // if err != nil { + // panic(err) + // } + args.out.UidMatrix = append(args.out.UidMatrix, &pb.List{Uids: nn_uids}) + return nil + } + if srcFn.atype == types.PasswordID && srcFn.fnType != passwordFn { // Silently skip if the user is trying to fetch an attribute of type password. return nil @@ -1655,6 +1690,7 @@ type functionContext struct { isFuncAtRoot bool isStringFn bool atype types.TypeID + vectorInfo []float64 } const ( @@ -1912,6 +1948,19 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) { return nil, err } checkRoot(q, fc) + case similarToFn: + str_vec := strings.Split(q.SrcFunc.Args[1], ",") + for _, arg := range str_vec { + vec_val, err := strconv.ParseFloat(strings.TrimSpace(arg), 64) + if err != nil { + if e, ok := err.(*strconv.NumError); ok && e.Err == strconv.ErrSyntax { + return nil, errors.Errorf("Value %q in %s is not a number", + arg, q.SrcFunc.Name) + } + return nil, err + } + fc.vectorInfo = append(fc.vectorInfo, vec_val) + } case uidInFn: for _, arg := range q.SrcFunc.Args { uidParsed, err := strconv.ParseUint(arg, 0, 64)