Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip all docs index creation for new deployment #5993

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
15 changes: 12 additions & 3 deletions channels/channelmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewDefaultChannelMapper() *ChannelMapper {
return NewChannelMapper(DefaultSyncFunction, time.Duration(base.DefaultJavascriptTimeoutSecs)*time.Second)
}

func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, metaMap map[string]interface{}, userCtx map[string]interface{}) (*ChannelMapperOutput, error) {
func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, metaMap map[string]interface{}, userCtx map[string]interface{}, shouldAddStarChannel bool) (*ChannelMapperOutput, error) {
numberFixBody := ConvertJSONNumbers(body)
numberFixMetaMap := ConvertJSONNumbers(metaMap)

Expand All @@ -62,6 +62,10 @@ func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{},
return nil, err
}
output := result1.(*ChannelMapperOutput)

if shouldAddStarChannel {
output.Channels.Add(UserStarChannel)
}
return output, nil
}

Expand Down Expand Up @@ -122,10 +126,15 @@ func ForChangedUsers(a, b AccessMap, fn func(user string)) {
}
}

func (runner *SyncRunner) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, userCtx map[string]interface{}) (*ChannelMapperOutput, error) {
func (runner *SyncRunner) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, userCtx map[string]interface{}, shouldAddStarChannel bool) (*ChannelMapperOutput, error) {
result, err := runner.Call(body, sgbucket.JSONString(oldBodyJSON), userCtx)
if err != nil {
return nil, err
}
return result.(*ChannelMapperOutput), nil
output := result.(*ChannelMapperOutput)

if shouldAddStarChannel {
output.Channels.Add(UserStarChannel)
}
return output, nil
}
160 changes: 88 additions & 72 deletions channels/channelmapper_test.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion db/background_mgr_resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{},
database := options["database"].(*Database)
regenerateSequences := options["regenerateSequences"].(bool)
resyncCollections := options["collections"].(ResyncCollections)
shouldAddStarChannel := options["shouldAddStarChannel"].(bool)

persistClusterStatus := func() {
err := persistClusterStatusCallback()
Expand Down Expand Up @@ -76,7 +77,7 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{},
for _, collectionID := range collectionIDs {
_, err := (&DatabaseCollectionWithUser{
DatabaseCollection: database.CollectionByID[collectionID],
}).UpdateAllDocChannels(ctx, regenerateSequences, callback, terminator)
}).UpdateAllDocChannels(ctx, regenerateSequences, callback, terminator, shouldAddStarChannel)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
db := options["database"].(*Database)
regenerateSequences := options["regenerateSequences"].(bool)
resyncCollections := options["collections"].(ResyncCollections)
shouldAddStarChannel := options["shouldAddStarChannel"].(bool)

resyncLoggingID := "Resync: " + r.ResyncID

Expand All @@ -108,7 +109,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
var err error
docID := string(event.Key)
key := realDocID(docID)
base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID))
base.DebugfCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID))
// Don't want to process raw binary docs
// The binary check should suffice but for additional safety also check for empty bodies
if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 {
Expand All @@ -125,7 +126,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
databaseCollection := db.CollectionByID[event.CollectionID]
_, unusedSequences, err := (&DatabaseCollectionWithUser{
DatabaseCollection: databaseCollection,
}).resyncDocument(ctx, docID, key, regenerateSequences, []uint64{})
}).resyncDocument(ctx, docID, key, regenerateSequences, []uint64{}, shouldAddStarChannel)

databaseCollection.releaseSequences(ctx, unusedSequences)

