Skip to content

Commit 5de97a8

Browse files
gregns1bbrkstorcolvin
authored andcommitted
CBG-3212: add api to fetch a document by its CV value (#6579)
* CBG-3212: add api to fetch a document by its CV value * test fix * rebased SourceAndVersion -> Version rename * Update currentRevChannels on CV revcache load and doc.updateChannels * fix spelling * Remove currentRevChannels * Move common GetRev/GetCV work into documentRevisionForRequest function * Pass revision.RevID into authorizeUserForChannels * Update db/crud.go Co-authored-by: Tor Colvin <[email protected]> --------- Co-authored-by: Ben Brooks <[email protected]> Co-authored-by: Tor Colvin <[email protected]>
1 parent 40f738d commit 5de97a8

File tree

6 files changed

+247
-37
lines changed

6 files changed

+247
-37
lines changed

db/crud.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -311,14 +311,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
311311
// No rev ID given, so load active revision
312312
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
313313
}
314-
315314
if err != nil {
316315
return DocumentRevision{}, err
317316
}
318317

318+
return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
319+
}
320+
321+
// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
322+
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
323+
// ensure only one of cv or revID is specified
324+
if cv != nil && revID != nil {
325+
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
326+
}
327+
var requestedVersion string
328+
if revID != nil {
329+
requestedVersion = *revID
330+
} else if cv != nil {
331+
requestedVersion = cv.String()
332+
}
333+
319334
if revision.BodyBytes == nil {
320335
if db.ForceAPIForbiddenErrors() {
321-
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
336+
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
322337
return DocumentRevision{}, ErrForbidden
323338
}
324339
return DocumentRevision{}, ErrMissing
@@ -337,16 +352,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
337352
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
338353
}
339354

340-
isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
355+
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
341356
if !isAuthorized {
342-
if revid == "" {
357+
// client just wanted active revision, not a specific one
358+
if requestedVersion == "" {
343359
return DocumentRevision{}, ErrForbidden
344360
}
345361
if db.ForceAPIForbiddenErrors() {
346-
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
362+
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
347363
return DocumentRevision{}, ErrForbidden
348364
}
349-
return redactedRev, nil
365+
return redactedRevision, nil
350366
}
351367

352368
// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
@@ -355,13 +371,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
355371
return DocumentRevision{}, ErrMissing
356372
}
357373

358-
if revision.Deleted && revid == "" {
374+
if revision.Deleted && requestedVersion == "" {
359375
return DocumentRevision{}, ErrDeleted
360376
}
361377

362378
return revision, nil
363379
}
364380

381+
func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
382+
if cv != nil {
383+
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, includeBody, RevCacheOmitDelta)
384+
} else {
385+
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
386+
}
387+
if err != nil {
388+
return DocumentRevision{}, err
389+
}
390+
391+
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
392+
}
393+
365394
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
366395
// returns nil.
367396
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
@@ -393,7 +422,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
393422
if fromRevision.Delta != nil {
394423
if fromRevision.Delta.ToRevID == toRevID {
395424

396-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
425+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
397426
if !isAuthorized {
398427
return nil, &redactedBody, nil
399428
}
@@ -416,7 +445,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
416445
}
417446

418447
deleted := toRevision.Deleted
419-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
448+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
420449
if !isAuthorized {
421450
return nil, &redactedBody, nil
422451
}
@@ -475,7 +504,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
475504
return nil, nil, nil
476505
}
477506

478-
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
507+
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
479508

