Skip to content

Commit 91b1f23

Browse files
gregns1bbrkstorcolvin
committed
CBG-3212: add api to fetch a document by its CV value (#6579)
* CBG-3212: add api to fetch a document by its CV value * test fix * rebased SourceAndVersion -> Version rename * Update currentRevChannels on CV revcache load and doc.updateChannels * fix spelling * Remove currentRevChannels * Move common GetRev/GetCV work into documentRevisionForRequest function * Pass revision.RevID into authorizeUserForChannels * Update db/crud.go Co-authored-by: Tor Colvin <[email protected]> --------- Co-authored-by: Ben Brooks <[email protected]> Co-authored-by: Tor Colvin <[email protected]>
1 parent 7ee3ed3 commit 91b1f23

9 files changed

+252
-43
lines changed

db/crud.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
314314
// No rev ID given, so load active revision
315315
revision, err = db.revisionCache.GetActive(ctx, docid)
316316
}
317-
318317
if err != nil {
319318
return DocumentRevision{}, err
320319
}
321320

321+
return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
322+
}
323+
324+
// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
325+
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
326+
// ensure only one of cv or revID is specified
327+
if cv != nil && revID != nil {
328+
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
329+
}
330+
var requestedVersion string
331+
if revID != nil {
332+
requestedVersion = *revID
333+
} else if cv != nil {
334+
requestedVersion = cv.String()
335+
}
336+
322337
if revision.BodyBytes == nil {
323338
if db.ForceAPIForbiddenErrors() {
324-
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
339+
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
325340
return DocumentRevision{}, ErrForbidden
326341
}
327342
return DocumentRevision{}, ErrMissing
@@ -340,16 +355,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
340355
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
341356
}
342357

343-
isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
358+
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
344359
if !isAuthorized {
345-
if revid == "" {
360+
// client just wanted active revision, not a specific one
361+
if requestedVersion == "" {
346362
return DocumentRevision{}, ErrForbidden
347363
}
348364
if db.ForceAPIForbiddenErrors() {
349-
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
365+
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
350366
return DocumentRevision{}, ErrForbidden
351367
}
352-
return redactedRev, nil
368+
return redactedRevision, nil
353369
}
354370

355371
// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
@@ -358,13 +374,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
358374
return DocumentRevision{}, ErrMissing
359375
}
360376

361-
if revision.Deleted && revid == "" {
377+
if revision.Deleted && requestedVersion == "" {
362378
return DocumentRevision{}, ErrDeleted
363379
}
364380

365381
return revision, nil
366382
}
367383

384+
func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
385+
if cv != nil {
386+
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, RevCacheOmitDelta)
387+
} else {
388+
revision, err = db.revisionCache.GetActive(ctx, docid)
389+
}
390+
if err != nil {
391+
return DocumentRevision{}, err
392+
}
393+
394+
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
395+
}
396+
368397
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
369398
// returns nil.
370399
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
@@ -396,7 +425,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
396425
if fromRevision.Delta != nil {
397426
if fromRevision.Delta.ToRevID == toRevID {
398427

399-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
428+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
400429
if !isAuthorized {
401430
return nil, &redactedBody, nil
402431
}
@@ -419,7 +448,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
419448
}
420449

421450
deleted := toRevision.Deleted
422-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
451+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
423452
if !isAuthorized {
424453
return nil, &redactedBody, nil
425454
}
@@ -478,7 +507,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
478507
return nil, nil, nil
479508
}
480509

481-
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
510+
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
482511