Expand Down
37 changes: 21 additions & 16 deletions db/background_mgr_resync_dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) {
defer resycMgr.resetStatus()

options := map[string]interface{}{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"shouldAddStarChannel": true,
}

err := resycMgr.Start(ctx, options)
Expand Down Expand Up @@ -208,9 +209,10 @@ func TestResyncManagerDCPStart(t *testing.T) {
db.ResyncManager = resyncMgr

options := map[string]interface{}{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"shouldAddStarChannel": true,
}
err := resyncMgr.Start(ctx, options)
require.NoError(t, err)
Expand Down Expand Up @@ -242,9 +244,10 @@ func TestResyncManagerDCPStart(t *testing.T) {
log.Printf("initialStats: processed[%v] changed[%v]", initialStats.DocsProcessed, initialStats.DocsChanged)

options := map[string]interface{}{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"shouldAddStarChannel": true,
}

err := resyncMgr.Start(ctx, options)
Expand Down Expand Up @@ -282,9 +285,10 @@ func TestResyncManagerDCPRunTwice(t *testing.T) {
db.ResyncManager = resycMgr

options := map[string]interface{}{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"shouldAddStarChannel": true,
}

err := resycMgr.Start(ctx, options)
Expand Down Expand Up @@ -318,7 +322,7 @@ func TestResyncManagerDCPRunTwice(t *testing.T) {
assert.Equal(t, db.DbStats.Database().SyncFunctionCount.Value(), int64(docsToCreate))
}

func TestResycnManagerDCPResumeStoppedProcess(t *testing.T) {
func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("Test requires Couchbase Server")
}
Expand All @@ -333,9 +337,10 @@ func TestResycnManagerDCPResumeStoppedProcess(t *testing.T) {
db.ResyncManager = resycMgr

options := map[string]interface{}{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"shouldAddStarChannel": true,
}

err := resycMgr.Start(ctx, options)
Expand Down
12 changes: 8 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ func (db *DatabaseCollectionWithUser) prepareSyncFn(doc *Document, newDoc *Docum
// Run the sync function on the given document and body. Need to inject the document ID and rev ID temporarily to run
// the sync function.
func (db *DatabaseCollectionWithUser) runSyncFn(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, newRevId string) (*uint32, string, base.Set, channels.AccessMap, channels.AccessMap, error) {
channelSet, access, roles, syncExpiry, oldBody, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, newRevId)
channelSet, access, roles, syncExpiry, oldBody, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, newRevId, !db.dbCtx.AllDocsIndexExists)
if err != nil {
return nil, ``, nil, nil, nil, err
}
Expand All @@ -1491,7 +1491,7 @@ func (db *DatabaseCollectionWithUser) recalculateSyncFnForActiveRev(ctx context.
if curBody != nil {
base.DebugfCtx(ctx, base.KeyCRUD, "updateDoc(%q): Rev %q causes %q to become current again",
base.UD(doc.ID), newRevID, doc.CurrentRev)
channelSet, access, roles, syncExpiry, oldBodyJSON, err = db.getChannelsAndAccess(ctx, doc, curBody, metaMap, doc.CurrentRev)
channelSet, access, roles, syncExpiry, oldBodyJSON, err = db.getChannelsAndAccess(ctx, doc, curBody, metaMap, doc.CurrentRev, !db.dbCtx.AllDocsIndexExists)
if err != nil {
return
}
Expand Down Expand Up @@ -2202,7 +2202,7 @@ func (db *DatabaseCollectionWithUser) Purge(ctx context.Context, key string) err

// Calls the JS sync function to assign the doc to channels, grant users
// access to channels, and reject invalid documents.
func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, revID string) (
func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, revID string, shouldAddStarChannel bool) (
result base.Set,
access channels.AccessMap,
roles channels.AccessMap,
Expand Down Expand Up @@ -2232,7 +2232,7 @@ func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context,

startTime := time.Now()
output, err = db.channelMapper().MapToChannelsAndAccess(body, oldJson, metaMap,
MakeUserCtx(db.user, db.ScopeName(), db.Name()))
MakeUserCtx(db.user, db.ScopeName(), db.Name()), shouldAddStarChannel)
syncFunctionTimeNano := time.Since(startTime).Nanoseconds()

db.dbStats().Database().SyncFunctionTime.Add(syncFunctionTimeNano)
Expand Down Expand Up @@ -2276,6 +2276,10 @@ func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context,
if nonStrings != nil {
base.WarnfCtx(ctx, "Channel names must be string values only. Ignoring non-string channels: %s", base.UD(nonStrings))
}
if shouldAddStarChannel {
array = append(array, channels.UserStarChannel)
}

result, err = channels.SetFromArray(array, channels.KeepStar)
}
}
Expand Down
15 changes: 8 additions & 7 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type DatabaseContext struct {
singleCollection *DatabaseCollection // Temporary collection
CollectionByID map[uint32]*DatabaseCollection // A map keyed by collection ID to Collection
CollectionNames map[string]map[string]struct{} // Map of scope, collection names
AllDocsIndexExists bool
}

type Scope struct {
Expand Down Expand Up @@ -1728,7 +1729,7 @@ func (db *Database) UpdateSyncFun(ctx context.Context, syncFun string) (changed
// To be used when the JavaScript sync function changes.
type updateAllDocChannelsCallbackFunc func(docsProcessed, docsChanged *int)

func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator) (int, error) {
func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator, shouldAddStarChannel bool) (int, error) {
base.InfofCtx(ctx, base.KeyAll, "Recomputing document channels...")
base.InfofCtx(ctx, base.KeyAll, "Re-running sync function on all documents...")

Expand Down Expand Up @@ -1775,7 +1776,7 @@ func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context,
key := realDocID(docid)
queryRowCount++
docsProcessed++
highSeq, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences)
highSeq, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences, shouldAddStarChannel)
if err == nil {
docsChanged++
} else if err != base.ErrUpdateCancel {
Expand Down Expand Up @@ -1880,7 +1881,7 @@ func (db *DatabaseCollection) releaseSequences(ctx context.Context, sequences []
}
}

func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, doc *Document, regenerateSequences bool, unusedSequences []uint64) (updatedDoc *Document, shouldUpdate bool, updatedExpiry *uint32, highSeq uint64, updatedUnusedSequences []uint64, err error) {
func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, doc *Document, regenerateSequences bool, unusedSequences []uint64, shouldAddStarChannel bool) (updatedDoc *Document, shouldUpdate bool, updatedExpiry *uint32, highSeq uint64, updatedUnusedSequences []uint64, err error) {
docid := doc.ID
forceUpdate := false
if !doc.HasValidSyncData() {
Expand All @@ -1906,7 +1907,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
if err != nil {
return
}
channels, access, roles, syncExpiry, _, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, rev.ID)
channels, access, roles, syncExpiry, _, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, rev.ID, shouldAddStarChannel)
if err != nil {
// Probably the validator rejected the doc
base.WarnfCtx(ctx, "Error calling sync() on doc %q: %v", base.UD(docid), err)
Expand Down Expand Up @@ -1942,7 +1943,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
return doc, shouldUpdate, updatedExpiry, doc.Sequence, updatedUnusedSequences, nil
}

func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64, shouldAddStarChannel bool) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
var updatedDoc *Document
var shouldUpdate bool
var updatedExpiry *uint32
Expand All @@ -1958,7 +1959,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
if err != nil {
return nil, nil, deleteDoc, nil, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences, shouldAddStarChannel)
if err != nil {
return nil, nil, deleteDoc, nil, err
}
Expand Down Expand Up @@ -1986,7 +1987,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
if err != nil {
return nil, nil, false, err
}
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences, shouldAddStarChannel)
if err != nil {
return nil, nil, false, err
}
Expand Down
22 changes: 13 additions & 9 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,10 @@ func TestAllDocsOnly(t *testing.T) {
revid, _, err := collection.Put(ctx, ids[i].DocID, body)
ids[i].RevID = revid
ids[i].Sequence = uint64(i + 1)
ids[i].Channels = channels
// `*` channels gets added to docs for new creation.
// add `*` channel to expected channels
ids[i].Channels = append(channels, "*")

assert.NoError(t, err, "Couldn't create document")
}

Expand Down Expand Up @@ -1389,11 +1392,12 @@ func TestSyncFnOnPush(t *testing.T) {
// Check that the doc has the correct channel (test for issue #300)
doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll)
assert.Equal(t, channels.ChannelMap{
"*": nil,
"clibup": nil,
"public": &channels.ChannelRemoval{Seq: 2, RevID: "4-four"},
}, doc.Channels)

assert.Equal(t, base.SetOf("clibup"), doc.History["4-four"].Channels)
assert.Equal(t, base.SetOf("*", "clibup"), doc.History["4-four"].Channels)
}

