Skip to content

Commit

Permalink
Fix memory corruption bug in impl/memory
Browse files Browse the repository at this point in the history
When you `SetCollection` in gkvlite, it invalidates the old
collection, but keeps the Items of the previous collection. The way
that impl/memory was using SetCollection was more along the lines of
GetOrCreate, which could cause corruption or other issues if two
codepaths did SetCollection simultaneously.

This fixes the corruption bug by removing SetCollection from the
memStore interface and replaces it with a GetOrCreateCollection,
which checks for the collection, and only calls SetCollection if it
doesn't exist. As a bonus, this seems to give a slight performance
speedup over the previous broken code.

Additionally, this forces all uses of VisitItemsAscend to use
a Snapshot, to avoid accidental modification of a Collection while
its being iterated over, which can lead to the situation discussed
above, or other nasty errors.

Finally, this adds an optional 'gkvlite tracer' functionality
which came in (extremely) handy for finding this bug. When given
a folder, it will emit go-code-like invocations of the functions run
on every collection, and includes enough information to reveal which
functions are being called on the root collection v. snapshots, and
if there are any overlapping VisitItemsAscend invocations, etc.

[email protected]
BUG=550684

Review URL: https://codereview.chromium.org/1911263002
  • Loading branch information
riannucci authored and Commit bot committed Apr 22, 2016
1 parent 8304c6e commit e6d5b59
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 109 deletions.
2 changes: 1 addition & 1 deletion impl/memory/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *dsImpl) TakeIndexSnapshot() ds.TestingSnapshot {
}

func (d *dsImpl) SetIndexSnapshot(snap ds.TestingSnapshot) {
d.data.setSnapshot(snap.(*memStore))
d.data.setSnapshot(snap.(memStore))
}