480509
if col.user != nil {
481510
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
@@ -487,6 +516,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
487516
RevID: revID,
488517
History: history,
489518
Deleted: isDeleted,
519+
CV: cv,
490520
}
491521
if isDeleted {
492522
// Deletions are denoted by the deleted message property during 2.x replication
@@ -1044,7 +1074,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
10441074
if existingDoc != nil {
10451075
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
10461076
if unmarshalErr != nil {
1047-
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
1077+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
10481078
}
10491079
matchRev = doc.CurrentRev
10501080
}

db/crud_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
sgbucket "github.com/couchbase/sg-bucket"
2222
"github.com/couchbase/sync_gateway/base"
23+
"github.com/couchbase/sync_gateway/channels"
2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526
)
@@ -1872,3 +1873,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
18721873
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
18731874
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
18741875
}
1876+
1877+
// TestGetCVWithDocResidentInCache:
1878+
// - Two test cases, one with doc a user will have access to, one without
1879+
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
1880+
// - Assert that the doc the user has access to is corrected fetched
1881+
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
1882+
func TestGetCVWithDocResidentInCache(t *testing.T) {
1883+
const docID = "doc1"
1884+
1885+
testCases := []struct {
1886+
name string
1887+
docChannels []string
1888+
access bool
1889+
}{
1890+
{
1891+
name: "getCVWithUserAccess",
1892+
docChannels: []string{"A"},
1893+
access: true,
1894+
},
1895+
{
1896+
name: "getCVWithoutUserAccess",
1897+
docChannels: []string{"B"},
1898+
access: false,
1899+
},
1900+
}
1901+
for _, testCase := range testCases {
1902+
t.Run(testCase.name, func(t *testing.T) {
1903+
db, ctx := setupTestDB(t)
1904+
defer db.Close(ctx)
1905+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1906+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1907+
1908+
// Create a user with access to channel A
1909+
authenticator := db.Authenticator(base.TestCtx(t))
1910+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1911+
require.NoError(t, err)
1912+
require.NoError(t, authenticator.Save(user))
1913+
collection.user, err = authenticator.GetUser("alice")
1914+
require.NoError(t, err)
1915+
1916+
// create doc with the channels for the test case
1917+
docBody := Body{"channels": testCase.docChannels}
1918+
rev, doc, err := collection.Put(ctx, docID, docBody)
1919+
require.NoError(t, err)
1920+
1921+
vrs := doc.HLV.Version
1922+
src := doc.HLV.SourceID
1923+
sv := &Version{Value: vrs, SourceID: src}
1924+
revision, err := collection.GetCV(ctx, docID, sv, true)
1925+
require.NoError(t, err)
1926+
if testCase.access {
1927+
assert.Equal(t, rev, revision.RevID)
1928+
assert.Equal(t, sv, revision.CV)
1929+
assert.Equal(t, docID, revision.DocID)
1930+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1931+
} else {
1932+
assert.Equal(t, rev, revision.RevID)
1933+
assert.Equal(t, sv, revision.CV)
1934+
assert.Equal(t, docID, revision.DocID)
1935+
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
1936+
}
1937+
})
1938+
}
1939+
}
1940+
1941+
// TestGetByCVForDocNotResidentInCache:
1942+
// - Setup db with rev cache size of 1
1943+
// - Put two docs forcing eviction of the first doc
1944+
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
1945+
// - Assert the doc revision fetched is correct to the first doc we created
1946+
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
1947+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
1948+
RevisionCacheOptions: &RevisionCacheOptions{
1949+
Size: 1,
1950+
},
1951+
})
1952+
defer db.Close(ctx)
1953+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1954+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1955+
1956+
// Create a user with access to channel A
1957+
authenticator := db.Authenticator(base.TestCtx(t))
1958+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1959+
require.NoError(t, err)
1960+
require.NoError(t, authenticator.Save(user))
1961+
collection.user, err = authenticator.GetUser("alice")
1962+
require.NoError(t, err)
1963+
1964+
const (
1965+
doc1ID = "doc1"
1966+
doc2ID = "doc2"
1967+
)
1968+
1969+
revBody := Body{"channels": []string{"A"}}
1970+
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
1971+
require.NoError(t, err)
1972+
1973+
// put another doc that should evict first doc from cache
1974+
_, _, err = collection.Put(ctx, doc2ID, revBody)
1975+
require.NoError(t, err)
1976+
1977+
// get by CV should force a load from bucket and have a cache miss
1978+
vrs := doc.HLV.Version
1979+
src := doc.HLV.SourceID
1980+
sv := &Version{Value: vrs, SourceID: src}
1981+
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
1982+
require.NoError(t, err)
1983+
1984+
// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
1985+
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
1986+
assert.Equal(t, rev, revision.RevID)
1987+
assert.Equal(t, sv, revision.CV)
1988+
assert.Equal(t, doc1ID, revision.DocID)
1989+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1990+
}
1991+
1992+
// TestGetCVActivePathway:
1993+
// - Two test cases, one with doc a user will have access to, one without
1994+
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
1995+
// - Assert doc that is created is fetched correctly when user has access to doc
1996+
// - Assert that correct error is returned when user has no access to the doc
1997+
func TestGetCVActivePathway(t *testing.T) {
1998+
const docID = "doc1"
1999+
2000+
testCases := []struct {
2001+
name string
2002+
docChannels []string
2003+
access bool
2004+
}{
2005+
{
2006+
name: "activeFetchWithUserAccess",
2007+
docChannels: []string{"A"},
2008+
access: true,
2009+
},
2010+
{
2011+
name: "activeFetchWithoutUserAccess",
2012+
docChannels: []string{"B"},
2013+
access: false,
2014+
},
2015+
}
2016+
for _, testCase := range testCases {
2017+
t.Run(testCase.name, func(t *testing.T) {
2018+
db, ctx := setupTestDB(t)
2019+
defer db.Close(ctx)
2020+
collection := GetSingleDatabaseCollectionWithUser(t, db)
2021+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
2022+
2023+
// Create a user with access to channel A
2024+
authenticator := db.Authenticator(base.TestCtx(t))
2025+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
2026+
require.NoError(t, err)
2027+
require.NoError(t, authenticator.Save(user))
2028+
collection.user, err = authenticator.GetUser("alice")
2029+
require.NoError(t, err)
2030+
2031+
// test get active path by specifying nil cv
2032+
revBody := Body{"channels": testCase.docChannels}
2033+
rev, doc, err := collection.Put(ctx, docID, revBody)
2034+
require.NoError(t, err)
2035+
revision, err := collection.GetCV(ctx, docID, nil, true)
2036+
2037+
if testCase.access == true {
2038+
require.NoError(t, err)
2039+
vrs := doc.HLV.Version
2040+
src := doc.HLV.SourceID
2041+
sv := &Version{Value: vrs, SourceID: src}
2042+
assert.Equal(t, rev, revision.RevID)
2043+
assert.Equal(t, sv, revision.CV)
2044+
assert.Equal(t, docID, revision.DocID)
2045+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2046+
} else {
2047+
require.Error(t, err)
2048+
assert.ErrorContains(t, err, ErrForbidden.Error())
2049+
assert.Equal(t, DocumentRevision{}, revision)
2050+
}
2051+
})
2052+
}
2053+
}