func TestInvalidChannel(t *testing.T) {
Expand Down Expand Up @@ -2456,7 +2460,7 @@ func TestResyncUpdateAllDocChannels(t *testing.T) {
return state == DBOffline
})

_, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator())
_, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator(), !db.AllDocsIndexExists)
assert.NoError(t, err)

syncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value())
Expand Down Expand Up @@ -2809,16 +2813,16 @@ func Test_resyncDocument(t *testing.T) {
_, err = db.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

_, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})
_, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}, true)
require.NoError(t, err)
err = collection.WaitForPendingChanges(ctx)
require.NoError(t, err)

syncData, err := collection.GetDocSyncData(ctx, docID)
assert.NoError(t, err)

assert.Len(t, syncData.ChannelSet, 2)
assert.Len(t, syncData.Channels, 2)
assert.Len(t, syncData.ChannelSet, 3)
assert.Len(t, syncData.Channels, 3)
found := false

for _, chSet := range syncData.ChannelSet {
Expand Down Expand Up @@ -2854,7 +2858,7 @@ func Test_getUpdatedDocument(t *testing.T) {
require.NoError(t, err)

collection := GetSingleDatabaseCollectionWithUser(t, db)
_, _, _, _, _, err = collection.getResyncedDocument(ctx, doc, false, []uint64{})
_, _, _, _, _, err = collection.getResyncedDocument(ctx, doc, false, []uint64{}, false)
assert.Equal(t, base.ErrUpdateCancel, err)
})

