Skip to content

Pebble Lock Manager PoC #7364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: leo/dbops-follower
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2da4e0d
add todos by mutexes in storage/*
jordanschalm May 1, 2025
14c4c4c
pin lockctx library
jordanschalm May 1, 2025
ae80b53
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
jordanschalm May 1, 2025
23b7fe7
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
jordanschalm May 5, 2025
b76c6fe
add lockmgr init to scaffold
jordanschalm May 5, 2025
1b7db28
integrate lockctx for headers IndexHeight
jordanschalm May 5, 2025
dc83b81
todo
jordanschalm May 6, 2025
e69a33d
add lock manager to bootstrapping
jordanschalm May 6, 2025
f3980be
lctx in other mutator writes
jordanschalm May 6, 2025
15c389d
missing dep
jordanschalm May 6, 2025
55cc555
differentiate between prod/testing lock manager
jordanschalm May 6, 2025
2ccaa19
rm irrelevant todos
jordanschalm May 6, 2025
6f71001
doc tweaks
jordanschalm May 6, 2025
92adeab
policy docs
jordanschalm May 7, 2025
36eb0e2
add todos for batch writer Lock uses
jordanschalm May 7, 2025
5c2a315
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
jordanschalm May 7, 2025
43beace
document acquisition of both locks for bootstrapping
jordanschalm May 7, 2025
6916965
lockctx: result approval indexing
jordanschalm May 7, 2025
23f064b
update IndexLatestSealAtBlock
jordanschalm May 8, 2025
56df6bd
Merge branch 'leo/dbops-follower' into leo/add-lockctx
zhangchiqing May 8, 2025
c432cb3
add storeWithLock method to storage cache
zhangchiqing May 8, 2025
1c4135a
update gomod
zhangchiqing May 8, 2025
0eaf0ca
update storage blocks
zhangchiqing May 8, 2025
ec09288
update mocks
zhangchiqing May 8, 2025
dca69eb
fix BatchStore in tests
zhangchiqing May 8, 2025
f40f04d
fix scaffold
zhangchiqing May 8, 2025
b6e1b69
fix linter
zhangchiqing May 8, 2025
48d850b
Merge branch 'jord/pebble/add-lockctx' of github.com:onflow/flow-go i…
jordanschalm May 9, 2025
5910c42
fix header test
zhangchiqing May 9, 2025
2e821f5
fix payloads test
zhangchiqing May 9, 2025
44ffd0f
fix deadlock in tests
zhangchiqing May 9, 2025
da53570
disable migrate last executed block
zhangchiqing May 22, 2025
99029e1
disable chaining
zhangchiqing May 22, 2025
513b08a
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
zhangchiqing May 26, 2025
efd6dba
Merge branch 'leo/disable-en-halfway-migration' into jord/pebble/add-…
zhangchiqing May 26, 2025
c4f9871
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
zhangchiqing May 29, 2025
1e4582f
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
zhangchiqing Jun 3, 2025
c1e4ba1
Merge branch 'leo/dbops-follower' into jord/pebble/add-lockctx
zhangchiqing Jun 3, 2025
0ec55b6
remove duplicated ops
zhangchiqing Jun 3, 2025
14ab9a0
fix tests
zhangchiqing Jun 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/jordanschalm/lockctx"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/onflow/crypto"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -209,6 +210,7 @@ type NodeConfig struct {
ProtocolDB storage.DB
SecretsDB *badger.DB
Storage Storage
StorageLockMgr lockctx.Manager
ProtocolEvents *events.Distributor
State protocol.State
Resolver madns.BasicResolver
Expand Down
13 changes: 13 additions & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,13 @@ func (fnb *FlowNodeBuilder) initSecretsDB() error {
return nil
}

// initStorageLockManager initializes the lock manager used by the storage layer.
// This manager must be a process-wide singleton.
func (fnb *FlowNodeBuilder) initStorageLockManager() error {
fnb.StorageLockMgr = storage.MakeSingletonLockManager()
return nil
}

func (fnb *FlowNodeBuilder) initStorage() error {

// in order to void long iterations with big keys when initializing with an
Expand Down Expand Up @@ -1359,6 +1366,7 @@ func (fnb *FlowNodeBuilder) initState() error {
state, err := badgerState.OpenState(
fnb.Metrics.Compliance,
fnb.ProtocolDB,
fnb.StorageLockMgr,
fnb.Storage.Headers,
fnb.Storage.Seals,
fnb.Storage.Results,
Expand Down Expand Up @@ -1408,6 +1416,7 @@ func (fnb *FlowNodeBuilder) initState() error {
fnb.State, err = badgerState.Bootstrap(
fnb.Metrics.Compliance,
fnb.ProtocolDB,
fnb.StorageLockMgr,
fnb.Storage.Headers,
fnb.Storage.Seals,
fnb.Storage.Results,
Expand Down Expand Up @@ -2139,6 +2148,10 @@ func (fnb *FlowNodeBuilder) onStart() error {
return err
}

if err := fnb.initStorageLockManager(); err != nil {
return err
}

// we always initialize both badger and pebble databases
// even if we only use one of them, this simplify the code and checks
if err := fnb.initBadgerDB(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/cmd/common/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (

func InitProtocolState(db *badger.DB, storages *storage.All) (protocol.State, error) {
metrics := &metrics.NoopCollector{}
lockManager := storage.NewTestingLockManager()

protocolState, err := protocolbadger.OpenState(
metrics,
badgerimpl.ToDB(db),
lockManager,
storages.Headers,
storages.Seals,
storages.Results,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func TestFindBlockTransactions(t *testing.T) {
state.On("AtHeight", uint64(5)).Return(snap5, nil)

// store into database
_, lctx := unittest.LockManagerWithContext(t, storage.LockInsertBlock)
defer lctx.Release()
require.NoError(t, badgerimpl.ToDB(db).WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
err := storages.Blocks.BatchStore(rw, &b1)
err := storages.Blocks.BatchStore(lctx, rw, &b1)
if err != nil {
return err
}

return storages.Blocks.BatchStore(rw, &b2)
return storages.Blocks.BatchStore(lctx, rw, &b2)
}))

require.NoError(t, collections.Store(&col1.Collection))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ func TestReExecuteBlock(t *testing.T) {
events := store.NewEvents(metrics, db)
serviceEvents := store.NewServiceEvents(metrics, db)

manager, lctx := unittest.LockManagerWithContext(t, storage.LockInsertBlock)
err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return blocks.BatchStore(rw, &genesis)
return blocks.BatchStore(lctx, rw, &genesis)
})
lctx.Release()
require.NoError(t, err)

getLatestFinalized := func() (uint64, error) {
Expand Down Expand Up @@ -82,9 +84,12 @@ func TestReExecuteBlock(t *testing.T) {
computationResult := testutil.ComputationResultFixture(t)
header := computationResult.Block.Header

lctx2 := manager.NewContext()
require.NoError(t, lctx2.AcquireLock(storage.LockInsertBlock))
err = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return blocks.BatchStore(rw, computationResult.Block)
return blocks.BatchStore(lctx2, rw, computationResult.Block)
})
lctx2.Release()
require.NoError(t, err)

// save execution results
Expand Down
2 changes: 1 addition & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
if dbops.IsBadgerTransaction(v.DBOps) {
approvalStorage = badger.NewResultApprovals(node.Metrics.Cache, node.DB)
} else if dbops.IsBatchUpdate(v.DBOps) {
approvalStorage = store.NewResultApprovals(node.Metrics.Cache, node.ProtocolDB)
approvalStorage = store.NewResultApprovals(node.Metrics.Cache, node.ProtocolDB, node.StorageLockMgr)
} else {
return nil, fmt.Errorf("invalid db opts type: %v", v.DBOps)
}
Expand Down
4 changes: 4 additions & 0 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/gammazero/workerpool"
"github.com/jordanschalm/lockctx"
"github.com/onflow/crypto"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -139,6 +140,7 @@ func (p *ConsensusParticipants) Update(epochCounter uint64, data *run.Participan

type Node struct {
db fstorage.DB
lockManager lockctx.Manager
dbCloser io.Closer
dbDir string
index int
Expand Down Expand Up @@ -378,6 +380,7 @@ func createNode(
metricsCollector := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
db := badgerimpl.ToDB(badgerdb)
lockManager := fstorage.NewTestingLockManager()

headersDB := store.NewHeaders(metricsCollector, db)
guaranteesDB := store.NewGuarantees(metricsCollector, db, store.DefaultCacheSize)
Expand Down Expand Up @@ -407,6 +410,7 @@ func createNode(
state, err := bprotocol.Bootstrap(
metricsCollector,
db,
lockManager,
headersDB,
sealsDB,
resultsDB,
Expand Down
64 changes: 49 additions & 15 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,25 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {
block2.Header.Height = 2

bdb := badgerimpl.ToDB(db)
manager, lctx := unittest.LockManagerWithContext(suite.T(), storage.LockInsertBlock)
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, &block1)
}))
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, &block2)
if err := all.Blocks.BatchStore(lctx, rw, &block1); err != nil {
return err
}
if err := all.Blocks.BatchStore(lctx, rw, &block2); err != nil {
return err
}
return nil
}))
lctx.Release()

fctx := manager.NewContext()
require.NoError(suite.T(), fctx.AcquireLock(storage.LockFinalizeBlock))
// the follower logic should update height index on the block storage when a block is finalized
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexBlockHeight(rw, block2.Header.Height, block2.ID())
return operation.IndexBlockHeight(fctx, rw, block2.Header.Height, block2.ID())
}))
fctx.Release()

assertHeaderResp := func(
resp *accessproto.BlockHeaderResponse,
Expand Down Expand Up @@ -726,13 +734,18 @@ func (suite *Suite) TestGetSealedTransaction() {

// 1. Assume that follower engine updated the block storage and the protocol state. The block is reported as sealed
bdb := badgerimpl.ToDB(db)
manager, lctx := unittest.LockManagerWithContext(suite.T(), storage.LockInsertBlock)
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, block)
return all.Blocks.BatchStore(lctx, rw, block)
}))
lctx.Release()
require.NoError(suite.T(), err)

fctx := manager.NewContext()
defer fctx.Release()
require.NoError(suite.T(), fctx.AcquireLock(storage.LockFinalizeBlock))
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexBlockHeight(rw, block.Header.Height, block.ID())
return operation.IndexBlockHeight(fctx, rw, block.Header.Height, block.ID())
}))

suite.sealedBlock = block.Header
Expand Down Expand Up @@ -802,12 +815,18 @@ func (suite *Suite) TestGetTransactionResult() {
suite.sealedSnapshot.On("Head").Return(sealedBlock, nil)

bdb := badgerimpl.ToDB(db)
manager, lctx := unittest.LockManagerWithContext(suite.T(), storage.LockInsertBlock)
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, block)
return all.Blocks.BatchStore(lctx, rw, block)
}))
lctx.Release()

lctx2 := manager.NewContext()
require.NoError(suite.T(), lctx2.AcquireLock(storage.LockInsertBlock))
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, blockNegative)
return all.Blocks.BatchStore(lctx2, rw, blockNegative)
}))
lctx2.Release()

suite.state.On("AtBlockID", blockId).Return(suite.sealedSnapshot)

Expand Down Expand Up @@ -953,9 +972,12 @@ func (suite *Suite) TestGetTransactionResult() {
require.NoError(suite.T(), err)
}
}
fctx2 := manager.NewContext()
require.NoError(suite.T(), fctx2.AcquireLock(storage.LockFinalizeBlock))
require.NoError(suite.T(), bdb.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexBlockHeight(rw, block.Header.Height, block.ID())
return operation.IndexBlockHeight(fctx2, rw, block.Header.Height, block.ID())
}))
fctx2.Release()
finalSnapshot.On("Head").Return(block.Header, nil)

processExecutionReceipts(block, collection, enNodeIDs, originID, ingestEng)
Expand Down Expand Up @@ -1183,13 +1205,19 @@ func (suite *Suite) TestExecuteScript() {

// create a block and a seal pointing to that block
lastBlock := unittest.BlockWithParentFixture(prevBlock.Header)
manager, lctx := unittest.LockManagerWithContext(suite.T(), storage.LockInsertBlock)
require.NoError(suite.T(), db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, lastBlock)
return all.Blocks.BatchStore(lctx, rw, lastBlock)
}))
lctx.Release()
require.NoError(suite.T(), err)

fctx := manager.NewContext()
require.NoError(suite.T(), fctx.AcquireLock(storage.LockFinalizeBlock))
require.NoError(suite.T(), db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return operation.IndexBlockHeight(rw, lastBlock.Header.Height, lastBlock.ID())
return operation.IndexBlockHeight(fctx, rw, lastBlock.Header.Height, lastBlock.ID())
}))
fctx.Release()
require.NoError(suite.T(), err)
// update latest sealed block
suite.sealedBlock = lastBlock.Header
Expand All @@ -1201,14 +1229,18 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)
}

fctx2 := manager.NewContext()
require.NoError(suite.T(), fctx2.AcquireLock(storage.LockInsertBlock))
require.NoError(suite.T(), fctx2.AcquireLock(storage.LockFinalizeBlock))
require.NoError(suite.T(), db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
err := all.Blocks.BatchStore(rw, prevBlock)
err := all.Blocks.BatchStore(fctx2, rw, prevBlock)
if err != nil {
return err
}

return operation.IndexBlockHeight(rw, prevBlock.Header.Height, prevBlock.ID())
return operation.IndexBlockHeight(fctx2, rw, prevBlock.Header.Height, prevBlock.ID())
}))
fctx2.Release()

// create execution receipts for each of the execution node and the previous block
executionReceipts = unittest.ReceiptsForBlockFixture(prevBlock, identities.NodeIDs())
Expand Down Expand Up @@ -1329,10 +1361,12 @@ func (suite *Suite) TestLastFinalizedBlockHeightResult() {
newFinalizedBlock := unittest.BlockWithParentFixture(block.Header)

db := badgerimpl.ToDB(badgerdb)
_, lctx := unittest.LockManagerWithContext(suite.T(), storage.LockInsertBlock)
// store new block
require.NoError(suite.T(), db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return all.Blocks.BatchStore(rw, block)
return all.Blocks.BatchStore(lctx, rw, block)
}))
lctx.Release()

assertFinalizedBlockHeader := func(resp *accessproto.BlockHeaderResponse, err error) {
require.NoError(suite.T(), err)
Expand Down
3 changes: 3 additions & 0 deletions engine/common/follower/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
pbadger "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/state/protocol/util"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/utils/unittest"
Expand All @@ -50,12 +51,14 @@ func TestFollowerHappyPath(t *testing.T) {
tracer := trace.NewNoopTracer()
log := unittest.Logger()
consumer := events.NewNoop()
lockManager := storage.NewTestingLockManager()
all := bstorage.InitAll(metrics, db)

// bootstrap root snapshot
state, err := pbadger.Bootstrap(
metrics,
badgerimpl.ToDB(db),
lockManager,
all.Headers,
all.Seals,
all.Results,
Expand Down
10 changes: 8 additions & 2 deletions engine/execution/pruner/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,24 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) {
// indexed by height
chunks := make([]*verification.VerifiableChunkData, lastFinalizedHeight+2)
parentID := genesis.ID()
manager, lctx := unittest.LockManagerWithContext(t, storage.LockInsertBlock)
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return blockstore.BatchStore(rw, genesis)
return blockstore.BatchStore(lctx, rw, genesis)
}))
lctx.Release()

for i := 1; i <= lastFinalizedHeight; i++ {
chunk, block := unittest.VerifiableChunkDataFixture(0, func(header *flow.Header) {
header.Height = uint64(i)
header.ParentID = parentID
})
chunks[i] = chunk // index by height
lctx := manager.NewContext()
require.NoError(t, lctx.AcquireLock(storage.LockInsertBlock))
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return blockstore.BatchStore(rw, block)
return blockstore.BatchStore(lctx, rw, block)
}))
lctx.Release()
require.NoError(t, bdb.Update(operation.IndexBlockHeight(chunk.Header.Height, chunk.Header.ID())))
require.NoError(t, results.Store(chunk.Result))
require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID()))
Expand Down
3 changes: 3 additions & 0 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/dgraph-io/badger/v2"
"github.com/jordanschalm/lockctx"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -58,6 +59,7 @@ type StateFixture struct {
Storage *storage.All
ProtocolEvents *events.Distributor
State protocol.ParticipantState
LockManager lockctx.Manager
}

// GenericNode implements a generic in-process node for tests.
Expand All @@ -72,6 +74,7 @@ type GenericNode struct {
Tracer module.Tracer
PublicDB *badger.DB
SecretsDB *badger.DB
LockManager lockctx.Manager
Headers storage.Headers
Guarantees storage.Guarantees
Seals storage.Seals
Expand Down
Loading
Loading