Skip to content

Commit

Permalink
chore: make the hash map more general
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 27, 2024
1 parent 90ec9ea commit 53a5819
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 101 deletions.
8 changes: 4 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type expiryPolicy[K comparable, V any] interface {
// to determine which entries to evict when the capacity is exceeded.
type Cache[K comparable, V any] struct {
nodeManager *node.Manager[K, V]
hashmap *hashmap.Map[K, V]
hashmap *hashmap.Map[K, V, node.Node[K, V]]
policy evictionPolicy[K, V]
expiryPolicy expiryPolicy[K, V]
stats statsRecorder
Expand Down Expand Up @@ -101,11 +101,11 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
stripedBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager)
}

var hm *hashmap.Map[K, V]
var hm *hashmap.Map[K, V, node.Node[K, V]]
if b.initialCapacity == nil {
hm = hashmap.New[K, V](nodeManager)
hm = hashmap.New[K, V, node.Node[K, V]](nodeManager)
} else {
hm = hashmap.NewWithSize[K, V](nodeManager, *b.initialCapacity)
hm = hashmap.NewWithSize[K, V, node.Node[K, V]](nodeManager, *b.initialCapacity)
}

cache := &Cache[K, V]{
Expand Down
4 changes: 4 additions & 0 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ func (m *Manager[K, V]) FromPointer(ptr unsafe.Pointer) Node[K, V] {
return m.fromPointer(ptr)
}
func (m *Manager[K, V]) IsNil(n Node[K, V]) bool {
return n == nil || n.AsPointer() == nil
}
func minUint8(a, b uint8) uint8 {
if a < b {
return a
Expand Down
10 changes: 7 additions & 3 deletions internal/generated/node/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 48 additions & 50 deletions internal/hashmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/dolthub/maphash"

"github.com/maypok86/otter/v2/internal/generated/node"
"github.com/maypok86/otter/v2/internal/xmath"
"github.com/maypok86/otter/v2/internal/xruntime"
)
Expand Down Expand Up @@ -89,14 +88,14 @@ const (
// (immutable K/V pair structs instead of atomic snapshots)
// and C++'s absl::flat_hash_map (meta memory and SWAR-based
// lookups).
type Map[K comparable, V any] struct {
type Map[K comparable, V any, N mapNode[K, V]] struct {
totalGrowths int64
totalShrinks int64
resizing int64 // resize in progress flag; updated atomically
resizeMu sync.Mutex // only used along with resizeCond
resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications)
table unsafe.Pointer // *mapTable
nodeManager *node.Manager[K, V]
nodeManager mapNodeManager[K, V, N]
minTableLen int
}

Expand All @@ -106,7 +105,7 @@ type counterStripe struct {
pad [xruntime.CacheLineSize - 8]byte
}

type mapTable[K comparable, V any] struct {
type mapTable[K comparable] struct {
buckets []bucketPadded
// striped counter for number of table nodes;
// used to determine if a table shrinking is needed
Expand All @@ -133,34 +132,34 @@ type bucket struct {
// NewWithSize creates a new Map instance with capacity enough
// to hold size nodes. If size is zero or negative, the value
// is ignored.
func NewWithSize[K comparable, V any](nodeManager *node.Manager[K, V], size int) *Map[K, V] {
return newMap[K, V](nodeManager, size)
func NewWithSize[K comparable, V any, N mapNode[K, V]](nodeManager mapNodeManager[K, V, N], size int) *Map[K, V, N] {
return newMap[K, V, N](nodeManager, size)
}

// New creates a new Map instance.
func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Map[K, V] {
return newMap[K, V](nodeManager, defaultMinMapTableLen*nodesPerMapBucket)
func New[K comparable, V any, N mapNode[K, V]](nodeManager mapNodeManager[K, V, N]) *Map[K, V, N] {
return newMap[K, V, N](nodeManager, defaultMinMapTableLen*nodesPerMapBucket)
}

func newMap[K comparable, V any](nodeManager *node.Manager[K, V], sizeHint int) *Map[K, V] {
m := &Map[K, V]{
func newMap[K comparable, V any, N mapNode[K, V]](nodeManager mapNodeManager[K, V, N], sizeHint int) *Map[K, V, N] {
m := &Map[K, V, N]{
nodeManager: nodeManager,
}
m.resizeCond = *sync.NewCond(&m.resizeMu)
var table *mapTable[K, V]
var table *mapTable[K]
prevHasher := maphash.NewHasher[K]()
if sizeHint <= defaultMinMapTableLen*nodesPerMapBucket {
table = newMapTable[K, V](defaultMinMapTableLen, prevHasher)
table = newMapTable[K](defaultMinMapTableLen, prevHasher)
} else {
tableLen := xmath.RoundUpPowerOf2(uint32((float64(sizeHint) / nodesPerMapBucket) / mapLoadFactor))
table = newMapTable[K, V](int(tableLen), prevHasher)
table = newMapTable[K](int(tableLen), prevHasher)
}
m.minTableLen = len(table.buckets)
atomic.StorePointer(&m.table, unsafe.Pointer(table))
return m
}

func newMapTable[K comparable, V any](minTableLen int, prevHasher maphash.Hasher[K]) *mapTable[K, V] {
func newMapTable[K comparable](minTableLen int, prevHasher maphash.Hasher[K]) *mapTable[K] {
buckets := make([]bucketPadded, minTableLen)
for i := range buckets {
buckets[i].meta = defaultMeta
Expand All @@ -172,18 +171,23 @@ func newMapTable[K comparable, V any](minTableLen int, prevHasher maphash.Hasher
counterLen = maxMapCounterLen
}
counter := make([]counterStripe, counterLen)
t := &mapTable[K, V]{
t := &mapTable[K]{
buckets: buckets,
size: counter,
hasher: maphash.NewSeed[K](prevHasher),
}
return t
}

func zeroValue[V any]() V {
var zero V
return zero
}

// Get returns the node stored in the map for a key, or nil
// if no value is present.
func (m *Map[K, V]) Get(key K) node.Node[K, V] {
table := (*mapTable[K, V])(atomic.LoadPointer(&m.table))
func (m *Map[K, V, N]) Get(key K) N {
table := (*mapTable[K])(atomic.LoadPointer(&m.table))
hash := table.hasher.Hash(key)
h1 := h1(hash)
h2w := broadcast(h2(hash))
Expand All @@ -206,7 +210,7 @@ func (m *Map[K, V]) Get(key K) node.Node[K, V] {
}
bptr := atomic.LoadPointer(&b.next)
if bptr == nil {
return nil
return zeroValue[N]()
}
b = (*bucketPadded)(bptr)
}
Expand All @@ -219,17 +223,14 @@ func (m *Map[K, V]) Get(key K) node.Node[K, V] {
// is executed. It means that modifications on other nodes in
// the bucket will be blocked until the computeFn executes. Consider
// this when the function includes long-running operations.
func (m *Map[K, V]) Compute(
key K,
computeFunc func(n node.Node[K, V]) node.Node[K, V],
) node.Node[K, V] {
func (m *Map[K, V, N]) Compute(key K, computeFunc func(n N) N) N {
for {
compute_attempt:
var (
emptyb *bucketPadded
emptyidx int
)
table := (*mapTable[K, V])(atomic.LoadPointer(&m.table))
table := (*mapTable[K])(atomic.LoadPointer(&m.table))
tableLen := len(table.buckets)
hash := table.hasher.Hash(key)
h1 := h1(hash)
Expand Down Expand Up @@ -265,7 +266,7 @@ func (m *Map[K, V]) Compute(
// In-place update/delete.
newNode := computeFunc(oldNode)
// oldNode != nil
if newNode == nil {
if m.nodeManager.IsNil(newNode) {
// Deletion.
// First we update the hash, then the node.
newmetaw := setByte(metaw, emptyMetaSlot, idx)
Expand Down Expand Up @@ -300,10 +301,10 @@ func (m *Map[K, V]) Compute(
if b.next == nil {
if emptyb != nil {
// Insertion into an existing bucket.
var zeroNode node.Node[K, V]
var zeroNode N
// oldNode == nil.
newNode := computeFunc(zeroNode)
if newNode == nil {
if m.nodeManager.IsNil(newNode) {
// no op.
rootb.mu.Unlock()
return newNode
Expand All @@ -323,10 +324,10 @@ func (m *Map[K, V]) Compute(
goto compute_attempt
}
// Insertion into a new bucket.
var zeroNode node.Node[K, V]
var zeroNode N
// oldNode == nil
newNode := computeFunc(zeroNode)
if newNode == nil {
if m.nodeManager.IsNil(newNode) {
rootb.mu.Unlock()
return newNode
}
Expand All @@ -344,24 +345,24 @@ func (m *Map[K, V]) Compute(
}
}

func (m *Map[K, V]) newerTableExists(table *mapTable[K, V]) bool {
func (m *Map[K, V, N]) newerTableExists(table *mapTable[K]) bool {
curTablePtr := atomic.LoadPointer(&m.table)
return uintptr(curTablePtr) != uintptr(unsafe.Pointer(table))
}

func (m *Map[K, V]) resizeInProgress() bool {
func (m *Map[K, V, N]) resizeInProgress() bool {
return atomic.LoadInt64(&m.resizing) == 1
}

func (m *Map[K, V]) waitForResize() {
func (m *Map[K, V, N]) waitForResize() {
m.resizeMu.Lock()
for m.resizeInProgress() {
m.resizeCond.Wait()
}
m.resizeMu.Unlock()
}

func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
func (m *Map[K, V, N]) resize(knownTable *mapTable[K], hint mapResizeHint) {
knownTableLen := len(knownTable.buckets)
// Fast path for shrink attempts.
if hint == mapShrinkHint {
Expand All @@ -376,20 +377,20 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
m.waitForResize()
return
}
var newTable *mapTable[K, V]
table := (*mapTable[K, V])(atomic.LoadPointer(&m.table))
var newTable *mapTable[K]
table := (*mapTable[K])(atomic.LoadPointer(&m.table))
tableLen := len(table.buckets)
switch hint {
case mapGrowHint:
// Grow the table with factor of 2.
atomic.AddInt64(&m.totalGrowths, 1)
newTable = newMapTable[K, V](tableLen<<1, table.hasher)
newTable = newMapTable[K](tableLen<<1, table.hasher)
case mapShrinkHint:
shrinkThreshold := int64((tableLen * nodesPerMapBucket) / mapShrinkFraction)
if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold {
// Shrink the table with factor of 2.
atomic.AddInt64(&m.totalShrinks, 1)
newTable = newMapTable[K, V](tableLen>>1, table.hasher)
newTable = newMapTable[K](tableLen>>1, table.hasher)
} else {
// No need to shrink. Wake up all waiters and give up.
m.resizeMu.Lock()
Expand All @@ -399,7 +400,7 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
return
}
case mapClearHint:
newTable = newMapTable[K, V](m.minTableLen, table.hasher)
newTable = newMapTable[K](m.minTableLen, table.hasher)
default:
panic(fmt.Sprintf("unexpected resize hint: %d", hint))
}
Expand All @@ -419,10 +420,7 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
m.resizeMu.Unlock()
}

func (m *Map[K, V]) copyBucket(
b *bucketPadded,
destTable *mapTable[K, V],
) (copied int) {
func (m *Map[K, V, N]) copyBucket(b *bucketPadded, destTable *mapTable[K]) (copied int) {
rootb := b
rootb.mu.Lock()
//nolint:gocritic // nesting is normal here
Expand Down Expand Up @@ -459,12 +457,12 @@ func (m *Map[K, V]) copyBucket(
// creation, modification and deletion. However, the concurrent
// modification rule apply, i.e. the changes may be not reflected
// in the subsequently iterated nodes.
func (m *Map[K, V]) Range(fn func(n node.Node[K, V]) bool) {
func (m *Map[K, V, N]) Range(fn func(n N) bool) {
var zeroPtr unsafe.Pointer
// Pre-allocate array big enough to fit nodes for most hash tables.
bnodes := make([]unsafe.Pointer, 0, 16*nodesPerMapBucket)
tablep := atomic.LoadPointer(&m.table)
table := *(*mapTable[K, V])(tablep)
table := *(*mapTable[K])(tablep)
for i := range table.buckets {
rootb := &table.buckets[i]
b := rootb
Expand Down Expand Up @@ -498,14 +496,14 @@ func (m *Map[K, V]) Range(fn func(n node.Node[K, V]) bool) {
}

// Clear deletes all keys and values currently stored in the map.
func (m *Map[K, V]) Clear() {
table := (*mapTable[K, V])(atomic.LoadPointer(&m.table))
func (m *Map[K, V, N]) Clear() {
table := (*mapTable[K])(atomic.LoadPointer(&m.table))
m.resize(table, mapClearHint)
}

// Size returns current size of the map.
func (m *Map[K, V]) Size() int {
table := (*mapTable[K, V])(atomic.LoadPointer(&m.table))
func (m *Map[K, V, N]) Size() int {
table := (*mapTable[K])(atomic.LoadPointer(&m.table))
return int(table.sumSize())
}

Expand All @@ -529,19 +527,19 @@ func appendToBucket(h2 uint8, nodePtr unsafe.Pointer, b *bucketPadded) {
}
}

func (table *mapTable[K, V]) addSize(bucketIdx uint64, delta int) {
func (table *mapTable[K]) addSize(bucketIdx uint64, delta int) {
//nolint:gosec // there is no overflow
cidx := uint64(len(table.size)-1) & bucketIdx
atomic.AddInt64(&table.size[cidx].c, int64(delta))
}

func (table *mapTable[K, V]) addSizePlain(bucketIdx uint64, delta int) {
func (table *mapTable[K]) addSizePlain(bucketIdx uint64, delta int) {
//nolint:gosec // there is no overflow
cidx := uint64(len(table.size)-1) & bucketIdx
table.size[cidx].c += int64(delta)
}

func (table *mapTable[K, V]) sumSize() int64 {
func (table *mapTable[K]) sumSize() int64 {
sum := int64(0)
for i := range table.size {
sum += atomic.LoadInt64(&table.size[i].c)
Expand Down
Loading

0 comments on commit 53a5819

Please sign in to comment.