diff --git a/cassandra/prefix_switch.go b/cassandra/prefix_switch.go index 2a6fac1..06aa65a 100644 --- a/cassandra/prefix_switch.go +++ b/cassandra/prefix_switch.go @@ -48,6 +48,7 @@ type PrefixSwitcher struct { lock sync.RWMutex currentTrieMap map[string]string cstarEnabled bool + bdbEnabled bool } func (s PrefixSwitchStatus) IsReadOnBeansdb() bool { @@ -157,16 +158,16 @@ func GetPrefixSwitchTrieFromCfg( } } -func NewPrefixSwitcher(config *config.CassandraStoreCfg, cqlStore *CassandraStore) (*PrefixSwitcher, error) { +func NewPrefixSwitcher(cfg *config.ProxyConfig, cqlStore *CassandraStore) (*PrefixSwitcher, error) { f := new(PrefixSwitcher) - if !config.Enable { + if !cfg.CassandraStoreCfg.Enable { f.defaultT = PrefixSwitchBrw f.cstarEnabled = false return f, nil } - prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(config, cqlStore) + prefixTrie, nowMap, err := GetPrefixSwitchTrieFromCfg(&cfg.CassandraStoreCfg, cqlStore) if err != nil { return nil, err } @@ -174,13 +175,14 @@ func NewPrefixSwitcher(config *config.CassandraStoreCfg, cqlStore *CassandraStor f.trie = prefixTrie f.cstarEnabled = true - defaultS, err := strToSwitchStatus(config.SwitchToKeyDefault) + defaultS, err := strToSwitchStatus(cfg.SwitchToKeyDefault) if err != nil { return nil, err } f.defaultT = defaultS f.currentTrieMap = nowMap + f.bdbEnabled = cfg.DStoreConfig.Enable return f, nil } @@ -215,6 +217,10 @@ func (s *PrefixSwitcher) matchStatus(key string) PrefixSwitchStatus { } func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus { + if !s.bdbEnabled && s.cstarEnabled { + return PrefixSwitchCrw + } + if !s.cstarEnabled { return PrefixSwitchBrw } @@ -226,15 +232,25 @@ func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus { // check key prefix and return bdb read enable c* read enable func (s *PrefixSwitcher) ReadEnabledOn(key string) (bool, bool) { + if !s.bdbEnabled && s.cstarEnabled { + return false, true + } + if !s.cstarEnabled { return true, false } + status := s.GetStatus(key) return status.IsReadOnBeansdb(), status.IsReadOnCstar() } // check keys prefix list and return bdb read keys and c* read keys func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys []string) { + if !s.bdbEnabled && s.cstarEnabled { + ckeys = keys + return + } + if !s.cstarEnabled { bkeys = keys return @@ -261,6 +277,10 @@ func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys // check key prefix and return bdb write enable c* write enable func (s *PrefixSwitcher) WriteEnabledOn(key string) (bool, bool) { + if !s.bdbEnabled && s.cstarEnabled { + return false, true + } + if !s.cstarEnabled { return true, false } diff --git a/dstore/store.go b/dstore/store.go index 639cdd8..d7970d8 100644 --- a/dstore/store.go +++ b/dstore/store.go @@ -44,7 +44,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error { s.cstar = cstar - switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, cstar) + switcher, err := cassandra.NewPrefixSwitcher(proxyConf, cstar) if err != nil { return err } @@ -63,7 +63,7 @@ func (s *Storage) InitStorageEngine(pCfg *config.ProxyConfig) error { s.dualWErrHandler = dualWErrHandler logger.Infof("dual write log send to: %s", s.dualWErrHandler.EFile) } else { - switcher, err := cassandra.NewPrefixSwitcher(&proxyConf.CassandraStoreCfg, nil) + switcher, err := cassandra.NewPrefixSwitcher(proxyConf, nil) if err != nil { return err } @@ -269,14 +269,18 @@ func (c *StorageClient) getMulti(keys []string) (rs map[string]*mc.Item, targets } func (c *StorageClient) GetMulti(keys []string) (rs map[string]*mc.Item, err error) { + // The keys args MUST BE DEDUPLICATED, if not, there will be MEMORY LEAK + keys = deduplicateKeys(keys) + timer := prometheus.NewTimer( cmdE2EDurationSeconds.WithLabelValues("getm"), ) defer timer.ObserveDuration() bkeys, ckeys := c.pswitcher.ReadEnableOnKeys(keys) + rs = make(map[string]*mc.Item, len(keys)) - + if len(bkeys) > 0 { totalReqs.WithLabelValues("getm", "beansdb").Inc() c.sched = GetScheduler() diff --git a/dstore/utils.go b/dstore/utils.go new file mode 100644 index 0000000..3018c62 --- /dev/null +++ b/dstore/utils.go @@ -0,0 +1,21 @@ +package dstore + +func deduplicateKeys(keys []string) []string { + dedup := make(map[string]struct{}, len(keys)) + + for _, k := range keys { + if _, ok := dedup[k]; ok { + continue + } else { + dedup[k] = struct{}{} + } + } + + dedupKs := make([]string, len(dedup)) + i := 0 + for k := range dedup { + dedupKs[i] = k + i++ + } + return dedupKs +} diff --git a/dstore/utils_test.go b/dstore/utils_test.go new file mode 100644 index 0000000..46370f3 --- /dev/null +++ b/dstore/utils_test.go @@ -0,0 +1,38 @@ +package dstore + +import ( + "fmt" + "testing" +) + +func TestDeduplicateKeys(t *testing.T) { + test := []string{"a", "b", "c", "d", "a"} + dtk := deduplicateKeys(test) + if (len(dtk) != 4) { + t.Errorf("string slice should be deduplicated: %s", dtk) + } + test2 := []string{"a", "n"} + dtk2 := deduplicateKeys(test2) + if (len(dtk2) != 2) { + t.Errorf("string slice %s has no duplications", test2) + } + t.Logf("after dedup: %s | %s", dtk, dtk2) +} + +func BenchmarkDeduplicateKeys(b *testing.B) { + test := []string{ + "/frodo_feed/title_vecs/3055:4601087161", + "/frodo_feed/title_vecs/3055:4601087162", + "/frodo_feed/title_vecs/3055:4601087161", + "/frodo_feed/title_vecs/3055:4601087165", + "/frodo_feed/title_vecs/3055:4601087161", + } + + for j := 0; j < 200; j++ { + test = append(test, fmt.Sprintf("/frodo_feed/title_vecs/3055:460108716%d", j)) + } + + for i := 0; i < b.N; i++ { + deduplicateKeys(test) + } +}