Skip to content

Commit

Permalink
Merge pull request #5206 from onflow/yahya/temp-fix-atomic-adjust
Browse files Browse the repository at this point in the history
[Caching] Adds support for atomic get-with-init and adjust-with-init on backend mempools
  • Loading branch information
yhassanzadeh13 authored Jan 10, 2024
2 parents c5cf716 + 913db0c commit b384458
Show file tree
Hide file tree
Showing 44 changed files with 1,515 additions and 924 deletions.
26 changes: 26 additions & 0 deletions module/mempool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The `mempool` module

The `mempool` module provides mempool implementations for the Flow blockchain, which
are in-memory data structures that are tasked with storing the `flow.Entity` objects.
`flow.Entity` objects are the fundamental data model of the Flow blockchain, and
every Flow primitives such as transactions, blocks, and collections are represented
as `flow.Entity` objects.

Each mempool implementation is tasked for storing a specific type of `flow.Entity`.
As a convention, all mempools are built on top of the `stdmap.Backend` struct, which
provides a thread-safe cache implementation for storing and retrieving `flow.Entity` objects.
The primary responsibility of the `stdmap.Backend` struct is to provide thread-safety for its underlying
data model (i.e., `mempool.Backdata`) that is tasked with maintaining the actual `flow.Entity` objects.

At the moment, the `mempool` module provides two implementations for the `mempool.Backdata`:
- `backdata.Backdata`: a map implementation for storing `flow.Entity` objects using native Go `map`s.
- `herocache.Cache`: a cache implementation for storing `flow.Entity` objects, which is a heap-optimized
cache implementation that is aims on minimizing the memory footprint of the mempool on the heap and
reducing the GC pressure.

Note-1: by design the `mempool.Backdata` interface is **not thread-safe**. Therefore, it is the responsibility
of the `stdmap.Backend` struct to provide thread-safety for its underlying `mempool.Backdata` implementation.