483512
if col.user != nil {
484513
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
@@ -490,6 +519,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
490519
RevID: revID,
491520
History: history,
492521
Deleted: isDeleted,
522+
CV: cv,
493523
}
494524
if isDeleted {
495525
// Deletions are denoted by the deleted message property during 2.x replication
@@ -1045,7 +1075,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
10451075
if existingDoc != nil {
10461076
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
10471077
if unmarshalErr != nil {
1048-
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
1078+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
10491079
}
10501080
matchRev = doc.CurrentRev
10511081
}

db/crud_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
sgbucket "github.com/couchbase/sg-bucket"
2222
"github.com/couchbase/sync_gateway/base"
23+
"github.com/couchbase/sync_gateway/channels"
2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526
)
@@ -1953,3 +1954,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
19531954
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
19541955
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
19551956
}
1957+
1958+
// TestGetCVWithDocResidentInCache:
1959+
// - Two test cases, one with doc a user will have access to, one without
1960+
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
1961+
// - Assert that the doc the user has access to is corrected fetched
1962+
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
1963+
func TestGetCVWithDocResidentInCache(t *testing.T) {
1964+
const docID = "doc1"
1965+
1966+
testCases := []struct {
1967+
name string
1968+
docChannels []string
1969+
access bool
1970+
}{
1971+
{
1972+
name: "getCVWithUserAccess",
1973+
docChannels: []string{"A"},
1974+
access: true,
1975+
},
1976+
{
1977+
name: "getCVWithoutUserAccess",
1978+
docChannels: []string{"B"},
1979+
access: false,
1980+
},
1981+
}
1982+
for _, testCase := range testCases {
1983+
t.Run(testCase.name, func(t *testing.T) {
1984+
db, ctx := setupTestDB(t)
1985+
defer db.Close(ctx)
1986+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
1987+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1988+
1989+
// Create a user with access to channel A
1990+
authenticator := db.Authenticator(base.TestCtx(t))
1991+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1992+
require.NoError(t, err)
1993+
require.NoError(t, authenticator.Save(user))
1994+
collection.user, err = authenticator.GetUser("alice")
1995+
require.NoError(t, err)
1996+
1997+
// create doc with the channels for the test case
1998+
docBody := Body{"channels": testCase.docChannels}
1999+
rev, doc, err := collection.Put(ctx, docID, docBody)
2000+
require.NoError(t, err)
2001+
2002+
vrs := doc.HLV.Version
2003+
src := doc.HLV.SourceID
2004+
sv := &Version{Value: vrs, SourceID: src}
2005+
revision, err := collection.GetCV(ctx, docID, sv, true)
2006+
require.NoError(t, err)
2007+
if testCase.access {
2008+
assert.Equal(t, rev, revision.RevID)
2009+
assert.Equal(t, sv, revision.CV)
2010+
assert.Equal(t, docID, revision.DocID)
2011+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2012+
} else {
2013+
assert.Equal(t, rev, revision.RevID)
2014+
assert.Equal(t, sv, revision.CV)
2015+
assert.Equal(t, docID, revision.DocID)
2016+
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
2017+
}
2018+
})
2019+
}
2020+
}
2021+
2022+
// TestGetByCVForDocNotResidentInCache:
2023+
// - Setup db with rev cache size of 1
2024+
// - Put two docs forcing eviction of the first doc
2025+
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
2026+
// - Assert the doc revision fetched is correct to the first doc we created
2027+
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
2028+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
2029+
RevisionCacheOptions: &RevisionCacheOptions{
2030+
Size: 1,
2031+
},
2032+
})
2033+
defer db.Close(ctx)
2034+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
2035+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
2036+
2037+
// Create a user with access to channel A
2038+
authenticator := db.Authenticator(base.TestCtx(t))
2039+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
2040+
require.NoError(t, err)
2041+
require.NoError(t, authenticator.Save(user))
2042+
collection.user, err = authenticator.GetUser("alice")
2043+
require.NoError(t, err)
2044+
2045+
const (
2046+
doc1ID = "doc1"
2047+
doc2ID = "doc2"
2048+
)
2049+
2050+
revBody := Body{"channels": []string{"A"}}
2051+
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
2052+
require.NoError(t, err)
2053+
2054+
// put another doc that should evict first doc from cache
2055+
_, _, err = collection.Put(ctx, doc2ID, revBody)
2056+
require.NoError(t, err)
2057+
2058+
// get by CV should force a load from bucket and have a cache miss
2059+
vrs := doc.HLV.Version
2060+
src := doc.HLV.SourceID
2061+
sv := &Version{Value: vrs, SourceID: src}
2062+
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
2063+
require.NoError(t, err)
2064+
2065+
// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
2066+
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
2067+
assert.Equal(t, rev, revision.RevID)
2068+
assert.Equal(t, sv, revision.CV)
2069+
assert.Equal(t, doc1ID, revision.DocID)
2070+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2071+
}
2072+
2073+
// TestGetCVActivePathway:
2074+
// - Two test cases, one with doc a user will have access to, one without
2075+
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
2076+
// - Assert doc that is created is fetched correctly when user has access to doc
2077+
// - Assert that correct error is returned when user has no access to the doc
2078+
func TestGetCVActivePathway(t *testing.T) {
2079+
const docID = "doc1"
2080+
2081+
testCases := []struct {
2082+
name string
2083+
docChannels []string
2084+
access bool
2085+
}{
2086+
{
2087+
name: "activeFetchWithUserAccess",
2088+
docChannels: []string{"A"},
2089+
access: true,
2090+
},
2091+
{
2092+
name: "activeFetchWithoutUserAccess",
2093+
docChannels: []string{"B"},
2094+
access: false,
2095+
},
2096+
}
2097+
for _, testCase := range testCases {
2098+
t.Run(testCase.name, func(t *testing.T) {
2099+
db, ctx := setupTestDB(t)
2100+
defer db.Close(ctx)
2101+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
2102+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
2103+
2104+
// Create a user with access to channel A
2105+
authenticator := db.Authenticator(base.TestCtx(t))
2106+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
2107+
require.NoError(t, err)
2108+
require.NoError(t, authenticator.Save(user))
2109+
collection.user, err = authenticator.GetUser("alice")
2110+
require.NoError(t, err)
2111+
2112+
// test get active path by specifying nil cv
2113+
revBody := Body{"channels": testCase.docChannels}
2114+
rev, doc, err := collection.Put(ctx, docID, revBody)
2115+
require.NoError(t, err)
2116+
revision, err := collection.GetCV(ctx, docID, nil, true)
2117+
2118+
if testCase.access == true {
2119+
require.NoError(t, err)
2120+
vrs := doc.HLV.Version
2121+
src := doc.HLV.SourceID
2122+
sv := &Version{Value: vrs, SourceID: src}
2123+
assert.Equal(t, rev, revision.RevID)
2124+
assert.Equal(t, sv, revision.CV)
2125+
assert.Equal(t, docID, revision.DocID)
2126+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2127+
} else {
2128+
require.Error(t, err)
2129+
assert.ErrorContains(t, err, ErrForbidden.Error())
2130+
assert.Equal(t, DocumentRevision{}, revision)
2131+
}
2132+
})
2133+
}
2134+
}

