diff --git a/impl/memory/datastore.go b/impl/memory/datastore.go index 6d07293..8001295 100644 --- a/impl/memory/datastore.go +++ b/impl/memory/datastore.go @@ -138,7 +138,7 @@ func (d *dsImpl) TakeIndexSnapshot() ds.TestingSnapshot { } func (d *dsImpl) SetIndexSnapshot(snap ds.TestingSnapshot) { - d.data.setSnapshot(snap.(*memStore)) + d.data.setSnapshot(snap.(memStore)) } func (d *dsImpl) CatchupIndexes() { diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go index ff06a47..94a8688 100644 --- a/impl/memory/datastore_data.go +++ b/impl/memory/datastore_data.go @@ -26,10 +26,10 @@ type dataStoreData struct { aid string // See README.md for head schema. - head *memStore + head memStore // if snap is nil, that means that this is always-consistent, and // getQuerySnaps will return (head, head) - snap *memStore + snap memStore // For testing, see SetTransactionRetryCount. txnFakeRetry int // true means that queries with insufficent indexes will pause to add them @@ -122,7 +122,7 @@ func (d *dataStoreData) getDisableSpecialEntities() bool { return d.disableSpecialEntities } -func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { +func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head memStore) { d.rwlock.RLock() defer d.rwlock.RUnlock() if d.snap == nil { @@ -140,13 +140,13 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { return } -func (d *dataStoreData) takeSnapshot() *memStore { +func (d *dataStoreData) takeSnapshot() memStore { d.rwlock.RLock() defer d.rwlock.RUnlock() return d.head.Snapshot() } -func (d *dataStoreData) setSnapshot(snap *memStore) { +func (d *dataStoreData) setSnapshot(snap memStore) { d.rwlock.Lock() defer d.rwlock.Unlock() if d.snap == nil { @@ -187,7 +187,7 @@ func rootIDsKey(kind string) []byte { return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil)) } -func curVersion(ents *memCollection, key []byte) int64 { +func curVersion(ents memCollection, key []byte) int64 { if ents != nil { if v := ents.Get(key); v != nil { pm, err := rpm(v) @@ -204,7 +204,7 @@ func curVersion(ents *memCollection, key []byte) int64 { return 0 } -func incrementLocked(ents *memCollection, key []byte, amt int) int64 { +func incrementLocked(ents memCollection, key []byte, amt int) int64 { if amt <= 0 { panic(fmt.Errorf("incrementLocked called with bad `amt`: %d", amt)) } @@ -215,24 +215,15 @@ func incrementLocked(ents *memCollection, key []byte, amt int) int64 { return ret } -func (d *dataStoreData) mutableEntsLocked(ns string) *memCollection { - coll := "ents:" + ns - ents := d.head.GetCollection(coll) - if ents == nil { - ents = d.head.SetCollection(coll, nil) - } - return ents -} - func (d *dataStoreData) allocateIDs(incomplete *ds.Key, n int) (int64, error) { d.Lock() defer d.Unlock() - ents := d.mutableEntsLocked(incomplete.Namespace()) + ents := d.head.GetOrCreateCollection("ents:" + incomplete.Namespace()) return d.allocateIDsLocked(ents, incomplete, n) } -func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Key, n int) (int64, error) { +func (d *dataStoreData) allocateIDsLocked(ents memCollection, incomplete *ds.Key, n int) (int64, error) { if d.disableSpecialEntities { return 0, errors.New("disableSpecialEntities is true so allocateIDs is disabled") } @@ -246,7 +237,7 @@ func (d *dataStoreData) allocateIDsLocked(ents *memCollection, incomplete *ds.Ke return incrementLocked(ents, idKey, n), nil } -func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) (*ds.Key, error) { +func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key, error) { if key.Incomplete() { id, err := d.allocateIDsLocked(ents, key, 1) if err != nil { @@ -268,7 +259,7 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu d.Lock() defer d.Unlock() - ents := d.mutableEntsLocked(ns) + ents := d.head.GetOrCreateCollection("ents:" + ns) ret, err = d.fixKeyLocked(ents, k) if err != nil { @@ -298,7 +289,7 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu return nil } -func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollection, error)) error { +func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (memCollection, error)) error { ents, err := getColl() if err != nil { return err @@ -322,7 +313,7 @@ func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect } func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { - return getMultiInner(keys, cb, func() (*memCollection, error) { + return getMultiInner(keys, cb, func() (memCollection, error) { s := d.takeSnapshot() return s.GetCollection("ents:" + keys[0].Namespace()), nil @@ -335,7 +326,7 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { hasEntsInNS := func() bool { d.Lock() defer d.Unlock() - return d.mutableEntsLocked(ns) != nil + return d.head.GetOrCreateCollection("ents:"+ns) != nil }() if hasEntsInNS { @@ -346,7 +337,7 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error { d.Lock() defer d.Unlock() - ents := d.mutableEntsLocked(ns) + ents := d.head.GetOrCreateCollection("ents:" + ns) if !d.disableSpecialEntities { incrementLocked(ents, groupMetaKey(k), 1) @@ -452,7 +443,7 @@ type txnDataStoreData struct { closed int32 isXG bool - snap *memStore + snap memStore // string is the raw-bytes encoding of the entity root incl. namespace muts map[string][]txnMutation @@ -533,7 +524,7 @@ func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d err := func() (err error) { td.parent.Lock() defer td.parent.Unlock() - ents := td.parent.mutableEntsLocked(ns) + ents := td.parent.head.GetOrCreateCollection("ents:" + ns) k, err = td.parent.fixKeyLocked(ents, k) return @@ -548,7 +539,7 @@ func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d } func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { - return getMultiInner(keys, cb, func() (*memCollection, error) { + return getMultiInner(keys, cb, func() (memCollection, error) { err := error(nil) for _, key := range keys { err = td.writeMutation(true, key, nil) @@ -579,7 +570,7 @@ func rpm(data []byte) (ds.PropertyMap, error) { serialize.WithContext, "", "") } -func namespaces(store *memStore) []string { +func namespaces(store memStore) []string { var namespaces []string for _, c := range store.GetCollectionNames() { ns, has := trimPrefix(c, "ents:") diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go index cbd1f36..867214c 100644 --- a/impl/memory/datastore_index.go +++ b/impl/memory/datastore_index.go @@ -43,7 +43,7 @@ func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { return ret } -func indexEntriesWithBuiltins(k *ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore { +func indexEntriesWithBuiltins(k *ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) memStore { sip := serialize.PropertyMapPartially(k, pm) return indexEntries(sip, k.Namespace(), append(defaultIndexes(k.Kind(), pm), complexIdxs...)) } @@ -132,9 +132,9 @@ func (m *matcher) match(sortBy []ds.IndexColumn, sip serialize.SerializedPmap) ( return m.buf, true } -func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefinition) *memStore { +func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefinition) memStore { ret := newMemStore() - idxColl := ret.SetCollection("idx", nil) + idxColl := ret.GetOrCreateCollection("idx") mtch := matcher{} for _, idx := range idxs { @@ -142,7 +142,7 @@ func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefin if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok { idxBin := serialize.ToBytes(*idx.PrepForIdxTable()) idxColl.Set(idxBin, []byte{}) - coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil) + coll := ret.GetOrCreateCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin)) irg.permute(coll.Set) } } @@ -153,7 +153,7 @@ func indexEntries(sip serialize.SerializedPmap, ns string, idxs []*ds.IndexDefin // walkCompIdxs walks the table of compound indexes in the store. If `endsWith` // is provided, this will only walk over compound indexes which match // Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix. -func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) { +func walkCompIdxs(store memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) { idxColl := store.GetCollection("idx") if idxColl == nil { return @@ -182,17 +182,18 @@ func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.Ind } } -func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { +func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { prefixBuf := []byte("idx:" + ns + ":") origPrefixBufLen := len(prefixBuf) + + oldIdx = oldIdx.Snapshot() + newIdx = newIdx.Snapshot() + gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) ks := string(prefixBuf) - coll := store.GetCollection(ks) - if coll == nil { - coll = store.SetCollection(ks, nil) - } + coll := store.GetOrCreateCollection(ks) oldColl := oldIdx.GetCollection(ks) newColl := newIdx.GetCollection(ks) @@ -224,16 +225,16 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { }) } -func addIndexes(store *memStore, aid string, compIdx []*ds.IndexDefinition) { +func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { normalized := make([]*ds.IndexDefinition, len(compIdx)) - idxColl := store.SetCollection("idx", nil) + idxColl := store.GetOrCreateCollection("idx") for i, idx := range compIdx { normalized[i] = idx.Normalize() idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{}) } for _, ns := range namespaces(store) { - if allEnts := store.GetCollection("ents:" + ns); allEnts != nil { + if allEnts := store.Snapshot().GetCollection("ents:" + ns); allEnts != nil { allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { pm, err := rpm(i.Val) memoryCorruption(err) @@ -254,10 +255,10 @@ func addIndexes(store *memStore, aid string, compIdx []*ds.IndexDefinition) { } } -func updateIndexes(store *memStore, key *ds.Key, oldEnt, newEnt ds.PropertyMap) { +func updateIndexes(store memStore, key *ds.Key, oldEnt, newEnt ds.PropertyMap) { // load all current complex query index definitions. compIdx := []*ds.IndexDefinition{} - walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool { + walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { compIdx = append(compIdx, i) return true }) diff --git a/impl/memory/datastore_index_selection.go b/impl/memory/datastore_index_selection.go index 9ee0e9a..89b0927 100644 --- a/impl/memory/datastore_index_selection.go +++ b/impl/memory/datastore_index_selection.go @@ -70,7 +70,7 @@ type indexDefinitionSortable struct { // redundant columns! (e.g. (tag, tag) is a perfectly valid prefix, becuase // (tag=1, tag=2) is a perfectly valid query). eqFilts []ds.IndexColumn - coll *memCollection + coll memCollection } func (i *indexDefinitionSortable) hasAncestor() bool { @@ -131,7 +131,7 @@ func (idxs indexDefinitionSortableSlice) Less(i, j int) bool { // If the proposed index is PERFECT (e.g. contains enough columns to cover all // equality filters, and also has the correct suffix), idxs will be replaced // with JUST that index, and this will return true. -func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s *memStore, missingTerms stringset.Set, id *ds.IndexDefinition) bool { +func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s memStore, missingTerms stringset.Set, id *ds.IndexDefinition) bool { // Kindless queries are handled elsewhere. if id.Kind != q.kind { impossible( @@ -230,7 +230,7 @@ func (idxs *indexDefinitionSortableSlice) maybeAddDefinition(q *reducedQuery, s // getRelevantIndexes retrieves the relevant indexes which could be used to // service q. It returns nil if it's not possible to service q with the current // indexes. -func getRelevantIndexes(q *reducedQuery, s *memStore) (indexDefinitionSortableSlice, error) { +func getRelevantIndexes(q *reducedQuery, s memStore) (indexDefinitionSortableSlice, error) { missingTerms := stringset.New(len(q.eqFilters)) for k := range q.eqFilters { if k == "__ancestor__" { @@ -468,7 +468,7 @@ func calculateConstraints(q *reducedQuery) *constraints { // getIndexes returns a set of iterator definitions. Iterating over these // will result in matching suffixes. -func getIndexes(q *reducedQuery, s *memStore) ([]*iterDefinition, error) { +func getIndexes(q *reducedQuery, s memStore) ([]*iterDefinition, error) { relevantIdxs := indexDefinitionSortableSlice(nil) if q.kind == "" { if coll := s.GetCollection("ents:" + q.ns); coll != nil { diff --git a/impl/memory/datastore_index_test.go b/impl/memory/datastore_index_test.go index 03c6cde..e83026d 100644 --- a/impl/memory/datastore_index_test.go +++ b/impl/memory/datastore_index_test.go @@ -243,7 +243,7 @@ func TestIndexEntries(t *testing.T) { } Convey(tc.name, func() { - store := (*memStore)(nil) + store := (memStore)(nil) if tc.withBuiltin { store = indexEntriesWithBuiltins(fakeKey, tc.pmap, tc.idxs) } else { @@ -252,7 +252,7 @@ func TestIndexEntries(t *testing.T) { } for colName, vals := range tc.collections { i := 0 - coll := store.GetCollection(colName) + coll := store.Snapshot().GetCollection(colName) numItems, _ := coll.GetTotals() So(numItems, ShouldEqual, len(tc.collections[colName])) coll.VisitItemsAscend(nil, true, func(itm *gkvlite.Item) bool { @@ -354,7 +354,7 @@ func TestUpdateIndexes(t *testing.T) { for _, tc := range updateIndexesTests { Convey(tc.name, func() { store := newMemStore() - idxColl := store.SetCollection("idx", nil) + idxColl := store.GetOrCreateCollection("idx") for _, i := range tc.idxs { idxColl.Set(cat(i.PrepForIdxTable()), []byte{}) } @@ -369,7 +369,7 @@ func TestUpdateIndexes(t *testing.T) { tmpLoader = nil for colName, data := range tc.expected { - coll := store.GetCollection(colName) + coll := store.Snapshot().GetCollection(colName) So(coll, ShouldNotBeNil) i := 0 coll.VisitItemsAscend(nil, false, func(itm *gkvlite.Item) bool { diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go index 776a0a7..f9b2bfa 100644 --- a/impl/memory/datastore_query_execution.go +++ b/impl/memory/datastore_query_execution.go @@ -99,11 +99,11 @@ type normalStrategy struct { aid string ns string - head *memCollection + head memCollection dedup stringset.Set } -func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStrategy { +func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head memStore) queryStrategy { coll := head.GetCollection("ents:" + ns) if coll == nil { return nil @@ -128,7 +128,7 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, return s.cb(key, pm, gc) } -func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy { +func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { if fq.KeysOnly() { return &keysOnlyStrategy{cb, stringset.New(0)} } @@ -165,7 +165,7 @@ func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c return } -func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore) (ret int64, err error) { +func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head memStore) (ret int64, err error) { if len(fq.Project()) == 0 && !fq.KeysOnly() { fq, err = fq.Original().KeysOnly(true).Finalize() if err != nil { @@ -179,7 +179,7 @@ func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me return } -func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head *memStore, cb ds.RawRunCB) error { +func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head memStore, cb ds.RawRunCB) error { // these objects have no properties, so any filters on properties cause an // empty result. if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { @@ -223,7 +223,7 @@ func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head *memStore, cb return nil } -func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { +func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { rq, err := reduce(fq, aid, ns, isTxn) if err == ds.ErrNullQuery { return nil diff --git a/impl/memory/datastore_test.go b/impl/memory/datastore_test.go index eacc070..3a17110 100644 --- a/impl/memory/datastore_test.go +++ b/impl/memory/datastore_test.go @@ -541,7 +541,7 @@ func TestCompoundIndexes(t *testing.T) { return "idx::" + string(serialize.ToBytes(*def.PrepForIdxTable())) } - numItms := func(c *memCollection) uint64 { + numItms := func(c memCollection) uint64 { ret, _ := c.GetTotals() return ret } diff --git a/impl/memory/doc.go b/impl/memory/doc.go index 0954f1a..ef89fe1 100644 --- a/impl/memory/doc.go +++ b/impl/memory/doc.go @@ -5,4 +5,14 @@ // Package memory provides an implementation of infra/gae/libs/wrapper which // backs to local memory ONLY. This is useful for unittesting, and is also used // for the nested-transaction filter implementation. +// +// Debug EnvVars +// +// To debug GKVLite memory access for a binary that uses this memory +// implementation, you may set the flag: +// -luci.gae.gkvlite_trace_folder +// to `/path/to/some/folder`. Every gkvlite memory store will be assigned +// a numbered file in that folder, and all access to that store will be logged +// to that file. Setting this to "-" will cause the trace information to dump to +// stdout. package memory diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go index 43cc463..2b5d737 100644 --- a/impl/memory/gkvlite_iter.go +++ b/impl/memory/gkvlite_iter.go @@ -14,7 +14,7 @@ import ( type iterDefinition struct { // The collection to iterate over - c *memCollection + c memCollection // The prefix to always assert for every row. A nil prefix matches every row. prefix []byte @@ -119,6 +119,10 @@ type iterator struct { } func (def *iterDefinition) mkIter() *iterator { + if !def.c.IsReadOnly() { + panic("attempting to make an iterator with r/w collection") + } + cmdChan := make(chan *cmd) ret := &iterator{ ch: cmdChan, diff --git a/impl/memory/gkvlite_iter_test.go b/impl/memory/gkvlite_iter_test.go index 936bb3f..c0accae 100644 --- a/impl/memory/gkvlite_iter_test.go +++ b/impl/memory/gkvlite_iter_test.go @@ -34,13 +34,14 @@ func TestIterator(t *testing.T) { t.Parallel() s := newMemStore() - c := s.SetCollection("zup", nil) + c := s.GetOrCreateCollection("zup") prev := []byte{} for i := 5; i < 100; i++ { data := mkNum(int64(i)) c.Set(data, prev) prev = data } + c = s.Snapshot().GetCollection("zup") get := func(c C, t *iterator) interface{} { ret := interface{}(nil) @@ -191,14 +192,16 @@ func TestMultiIteratorSimple(t *testing.T) { Convey("Test MultiIterator", t, func() { s := newMemStore() - c := s.SetCollection("zup1", nil) + c := s.GetOrCreateCollection("zup1") for _, row := range valBytes { c.Set(row, []byte{}) } - c2 := s.SetCollection("zup2", nil) + c2 := s.GetOrCreateCollection("zup2") for _, row := range otherValBytes { c2.Set(row, []byte{}) } + c = s.Snapshot().GetCollection("zup1") + c2 = s.Snapshot().GetCollection("zup2") Convey("can join the same collection twice", func() { // get just the (1, *) diff --git a/impl/memory/gkvlite_tracing_utils.go b/impl/memory/gkvlite_tracing_utils.go new file mode 100644 index 0000000..b6f7991 --- /dev/null +++ b/impl/memory/gkvlite_tracing_utils.go @@ -0,0 +1,198 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package memory + +import ( + "encoding/hex" + "flag" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "sync/atomic" + + "github.com/luci/gkvlite" +) + +var logMemCollectionFolder = flag.String( + "luci.gae.gkvlite_trace_folder", "", + "Set to a folder path to enable debugging traces to be dumped there. Set to '-' to dump to stdout.") +var logMemCollectionFolderTmp string +var logMemCollectionOnce sync.Once +var logMemCounter uint32 +var logMemNameKey = "holds a string indicating the GKVLiteDebuggingTraceName" +var stdoutLock sync.Mutex + +func wrapTracingMemStore(store memStore) memStore { + var writer traceWriter + logNum := atomic.AddUint32(&logMemCounter, 1) - 1 + collName := fmt.Sprintf("coll%d", logNum) + + if *logMemCollectionFolder == "-" { + writer = func(format string, a ...interface{}) { + stdoutLock.Lock() + defer stdoutLock.Unlock() + fmt.Printf(format+"\n", a...) + } + } else { + logMemCollectionOnce.Do(func() { + var err error + logMemCollectionFolderTmp, err = ioutil.TempDir(*logMemCollectionFolder, "luci-gae-gkvlite_trace") + if err != nil { + panic(err) + } + logMemCollectionFolderTmp, err = filepath.Abs(logMemCollectionFolderTmp) + if err != nil { + panic(err) + } + fmt.Fprintf(os.Stderr, "Saving GKVLite trace files to %q\n", logMemCollectionFolderTmp) + }) + if logMemCollectionFolderTmp == "" { + panic("unable to create folder for tracefiles") + } + + lck := sync.Mutex{} + fname := fmt.Sprintf(filepath.Join(logMemCollectionFolderTmp, fmt.Sprintf("%d.trace", logNum))) + fil, err := os.Create(fname) + if err != nil { + panic(err) + } + writer = func(format string, a ...interface{}) { + lck.Lock() + defer lck.Unlock() + fmt.Fprintf(fil, format+"\n", a...) + } + runtime.SetFinalizer(&writer, func(_ *traceWriter) { fil.Close() }) + } + writer("%s := newMemStore()", collName) + return &tracingMemStoreImpl{store, writer, collName, 0, false} +} + +type traceWriter func(format string, a ...interface{}) + +type tracingMemStoreImpl struct { + i memStore + w traceWriter + + collName string + // for the mutable store, this is a counter that increments for every + // Snapshot, and for snapshots, this is the number of the snapshot. + snapNum uint + isSnap bool +} + +var _ memStore = (*tracingMemStoreImpl)(nil) + +func (t *tracingMemStoreImpl) ImATestingSnapshot() {} + +func (t *tracingMemStoreImpl) colWriter(action, name string) traceWriter { + ident := t.ident() + hexname := hex.EncodeToString([]byte(name)) + writer := func(format string, a ...interface{}) { + if strings.HasPrefix(format, "//") { // comment + t.w(format, a...) + } else { + t.w(fmt.Sprintf("%s_%s%s", ident, hexname, format), a...) + } + } + writer(" := %s.%s(%q)", ident, action, name) + return writer +} + +func (t *tracingMemStoreImpl) ident() string { + if t.isSnap { + return fmt.Sprintf("%s_snap%d", t.collName, t.snapNum) + } + return t.collName +} + +func (t *tracingMemStoreImpl) GetCollection(name string) memCollection { + coll := t.i.GetCollection(name) + if coll == nil { + t.w("// %s.GetCollection(%q) -> nil", t.ident(), name) + return nil + } + writer := t.colWriter("GetCollection", name) + return &tracingMemCollectionImpl{coll, writer, 0} +} + +func (t *tracingMemStoreImpl) GetCollectionNames() []string { + t.w("%s.GetCollectionNames()", t.ident()) + return t.i.GetCollectionNames() +} + +func (t *tracingMemStoreImpl) GetOrCreateCollection(name string) memCollection { + writer := t.colWriter("GetOrCreateCollection", name) + return &tracingMemCollectionImpl{t.i.GetOrCreateCollection(name), writer, 0} +} + +func (t *tracingMemStoreImpl) Snapshot() memStore { + snap := t.i.Snapshot() + if snap == t.i { + t.w("// %s.Snapshot() -> self", t.ident()) + return t + } + ret := &tracingMemStoreImpl{snap, t.w, t.collName, t.snapNum, true} + t.w("%s := %s.Snapshot()", ret.ident(), t.ident()) + t.snapNum++ + return ret +} + +func (t *tracingMemStoreImpl) IsReadOnly() bool { + return t.i.IsReadOnly() +} + +type tracingMemCollectionImpl struct { + i memCollection + w traceWriter + visitNumber uint +} + +var _ memCollection = (*tracingMemCollectionImpl)(nil) + +func (t *tracingMemCollectionImpl) Name() string { + return t.i.Name() +} + +func (t *tracingMemCollectionImpl) Delete(k []byte) bool { + t.w(".Delete(%#v)", k) + return t.i.Delete(k) +} + +func (t *tracingMemCollectionImpl) Get(k []byte) []byte { + t.w(".Get(%#v)", k) + return t.i.Get(k) +} + +func (t *tracingMemCollectionImpl) GetTotals() (numItems, numBytes uint64) { + t.w(".GetTotals()") + return t.i.GetTotals() +} + +func (t *tracingMemCollectionImpl) MinItem(withValue bool) *gkvlite.Item { + t.w(".MinItem(%t)", withValue) + return t.i.MinItem(withValue) +} + +func (t *tracingMemCollectionImpl) Set(k, v []byte) { + t.w(".Set(%#v, %#v)", k, v) + t.i.Set(k, v) +} + +func (t *tracingMemCollectionImpl) VisitItemsAscend(target []byte, withValue bool, visitor gkvlite.ItemVisitor) { + vnum := t.visitNumber + t.visitNumber++ + + t.w(".VisitItemsAscend(%#v, %t, func(i *gkvlite.Item) bool{ return true }) // BEGIN VisitItemsAscend(%d)", target, withValue, vnum) + defer t.w("// END VisitItemsAscend(%d)", vnum) + t.i.VisitItemsAscend(target, withValue, visitor) +} + +func (t *tracingMemCollectionImpl) IsReadOnly() bool { + return t.i.IsReadOnly() +} diff --git a/impl/memory/gkvlite_utils.go b/impl/memory/gkvlite_utils.go index ed7aa5c..adbc45d 100644 --- a/impl/memory/gkvlite_utils.go +++ b/impl/memory/gkvlite_utils.go @@ -9,13 +9,21 @@ import ( "runtime" "sync" + "github.com/luci/gae/service/datastore" "github.com/luci/gkvlite" ) -func gkvCollide(o, n *memCollection, f func(k, ov, nv []byte)) { +func gkvCollide(o, n memCollection, f func(k, ov, nv []byte)) { + if o != nil && !o.IsReadOnly() { + panic("old collection is r/w") + } + if n != nil && !n.IsReadOnly() { + panic("new collection is r/w") + } + // TODO(riannucci): reimplement in terms of *iterator. oldItems, newItems := make(chan *gkvlite.Item), make(chan *gkvlite.Item) - walker := func(c *memCollection, ch chan<- *gkvlite.Item, wg *sync.WaitGroup) { + walker := func(c memCollection, ch chan<- *gkvlite.Item, wg *sync.WaitGroup) { defer close(ch) defer wg.Done() if c != nil { @@ -66,78 +74,128 @@ func gkvCollide(o, n *memCollection, f func(k, ov, nv []byte)) { // This is reasonable for in-memory Store objects, since the only errors that // should occur happen with file IO on the underlying file (which of course // doesn't exist). -type memStore gkvlite.Store +type memStore interface { + datastore.TestingSnapshot -func (*memStore) ImATestingSnapshot() {} + GetCollection(name string) memCollection + GetCollectionNames() []string + GetOrCreateCollection(name string) memCollection + Snapshot() memStore -func newMemStore() *memStore { - ret, err := gkvlite.NewStore(nil) - memoryCorruption(err) - return (*memStore)(ret) + IsReadOnly() bool +} + +// memCollection is a gkvlite.Collection which will panic for anything which +// might otherwise return an error. +// +// This is reasonable for in-memory Store objects, since the only errors that +// should occur happen with file IO on the underlying file (which of course +// doesn't exist. +type memCollection interface { + Name() string + Delete(k []byte) bool + Get(k []byte) []byte + GetTotals() (numItems, numBytes uint64) + MinItem(withValue bool) *gkvlite.Item + Set(k, v []byte) + VisitItemsAscend(target []byte, withValue bool, visitor gkvlite.ItemVisitor) + + IsReadOnly() bool } -func (ms *memStore) Snapshot() *memStore { - ret := (*memStore)((*gkvlite.Store)(ms).Snapshot()) - runtime.SetFinalizer((*gkvlite.Store)(ret), func(s *gkvlite.Store) { - go s.Close() - }) +type memStoreImpl struct { + s *gkvlite.Store + ro bool +} + +var _ memStore = (*memStoreImpl)(nil) + +func (*memStoreImpl) ImATestingSnapshot() {} + +func (ms *memStoreImpl) IsReadOnly() bool { return ms.ro } + +func newMemStore() memStore { + store, err := gkvlite.NewStore(nil) + memoryCorruption(err) + ret := memStore(&memStoreImpl{store, false}) + if *logMemCollectionFolder != "" { + ret = wrapTracingMemStore(ret) + } return ret } -func (ms *memStore) MakePrivateCollection(cmp gkvlite.KeyCompare) *memCollection { - return (*memCollection)((*gkvlite.Store)(ms).MakePrivateCollection(cmp)) +func (ms *memStoreImpl) Snapshot() memStore { + if ms.ro { + return ms + } + ret := ms.s.Snapshot() + runtime.SetFinalizer(ret, func(s *gkvlite.Store) { go s.Close() }) + return &memStoreImpl{ret, true} } -func (ms *memStore) GetCollection(name string) *memCollection { - return (*memCollection)((*gkvlite.Store)(ms).GetCollection(name)) +func (ms *memStoreImpl) GetCollection(name string) memCollection { + coll := ms.s.GetCollection(name) + if coll == nil { + return nil + } + return &memCollectionImpl{coll, ms.ro} } -func (ms *memStore) SetCollection(name string, cmp gkvlite.KeyCompare) *memCollection { - return (*memCollection)((*gkvlite.Store)(ms).SetCollection(name, cmp)) +func (ms *memStoreImpl) GetOrCreateCollection(name string) memCollection { + coll := ms.GetCollection(name) + if coll == nil { + coll = &memCollectionImpl{(ms.s.SetCollection(name, nil)), ms.ro} + } + return coll } -func (ms *memStore) GetCollectionNames() []string { - return (*gkvlite.Store)(ms).GetCollectionNames() +func (ms *memStoreImpl) GetCollectionNames() []string { + return ms.s.GetCollectionNames() } -// memCollection is a gkvlite.Collection which will panic for anything which -// might otherwise return an error. -// -// This is reasonable for in-memory Store objects, since the only errors that -// should occur happen with file IO on the underlying file (which of course -// doesn't exist. -type memCollection gkvlite.Collection +type memCollectionImpl struct { + c *gkvlite.Collection + ro bool +} + +var _ memCollection = (*memCollectionImpl)(nil) -func (mc *memCollection) Get(k []byte) []byte { - ret, err := (*gkvlite.Collection)(mc).Get(k) +func (mc *memCollectionImpl) Name() string { return mc.c.Name() } +func (mc *memCollectionImpl) IsReadOnly() bool { return mc.ro } + +func (mc *memCollectionImpl) Get(k []byte) []byte { + ret, err := mc.c.Get(k) memoryCorruption(err) return ret } -func (mc *memCollection) MinItem(withValue bool) *gkvlite.Item { - ret, err := (*gkvlite.Collection)(mc).MinItem(withValue) +func (mc *memCollectionImpl) MinItem(withValue bool) *gkvlite.Item { + ret, err := mc.c.MinItem(withValue) memoryCorruption(err) return ret } -func (mc *memCollection) Set(k, v []byte) { - err := (*gkvlite.Collection)(mc).Set(k, v) +func (mc *memCollectionImpl) Set(k, v []byte) { + err := mc.c.Set(k, v) memoryCorruption(err) } -func (mc *memCollection) Delete(k []byte) bool { - ret, err := (*gkvlite.Collection)(mc).Delete(k) +func (mc *memCollectionImpl) Delete(k []byte) bool { + ret, err := mc.c.Delete(k) memoryCorruption(err) return ret } -func (mc *memCollection) VisitItemsAscend(target []byte, withValue bool, visitor gkvlite.ItemVisitor) { - err := (*gkvlite.Collection)(mc).VisitItemsAscend(target, withValue, visitor) +func (mc *memCollectionImpl) VisitItemsAscend(target []byte, withValue bool, visitor gkvlite.ItemVisitor) { + if !mc.ro { + panic("attempting to VisitItemsAscend from r/w memCollection") + } + err := mc.c.VisitItemsAscend(target, withValue, visitor) memoryCorruption(err) } -func (mc *memCollection) GetTotals() (numItems, numBytes uint64) { - numItems, numBytes, err := (*gkvlite.Collection)(mc).GetTotals() +func (mc *memCollectionImpl) GetTotals() (numItems, numBytes uint64) { + numItems, numBytes, err := mc.c.GetTotals() memoryCorruption(err) return } diff --git a/impl/memory/gkvlite_utils_test.go b/impl/memory/gkvlite_utils_test.go index af07271..1723267 100644 --- a/impl/memory/gkvlite_utils_test.go +++ b/impl/memory/gkvlite_utils_test.go @@ -95,26 +95,26 @@ var testCollisionCases = []struct { }, } -func getFilledColl(s *memStore, fill []kv) *memCollection { +func getFilledColl(fill []kv) memCollection { if fill == nil { return nil } - ret := s.MakePrivateCollection(nil) + store := newMemStore() + ret := store.GetOrCreateCollection("") for _, i := range fill { ret.Set(i.k, i.v) } - return ret + return store.Snapshot().GetCollection("") } func TestCollision(t *testing.T) { t.Parallel() Convey("Test gkvCollide", t, func() { - s := newMemStore() for _, tc := range testCollisionCases { Convey(tc.name, func() { - left := getFilledColl(s, tc.left) - right := getFilledColl(s, tc.right) + left := getFilledColl(tc.left) + right := getFilledColl(tc.right) i := 0 gkvCollide(left, right, func(key, left, right []byte) { e := tc.expect[i]