diff --git a/channels/channelmapper.go b/channels/channelmapper.go index 1497e1d41b..038458244f 100644 --- a/channels/channelmapper.go +++ b/channels/channelmapper.go @@ -53,7 +53,7 @@ func NewDefaultChannelMapper() *ChannelMapper { return NewChannelMapper(DefaultSyncFunction, time.Duration(base.DefaultJavascriptTimeoutSecs)*time.Second) } -func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, metaMap map[string]interface{}, userCtx map[string]interface{}) (*ChannelMapperOutput, error) { +func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, metaMap map[string]interface{}, userCtx map[string]interface{}, shouldAddStarChannel bool) (*ChannelMapperOutput, error) { numberFixBody := ConvertJSONNumbers(body) numberFixMetaMap := ConvertJSONNumbers(metaMap) @@ -62,6 +62,10 @@ func (mapper *ChannelMapper) MapToChannelsAndAccess(body map[string]interface{}, return nil, err } output := result1.(*ChannelMapperOutput) + + if shouldAddStarChannel { + output.Channels.Add(UserStarChannel) + } return output, nil } @@ -122,10 +126,15 @@ func ForChangedUsers(a, b AccessMap, fn func(user string)) { } } -func (runner *SyncRunner) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, userCtx map[string]interface{}) (*ChannelMapperOutput, error) { +func (runner *SyncRunner) MapToChannelsAndAccess(body map[string]interface{}, oldBodyJSON string, userCtx map[string]interface{}, shouldAddStarChannel bool) (*ChannelMapperOutput, error) { result, err := runner.Call(body, sgbucket.JSONString(oldBodyJSON), userCtx) if err != nil { return nil, err } - return result.(*ChannelMapperOutput), nil + output := result.(*ChannelMapperOutput) + + if shouldAddStarChannel { + output.Channels.Add(UserStarChannel) + } + return output, nil } diff --git a/channels/channelmapper_test.go b/channels/channelmapper_test.go index 9d6de5cd19..0875504e42 100644 --- a/channels/channelmapper_test.go +++ b/channels/channelmapper_test.go @@ -51,7 +51,7 @@ func TestOttoValueToStringArray(t *testing.T) { // verify that our version of Otto treats JSON parsed arrays like real arrays func TestJavaScriptWorks(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(doc.x.concat(doc.y));}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"x":["abc"],"y":["xyz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"x":["abc"],"y":["xyz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "abc", "xyz"), res.Channels) } @@ -59,7 +59,7 @@ func TestJavaScriptWorks(t *testing.T) { // Just verify that the calls to the channel() fn show up in the output channel list. func TestSyncFunction(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel("foo", "bar"); channel("baz")}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo", "bar", "baz"), res.Channels) } @@ -67,7 +67,7 @@ func TestSyncFunction(t *testing.T) { // Just verify that the calls to the access() fn show up in the output channel list. func TestAccessFunction(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("foo", "bar"); access("foo", "baz")}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{"foo": BaseSetOf(t, "bar", "baz")}, res.Access) } @@ -75,7 +75,7 @@ func TestAccessFunction(t *testing.T) { // Just verify that the calls to the channel() fn show up in the output channel list. func TestSyncFunctionTakesArray(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(["foo", "bar ok","baz"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo", "bar ok", "baz"), res.Channels) } @@ -83,21 +83,21 @@ func TestSyncFunctionTakesArray(t *testing.T) { // Calling channel() with an invalid channel name should return an error. func TestSyncFunctionRejectsInvalidChannels(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(["foo", "bad,name","baz"])}`, 0) - _, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser) + _, err := mapper.MapToChannelsAndAccess(parse(`{"channels": []}`), `{}`, emptyMetaMap(), noUser, false) assert.True(t, err != nil) } // Calling access() with an invalid channel name should return an error. func TestAccessFunctionRejectsInvalidChannels(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("foo", "bad,name");}`, 0) - _, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + _, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.True(t, err != nil) } // Just verify that the calls to the access() fn show up in the output channel list. func TestAccessFunctionTakesArrayOfUsers(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access(["foo","bar","baz"], "ginger")}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{"bar": BaseSetOf(t, "ginger"), "baz": BaseSetOf(t, "ginger"), "foo": BaseSetOf(t, "ginger")}, res.Access) } @@ -105,14 +105,14 @@ func TestAccessFunctionTakesArrayOfUsers(t *testing.T) { // Just verify that the calls to the access() fn show up in the output channel list. func TestAccessFunctionTakesArrayOfChannels(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("lee", ["ginger", "earl_grey", "green"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{"lee": BaseSetOf(t, "ginger", "earl_grey", "green")}, res.Access) } func TestAccessFunctionTakesArrayOfChannelsAndUsers(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access(["lee", "nancy"], ["ginger", "earl_grey", "green"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "ginger", "earl_grey", "green"), res.Access["lee"]) assert.Equal(t, BaseSetOf(t, "ginger", "earl_grey", "green"), res.Access["nancy"]) @@ -120,42 +120,42 @@ func TestAccessFunctionTakesArrayOfChannelsAndUsers(t *testing.T) { func TestAccessFunctionTakesEmptyArrayUser(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access([], ["ginger", "earl grey", "green"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{}, res.Access) } func TestAccessFunctionTakesEmptyArrayChannels(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("lee", [])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{}, res.Access) } func TestAccessFunctionTakesNullUser(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access(null, ["ginger", "earl grey", "green"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{}, res.Access) } func TestAccessFunctionTakesNullChannels(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("lee", null)}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{}, res.Access) } func TestAccessFunctionTakesNonChannelsInArray(t *testing.T) { mapper := NewChannelMapper(`function(doc) {access("lee", ["ginger", null, 5])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{"lee": BaseSetOf(t, "ginger")}, res.Access) } func TestAccessFunctionTakesUndefinedUser(t *testing.T) { mapper := NewChannelMapper(`function(doc) {var x = {}; access(x.nothing, ["ginger", "earl grey", "green"])}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{}, res.Access) } @@ -164,7 +164,7 @@ func TestAccessFunctionTakesUndefinedUser(t *testing.T) { // implementation with access(), so most of the above tests also apply to it.) func TestRoleFunction(t *testing.T) { mapper := NewChannelMapper(`function(doc) {role(["foo","bar","baz"], "role:froods")}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, AccessMap{"bar": BaseSetOf(t, "froods"), "baz": BaseSetOf(t, "froods"), "foo": BaseSetOf(t, "froods")}, res.Roles) } @@ -172,7 +172,7 @@ func TestRoleFunction(t *testing.T) { // Now just make sure the input comes through intact func TestInputParse(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(doc.channel);}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channel": "foo"}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channel": "foo"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo"), res.Channels) } @@ -180,11 +180,11 @@ func TestInputParse(t *testing.T) { // A more realistic example func TestDefaultChannelMapper(t *testing.T) { mapper := NewDefaultChannelMapper() - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo", "bar", "baz"), res.Channels) - res, err = mapper.MapToChannelsAndAccess(parse(`{"x": "y"}`), `{}`, emptyMetaMap(), noUser) + res, err = mapper.MapToChannelsAndAccess(parse(`{"x": "y"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.Set{}, res.Channels) } @@ -192,7 +192,7 @@ func TestDefaultChannelMapper(t *testing.T) { // Empty/no-op channel mapper fn func TestEmptyChannelMapper(t *testing.T) { mapper := NewChannelMapper(`function(doc) {}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.Set{}, res.Channels) } @@ -202,7 +202,7 @@ func TestChannelMapperUnderscoreLib(t *testing.T) { underscore.Enable() // It really slows down unit tests (by making otto.New take a lot longer) defer underscore.Disable() mapper := NewChannelMapper(`function(doc) {channel(_.first(doc.channels));}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo"), res.Channels) } @@ -210,7 +210,7 @@ func TestChannelMapperUnderscoreLib(t *testing.T) { // Validation by calling reject() func TestChannelMapperReject(t *testing.T) { mapper := NewChannelMapper(`function(doc) {reject(403, "bad");}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, "bad"), res.Rejection) } @@ -218,7 +218,7 @@ func TestChannelMapperReject(t *testing.T) { // Rejection by calling throw() func TestChannelMapperThrow(t *testing.T) { mapper := NewChannelMapper(`function(doc) {throw({forbidden:"bad"});}`, 0) - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, "bad"), res.Rejection) } @@ -226,14 +226,14 @@ func TestChannelMapperThrow(t *testing.T) { // Test other runtime exception func TestChannelMapperException(t *testing.T) { mapper := NewChannelMapper(`function(doc) {(nil)[5];}`, 0) - _, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + _, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.True(t, err != nil) } // Test the public API func TestPublicChannelMapper(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(doc.channels);}`, 0) - output, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + output, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "foo", "bar", "baz"), output.Channels) } @@ -244,16 +244,16 @@ func TestCheckUser(t *testing.T) { requireUser(doc.owner); }`, 0) var sally = map[string]interface{}{"name": "sally", "channels": []string{}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "channels": []string{}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorWrongUser), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"owner": "sally"}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -264,16 +264,16 @@ func TestCheckUserArray(t *testing.T) { requireUser(doc.owners); }`, 0) var sally = map[string]interface{}{"name": "sally", "channels": []string{}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally", "joe"]}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally", "joe"]}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "channels": []string{}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally", "joe"]}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally", "joe"]}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorWrongUser), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally"]}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"owners": ["sally"]}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -284,16 +284,16 @@ func TestCheckRole(t *testing.T) { requireRole(doc.role); }`, 0) var sally = map[string]interface{}{"name": "sally", "roles": map[string]int{"girl": 1, "5yo": 1}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "roles": []string{"boy", "musician"}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorMissingRole), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"role": "girl"}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -304,16 +304,16 @@ func TestCheckRoleArray(t *testing.T) { requireRole(doc.roles); }`, 0) var sally = map[string]interface{}{"name": "sally", "roles": map[string]int{"girl": 1, "5yo": 1}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"roles": ["kid","girl"]}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"roles": ["kid","girl"]}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "roles": map[string]int{"boy": 1, "musician": 1}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"roles": ["girl"]}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"roles": ["girl"]}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorMissingRole), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"roles": ["girl"]}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"roles": ["girl"]}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -324,16 +324,16 @@ func TestCheckAccess(t *testing.T) { requireAccess(doc.channel) }`, 0) var sally = map[string]interface{}{"name": "sally", "roles": []string{"girl", "5yo"}, "channels": []string{"party", "school"}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"channel": "party"}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channel": "party"}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "roles": []string{"boy", "musician"}, "channels": []string{"party", "school"}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"channel": "work"}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"channel": "work"}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorMissingChannelAccess), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"channel": "magic"}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"channel": "magic"}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -344,16 +344,16 @@ func TestCheckAccessArray(t *testing.T) { requireAccess(doc.channels) }`, 0) var sally = map[string]interface{}{"name": "sally", "roles": []string{"girl", "5yo"}, "channels": []string{"party", "school"}} - res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["swim","party"]}`), `{}`, emptyMetaMap(), sally) + res, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["swim","party"]}`), `{}`, emptyMetaMap(), sally, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) var linus = map[string]interface{}{"name": "linus", "roles": []string{"boy", "musician"}, "channels": []string{"party", "school"}} - res, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["work"]}`), `{}`, emptyMetaMap(), linus) + res, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["work"]}`), `{}`, emptyMetaMap(), linus, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, base.HTTPErrorf(403, base.SyncFnErrorMissingChannelAccess), res.Rejection) - res, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["magic"]}`), `{}`, emptyMetaMap(), nil) + res, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["magic"]}`), `{}`, emptyMetaMap(), nil, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, nil, res.Rejection) } @@ -361,111 +361,127 @@ func TestCheckAccessArray(t *testing.T) { // Test changing the function func TestSetFunction(t *testing.T) { mapper := NewChannelMapper(`function(doc) {channel(doc.channels);}`, 0) - output, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + output, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") + assert.Equal(t, BaseSetOf(t, "foo", "bar", "baz"), output.Channels) + changed, err := mapper.SetFunction(`function(doc) {channel("all");}`) assert.True(t, changed, "SetFunction failed") assert.NoError(t, err, "SetFunction failed") - output, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser) + output, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, BaseSetOf(t, "all"), output.Channels) } +func TestMapToChannelsAndAccessAddStarChannel(t *testing.T) { + mapper := NewChannelMapper(`function(doc) {channel(doc.channels);}`, 0) + output, err := mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, true) + assert.NoError(t, err, "MapToChannelsAndAccess failed") + assert.Equal(t, BaseSetOf(t, "foo", "bar", "baz", "*"), output.Channels) + + changed, err := mapper.SetFunction(`function(doc) {channel("all");}`) + assert.True(t, changed, "SetFunction failed") + assert.NoError(t, err, "SetFunction failed") + output, err = mapper.MapToChannelsAndAccess(parse(`{"channels": ["foo", "bar", "baz"]}`), `{}`, emptyMetaMap(), noUser, true) + assert.NoError(t, err, "MapToChannelsAndAccess failed") + assert.Equal(t, BaseSetOf(t, "all", "*"), output.Channels) +} + // Test that expiry function sets the expiry property func TestExpiryFunction(t *testing.T) { mapper := NewChannelMapper(`function(doc) {expiry(doc.expiry);}`, 0) - res1, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":100}`), `{}`, emptyMetaMap(), noUser) + res1, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":100}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(100), *res1.Expiry) - res2, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"500"}`), `{}`, emptyMetaMap(), noUser) + res2, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"500"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(500), *res2.Expiry) - res_stringDate, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"2105-01-01T00:00:00.000+00:00"}`), `{}`, emptyMetaMap(), noUser) + res_stringDate, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"2105-01-01T00:00:00.000+00:00"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(4260211200), *res_stringDate.Expiry) // Validate invalid expiry values log warning and don't set expiry - res3, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"abc"}`), `{}`, emptyMetaMap(), noUser) + res3, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"abc"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry:abc") assert.True(t, res3.Expiry == nil) // Invalid: non-numeric - res4, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":["100", "200"]}`), `{}`, emptyMetaMap(), noUser) + res4, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":["100", "200"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry as array") assert.True(t, res4.Expiry == nil) // Invalid: negative value - res5, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":-100}`), `{}`, emptyMetaMap(), noUser) + res5, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":-100}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry as negative value") assert.True(t, res5.Expiry == nil) // Invalid - larger than uint32 - res6, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":123456789012345}`), `{}`, emptyMetaMap(), noUser) + res6, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":123456789012345}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry > unit32") assert.True(t, res6.Expiry == nil) // Invalid - non-unix date - resInvalidDate, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"1805-01-01T00:00:00.000+00:00"}`), `{}`, emptyMetaMap(), noUser) + resInvalidDate, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"1805-01-01T00:00:00.000+00:00"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry:1805-01-01T00:00:00.000+00:00") assert.True(t, resInvalidDate.Expiry == nil) // No expiry specified - res7, err := mapper.MapToChannelsAndAccess(parse(`{"value":5}`), `{}`, emptyMetaMap(), noUser) + res7, err := mapper.MapToChannelsAndAccess(parse(`{"value":5}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry not specified") assert.True(t, res7.Expiry == nil) } func TestExpiryFunctionConstantValue(t *testing.T) { mapper := NewChannelMapper(`function(doc) {expiry(100);}`, 0) - res1, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res1, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(100), *res1.Expiry) mapper = NewChannelMapper(`function(doc) {expiry("500");}`, 0) - res2, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res2, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(500), *res2.Expiry) mapper = NewChannelMapper(`function(doc) {expiry("2105-01-01T00:00:00.000+00:00");}`, 0) - res_stringDate, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res_stringDate, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error") assert.Equal(t, uint32(4260211200), *res_stringDate.Expiry) // Validate invalid expiry values log warning and don't set expiry mapper = NewChannelMapper(`function(doc) {expiry("abc");}`, 0) - res3, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res3, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry:abc") assert.True(t, res3.Expiry == nil) // Invalid: non-numeric mapper = NewChannelMapper(`function(doc) {expiry(["100", "200"]);}`, 0) - res4, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res4, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry as array") assert.True(t, res4.Expiry == nil) // Invalid: negative value mapper = NewChannelMapper(`function(doc) {expiry(-100);}`, 0) - res5, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res5, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry as negative value") assert.True(t, res5.Expiry == nil) // Invalid - larger than uint32 mapper = NewChannelMapper(`function(doc) {expiry(123456789012345);}`, 0) - res6, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res6, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry as > unit32") assert.True(t, res6.Expiry == nil) // Invalid - non-unix date mapper = NewChannelMapper(`function(doc) {expiry("1805-01-01T00:00:00.000+00:00");}`, 0) - resInvalidDate, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + resInvalidDate, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry:1805-01-01T00:00:00.000+00:00") assert.True(t, resInvalidDate.Expiry == nil) // No expiry specified mapper = NewChannelMapper(`function(doc) {expiry();}`, 0) - res7, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser) + res7, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess error for expiry not specified") assert.True(t, res7.Expiry == nil) } @@ -473,36 +489,36 @@ func TestExpiryFunctionConstantValue(t *testing.T) { // Test that expiry function when invoked more than once by sync function func TestExpiryFunctionMultipleInvocation(t *testing.T) { mapper := NewChannelMapper(`function(doc) {expiry(doc.expiry); expiry(doc.secondExpiry)}`, 0) - res1, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":100}`), `{}`, emptyMetaMap(), noUser) + res1, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":100}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, uint32(100), *res1.Expiry) - res2, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"500"}`), `{}`, emptyMetaMap(), noUser) + res2, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"500"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess failed") assert.Equal(t, uint32(500), *res2.Expiry) // Validate invalid expiry values log warning and don't set expiry - res3, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"abc"}`), `{}`, emptyMetaMap(), noUser) + res3, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":"abc"}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess filed for expiry:abc") assert.True(t, res3.Expiry == nil) // Invalid: non-numeric - res4, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":["100", "200"]}`), `{}`, emptyMetaMap(), noUser) + res4, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":["100", "200"]}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess filed for expiry as array") assert.True(t, res4.Expiry == nil) // Invalid: negative value - res5, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":-100}`), `{}`, emptyMetaMap(), noUser) + res5, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":-100}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess filed for expiry as array") assert.True(t, res5.Expiry == nil) // Invalid - larger than uint32 - res6, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":123456789012345}`), `{}`, emptyMetaMap(), noUser) + res6, err := mapper.MapToChannelsAndAccess(parse(`{"expiry":123456789012345}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess filed for expiry as array") assert.True(t, res6.Expiry == nil) // No expiry specified - res7, err := mapper.MapToChannelsAndAccess(parse(`{"value":5}`), `{}`, emptyMetaMap(), noUser) + res7, err := mapper.MapToChannelsAndAccess(parse(`{"value":5}`), `{}`, emptyMetaMap(), noUser, false) assert.NoError(t, err, "MapToChannelsAndAccess filed for expiry as array") assert.True(t, res7.Expiry == nil) } @@ -520,7 +536,7 @@ func TestMetaMap(t *testing.T) { }, } - res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, metaMap, noUser) + res, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, metaMap, noUser, false) require.NoError(t, err) assert.ElementsMatch(t, res.Channels.ToArray(), channels) } @@ -535,7 +551,7 @@ func TestNilMetaMap(t *testing.T) { }, } - _, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, metaMap, noUser) + _, err := mapper.MapToChannelsAndAccess(parse(`{}`), `{}`, metaMap, noUser, false) require.Error(t, err) assert.True(t, err.Error() == "TypeError: Cannot access member 'val' of undefined") } diff --git a/db/background_mgr_resync.go b/db/background_mgr_resync.go index 9f09fe383d..771c8aba14 100644 --- a/db/background_mgr_resync.go +++ b/db/background_mgr_resync.go @@ -48,6 +48,7 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{}, database := options["database"].(*Database) regenerateSequences := options["regenerateSequences"].(bool) resyncCollections := options["collections"].(ResyncCollections) + shouldAddStarChannel := options["shouldAddStarChannel"].(bool) persistClusterStatus := func() { err := persistClusterStatusCallback() @@ -76,7 +77,7 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{}, for _, collectionID := range collectionIDs { _, err := (&DatabaseCollectionWithUser{ DatabaseCollection: database.CollectionByID[collectionID], - }).UpdateAllDocChannels(ctx, regenerateSequences, callback, terminator) + }).UpdateAllDocChannels(ctx, regenerateSequences, callback, terminator, shouldAddStarChannel) if err != nil { return err } diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 5817c25971..1a07977b4b 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -91,6 +91,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface db := options["database"].(*Database) regenerateSequences := options["regenerateSequences"].(bool) resyncCollections := options["collections"].(ResyncCollections) + shouldAddStarChannel := options["shouldAddStarChannel"].(bool) resyncLoggingID := "Resync: " + r.ResyncID @@ -108,7 +109,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface var err error docID := string(event.Key) key := realDocID(docID) - base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID)) + base.DebugfCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID)) // Don't want to process raw binary docs // The binary check should suffice but for additional safety also check for empty bodies if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 { @@ -125,7 +126,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface databaseCollection := db.CollectionByID[event.CollectionID] _, unusedSequences, err := (&DatabaseCollectionWithUser{ DatabaseCollection: databaseCollection, - }).resyncDocument(ctx, docID, key, regenerateSequences, []uint64{}) + }).resyncDocument(ctx, docID, key, regenerateSequences, []uint64{}, shouldAddStarChannel) databaseCollection.releaseSequences(ctx, unusedSequences) diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index 07974831a6..8209ccc0b6 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -155,9 +155,10 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) { defer resycMgr.resetStatus() options := map[string]interface{}{ - "database": db, - "regenerateSequences": false, - "collections": ResyncCollections{}, + "database": db, + "regenerateSequences": false, + "collections": ResyncCollections{}, + "shouldAddStarChannel": true, } err := resycMgr.Start(ctx, options) @@ -208,9 +209,10 @@ func TestResyncManagerDCPStart(t *testing.T) { db.ResyncManager = resyncMgr options := map[string]interface{}{ - "database": db, - "regenerateSequences": false, - "collections": ResyncCollections{}, + "database": db, + "regenerateSequences": false, + "collections": ResyncCollections{}, + "shouldAddStarChannel": true, } err := resyncMgr.Start(ctx, options) require.NoError(t, err) @@ -242,9 +244,10 @@ func TestResyncManagerDCPStart(t *testing.T) { log.Printf("initialStats: processed[%v] changed[%v]", initialStats.DocsProcessed, initialStats.DocsChanged) options := map[string]interface{}{ - "database": db, - "regenerateSequences": false, - "collections": ResyncCollections{}, + "database": db, + "regenerateSequences": false, + "collections": ResyncCollections{}, + "shouldAddStarChannel": true, } err := resyncMgr.Start(ctx, options) @@ -282,9 +285,10 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { db.ResyncManager = resycMgr options := map[string]interface{}{ - "database": db, - "regenerateSequences": false, - "collections": ResyncCollections{}, + "database": db, + "regenerateSequences": false, + "collections": ResyncCollections{}, + "shouldAddStarChannel": true, } err := resycMgr.Start(ctx, options) @@ -318,7 +322,7 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { assert.Equal(t, db.DbStats.Database().SyncFunctionCount.Value(), int64(docsToCreate)) } -func TestResycnManagerDCPResumeStoppedProcess(t *testing.T) { +func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { if base.UnitTestUrlIsWalrus() { t.Skip("Test requires Couchbase Server") } @@ -333,9 +337,10 @@ func TestResycnManagerDCPResumeStoppedProcess(t *testing.T) { db.ResyncManager = resycMgr options := map[string]interface{}{ - "database": db, - "regenerateSequences": false, - "collections": ResyncCollections{}, + "database": db, + "regenerateSequences": false, + "collections": ResyncCollections{}, + "shouldAddStarChannel": true, } err := resycMgr.Start(ctx, options) diff --git a/db/crud.go b/db/crud.go index fa9e380a4c..79a9c55d62 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1466,7 +1466,7 @@ func (db *DatabaseCollectionWithUser) prepareSyncFn(doc *Document, newDoc *Docum // Run the sync function on the given document and body. Need to inject the document ID and rev ID temporarily to run // the sync function. func (db *DatabaseCollectionWithUser) runSyncFn(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, newRevId string) (*uint32, string, base.Set, channels.AccessMap, channels.AccessMap, error) { - channelSet, access, roles, syncExpiry, oldBody, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, newRevId) + channelSet, access, roles, syncExpiry, oldBody, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, newRevId, !db.dbCtx.AllDocsIndexExists) if err != nil { return nil, ``, nil, nil, nil, err } @@ -1491,7 +1491,7 @@ func (db *DatabaseCollectionWithUser) recalculateSyncFnForActiveRev(ctx context. if curBody != nil { base.DebugfCtx(ctx, base.KeyCRUD, "updateDoc(%q): Rev %q causes %q to become current again", base.UD(doc.ID), newRevID, doc.CurrentRev) - channelSet, access, roles, syncExpiry, oldBodyJSON, err = db.getChannelsAndAccess(ctx, doc, curBody, metaMap, doc.CurrentRev) + channelSet, access, roles, syncExpiry, oldBodyJSON, err = db.getChannelsAndAccess(ctx, doc, curBody, metaMap, doc.CurrentRev, !db.dbCtx.AllDocsIndexExists) if err != nil { return } @@ -2202,7 +2202,7 @@ func (db *DatabaseCollectionWithUser) Purge(ctx context.Context, key string) err // Calls the JS sync function to assign the doc to channels, grant users // access to channels, and reject invalid documents. -func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, revID string) ( +func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, doc *Document, body Body, metaMap map[string]interface{}, revID string, shouldAddStarChannel bool) ( result base.Set, access channels.AccessMap, roles channels.AccessMap, @@ -2232,7 +2232,7 @@ func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, startTime := time.Now() output, err = db.channelMapper().MapToChannelsAndAccess(body, oldJson, metaMap, - MakeUserCtx(db.user, db.ScopeName(), db.Name())) + MakeUserCtx(db.user, db.ScopeName(), db.Name()), shouldAddStarChannel) syncFunctionTimeNano := time.Since(startTime).Nanoseconds() db.dbStats().Database().SyncFunctionTime.Add(syncFunctionTimeNano) @@ -2276,6 +2276,10 @@ func (db *DatabaseCollectionWithUser) getChannelsAndAccess(ctx context.Context, if nonStrings != nil { base.WarnfCtx(ctx, "Channel names must be string values only. Ignoring non-string channels: %s", base.UD(nonStrings)) } + if shouldAddStarChannel { + array = append(array, channels.UserStarChannel) + } + result, err = channels.SetFromArray(array, channels.KeepStar) } } diff --git a/db/database.go b/db/database.go index f4ccd87213..af07d2a96d 100644 --- a/db/database.go +++ b/db/database.go @@ -128,6 +128,7 @@ type DatabaseContext struct { singleCollection *DatabaseCollection // Temporary collection CollectionByID map[uint32]*DatabaseCollection // A map keyed by collection ID to Collection CollectionNames map[string]map[string]struct{} // Map of scope, collection names + AllDocsIndexExists bool } type Scope struct { @@ -1728,7 +1729,7 @@ func (db *Database) UpdateSyncFun(ctx context.Context, syncFun string) (changed // To be used when the JavaScript sync function changes. type updateAllDocChannelsCallbackFunc func(docsProcessed, docsChanged *int) -func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator) (int, error) { +func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, regenerateSequences bool, callback updateAllDocChannelsCallbackFunc, terminator *base.SafeTerminator, shouldAddStarChannel bool) (int, error) { base.InfofCtx(ctx, base.KeyAll, "Recomputing document channels...") base.InfofCtx(ctx, base.KeyAll, "Re-running sync function on all documents...") @@ -1775,7 +1776,7 @@ func (db *DatabaseCollectionWithUser) UpdateAllDocChannels(ctx context.Context, key := realDocID(docid) queryRowCount++ docsProcessed++ - highSeq, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences) + highSeq, unusedSequences, err = db.resyncDocument(ctx, docid, key, regenerateSequences, unusedSequences, shouldAddStarChannel) if err == nil { docsChanged++ } else if err != base.ErrUpdateCancel { @@ -1880,7 +1881,7 @@ func (db *DatabaseCollection) releaseSequences(ctx context.Context, sequences [] } } -func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, doc *Document, regenerateSequences bool, unusedSequences []uint64) (updatedDoc *Document, shouldUpdate bool, updatedExpiry *uint32, highSeq uint64, updatedUnusedSequences []uint64, err error) { +func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, doc *Document, regenerateSequences bool, unusedSequences []uint64, shouldAddStarChannel bool) (updatedDoc *Document, shouldUpdate bool, updatedExpiry *uint32, highSeq uint64, updatedUnusedSequences []uint64, err error) { docid := doc.ID forceUpdate := false if !doc.HasValidSyncData() { @@ -1906,7 +1907,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d if err != nil { return } - channels, access, roles, syncExpiry, _, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, rev.ID) + channels, access, roles, syncExpiry, _, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, rev.ID, shouldAddStarChannel) if err != nil { // Probably the validator rejected the doc base.WarnfCtx(ctx, "Error calling sync() on doc %q: %v", base.UD(docid), err) @@ -1942,7 +1943,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d return doc, shouldUpdate, updatedExpiry, doc.Sequence, updatedUnusedSequences, nil } -func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) { +func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64, shouldAddStarChannel bool) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) { var updatedDoc *Document var shouldUpdate bool var updatedExpiry *uint32 @@ -1958,7 +1959,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, if err != nil { return nil, nil, deleteDoc, nil, err } - updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences) + updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences, shouldAddStarChannel) if err != nil { return nil, nil, deleteDoc, nil, err } @@ -1986,7 +1987,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, if err != nil { return nil, nil, false, err } - updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences) + updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences, shouldAddStarChannel) if err != nil { return nil, nil, false, err } diff --git a/db/database_test.go b/db/database_test.go index 874f4e2ea6..d7953974fe 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -824,7 +824,10 @@ func TestAllDocsOnly(t *testing.T) { revid, _, err := collection.Put(ctx, ids[i].DocID, body) ids[i].RevID = revid ids[i].Sequence = uint64(i + 1) - ids[i].Channels = channels + // `*` channels gets added to docs for new creation. + // add `*` channel to expected channels + ids[i].Channels = append(channels, "*") + assert.NoError(t, err, "Couldn't create document") } @@ -1389,11 +1392,12 @@ func TestSyncFnOnPush(t *testing.T) { // Check that the doc has the correct channel (test for issue #300) doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll) assert.Equal(t, channels.ChannelMap{ + "*": nil, "clibup": nil, "public": &channels.ChannelRemoval{Seq: 2, RevID: "4-four"}, }, doc.Channels) - assert.Equal(t, base.SetOf("clibup"), doc.History["4-four"].Channels) + assert.Equal(t, base.SetOf("*", "clibup"), doc.History["4-four"].Channels) } func TestInvalidChannel(t *testing.T) { @@ -2456,7 +2460,7 @@ func TestResyncUpdateAllDocChannels(t *testing.T) { return state == DBOffline }) - _, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator()) + _, err = collection.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator(), !db.AllDocsIndexExists) assert.NoError(t, err) syncFnCount := int(db.DbStats.Database().SyncFunctionCount.Value()) @@ -2809,7 +2813,7 @@ func Test_resyncDocument(t *testing.T) { _, err = db.UpdateSyncFun(ctx, syncFn) require.NoError(t, err) - _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) + _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}, true) require.NoError(t, err) err = collection.WaitForPendingChanges(ctx) require.NoError(t, err) @@ -2817,8 +2821,8 @@ func Test_resyncDocument(t *testing.T) { syncData, err := collection.GetDocSyncData(ctx, docID) assert.NoError(t, err) - assert.Len(t, syncData.ChannelSet, 2) - assert.Len(t, syncData.Channels, 2) + assert.Len(t, syncData.ChannelSet, 3) + assert.Len(t, syncData.Channels, 3) found := false for _, chSet := range syncData.ChannelSet { @@ -2854,7 +2858,7 @@ func Test_getUpdatedDocument(t *testing.T) { require.NoError(t, err) collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, _, _, _, err = collection.getResyncedDocument(ctx, doc, false, []uint64{}) + _, _, _, _, _, err = collection.getResyncedDocument(ctx, doc, false, []uint64{}, false) assert.Equal(t, base.ErrUpdateCancel, err) }) @@ -2888,14 +2892,14 @@ func Test_getUpdatedDocument(t *testing.T) { _, err = db.UpdateSyncFun(ctx, syncFn) require.NoError(t, err) - updatedDoc, shouldUpdate, _, highSeq, _, err := collection.getResyncedDocument(ctx, doc, false, []uint64{}) + updatedDoc, shouldUpdate, _, highSeq, _, err := collection.getResyncedDocument(ctx, doc, false, []uint64{}, false) require.NoError(t, err) assert.True(t, shouldUpdate) assert.Equal(t, doc.Sequence, highSeq) assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value())) // Rerunning same resync function should mark doc not to be updated - _, shouldUpdate, _, _, _, err = collection.getResyncedDocument(ctx, updatedDoc, false, []uint64{}) + _, shouldUpdate, _, _, _, err = collection.getResyncedDocument(ctx, updatedDoc, false, []uint64{}, false) require.NoError(t, err) assert.False(t, shouldUpdate) assert.Equal(t, 3, int(db.DbStats.Database().SyncFunctionCount.Value())) diff --git a/db/design_doc.go b/db/design_doc.go index d44d21a978..a2e339c4a2 100644 --- a/db/design_doc.go +++ b/db/design_doc.go @@ -329,7 +329,7 @@ func filterViewResult(input sgbucket.ViewResult, user auth.User, applyChannelFil // Is any item of channels found in visibleChannels? func channelsIntersect(visibleChannels ch.TimedSet, channels []interface{}) bool { for _, channel := range channels { - if visibleChannels.Contains(channel.(string)) || channel == "*" { + if visibleChannels.Contains(channel.(string)) { return true } } @@ -482,9 +482,13 @@ func installViews(ctx context.Context, viewStore sgbucket.ViewStore) error { if (channels) { for (var name in channels) { removed = channels[name]; - if (!removed) + // if EnableStarChannelLog= true and "*" is added because AllDocs index isn't created + // Skip emiting "*" again as above lines has emitted "*" channel + if (name == "*" && %v) // EnableStarChannelLog + continue + if (!removed) { emit([name, sequence], value); - else { + } else { var flags = removed.del ? %d : %d; // channels.Removed/Deleted emit([name, removed.seq], {rev:removed.rev, flags: flags}); } @@ -492,7 +496,7 @@ func installViews(ctx context.Context, viewStore sgbucket.ViewStore) error { } }` - channels_map = fmt.Sprintf(channels_map, syncData, base.SyncDocPrefix, ch.Deleted, EnableStarChannelLog, + channels_map = fmt.Sprintf(channels_map, syncData, base.SyncDocPrefix, ch.Deleted, EnableStarChannelLog, EnableStarChannelLog, ch.Removed|ch.Deleted, ch.Removed) // Channel access view, used by ComputeChannelsForPrincipal() diff --git a/db/indexes.go b/db/indexes.go index e9df53b390..7ae004c451 100644 --- a/db/indexes.go +++ b/db/indexes.go @@ -256,23 +256,23 @@ func (i *SGIndex) shouldCreate(isServerless bool) bool { // Creates index associated with specified SGIndex if not already present. Always defers build - a subsequent BUILD INDEX // will need to be invoked for any created indexes. -func (i *SGIndex) createIfNeeded(bucket base.N1QLStore, useXattrs bool, numReplica uint) (isDeferred bool, err error) { +func (i *SGIndex) createIfNeeded(bucket base.N1QLStore, useXattrs bool, numReplica uint) (isDeferred bool, isCreated bool, err error) { if i.isXattrOnly() && !useXattrs { - return false, nil + return false, false, nil } indexName := i.fullIndexName(useXattrs) exists, indexMeta, metaErr := bucket.GetIndexMeta(indexName) if metaErr != nil { - return false, metaErr + return false, false, metaErr } // For already existing indexes, check whether they need to be built. if exists { if indexMeta == nil { - return false, fmt.Errorf("No metadata retrieved for existing index %s", indexName) + return false, true, fmt.Errorf("No metadata retrieved for existing index %s", indexName) } if indexMeta.State == base.IndexStateDeferred { // Two possible scenarios when index already exists in deferred state: @@ -284,13 +284,18 @@ func (i *SGIndex) createIfNeeded(bucket base.N1QLStore, useXattrs bool, numRepli time.Sleep(10 * time.Second) exists, indexMeta, metaErr = bucket.GetIndexMeta(indexName) if metaErr != nil || indexMeta == nil { - return false, fmt.Errorf("Error retrieving index metadata after defer wait. IndexMeta: %v Error:%v", indexMeta, metaErr) + return false, false, fmt.Errorf("Error retrieving index metadata after defer wait. IndexMeta: %v Error:%v", indexMeta, metaErr) } if indexMeta.State == base.IndexStateDeferred { - return true, nil + return true, true, nil } } - return false, nil + return false, true, nil + } + + // Do not create AllDoc Index if it does not exist already + if isAllDocsIndex(i) { + return false, false, nil } logCtx := context.TODO() @@ -331,20 +336,26 @@ func (i *SGIndex) createIfNeeded(bucket base.N1QLStore, useXattrs bool, numRepli err, _ = base.RetryLoop(description, worker, sleeper) if err != nil { - return false, pkgerrors.Wrapf(err, "Error installing Couchbase index: %v", indexName) + return false, false, pkgerrors.Wrapf(err, "Error installing Couchbase index: %v", indexName) } base.InfofCtx(logCtx, base.KeyQuery, "Index %s created successfully", indexName) - return isDeferred, nil + return isDeferred, true, nil +} + +func isAllDocsIndex(i *SGIndex) bool { + return i.simpleName == sgIndexes[IndexAllDocs].simpleName } // Initializes Sync Gateway indexes for bucket. Creates required indexes if not found, then waits for index readiness. -func InitializeIndexes(n1QLStore base.N1QLStore, useXattrs bool, numReplicas uint, failFast bool, isServerless bool) error { +func InitializeIndexes(n1QLStore base.N1QLStore, useXattrs bool, numReplicas uint, failFast bool, isServerless bool) (bool, error) { base.InfofCtx(context.TODO(), base.KeyAll, "Initializing indexes with numReplicas: %d...", numReplicas) // Create any indexes that aren't present deferredIndexes := make([]string, 0) + allDocsIndexCreated := false + for _, sgIndex := range sgIndexes { if !sgIndex.shouldCreate(isServerless) { @@ -353,9 +364,12 @@ func InitializeIndexes(n1QLStore base.N1QLStore, useXattrs bool, numReplicas uin } fullIndexName := sgIndex.fullIndexName(useXattrs) - isDeferred, err := sgIndex.createIfNeeded(n1QLStore, useXattrs, numReplicas) + isDeferred, isCreated, err := sgIndex.createIfNeeded(n1QLStore, useXattrs, numReplicas) if err != nil { - return base.RedactErrorf("Unable to install index %s: %v", base.MD(sgIndex.simpleName), err) + return false, base.RedactErrorf("Unable to install index %s: %v", base.MD(sgIndex.simpleName), err) + } + if isAllDocsIndex(&sgIndex) { + allDocsIndexCreated = isCreated } if isDeferred { @@ -368,16 +382,16 @@ func InitializeIndexes(n1QLStore base.N1QLStore, useXattrs bool, numReplicas uin buildErr := base.BuildDeferredIndexes(n1QLStore, deferredIndexes) if buildErr != nil { base.InfofCtx(context.TODO(), base.KeyQuery, "Error building deferred indexes. Error: %v", buildErr) - return buildErr + return allDocsIndexCreated, buildErr } } // Wait for initial readiness queries to complete - return waitForIndexes(n1QLStore, useXattrs, isServerless, failFast) + return allDocsIndexCreated, waitForIndexes(n1QLStore, useXattrs, isServerless, failFast, allDocsIndexCreated) } // Issue a consistency=request_plus query against critical indexes to guarantee indexing is complete and indexes are ready. -func waitForIndexes(bucket base.N1QLStore, useXattrs, isServerless, failfast bool) error { +func waitForIndexes(bucket base.N1QLStore, useXattrs, isServerless, failfast bool, allDocsIndexCreated bool) error { logCtx := context.TODO() base.InfofCtx(logCtx, base.KeyAll, "Verifying index availability for bucket %s...", base.MD(bucket.GetName())) var indexes []string @@ -387,6 +401,10 @@ func waitForIndexes(bucket base.N1QLStore, useXattrs, isServerless, failfast boo if sgIndex.isXattrOnly() && !useXattrs { continue } + if isAllDocsIndex(&sgIndex) && !allDocsIndexCreated { + continue + } + if !sgIndex.shouldCreate(isServerless) { continue } diff --git a/db/indexes_test.go b/db/indexes_test.go index 4a3108f01f..adbbf0bc33 100644 --- a/db/indexes_test.go +++ b/db/indexes_test.go @@ -135,7 +135,7 @@ func TestPostUpgradeIndexesSimple(t *testing.T) { log.Printf("removedIndexes: %+v", removedIndexes) assert.NoError(t, removeErr, "Unexpected error running removeObsoleteIndexes in setup case") - err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) + _, err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) assert.NoError(t, err) // Running w/ opposite xattrs flag should preview removal of the indexes associated with this db context @@ -154,7 +154,7 @@ func TestPostUpgradeIndexesSimple(t *testing.T) { assert.NoError(t, removeErr, "Unexpected error running removeObsoleteIndexes in post-cleanup no-op") // Restore indexes after test - err = InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) + _, err = InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) assert.NoError(t, err) } @@ -197,7 +197,7 @@ func TestPostUpgradeIndexesVersionChange(t *testing.T) { assert.NoError(t, removeErr, "Unexpected error running removeObsoleteIndexes with hacked sgIndexes") // Restore indexes after test - err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) + _, err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) assert.NoError(t, err) } @@ -232,7 +232,7 @@ func TestPostUpgradeMultipleCollections(t *testing.T) { for _, dataStore := range db.getDataStores() { n1qlStore, ok := base.AsN1QLStore(dataStore) assert.True(t, ok) - err := InitializeIndexes(n1qlStore, useXattrs, 0, false, false) + _, err := InitializeIndexes(n1qlStore, useXattrs, 0, false, false) require.NoError(t, err) } @@ -311,7 +311,7 @@ func TestRemoveIndexesUseViewsTrueAndFalse(t *testing.T) { assert.NoError(t, err) // Restore indexes after test - err = InitializeIndexes(n1QLStore, db.UseXattrs(), 0, false, db.IsServerless()) + _, err = InitializeIndexes(n1QLStore, db.UseXattrs(), 0, false, db.IsServerless()) assert.NoError(t, err) } @@ -334,7 +334,7 @@ func TestRemoveObsoleteIndexOnError(t *testing.T) { // Restore indexes after test n1qlStore, ok := base.AsN1QLStore(dataStore) assert.True(t, ok) - err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) + _, err := InitializeIndexes(n1qlStore, db.UseXattrs(), 0, false, db.IsServerless()) assert.NoError(t, err) }() @@ -387,7 +387,7 @@ func dropAndInitializeIndexes(ctx context.Context, n1qlStore base.N1QLStore, xat return dropErr } - initErr := InitializeIndexes(n1qlStore, xattrs, 0, false, isServerless) + _, initErr := InitializeIndexes(n1qlStore, xattrs, 0, false, isServerless) if initErr != nil { return initErr } diff --git a/db/indextest/indextest_test.go b/db/indextest/indextest_test.go index b8776d0fd9..0f4fabb2b6 100644 --- a/db/indextest/indextest_test.go +++ b/db/indextest/indextest_test.go @@ -459,7 +459,7 @@ func setupN1QLStore(bucket base.Bucket, isServerless bool) ([]base.N1QLStore, re return nil, nil, fmt.Errorf("Unable to get n1QLStore for testBucket") } - if err := db.InitializeIndexes(n1QLStore, base.TestUseXattrs(), 0, false, isServerless); err != nil { + if _, err := db.InitializeIndexes(n1QLStore, base.TestUseXattrs(), 0, false, isServerless); err != nil { return nil, nil, err } outN1QLStores = append(outN1QLStores, n1QLStore) diff --git a/db/query.go b/db/query.go index 49bd90909b..cfe058af54 100644 --- a/db/query.go +++ b/db/query.go @@ -555,7 +555,7 @@ func (context *DatabaseCollection) buildChannelsQuery(channelName string, startS channelQuery := QueryChannels index := sgIndexes[IndexChannels] - if channelName == channels.UserStarChannel { + if channelName == channels.UserStarChannel && context.dbCtx.AllDocsIndexExists { channelQuery = QueryStarChannel index = sgIndexes[IndexAllDocs] } diff --git a/db/util_testing.go b/db/util_testing.go index 33c19045ae..68acb33268 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -182,6 +182,8 @@ func WaitForUserWaiterChange(userWaiter *ChangeWaiter) bool { // emptyAllDocsIndex ensures the AllDocs index for the given bucket is empty. Works similarly to db.Compact, except on a different index and without a DatabaseContext func emptyAllDocsIndex(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) { + base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex called") + purgedDocCount := 0 purgeBody := Body{"_purged": true} @@ -197,6 +199,7 @@ WHERE META(ks).xattrs._sync.sequence >= 0 AND META(ks).xattrs._sync.sequence < 9223372036854775807 AND META(ks).id NOT LIKE '\\_sync:%'` results, err := n1qlStore.Query(statement, nil, base.RequestPlus, true) + base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex failed to remove allDocsIndex %+v", err) if err != nil { return 0, err } @@ -227,11 +230,74 @@ WHERE META(ks).xattrs._sync.sequence >= 0 } } err = results.Close() + base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex results.Close %+v", err) + if err != nil { return 0, err } - tbp.Logf(ctx, "Finished compaction ... Total docs purged: %d", purgedDocCount) + base.InfofCtx(ctx, base.KeyAll, "Finished compaction ... Total docs purged: %d", purgedDocCount) + return purgedDocCount, nil +} + +// emptyBucketUsingChannelIndex ensures all docs are cleared (using Channel index) for the given bucket. Workes similarly to emptyAllDocsIndex but is used when AllDocs index isn't present +func emptyBucketUsingChannelIndex(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) { + purgedDocCount := 0 + purgeBody := Body{"_purged": true} + + n1qlStore, ok := base.AsN1QLStore(dataStore) + if !ok { + return 0, fmt.Errorf("bucket was not a n1ql store") + } + + // A stripped down version of db.Compact() that works on AllDocs instead of tombstones + statement := `SELECT [op.name, META(ks).xattrs._sync.sequence][1] AS seq, + META(ks).xattrs._sync.rev AS rev, + META(ks).xattrs._sync.flags AS flags, + META(ks).id AS id +FROM ` + base.KeyspaceQueryToken + ` AS ks USE INDEX (sg_channels_x1) +UNNEST OBJECT_PAIRS(META(ks).xattrs._sync.channels) AS op +WHERE ([op.name, LEAST(META(ks).xattrs._sync.sequence, op.val.seq), +IFMISSING(op.val.rev,NULL),IFMISSING(op.val.del,NULL)] BETWEEN ["", 0] AND ["*",9223372036854775807])` + + results, err := n1qlStore.Query(statement, nil, base.RequestPlus, true) + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex failed to remove allDocsIndex %+v", err) + if err != nil { + return 0, err + } + + var tombstonesRow QueryIdRow + for results.Next(&tombstonesRow) { + // First, attempt to purge. + var purgeErr error + if base.TestUseXattrs() { + purgeErr = dataStore.DeleteWithXattr(tombstonesRow.Id, base.SyncXattrName) + } else { + purgeErr = dataStore.Delete(tombstonesRow.Id) + } + + if base.IsKeyNotFoundError(dataStore, purgeErr) { + // If key no longer exists, need to add and remove to trigger removal from view + _, addErr := dataStore.Add(tombstonesRow.Id, 0, purgeBody) + if addErr != nil { + tbp.Logf(ctx, "Error compacting key %s (add) - will not be compacted. %v", tombstonesRow.Id, addErr) + continue + } + + if delErr := dataStore.Delete(tombstonesRow.Id); delErr != nil { + tbp.Logf(ctx, "Error compacting key %s (delete) - will not be compacted. %v", tombstonesRow.Id, delErr) + } + purgedDocCount++ + } else if purgeErr != nil { + tbp.Logf(ctx, "Error compacting key %s (purge) - will not be compacted. %v", tombstonesRow.Id, purgeErr) + } + } + err = results.Close() + if err != nil { + return 0, err + } + + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex Finished compaction ... Total docs purged: %d", purgedDocCount) return purgedDocCount, nil } @@ -261,6 +327,13 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex return err } if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil { + base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex error %+v", err) + return err + } + + // + if _, err := emptyBucketUsingChannelIndex(ctx, dataStore, tbp); err != nil { + base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex error %+v", err) return err } @@ -269,6 +342,8 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex return errors.New("attempting to empty indexes with non-N1QL store") } tbp.Logf(ctx, "waiting for empty bucket indexes") + base.InfofCtx(ctx, base.KeyAll, "waiting for empty bucket indexes") + // we can't init indexes concurrently, so we'll just wait for them to be empty after emptying instead of recreating. // we can't init indexes concurrently, so we'll just wait for them to be empty after emptying instead of recreating. if err := WaitForIndexEmpty(n1qlStore, base.TestUseXattrs()); err != nil { tbp.Logf(ctx, "WaitForIndexEmpty returned an error: %v", err) @@ -344,7 +419,7 @@ var viewsAndGSIBucketInit base.TBPBucketInitFunc = func(ctx context.Context, b b return err } tbp.Logf(ctx, "creating SG bucket indexes") - if err := InitializeIndexes(n1qlStore, base.TestUseXattrs(), 0, false, false); err != nil { + if _, err := InitializeIndexes(n1qlStore, base.TestUseXattrs(), 0, false, false); err != nil { return err } diff --git a/rest/access_test.go b/rest/access_test.go index d748991811..13585711bc 100644 --- a/rest/access_test.go +++ b/rest/access_test.go @@ -91,6 +91,8 @@ func TestStarAccess(t *testing.T) { rt := NewRestTester(t, nil) defer rt.Close() + isAllDocsIndexExist := rt.ServerContext().IsAllDocsIndexExistFor(rt.GetDatabase().Name) + a := auth.NewAuthenticator(rt.MetadataStore(), nil, auth.DefaultAuthenticatorOptions()) a.Collections = rt.GetDatabase().CollectionNames var changes struct { @@ -137,16 +139,20 @@ func TestStarAccess(t *testing.T) { // GET /db/_all_docs?channels=true // Check that _all_docs returns the docs the user has access to: response = rt.SendUserRequest("GET", "/{{.keyspace}}/_all_docs?channels=true", "", "bernard") - RequireStatus(t, response, 200) - - log.Printf("Response = %s", response.Body.Bytes()) - err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) - assert.NoError(t, err) - require.Equal(t, 3, len(allDocsResult.Rows)) - require.Equal(t, "doc1", allDocsResult.Rows[0].ID) - require.Equal(t, []string{"books"}, allDocsResult.Rows[0].Value.Channels) - require.Equal(t, "doc3", allDocsResult.Rows[1].ID) - require.Equal(t, []string{"!"}, allDocsResult.Rows[1].Value.Channels) + if isAllDocsIndexExist { + RequireStatus(t, response, 200) + + log.Printf("Response = %s", response.Body.Bytes()) + err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) + assert.NoError(t, err) + assert.Equal(t, 3, len(allDocsResult.Rows)) + assert.Equal(t, "doc1", allDocsResult.Rows[0].ID) + assert.Equal(t, []string{"books"}, allDocsResult.Rows[0].Value.Channels) + assert.Equal(t, "doc3", allDocsResult.Rows[1].ID) + assert.Equal(t, []string{"!"}, allDocsResult.Rows[1].Value.Channels) + } else { + RequireStatus(t, response, 400) + } // Ensure docs have been processed before issuing changes requests expectedSeq := uint64(6) @@ -211,15 +217,18 @@ func TestStarAccess(t *testing.T) { // GET /db/_all_docs?channels=true // Check that _all_docs returns all docs (based on user * channel) response = rt.SendUserRequest("GET", "/{{.keyspace}}/_all_docs?channels=true", "", "fran") - RequireStatus(t, response, 200) - - log.Printf("Response = %s", response.Body.Bytes()) - err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) - assert.NoError(t, err) - assert.Equal(t, 6, len(allDocsResult.Rows)) - assert.Equal(t, "doc1", allDocsResult.Rows[0].ID) - assert.Equal(t, []string{"books"}, allDocsResult.Rows[0].Value.Channels) - + if isAllDocsIndexExist { + RequireStatus(t, response, 200) + + log.Printf("Response = %s", response.Body.Bytes()) + err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) + assert.NoError(t, err) + assert.Equal(t, 6, len(allDocsResult.Rows)) + assert.Equal(t, "doc1", allDocsResult.Rows[0].ID) + assert.Equal(t, []string{"books"}, allDocsResult.Rows[0].Value.Channels) + } else { + RequireStatus(t, response, 400) + } // GET /db/_changes response = rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "fran") log.Printf("_changes looks like: %s", response.Body.Bytes()) @@ -258,13 +267,16 @@ func TestStarAccess(t *testing.T) { // GET /db/_all_docs?channels=true // Check that _all_docs only returns ! docs (based on doc ! channel) response = rt.SendUserRequest("GET", "/{{.keyspace}}/_all_docs?channels=true", "", "manny") - RequireStatus(t, response, 200) - log.Printf("Response = %s", response.Body.Bytes()) - err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) - assert.NoError(t, err) - assert.Equal(t, 2, len(allDocsResult.Rows)) - assert.Equal(t, "doc3", allDocsResult.Rows[0].ID) - + if isAllDocsIndexExist { + RequireStatus(t, response, 200) + log.Printf("Response = %s", response.Body.Bytes()) + err = base.JSONUnmarshal(response.Body.Bytes(), &allDocsResult) + assert.NoError(t, err) + assert.Equal(t, 2, len(allDocsResult.Rows)) + assert.Equal(t, "doc3", allDocsResult.Rows[0].ID) + } else { + RequireStatus(t, response, 400) + } // GET /db/_changes response = rt.SendUserRequest("GET", "/{{.keyspace}}/_changes", "", "manny") log.Printf("_changes looks like: %s", response.Body.Bytes()) @@ -582,6 +594,10 @@ func TestAllDocsAccessControl(t *testing.T) { rt := NewRestTester(t, nil) defer rt.Close() + if !rt.ServerContext().IsAllDocsIndexExistFor(rt.GetDatabase().Name) { + t.Skip("This test requires AllDocs index to be present") + } + type allDocsRow struct { ID string `json:"id"` Key string `json:"key"` @@ -957,7 +973,7 @@ func TestChannelAccessChanges(t *testing.T) { changed, err := database.UpdateSyncFun(ctx, `function(doc) {access("alice", "beta");channel("beta");}`) assert.NoError(t, err) assert.True(t, changed) - changeCount, err := collectionWithUser.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator()) + changeCount, err := collectionWithUser.UpdateAllDocChannels(ctx, false, func(docsProcessed, docsChanged *int) {}, base.NewSafeTerminator(), false) assert.NoError(t, err) assert.Equal(t, 9, changeCount) diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index bd74b694c4..54ad160b22 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -2223,8 +2223,8 @@ func TestRawRedaction(t *testing.T) { err := base.JSONUnmarshal(res.Body.Bytes(), &body) assert.NoError(t, err) syncData := body[base.SyncPropertyName] - assert.Equal(t, map[string]interface{}{"achannel": nil}, syncData.(map[string]interface{})["channels"]) - assert.Equal(t, []interface{}([]interface{}{[]interface{}{"achannel"}}), syncData.(map[string]interface{})["history"].(map[string]interface{})["channels"]) + assert.Equal(t, map[string]interface{}{"achannel": nil, "*": nil}, syncData.(map[string]interface{})["channels"]) + assert.Equal(t, []interface{}([]interface{}{[]interface{}{"*", "achannel"}}), syncData.(map[string]interface{})["history"].(map[string]interface{})["channels"]) // Test redacted body = map[string]interface{}{} diff --git a/rest/api.go b/rest/api.go index 358d4f0bb5..024ed0cf74 100644 --- a/rest/api.go +++ b/rest/api.go @@ -296,6 +296,14 @@ func (h *handler) handlePostResync() error { } } + shouldAddStarChannel, _ := h.getOptBoolQuery("add_star_channel", false) + + // When AllDocsIndex isn't present for a db, we should add '*' by default when running resync + // this will make sure Channels can be fetched via Channel Index. + if !h.server.IsAllDocsIndexExistFor(h.db.Name) { + shouldAddStarChannel = true + } + if action != "" && action != string(db.BackgroundProcessActionStart) && action != string(db.BackgroundProcessActionStop) { return base.HTTPErrorf(http.StatusBadRequest, "Unknown parameter for 'action'. Must be start or stop") } @@ -310,9 +318,10 @@ func (h *handler) handlePostResync() error { if action == string(db.BackgroundProcessActionStart) { if atomic.CompareAndSwapUint32(&h.db.State, db.DBOffline, db.DBResyncing) { err := h.db.ResyncManager.Start(h.ctx(), map[string]interface{}{ - "database": h.db, - "regenerateSequences": regenerateSequences, - "collections": resyncPostReqBody.Scope, + "database": h.db, + "regenerateSequences": regenerateSequences, + "collections": resyncPostReqBody.Scope, + "shouldAddStarChannel": shouldAddStarChannel, }) if err != nil { return err diff --git a/rest/api_collections_test.go b/rest/api_collections_test.go index 0212be1663..a53ee77c9c 100644 --- a/rest/api_collections_test.go +++ b/rest/api_collections_test.go @@ -120,6 +120,10 @@ func TestCollectionsPublicChannel(t *testing.T) { }) defer rt.Close() + if !rt.ServerContext().IsAllDocsIndexExistFor(rt.GetDatabase().Name) { + t.Skip("This test requires AllDocs index to be present") + } + pathPublic := "/{{.keyspace}}/docpublic" resp := rt.SendAdminRequest(http.MethodPut, pathPublic, `{"channels":["!"]}`) RequireStatus(t, resp, http.StatusCreated) diff --git a/rest/api_test.go b/rest/api_test.go index 04ec21b5ae..d8b35c3c22 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -1116,6 +1116,10 @@ func TestAllDocsChannelsAfterChannelMove(t *testing.T) { rt := NewRestTester(t, &rtConfig) defer rt.Close() + if !rt.ServerContext().IsAllDocsIndexExistFor(rt.GetDatabase().Name) { + t.Skip("This test requires AllDocs index to be present") + } + ctx := rt.Context() a := rt.ServerContext().Database(ctx, "db").Authenticator(ctx) guest, err := a.GetUser("") @@ -2403,9 +2407,18 @@ func TestDocumentChannelHistory(t *testing.T) { syncData, err := rt.GetDatabase().GetSingleDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc") assert.NoError(t, err) - require.Len(t, syncData.ChannelSet, 1) - assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 0}) - assert.Len(t, syncData.ChannelSetHistory, 0) + shouldExpectStarChannel := !rt.GetDatabase().AllDocsIndexExists + + if shouldExpectStarChannel { + require.Len(t, syncData.ChannelSet, 2) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 1, End: 0}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "*", Start: 1, End: 0}) + assert.Len(t, syncData.ChannelSetHistory, 0) + } else { + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 0}) + assert.Len(t, syncData.ChannelSetHistory, 0) + } // Update doc to remove from channel and ensure a single channel history entry with both start and end sequences // and no old channel history entries @@ -2416,9 +2429,16 @@ func TestDocumentChannelHistory(t *testing.T) { syncData, err = rt.GetDatabase().GetSingleDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc") assert.NoError(t, err) - require.Len(t, syncData.ChannelSet, 1) - assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) - assert.Len(t, syncData.ChannelSetHistory, 0) + if shouldExpectStarChannel { + require.Len(t, syncData.ChannelSet, 2) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "*", Start: 1, End: 0}) + assert.Len(t, syncData.ChannelSetHistory, 0) + } else { + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + assert.Len(t, syncData.ChannelSetHistory, 0) + } // Update doc to add to channels test and test2 and ensure a single channel history entry for both test and test2 // both with start sequences only and ensure old test entry was moved to old @@ -2429,11 +2449,19 @@ func TestDocumentChannelHistory(t *testing.T) { syncData, err = rt.GetDatabase().GetSingleDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc") assert.NoError(t, err) - require.Len(t, syncData.ChannelSet, 2) - assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) - assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) - require.Len(t, syncData.ChannelSetHistory, 1) - assert.Equal(t, syncData.ChannelSetHistory[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + if shouldExpectStarChannel { + require.Len(t, syncData.ChannelSet, 3) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) + require.Len(t, syncData.ChannelSetHistory, 1) + assert.Equal(t, syncData.ChannelSetHistory[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + } else { + require.Len(t, syncData.ChannelSet, 2) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) + require.Len(t, syncData.ChannelSetHistory, 1) + assert.Equal(t, syncData.ChannelSetHistory[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + } } func TestChannelHistoryLegacyDoc(t *testing.T) { @@ -2493,7 +2521,11 @@ func TestChannelHistoryLegacyDoc(t *testing.T) { assert.NoError(t, err) syncData, err := rt.GetDatabase().GetSingleDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") assert.NoError(t, err) - require.Len(t, syncData.ChannelSet, 1) + if rt.GetDatabase().AllDocsIndexExists { + require.Len(t, syncData.ChannelSet, 1) + } else { + require.Len(t, syncData.ChannelSet, 2) + } assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{ Name: "test", Start: 1, diff --git a/rest/bulk_api.go b/rest/bulk_api.go index 55f9de3def..633d73df44 100644 --- a/rest/bulk_api.go +++ b/rest/bulk_api.go @@ -27,6 +27,16 @@ import ( // HTTP handler for _all_docs func (h *handler) handleAllDocs() error { + dbName := h.db.Name + + if h.server.databases_[dbName] == nil { + return base.HTTPErrorf(http.StatusInternalServerError, "could not find database context for %s", dbName) + } + + if !h.server.IsAllDocsIndexExistFor(dbName) { + return base.HTTPErrorf(http.StatusBadRequest, "all_docs endpoint is not available for db %s", dbName) + } + // http://wiki.apache.org/couchdb/HTTP_Bulk_Document_API includeDocs := h.getBoolQuery("include_docs") includeChannels := h.getBoolQuery("channels") diff --git a/rest/doc_api_test.go b/rest/doc_api_test.go index 78e19886d4..445cc816bd 100644 --- a/rest/doc_api_test.go +++ b/rest/doc_api_test.go @@ -105,6 +105,8 @@ func TestDocumentNumbers(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) + shouldExpectStarChannel := !rt.GetDatabase().AllDocsIndexExists + for _, test := range tests { t.Run(test.name, func(ts *testing.T) { // Create document @@ -126,7 +128,11 @@ func TestDocumentNumbers(t *testing.T) { var rawResponse RawResponse require.NoError(ts, base.JSONUnmarshal(getRawResponse.Body.Bytes(), &rawResponse)) log.Printf("raw response: %s", getRawResponse.Body.Bytes()) - assert.Equal(ts, 1, len(rawResponse.Sync.Channels)) + if shouldExpectStarChannel { + assert.Equal(ts, 2, len(rawResponse.Sync.Channels)) + } else { + assert.Equal(ts, 1, len(rawResponse.Sync.Channels)) + } assert.True(ts, HasActiveChannel(rawResponse.Sync.Channels, test.expectedFormatChannel), fmt.Sprintf("Expected channel %s was not found in document channels (%s)", test.expectedFormatChannel, test.name)) }) diff --git a/rest/server_context.go b/rest/server_context.go index d14cd42aa7..5243be38d4 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -156,6 +156,11 @@ func (sc *ServerContext) PostStartup() { close(sc.hasStarted) } +// IsAllDocsIndexExistFor returns if AllDocs index exists for given db +func (sc *ServerContext) IsAllDocsIndexExistFor(dbName string) bool { + return sc.databases_[dbName] != nil && sc.databases_[dbName].AllDocsIndexExists +} + // serverContextStopMaxWait is the maximum amount of time to wait for // background goroutines to terminate before the server is stopped. const serverContextStopMaxWait = 30 * time.Second @@ -467,15 +472,16 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config useViews = true } + isAllDocsIndexCreated := false // initDataStore is a function to initialize Views or GSI indexes for a datastore - initDataStore := func(ds base.DataStore) error { + initDataStore := func(ds base.DataStore) (bool, error) { if useViews { - return db.InitializeViews(ctx, ds) + return false, db.InitializeViews(ctx, ds) } gsiSupported := bucket.IsSupported(sgbucket.BucketStoreFeatureN1ql) if !gsiSupported { - return errors.New("Sync Gateway was unable to connect to a query node on the provided Couchbase Server cluster. Ensure a query node is accessible, or set 'use_views':true in Sync Gateway's database config.") + return false, errors.New("Sync Gateway was unable to connect to a query node on the provided Couchbase Server cluster. Ensure a query node is accessible, or set 'use_views':true in Sync Gateway's database config.") } numReplicas := DefaultNumIndexReplicas @@ -484,15 +490,15 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config } n1qlStore, ok := base.AsN1QLStore(ds) if !ok { - return errors.New("Cannot create indexes on non-Couchbase data store.") + return false, errors.New("Cannot create indexes on non-Couchbase data store.") } - indexErr := db.InitializeIndexes(n1qlStore, config.UseXattrs(), numReplicas, false, sc.Config.IsServerless()) + isAllDocsIndexCreated, indexErr := db.InitializeIndexes(n1qlStore, config.UseXattrs(), numReplicas, false, sc.Config.IsServerless()) if indexErr != nil { - return indexErr + return false, indexErr } - return nil + return isAllDocsIndexCreated, nil } if len(config.Scopes) > 0 { @@ -515,14 +521,14 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config return nil, fmt.Errorf("attempting to create/update database with a scope/collection that is not found") } - if err := initDataStore(dataStore); err != nil { + if isAllDocsIndexCreated, err = initDataStore(dataStore); err != nil { return nil, err } } } } else { // no scopes configured - init the default data store - if err := initDataStore(bucket.DefaultDataStore()); err != nil { + if isAllDocsIndexCreated, err = initDataStore(bucket.DefaultDataStore()); err != nil { return nil, err } } @@ -624,6 +630,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config dbcontext.BucketSpec = spec dbcontext.ServerContextHasStarted = sc.hasStarted dbcontext.NoX509HTTPClient = sc.NoX509HTTPClient + dbcontext.AllDocsIndexExists = isAllDocsIndexCreated syncFn := "" if config.Sync != nil { diff --git a/rest/sync_fn_test.go b/rest/sync_fn_test.go index 4005b57cae..3aa4f6d305 100644 --- a/rest/sync_fn_test.go +++ b/rest/sync_fn_test.go @@ -42,6 +42,7 @@ func TestSyncFnBodyProperties(t *testing.T) { testdataKey, db.BodyId, db.BodyRev, + "*", } // This sync function routes into channels based on top-level properties contained in doc @@ -84,6 +85,7 @@ func TestSyncFnBodyPropertiesTombstone(t *testing.T) { db.BodyId, db.BodyRev, db.BodyDeleted, + "*", } // This sync function routes into channels based on top-level properties contained in doc @@ -131,6 +133,7 @@ func TestSyncFnOldDocBodyProperties(t *testing.T) { expectedProperties := []string{ testdataKey, db.BodyId, + "*", } // This sync function routes into channels based on top-level properties contained in oldDoc @@ -180,6 +183,7 @@ func TestSyncFnOldDocBodyPropertiesTombstoneResurrect(t *testing.T) { testdataKey, db.BodyId, db.BodyDeleted, + "*", } // This sync function routes into channels based on top-level properties contained in oldDoc diff --git a/rest/user_api_test.go b/rest/user_api_test.go index 5b79267e61..28c95ae457 100644 --- a/rest/user_api_test.go +++ b/rest/user_api_test.go @@ -1133,6 +1133,8 @@ func TestFunkyUsernames(t *testing.T) { ctx := rt.Context() a := rt.ServerContext().Database(ctx, "db").Authenticator(ctx) + isAllDocsIndexExist := rt.ServerContext().IsAllDocsIndexExistFor(rt.GetDatabase().Name) + // Create a test user user, err := a.NewUser(tc.UserName, "letmein", channels.BaseSetOf(t, "foo")) require.NoError(t, err) @@ -1141,8 +1143,10 @@ func TestFunkyUsernames(t *testing.T) { response := rt.SendUserRequest("PUT", "/{{.keyspace}}/AC+DC", `{"foo":"bar", "channels": ["foo"]}`, tc.UserName) RequireStatus(t, response, 201) - response = rt.SendUserRequest("GET", "/{{.keyspace}}/_all_docs", "", tc.UserName) - RequireStatus(t, response, 200) + if isAllDocsIndexExist { + response = rt.SendUserRequest("GET", "/{{.keyspace}}/_all_docs", "", tc.UserName) + RequireStatus(t, response, 200) + } response = rt.SendUserRequest("GET", "/{{.keyspace}}/AC+DC", "", tc.UserName) RequireStatus(t, response, 200)