Expand Down Expand Up @@ -2888,14 +2892,14 @@ func Test_getUpdatedDocument(t *testing.T) {
_, err = db.UpdateSyncFun(ctx, syncFn)
require.NoError(t, err)

updatedDoc, shouldUpdate, _, highSeq, _, err := collection.getResyncedDocument(ctx, doc, false, []uint64{})
updatedDoc, shouldUpdate, _, highSeq, _, err := collection.getResyncedDocument(ctx, doc, false, []uint64{}, false)
require.NoError(t, err)
assert.True(t, shouldUpdate)
assert.Equal(t, doc.Sequence, highSeq)
assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value()))

// Rerunning same resync function should mark doc not to be updated
_, shouldUpdate, _, _, _, err = collection.getResyncedDocument(ctx, updatedDoc, false, []uint64{})
_, shouldUpdate, _, _, _, err = collection.getResyncedDocument(ctx, updatedDoc, false, []uint64{}, false)
require.NoError(t, err)
assert.False(t, shouldUpdate)
assert.Equal(t, 3, int(db.DbStats.Database().SyncFunctionCount.Value()))
Expand Down
12 changes: 8 additions & 4 deletions db/design_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func filterViewResult(input sgbucket.ViewResult, user auth.User, applyChannelFil
// Is any item of channels found in visibleChannels?
func channelsIntersect(visibleChannels ch.TimedSet, channels []interface{}) bool {
for _, channel := range channels {
if visibleChannels.Contains(channel.(string)) || channel == "*" {
if visibleChannels.Contains(channel.(string)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adamcfraser please confirm removing this or condition is okay. I don't understand why the * channel was checked in the first place. This fixes rest/TestUserViewQuery test which was getting 2 documents but was expecting just one.

return true
}
}
Expand Down Expand Up @@ -482,17 +482,21 @@ func installViews(ctx context.Context, viewStore sgbucket.ViewStore) error {
if (channels) {
for (var name in channels) {
removed = channels[name];
if (!removed)
// if EnableStarChannelLog= true and "*" is added because AllDocs index isn't created
// Skip emiting "*" again as above lines has emitted "*" channel
if (name == "*" && %v) // EnableStarChannelLog
continue
if (!removed) {
emit([name, sequence], value);
else {
} else {
var flags = removed.del ? %d : %d; // channels.Removed/Deleted
emit([name, removed.seq], {rev:removed.rev, flags: flags});
}
}
}
}`

channels_map = fmt.Sprintf(channels_map, syncData, base.SyncDocPrefix, ch.Deleted, EnableStarChannelLog,
channels_map = fmt.Sprintf(channels_map, syncData, base.SyncDocPrefix, ch.Deleted, EnableStarChannelLog, EnableStarChannelLog,
ch.Removed|ch.Deleted, ch.Removed)

// Channel access view, used by ComputeChannelsForPrincipal()
Expand Down
Loading