db/database_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1850,7 +1850,7 @@ func TestChannelQuery(t *testing.T) {
18501850

18511851
db, ctx := setupTestDB(t)
18521852
defer db.Close(ctx)
1853-
collection := GetSingleDatabaseCollectionWithUser(t, db)
1853+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
18541854
_, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) {
18551855
channel(doc.channels);
18561856
}`)

db/document.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,17 @@ type SyncData struct {
103103
removedRevisionBodyKeys map[string]string // keys of non-winning revisions that have been removed (and so may require deletion), indexed by revID
104104
}
105105

106+
// determine set of current channels based on removal entries.
107+
func (sd *SyncData) getCurrentChannels() base.Set {
108+
ch := base.SetOf()
109+
for channelName, channelRemoval := range sd.Channels {
110+
if channelRemoval == nil || channelRemoval.Seq == 0 {
111+
ch.Add(channelName)
112+
}
113+
}
114+
return ch
115+
}
116+
106117
func (sd *SyncData) HashRedact(salt string) SyncData {
107118

108119
// Creating a new SyncData with the redacted info. We copy all the information which stays the same and create new
@@ -183,12 +194,11 @@ type Document struct {
183194
rawUserXattr []byte // Raw user xattr as retrieved from the bucket
184195
metadataOnlyUpdate *MetadataOnlyUpdate // Contents of _mou xattr, marshalled/unmarshalled with document from xattrs
185196

186-
Deleted bool
187-
DocExpiry uint32
188-
RevID string
189-
DocAttachments AttachmentsMeta
190-
inlineSyncData bool
191-
currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time)
197+
Deleted bool
198+
DocExpiry uint32
199+
RevID string
200+
DocAttachments AttachmentsMeta
201+
inlineSyncData bool
192202
}
193203

194204
type historyOnlySyncData struct {
@@ -917,7 +927,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (
917927
doc.updateChannelHistory(channel, doc.Sequence, true)
918928
}
919929
}
920-
doc.currentRevChannels = newChannels
921930
if changed != nil {
922931
base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels))
923932
changedChannels, err = channels.SetFromArray(changed, channels.KeepStar)
@@ -1027,17 +1036,6 @@ func (doc *Document) UnmarshalJSON(data []byte) error {
10271036
doc.SyncData = *syncData.SyncData
10281037
}
10291038

1030-
// determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time)
1031-
if len(doc.Channels) > 0 {
1032-
ch := base.SetOf()
1033-
for channelName, channelRemoval := range doc.Channels {
1034-
if channelRemoval == nil || channelRemoval.Seq == 0 {
1035-
ch.Add(channelName)
1036-
}
1037-
}
1038-
doc.currentRevChannels = ch
1039-
}
1040-
10411039
// Unmarshal the rest of the doc body as map[string]interface{}
10421040
if err := doc._body.Unmarshal(data); err != nil {
10431041
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))

0 commit comments

Comments
 (0)