Note-2: The `herocache.Cache` implementation is several orders of magnitude faster than the `backdata.Backdata` on
high-throughput workloads. For the read or write-heavy workloads, the `herocache.Cache` implementation is recommended as
the underlying `mempool.Backdata` implementation.
22 changes: 22 additions & 0 deletions module/mempool/backData.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,28 @@ type BackData interface {
// Returns a bool which indicates whether the entity was updated as well as the updated entity.
Adjust(entityID flow.Identifier, f func(flow.Entity) flow.Entity) (flow.Entity, bool)

// AdjustWithInit adjusts the entity using the given function if the given identifier can be found. When the
// entity is not found, it initializes the entity using the given init function and then applies the adjust function.
// Args:
// - entityID: the identifier of the entity to adjust.
// - adjust: the function that adjusts the entity.
// - init: the function that initializes the entity when it is not found.
// Returns:
// - the adjusted entity.
//
// - a bool which indicates whether the entity was adjusted.
AdjustWithInit(entityID flow.Identifier, adjust func(flow.Entity) flow.Entity, init func() flow.Entity) (flow.Entity, bool)

// GetWithInit returns the given entity from the backdata. If the entity does not exist, it creates a new entity
// using the factory function and stores it in the backdata.
// Args:
// - entityID: the identifier of the entity to get.
// - init: the function that initializes the entity when it is not found.
// Returns:
// - the entity.
// - a bool which indicates whether the entity was found (or created).
GetWithInit(entityID flow.Identifier, init func() flow.Entity) (flow.Entity, bool)

// ByID returns the given entity from the backdata.
ByID(entityID flow.Identifier) (flow.Entity, bool)

Expand Down
40 changes: 40 additions & 0 deletions module/mempool/herocache/backdata/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type slotBucket struct {
}

// Cache implements an array-based generic memory pool backed by a fixed total array.
// Note that this implementation is NOT thread-safe, and the higher-level Backend is responsible for concurrency management.
type Cache struct {
logger zerolog.Logger
collector module.HeroCacheMetrics
Expand Down Expand Up @@ -203,6 +204,45 @@ func (c *Cache) Adjust(entityID flow.Identifier, f func(flow.Entity) flow.Entity
return newEntity, true
}

// AdjustWithInit adjusts the entity using the given function if the given identifier can be found. When the
// entity is not found, it initializes the entity using the given init function and then applies the adjust function.
// Args:
// - entityID: the identifier of the entity to adjust.
// - adjust: the function that adjusts the entity.
// - init: the function that initializes the entity when it is not found.
// Returns:
// - the adjusted entity.
//
// - a bool which indicates whether the entity was adjusted.
func (c *Cache) AdjustWithInit(entityID flow.Identifier, adjust func(flow.Entity) flow.Entity, init func() flow.Entity) (flow.Entity, bool) {
defer c.logTelemetry()

if c.Has(entityID) {
return c.Adjust(entityID, adjust)
}
c.put(entityID, init())
return c.Adjust(entityID, adjust)
}

// GetWithInit returns the given entity from the backdata. If the entity does not exist, it creates a new entity
// using the factory function and stores it in the backdata.
// Args:
// - entityID: the identifier of the entity to get.
// - init: the function that initializes the entity when it is not found.
// Returns:
// - the entity.
//
// - a bool which indicates whether the entity was found (or created).
func (c *Cache) GetWithInit(entityID flow.Identifier, init func() flow.Entity) (flow.Entity, bool) {
defer c.logTelemetry()

if c.Has(entityID) {
return c.ByID(entityID)
}
c.put(entityID, init())
return c.ByID(entityID)
}

// ByID returns the given entity from the backdata.
func (c *Cache) ByID(entityID flow.Identifier) (flow.Entity, bool) {
defer c.logTelemetry()
Expand Down
154 changes: 154 additions & 0 deletions module/mempool/herocache/backdata/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,160 @@ func TestArrayBackData_Adjust(t *testing.T) {
require.Equal(t, bd.Size(), uint(limit))
}

// TestArrayBackData_AdjustWitInit evaluates that AdjustWithInit method. It should initialize and then adjust the value of
// non-existing entity while preserving the integrity of BackData on just adjusting the value of existing entity.
func TestArrayBackData_AdjustWitInit(t *testing.T) {
limit := 100_000

bd := NewCache(uint32(limit),
8,
heropool.LRUEjection,
unittest.Logger(),
metrics.NewNoopCollector())

entities := unittest.EntityListFixture(uint(limit))
for _, e := range entities {
adjustedEntity, adjusted := bd.AdjustWithInit(e.ID(), func(entity flow.Entity) flow.Entity {
// adjust logic, increments the nonce of the entity
mockEntity, ok := entity.(*unittest.MockEntity)
require.True(t, ok)
mockEntity.Nonce++
return mockEntity
}, func() flow.Entity {
return e // initialize with the entity
})
require.True(t, adjusted)
require.Equal(t, e.ID(), adjustedEntity.ID())
require.Equal(t, uint64(1), adjustedEntity.(*unittest.MockEntity).Nonce)
}

// picks a random entity from BackData and adjusts its identifier to a new one.
entityIndex := rand.Int() % limit
// checking integrity of retrieving entity
oldEntity, ok := bd.ByID(entities[entityIndex].ID())
require.True(t, ok)
oldEntityID := oldEntity.ID()
require.Equal(t, entities[entityIndex].ID(), oldEntityID)
require.Equal(t, entities[entityIndex], oldEntity)

// picks a new identifier for the entity and makes sure it is different than its current one.
newEntityID := unittest.IdentifierFixture()
require.NotEqual(t, oldEntityID, newEntityID)

// adjusts old entity to a new entity with a new identifier
newEntity, ok := bd.Adjust(oldEntity.ID(), func(entity flow.Entity) flow.Entity {
mockEntity, ok := entity.(*unittest.MockEntity)
require.True(t, ok)
// oldEntity must be passed to func parameter of adjust.
require.Equal(t, oldEntityID, mockEntity.ID())
require.Equal(t, oldEntity, mockEntity)

// adjust logic, adjsuts the nonce of the entity
return &unittest.MockEntity{Identifier: newEntityID, Nonce: 2}
})

// adjustment must be successful, and identifier must be updated.
require.True(t, ok)
require.Equal(t, newEntityID, newEntity.ID())
require.Equal(t, uint64(2), newEntity.(*unittest.MockEntity).Nonce)
newMockEntity, ok := newEntity.(*unittest.MockEntity)
require.True(t, ok)

// replaces new entity in the original reference list and
// retrieves all.
entities[entityIndex] = newMockEntity
testRetrievableFrom(t, bd, entities, 0)

// re-adjusting old entity must fail, since its identifier must no longer exist
entity, ok := bd.Adjust(oldEntityID, func(entity flow.Entity) flow.Entity {
require.Fail(t, "function must not be invoked on a non-existing entity")
return entity
})
require.False(t, ok)
require.Nil(t, entity)

// similarly, retrieving old entity must fail
entity, ok = bd.ByID(oldEntityID)
require.False(t, ok)
require.Nil(t, entity)

ok = bd.Has(oldEntityID)
require.False(t, ok)
}

// TestArrayBackData_GetWithInit evaluates that GetWithInit method. It should initialize and then retrieve the value of
// non-existing entity while preserving the integrity of BackData on just retrieving the value of existing entity.
func TestArrayBackData_GetWithInit(t *testing.T) {
limit := 1000

bd := NewCache(uint32(limit), 8, heropool.LRUEjection, unittest.Logger(), metrics.NewNoopCollector())

entities := unittest.EntityListFixture(uint(limit))

// GetWithInit
for _, e := range entities {
// all entities must be initialized retrieved successfully
actual, ok := bd.GetWithInit(e.ID(), func() flow.Entity {
return e // initialize with the entity
})
require.True(t, ok)
require.Equal(t, e, actual)
}

// All
all := bd.All()
require.Equal(t, len(entities), len(all))
for _, expected := range entities {
actual, ok := bd.ByID(expected.ID())
require.True(t, ok)
require.Equal(t, expected, actual)
}

// Identifiers
ids := bd.Identifiers()
require.Equal(t, len(entities), len(ids))
for _, id := range ids {
require.True(t, bd.Has(id))
}

// Entities
actualEntities := bd.Entities()
require.Equal(t, len(entities), len(actualEntities))
require.ElementsMatch(t, entities, actualEntities)

// Adjust
for _, e := range entities {
// all entities must be adjusted successfully
actual, ok := bd.Adjust(e.ID(), func(entity flow.Entity) flow.Entity {
// increment nonce of the entity
entity.(*unittest.MockEntity).Nonce++
return entity
})
require.True(t, ok)
require.Equal(t, e, actual)
}

// ByID; should return the latest version of the entity
for _, e := range entities {
// all entities must be retrieved successfully
actual, ok := bd.ByID(e.ID())
require.True(t, ok)
require.Equal(t, e.ID(), actual.ID())
require.Equal(t, uint64(1), actual.(*unittest.MockEntity).Nonce)
}

// GetWithInit; should return the latest version of the entity, than increment the nonce
for _, e := range entities {
// all entities must be retrieved successfully
actual, ok := bd.GetWithInit(e.ID(), func() flow.Entity {
require.Fail(t, "should not be called") // entity has already been initialized
return e
})
require.True(t, ok)
require.Equal(t, e.ID(), actual.ID())
}
}

// TestArrayBackData_WriteHeavy evaluates correctness of Cache under the writing and retrieving
// a heavy load of entities up to its limit. All data must be written successfully and then retrievable.
func TestArrayBackData_WriteHeavy(t *testing.T) {
Expand Down
52 changes: 52 additions & 0 deletions module/mempool/mock/back_data.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b384458

Please sign in to comment.