func (d *dsImpl) CatchupIndexes() {
Expand Down
47 changes: 19 additions & 28 deletions impl/memory/datastore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ type dataStoreData struct {
aid string

// See README.md for head schema.
head *memStore
head memStore
// if snap is nil, that means that this is always-consistent, and
// getQuerySnaps will return (head, head)
snap *memStore
snap memStore
// For testing, see SetTransactionRetryCount.
txnFakeRetry int
// true means that queries with insufficent indexes will pause to add them
Expand Down Expand Up @@ -122,7 +122,7 @@ func (d *dataStoreData) getDisableSpecialEntities() bool {
return d.disableSpecialEntities
}

func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head memStore) {
d.rwlock.RLock()
defer d.rwlock.RUnlock()
if d.snap == nil {
Expand All @@ -140,13 +140,13 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
return
}

func (d *dataStoreData) takeSnapshot() *memStore {
func (d *dataStoreData) takeSnapshot() memStore {
d.rwlock.RLock()
defer d.rwlock.RUnlock()
return d.head.Snapshot()
}

func (d *dataStoreData) setSnapshot(snap *memStore) {
func (d *dataStoreData) setSnapshot(snap memStore) {
d.rwlock.Lock()
defer d.rwlock.Unlock()
if d.snap == nil {
Expand Down Expand Up @@ -187,7 +187,7 @@ func rootIDsKey(kind string) []byte {
return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
}

func curVersion(ents *memCollection, key []byte) int64 {
func curVersion(ents memCollection, key []byte) int64 {
if ents != nil {
if v := ents.Get(key); v != nil {
pm, err := rpm(v)
Expand All @@ -204,7 +204,7 @@ func curVersion(ents *memCollection, key []byte) int64 {
return 0
}

func incrementLocked(ents *memCollection, key []byte, amt int) int64 {
func incrementLocked(ents memCollection, key []byte, amt int) int64 {
if amt <= 0 {
panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", amt))
}
Expand All @@ -215,24 +215,15 @@ func incrementLocked(ents *memCollection, key []byte, amt int) int64 {
return ret
}

func (d *dataStoreData) mutableEntsLocked(ns string) *memCollection {
coll := "ents:" + ns
ents := d.head.GetCollection(coll)
if ents == nil {
ents = d.head.SetCollection(coll, nil)
}
return ents
}

func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) {
d.Lock()
defer d.Unlock()

ents := d.mutableEntsLocked(incomplete.Namespace())
ents := d.head.GetOrCreateCollection("ents:" + incomplete.Namespace())
return d.allocateIDsLocked(ents, incomplete, n)
}

func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Key, n int) (int64, error) {
func (d *dataStoreData) allocateIDsLocked(ents memCollection, incomplete *ds.Key, n int) (int64, error) {
if d.disableSpecialEntities {
return 0, errors.New("disableSpecialEntities is true so allocateIDs is disabled")
}
Expand All @@ -246,7 +237,7 @@ func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Ke
return incrementLocked(ents, idKey, n), nil
}

func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) (*ds.Key, error) {
func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, error) {
if key.Incomplete() {
id, err := d.allocateIDsLocked(ents, key, 1)
if err != nil {
Expand All @@ -268,7 +259,7 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
d.Lock()
defer d.Unlock()

ents := d.mutableEntsLocked(ns)
ents := d.head.GetOrCreateCollection("ents:" + ns)

ret, err = d.fixKeyLocked(ents, k)
if err != nil {
Expand Down Expand Up @@ -298,7 +289,7 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
return nil
}

func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollection, error)) error {
func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (memCollection, error)) error {
ents, err := getColl()
if err != nil {
return err
Expand All @@ -322,7 +313,7 @@ func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect
}

func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
return getMultiInner(keys, cb, func() (*memCollection, error) {
return getMultiInner(keys, cb, func() (memCollection, error) {
s := d.takeSnapshot()

return s.GetCollection("ents:" + keys[0].Namespace()), nil
Expand All @@ -335,7 +326,7 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
hasEntsInNS := func() bool {
d.Lock()
defer d.Unlock()
return d.mutableEntsLocked(ns) != nil
return d.head.GetOrCreateCollection("ents:"+ns) != nil
}()

if hasEntsInNS {
Expand All @@ -346,7 +337,7 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
d.Lock()
defer d.Unlock()

ents := d.mutableEntsLocked(ns)
ents := d.head.GetOrCreateCollection("ents:" + ns)

if !d.disableSpecialEntities {
incrementLocked(ents, groupMetaKey(k), 1)
Expand Down Expand Up @@ -452,7 +443,7 @@ type txnDataStoreData struct {
closed int32
isXG bool

snap *memStore
snap memStore

// string is the raw-bytes encoding of the entity root incl. namespace
muts map[string][]txnMutation
Expand Down Expand Up @@ -533,7 +524,7 @@ func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d
err := func() (err error) {
td.parent.Lock()
defer td.parent.Unlock()
ents := td.parent.mutableEntsLocked(ns)
ents := td.parent.head.GetOrCreateCollection("ents:" + ns)

k, err = td.parent.fixKeyLocked(ents, k)
return
Expand All @@ -548,7 +539,7 @@ func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d
}

func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
return getMultiInner(keys, cb, func() (*memCollection, error) {
return getMultiInner(keys, cb, func() (memCollection, error) {
err := error(nil)
for _, key := range keys {
err = td.writeMutation(true, key, nil)
Expand Down Expand Up @@ -579,7 +570,7 @@ func rpm(data []byte) (ds.PropertyMap, error) {
serialize.WithContext, "", "")
}

func namespaces(store *memStore) []string {
func namespaces(store memStore) []string {
var namespaces []string
for _, c := range store.GetCollectionNames() {
ns, has := trimPrefix(c, "ents:")
Expand Down
31 changes: 16 additions & 15 deletions impl/memory/datastore_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
return ret
}

func indexEntriesWithBuiltins(k *ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore {
func indexEntriesWithBuiltins(k *ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) memStore {
sip := serialize.PropertyMapPartially(k, pm)
return indexEntries(sip, k.Namespace(), append(defaultIndexes(k.Kind(), pm), complexIdxs...))
}
Expand Down Expand Up @@ -132,17 +132,17 @@ func (m *matcher) match(sortBy []ds.IndexColumn, sip serialize.SerializedPmap) (
return m.buf, true
}

func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefinition) *memStore {
func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefinition) memStore {
ret := newMemStore()
idxColl := ret.SetCollection("idx", nil)
idxColl := ret.GetOrCreateCollection("idx")

mtch := matcher{}
for _, idx := range idxs {
idx = idx.Normalize()
if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
idxBin := serialize.ToBytes(*idx.PrepForIdxTable())
idxColl.Set(idxBin, []byte{})
coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil)
coll := ret.GetOrCreateCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin))
irg.permute(coll.Set)
}
}
Expand All @@ -153,7 +153,7 @@ func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefin
// walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
// is provided, this will only walk over compound indexes which match
// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
func walkCompIdxs(store memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
idxColl := store.GetCollection("idx")
if idxColl == nil {
return
Expand Down Expand Up @@ -182,17 +182,18 @@ func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.Ind
}
}

func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
func mergeIndexes(ns string, store, oldIdx, newIdx memStore) {
prefixBuf := []byte("idx:" + ns + ":")
origPrefixBufLen := len(prefixBuf)

oldIdx = oldIdx.Snapshot()
newIdx = newIdx.Snapshot()

gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
ks := string(prefixBuf)

coll := store.GetCollection(ks)
if coll == nil {
coll = store.SetCollection(ks, nil)
}
coll := store.GetOrCreateCollection(ks)

oldColl := oldIdx.GetCollection(ks)
newColl := newIdx.GetCollection(ks)
Expand Down Expand Up @@ -224,16 +225,16 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
})
}

func addIndexes(store *memStore, aid string, compIdx []*ds.IndexDefinition) {
func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) {
normalized := make([]*ds.IndexDefinition, len(compIdx))
idxColl := store.SetCollection("idx", nil)
idxColl := store.GetOrCreateCollection("idx")
for i, idx := range compIdx {
normalized[i] = idx.Normalize()
idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
}

for _, ns := range namespaces(store) {
if allEnts := store.GetCollection("ents:" + ns); allEnts != nil {
if allEnts := store.Snapshot().GetCollection("ents:" + ns); allEnts != nil {
allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
pm, err := rpm(i.Val)
memoryCorruption(err)
Expand All @@ -254,10 +255,10 @@ func addIndexes(store *memStore, aid string, compIdx []*ds.IndexDefinition) {
}
}

func updateIndexes(store *memStore, key *ds.Key, oldEnt, newEnt ds.PropertyMap) {
func updateIndexes(store memStore, key *ds.Key, oldEnt, newEnt ds.PropertyMap) {
// load all current complex query index definitions.
compIdx := []*ds.IndexDefinition{}
walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool {
walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool {
compIdx = append(compIdx, i)
return true
})
Expand Down
8 changes: 4 additions & 4 deletions impl/memory/datastore_index_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type indexDefinitionSortable struct {
// redundant columns! (e.g. (tag, tag) is a perfectly valid prefix, becuase
// (tag=1, tag=2) is a perfectly valid query).
eqFilts []ds.IndexColumn
coll *memCollection
coll memCollection
}

func (i *indexDefinitionSortable) hasAncestor() bool {
Expand Down Expand Up @@ -131,7 +131,7 @@ func (idxs indexDefinitionSortableSlice) Less(i, j int) bool {
// If the proposed index is PERFECT (e.g. contains enough columns to cover all
// equality filters, and also has the correct suffix), idxs will be replaced
// with JUST that index, and this will return true.
func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s *memStore, missingTerms stringset.Set, id *ds.IndexDefinition) bool {
func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s memStore, missingTerms stringset.Set, id *ds.IndexDefinition) bool {
// Kindless queries are handled elsewhere.
if id.Kind != q.kind {
impossible(
Expand Down Expand Up @@ -230,7 +230,7 @@ func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s
// getRelevantIndexes retrieves the relevant indexes which could be used to
// service q. It returns nil if it's not possible to service q with the current
// indexes.
func getRelevantIndexes(q *reducedQuery, s *memStore) (indexDefinitionSortableSlice, error) {
func getRelevantIndexes(q *reducedQuery, s memStore) (indexDefinitionSortableSlice, error) {
missingTerms := stringset.New(len(q.eqFilters))
for k := range q.eqFilters {
if k == "__ancestor__" {
Expand Down Expand Up @@ -468,7 +468,7 @@ func calculateConstraints(q *reducedQuery) *constraints {

// getIndexes returns a set of iterator definitions. Iterating over these
// will result in matching suffixes.
func getIndexes(q *reducedQuery, s *memStore) ([]*iterDefinition, error) {
func getIndexes(q *reducedQuery, s memStore) ([]*iterDefinition, error) {
relevantIdxs := indexDefinitionSortableSlice(nil)
if q.kind == "" {
if coll := s.GetCollection("ents:" + q.ns); coll != nil {
Expand Down
8 changes: 4 additions & 4 deletions impl/memory/datastore_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func TestIndexEntries(t *testing.T) {
}

Convey(tc.name, func() {
store := (*memStore)(nil)
store := (memStore)(nil)
if tc.withBuiltin {
store = indexEntriesWithBuiltins(fakeKey, tc.pmap, tc.idxs)
} else {
Expand All @@ -252,7 +252,7 @@ func TestIndexEntries(t *testing.T) {
}
for colName, vals := range tc.collections {
i := 0
coll := store.GetCollection(colName)
coll := store.Snapshot().GetCollection(colName)
numItems, _ := coll.GetTotals()
So(numItems, ShouldEqual, len(tc.collections[colName]))
coll.VisitItemsAscend(nil, true, func(itm *gkvlite.Item) bool {
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestUpdateIndexes(t *testing.T) {
for _, tc := range updateIndexesTests {
Convey(tc.name, func() {
store := newMemStore()
idxColl := store.SetCollection("idx", nil)
idxColl := store.GetOrCreateCollection("idx")
for _, i := range tc.idxs {
idxColl.Set(cat(i.PrepForIdxTable()), []byte{})
}
Expand All @@ -369,7 +369,7 @@ func TestUpdateIndexes(t *testing.T) {
tmpLoader = nil

for colName, data := range tc.expected {
coll := store.GetCollection(colName)
coll := store.Snapshot().GetCollection(colName)
So(coll, ShouldNotBeNil)
i := 0
coll.VisitItemsAscend(nil, false, func(itm *gkvlite.Item) bool {
Expand Down
12 changes: 6 additions & 6 deletions impl/memory/datastore_query_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ type normalStrategy struct {

aid string
ns string
head *memCollection
head memCollection
dedup stringset.Set
}

func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head memStore) queryStrategy {
coll := head.GetCollection("ents:" + ns)
if coll == nil {
return nil
Expand All @@ -128,7 +128,7 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
return s.cb(key, pm, gc)
}

func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy {
if fq.KeysOnly() {
return &keysOnlyStrategy{cb, stringset.New(0)}
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
return
}

func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore) (ret int64, err error) {
func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head memStore) (ret int64, err error) {
if len(fq.Project()) == 0 && !fq.KeysOnly() {
fq, err = fq.Original().KeysOnly(true).Finalize()
if err != nil {
Expand All @@ -179,7 +179,7 @@ func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me
return
}

func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head *memStore, cb ds.RawRunCB) error {
func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head memStore, cb ds.RawRunCB) error {
// these objects have no properties, so any filters on properties cause an
// empty result.
if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 {
Expand Down Expand Up @@ -223,7 +223,7 @@ func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head *memStore, cb
return nil
}

func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head memStore, cb ds.RawRunCB) error {
rq, err := reduce(fq, aid, ns, isTxn)
if err == ds.ErrNullQuery {
return nil
Expand Down
Loading

0 comments on commit e6d5b59

Please sign in to comment.