db/document.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,17 @@ type SyncData struct {
103103
removedRevisionBodyKeys map[string]string // keys of non-winning revisions that have been removed (and so may require deletion), indexed by revID
104104
}
105105

106+
// determine set of current channels based on removal entries.
107+
func (sd *SyncData) getCurrentChannels() base.Set {
108+
ch := base.SetOf()
109+
for channelName, channelRemoval := range sd.Channels {
110+
if channelRemoval == nil || channelRemoval.Seq == 0 {
111+
ch.Add(channelName)
112+
}
113+
}
114+
return ch
115+
}
116+
106117
func (sd *SyncData) HashRedact(salt string) SyncData {
107118

108119
// Creating a new SyncData with the redacted info. We copy all the information which stays the same and create new
@@ -183,12 +194,11 @@ type Document struct {
183194
rawUserXattr []byte // Raw user xattr as retrieved from the bucket
184195
metadataOnlyUpdate *MetadataOnlyUpdate // Contents of _mou xattr, marshalled/unmarshalled with document from xattrs
185196

186-
Deleted bool
187-
DocExpiry uint32
188-
RevID string
189-
DocAttachments AttachmentsMeta
190-
inlineSyncData bool
191-
currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time)
197+
Deleted bool
198+
DocExpiry uint32
199+
RevID string
200+
DocAttachments AttachmentsMeta
201+
inlineSyncData bool
192202
}
193203

194204
type historyOnlySyncData struct {
@@ -917,7 +927,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (
917927
doc.updateChannelHistory(channel, doc.Sequence, true)
918928
}
919929
}
920-
doc.currentRevChannels = newChannels
921930
if changed != nil {
922931
base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels))
923932
changedChannels, err = channels.SetFromArray(changed, channels.KeepStar)
@@ -1027,17 +1036,6 @@ func (doc *Document) UnmarshalJSON(data []byte) error {
10271036
doc.SyncData = *syncData.SyncData
10281037
}
10291038

1030-
// determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time)
1031-
if len(doc.Channels) > 0 {
1032-
ch := base.SetOf()
1033-
for channelName, channelRemoval := range doc.Channels {
1034-
if channelRemoval == nil || channelRemoval.Seq == 0 {
1035-
ch.Add(channelName)
1036-
}
1037-
}
1038-
doc.currentRevChannels = ch
1039-
}
1040-
10411039
// Unmarshal the rest of the doc body as map[string]interface{}
10421040
if err := doc._body.Unmarshal(data); err != nil {
10431041
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))

db/revision_cache_interface.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
275275
return revCacheLoaderForDocument(ctx, backingStore, doc, id.RevID)
276276
}
277277

278-
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
278+
// revCacheLoaderForCv will load a document from the bucket using the CV, compare the fetched doc and the CV specified in the function,
279279
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
280280
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
281281
cv := Version{
@@ -337,7 +337,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache
337337
if err = doc.HasCurrentVersion(cv); err != nil {
338338
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err
339339
}
340-
channels = doc.currentRevChannels
340+
channels = doc.SyncData.getCurrentChannels()
341341
revid = doc.CurrentRev
342342

343343
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err

0 commit comments

Comments
 (0)