Skip to content

Commit

Permalink
Feature/address meta (#10)
Browse files Browse the repository at this point in the history
* Add meta to address

* Dont cache address in single index mode
  • Loading branch information
dantudor authored Sep 3, 2021
1 parent 0b3a6b6 commit ef190e3
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 17 deletions.
4 changes: 4 additions & 0 deletions config/mappings/address.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
},
"attempt": {
"type": "integer"
},
"meta": {
"type": "nested",
"properties": { }
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (i indexer) index(height, target uint64, option IndexOption.IndexOption) er
start := time.Now()

if option == IndexOption.SingleIndex {
i.addressIndexer.Index(height, height, txs)
i.addressIndexer.Index(height, height, txs, option)
return
}

Expand All @@ -112,7 +112,7 @@ func (i indexer) index(height, target uint64, option IndexOption.IndexOption) er
if option == IndexOption.BatchIndex {
from = height - i.elastic.GetBulkIndexSize() + 1
}
i.addressIndexer.Index(from, height, txs)
i.addressIndexer.Index(from, height, txs, option)
}
zap.L().With(
zap.Duration("elapsed", time.Since(start)),
Expand Down
36 changes: 21 additions & 15 deletions internal/service/address/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/NavExplorer/navcoind-go"
"github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache"
"github.com/NavExplorer/navexplorer-indexer-go/v2/internal/indexer/IndexOption"
"github.com/NavExplorer/navexplorer-indexer-go/v2/internal/service/block"
"github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer"
"github.com/olivere/elastic/v7"
Expand All @@ -16,13 +17,13 @@ import (

type Indexer interface {
bulkPersist(bulk *elastic.BulkService)
Index(from, to uint64, txs []explorer.BlockTransaction)
Index(from, to uint64, txs []explorer.BlockTransaction, option IndexOption.IndexOption)
ClearCache()
indexAddresses(from, to uint64)
indexMultiSigs(txs []explorer.BlockTransaction)
getAddress(hash string) explorer.Address
indexAddresses(from, to uint64, option IndexOption.IndexOption)
indexMultiSigs(txs []explorer.BlockTransaction, option IndexOption.IndexOption)
getAddress(hash string, useCache bool) explorer.Address
updateAddress(address explorer.Address, history explorer.AddressHistory) error
getAndUpdateAddress(history explorer.AddressHistory) error
getAndUpdateAddress(history explorer.AddressHistory, useCache bool) error
generateAddressHistory(start, end uint64, addresses []string) ([]explorer.AddressHistory, error)
}

Expand Down Expand Up @@ -70,7 +71,7 @@ func (i indexer) bulkPersist(bulk *elastic.BulkService) {
}
}

func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction) {
func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction, option IndexOption.IndexOption) {
if len(txs) == 0 {
return
}
Expand All @@ -80,12 +81,12 @@ func (i indexer) Index(from, to uint64, txs []explorer.BlockTransaction) {

go func() {
defer wg.Done()
i.indexAddresses(from, to)
i.indexAddresses(from, to, option)
}()

go func() {
defer wg.Done()
i.indexMultiSigs(txs)
i.indexMultiSigs(txs, option)
}()

wg.Wait()
Expand All @@ -95,7 +96,7 @@ func (i indexer) ClearCache() {
i.cache.Flush()
}

func (i indexer) indexAddresses(from, to uint64) {
func (i indexer) indexAddresses(from, to uint64, option IndexOption.IndexOption) {
txs := make([]explorer.BlockTransaction, 0)
for _, entity := range i.elastic.GetEntitiesByIndex(elastic_cache.BlockTransactionIndex.Get()) {
txs = append(txs, entity.(explorer.BlockTransaction))
Expand Down Expand Up @@ -130,7 +131,7 @@ func (i indexer) indexAddresses(from, to uint64) {
for _, history := range addressHistorys {
i.elastic.AddIndexRequest(elastic_cache.AddressHistoryIndex.Get(), history)

err := i.getAndUpdateAddress(history)
err := i.getAndUpdateAddress(history, option == IndexOption.BatchIndex)
if err != nil {
zap.L().With(
zap.Error(err),
Expand Down Expand Up @@ -168,10 +169,10 @@ func chunkAddresses(addresses []string, chunks int) [][]string {
return chunked
}

func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction) {
func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction, option IndexOption.IndexOption) {
for _, tx := range txs {
for _, multiSig := range tx.GetAllMultiSigs() {
address := i.getAddress(multiSig.Key())
address := i.getAddress(multiSig.Key(), option == IndexOption.BatchIndex)
address.MultiSig = &multiSig
if len(tx.GetAllMultiSigs()) == 0 {
continue
Expand All @@ -189,7 +190,12 @@ func (i indexer) indexMultiSigs(txs []explorer.BlockTransaction) {
}
}

func (i indexer) getAddress(hash string) explorer.Address {
func (i indexer) getAddress(hash string, useCache bool) explorer.Address {
if !useCache {
address := i.addressRepository.GetOrCreateAddress(hash)
return address
}

address, exists := i.cache.Get(hash)
if exists == false {
address := i.addressRepository.GetOrCreateAddress(hash)
Expand Down Expand Up @@ -249,9 +255,9 @@ func (i indexer) generateAddressHistory(start, end uint64, addresses []string) (
return addressHistorys, nil
}

func (i indexer) getAndUpdateAddress(history explorer.AddressHistory) error {
func (i indexer) getAndUpdateAddress(history explorer.AddressHistory, useCache bool) error {
start := time.Now()
err := i.updateAddress(i.getAddress(history.Hash), history)
err := i.updateAddress(i.getAddress(history.Hash, useCache), history)

zap.L().With(
zap.Duration("elapsed", time.Since(start)),
Expand Down
2 changes: 2 additions & 0 deletions internal/service/address/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (r repository) CreateAddress(hash string, createdBlock uint64, createdTime
}

func (r repository) GetOrCreateAddress(hash string) explorer.Address {
zap.L().With(zap.String("hash", hash)).Info("AddressRepository: GetOrCreateAddress")

result, err := r.elastic.GetClient().
Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewTermQuery("hash.keyword", hash)).
Expand Down
2 changes: 2 additions & 0 deletions pkg/explorer/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Address struct {

MultiSig *MultiSig `json:"multisig,omitempty"`

Meta map[string]string `json:"meta"`

// Transient
RichList RichList `json:"rich_list,omitempty"`
}
Expand Down

0 comments on commit ef190e3

Please sign in to comment.