Skip to content

Commit d69524d

Browse files
yhzhangjumplidatong
authored andcommitted
firedancer-dev: replay a slot only after its parent's bank hash matches
1 parent 9724bc9 commit d69524d

File tree

3 files changed

+107
-37
lines changed

3 files changed

+107
-37
lines changed

src/app/firedancer-dev/commands/backtest.c

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ backtest_topo( config_t * config ) {
152152

153153
enum{
154154
metric_cpu_idx=0,
155-
rocksdb_cpu_idx,
155+
backtest_cpu_idx,
156156
replay_cpu_idx,
157157
exec_idx_start
158158
};
@@ -170,16 +170,16 @@ backtest_topo( config_t * config ) {
170170
metric_tile->metric.prometheus_listen_port = config->tiles.metric.prometheus_listen_port;
171171

172172
/**********************************************************************/
173-
/* Add the rocksdb tile to topo */
173+
/* Add the backtest tile to topo */
174174
/**********************************************************************/
175-
fd_topob_wksp( topo, "rocksdb" );
176-
fd_topo_tile_t * rocksdb_tile = fd_topob_tile( topo, "arch_b", "rocksdb", "metric_in", rocksdb_cpu_idx, 0, 0 );
177-
rocksdb_tile->archiver.end_slot = config->tiles.archiver.end_slot;
178-
strncpy( rocksdb_tile->archiver.archiver_path, config->tiles.archiver.archiver_path, PATH_MAX );
179-
if( FD_UNLIKELY( 0==strlen( rocksdb_tile->archiver.archiver_path ) ) ) {
175+
fd_topob_wksp( topo, "backtest" );
176+
fd_topo_tile_t * backtest_tile = fd_topob_tile( topo, "btest", "backtest", "metric_in", backtest_cpu_idx, 0, 0 );
177+
backtest_tile->archiver.end_slot = config->tiles.archiver.end_slot;
178+
strncpy( backtest_tile->archiver.archiver_path, config->tiles.archiver.archiver_path, PATH_MAX );
179+
if( FD_UNLIKELY( 0==strlen( backtest_tile->archiver.archiver_path ) ) ) {
180180
FD_LOG_ERR(( "Rocksdb not found, check `archiver.archiver_path` in toml" ));
181181
} else {
182-
FD_LOG_NOTICE(( "Found rocksdb path from config: %s", rocksdb_tile->archiver.archiver_path ));
182+
FD_LOG_NOTICE(( "Found rocksdb path from config: %s", backtest_tile->archiver.archiver_path ));
183183
}
184184

185185
/**********************************************************************/
@@ -257,12 +257,12 @@ backtest_topo( config_t * config ) {
257257
topo->links[ replay_tile->in_link_id[ fd_topo_find_tile_in_link( topo, replay_tile, "store_replay", 0 ) ] ].permit_no_producers = 1;
258258

259259
/**********************************************************************/
260-
/* Setup rocksdb->replay link (repair_repla) in topo */
260+
/* Setup backtest->replay link (repair_repla) in topo */
261261
/**********************************************************************/
262262
fd_topob_wksp( topo, "repair_repla" );
263263
fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(ulong), 1UL );
264264
fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "repair_repla", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
265-
fd_topob_tile_out( topo, "arch_b", 0UL, "repair_repla", 0UL );
265+
fd_topob_tile_out( topo, "btest", 0UL, "repair_repla", 0UL );
266266

267267
/**********************************************************************/
268268
/* Setup pack/batch->replay links in topo w/o a producer */
@@ -297,11 +297,11 @@ backtest_topo( config_t * config ) {
297297
FOR(bank_tile_cnt) topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "replay_poh", i ) ] ].permit_no_consumers = 1;
298298

299299
/**********************************************************************/
300-
/* Setup replay->rocksdb link (replay_notif) in topo */
300+
/* Setup replay->backtest link (replay_notif) in topo */
301301
/**********************************************************************/
302302
fd_topob_wksp( topo, "replay_notif" );
303303
fd_topob_link( topo, "replay_notif", "replay_notif", FD_REPLAY_NOTIF_DEPTH, FD_REPLAY_NOTIF_MTU, 1UL );
304-
fd_topob_tile_in( topo, "arch_b", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
304+
fd_topob_tile_in( topo, "btest", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
305305
fd_topob_tile_out( topo, "replay", 0UL, "replay_notif", 0UL );
306306

307307
/**********************************************************************/
@@ -337,7 +337,7 @@ backtest_topo( config_t * config ) {
337337
/* Setup the shared objs used by replay and exec tiles */
338338
/**********************************************************************/
339339

340-
/* blockstore_obj shared by replay and rocksdb tiles */
340+
/* blockstore_obj shared by replay and backtest tiles */
341341
fd_topob_wksp( topo, "bstore" );
342342
fd_topo_obj_t * blockstore_obj = setup_topo_blockstore( topo,
343343
"bstore",
@@ -347,14 +347,14 @@ backtest_topo( config_t * config ) {
347347
config->firedancer.blockstore.txn_max,
348348
config->firedancer.blockstore.alloc_max );
349349
fd_topob_tile_uses( topo, replay_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
350-
fd_topob_tile_uses( topo, rocksdb_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
350+
fd_topob_tile_uses( topo, backtest_tile, blockstore_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
351351
FD_TEST( fd_pod_insertf_ulong( topo->props, blockstore_obj->id, "blockstore" ) );
352352

353-
/* turb_slot_obj shared by replay and rocksdb tiles */
353+
/* turb_slot_obj shared by replay and backtest tiles */
354354
fd_topob_wksp( topo, "turb_slot" );
355355
fd_topo_obj_t * turb_slot_obj = fd_topob_obj( topo, "fseq", "turb_slot" );
356356
fd_topob_tile_uses( topo, replay_tile, turb_slot_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
357-
fd_topob_tile_uses( topo, rocksdb_tile, turb_slot_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
357+
fd_topob_tile_uses( topo, backtest_tile, turb_slot_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
358358
FD_TEST( fd_pod_insertf_ulong( topo->props, turb_slot_obj->id, "turb_slot" ) );
359359

360360
/* runtime_pub_obj shared by replay, exec and writer tiles */
@@ -396,11 +396,11 @@ backtest_topo( config_t * config ) {
396396
FD_TEST( fd_pod_insertf_ulong( topo->props, writer_fseq_obj->id, "writer_fseq.%lu", i ) );
397397
}
398398

399-
/* root_slot_obj shared by replay and rocksdb tiles */
399+
/* root_slot_obj shared by replay and backtest tiles */
400400
fd_topob_wksp( topo, "root_slot" );
401401
fd_topo_obj_t * root_slot_obj = fd_topob_obj( topo, "fseq", "root_slot" );
402402
fd_topob_tile_uses( topo, replay_tile, root_slot_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
403-
fd_topob_tile_uses( topo, rocksdb_tile, root_slot_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
403+
fd_topob_tile_uses( topo, backtest_tile, root_slot_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );
404404
FD_TEST( fd_pod_insertf_ulong( topo->props, root_slot_obj->id, "root_slot" ) );
405405

406406
/* txncache_obj, busy_obj, poh_slot_obj and constipated_obj only by replay tile */

src/disco/archiver/fd_archiver_backtest.c

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
struct fd_archiver_backtest_tile_ctx {
2222
ulong use_rocksdb;
2323
fd_rocksdb_t rocksdb;
24+
rocksdb_iterator_t * rocksdb_iter;
2425
fd_rocksdb_root_iter_t rocksdb_root_iter;
2526
fd_slot_meta_t rocksdb_slot_meta;
2627
ulong rocksdb_curr_idx;
@@ -53,8 +54,7 @@ loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
5354
}
5455

5556
static void
56-
rocksdb_inspect( fd_archiver_backtest_tile_ctx_t * ctx,
57-
fd_stem_context_t * stem ) {
57+
rocksdb_inspect( fd_archiver_backtest_tile_ctx_t * ctx ) {
5858
ulong start_slot = 0;
5959
ulong end_slot = 0;
6060
ulong shred_cnt = 0;
@@ -74,7 +74,6 @@ rocksdb_inspect( fd_archiver_backtest_tile_ctx_t * ctx,
7474

7575
rocksdb_iter_seek(iter, (const char *) k, sizeof(k));
7676

77-
uint entry_batch_start_idx=0;
7877
for (ulong i = start_idx; i < end_idx; i++) {
7978
ulong cur_slot, index;
8079
uchar valid = rocksdb_iter_valid(iter);
@@ -107,18 +106,6 @@ rocksdb_inspect( fd_archiver_backtest_tile_ctx_t * ctx,
107106
end_slot = shred->slot;
108107
shred_cnt++;
109108

110-
fd_blockstore_shred_insert( ctx->blockstore, shred );
111-
if( !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE) ) {
112-
int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
113-
/* Notify the replay tile after inserting a FEC set */
114-
FD_LOG_INFO(( "%lu:[%u, %u] notifies replay", slot, entry_batch_start_idx, shred->idx ));
115-
uint cnt = shred->idx+1-entry_batch_start_idx;
116-
entry_batch_start_idx = shred->idx+1;
117-
ulong sig = fd_disco_repair_replay_sig( slot, (ushort)(slot - ctx->rocksdb_slot_meta.parent_slot), cnt, slot_complete );
118-
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
119-
fd_stem_publish( stem, REPLAY_OUT_IDX, sig, 0, 0, 0, tspub, tspub );
120-
}
121-
122109
rocksdb_iter_next(iter);
123110
}
124111
} while(1);
@@ -132,6 +119,81 @@ rocksdb_inspect( fd_archiver_backtest_tile_ctx_t * ctx,
132119
FD_TEST( shred_cnt>0 );
133120
}
134121

122+
static fd_shred_t const *
123+
rocksdb_get_shred( fd_archiver_backtest_tile_ctx_t * ctx,
124+
ulong * out_sz ) {
125+
if( ctx->rocksdb_curr_idx==ctx->rocksdb_end_idx ) {
126+
if( FD_UNLIKELY( fd_rocksdb_root_iter_next( &ctx->rocksdb_root_iter, &ctx->rocksdb_slot_meta, ctx->valloc ) ) ) return NULL;
127+
if( FD_UNLIKELY( fd_rocksdb_get_meta( &ctx->rocksdb, ctx->rocksdb_slot_meta.slot, &ctx->rocksdb_slot_meta, ctx->valloc ) ) ) return NULL;
128+
ctx->rocksdb_curr_idx = 0;
129+
ctx->rocksdb_end_idx = ctx->rocksdb_slot_meta.received;
130+
}
131+
ulong slot = ctx->rocksdb_slot_meta.slot;
132+
133+
char k[16];
134+
*((ulong *) &k[0]) = fd_ulong_bswap(slot);
135+
*((ulong *) &k[8]) = fd_ulong_bswap(ctx->rocksdb_curr_idx);
136+
rocksdb_iter_seek(ctx->rocksdb_iter, (const char *) k, sizeof(k));
137+
138+
ulong cur_slot, index;
139+
uchar valid = rocksdb_iter_valid(ctx->rocksdb_iter);
140+
141+
if (valid) {
142+
size_t klen = 0;
143+
const char* key = rocksdb_iter_key(ctx->rocksdb_iter, &klen); // There is no need to free key
144+
if (klen != 16) // invalid key
145+
FD_LOG_ERR(( "rocksdb has invalid key length" ));
146+
cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
147+
index = fd_ulong_bswap(*((ulong *) &key[8]));
148+
}
149+
150+
if (!valid || cur_slot != slot)
151+
FD_LOG_ERR(("missing shreds for slot %lu, valid=%u", slot, valid));
152+
153+
if (index != ctx->rocksdb_curr_idx)
154+
FD_LOG_ERR(("missing shred %lu at index %lu for slot %lu", ctx->rocksdb_curr_idx, index, slot));
155+
156+
size_t dlen = 0;
157+
// Data was first copied from disk into memory to make it available to this API
158+
const unsigned char *data = (const unsigned char *) rocksdb_iter_value(ctx->rocksdb_iter, &dlen);
159+
if (data == NULL)
160+
FD_LOG_ERR(("failed to read shred %lu/%lu", slot, ctx->rocksdb_curr_idx));
161+
162+
// This just correctly selects from inside the data pointer to the
163+
// actual data without a memory copy
164+
fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
165+
ctx->rocksdb_curr_idx++;
166+
167+
*out_sz = dlen;
168+
return shred;
169+
}
170+
171+
static void
172+
notify_one_slot( fd_archiver_backtest_tile_ctx_t * ctx,
173+
fd_stem_context_t * stem ) {
174+
uint entry_batch_start_idx = 0;
175+
int slot_complete = 0;
176+
while(!slot_complete) {
177+
ulong sz = 0;
178+
fd_shred_t const * shred = rocksdb_get_shred( ctx, &sz );
179+
if( FD_UNLIKELY( shred==NULL ) ) {
180+
break;
181+
} else {
182+
fd_blockstore_shred_insert( ctx->blockstore, shred );
183+
if( !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE) ) {
184+
slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
185+
/* Notify the replay tile after inserting a FEC set */
186+
FD_LOG_DEBUG(( "%lu:[%u, %u] notifies replay", shred->slot, entry_batch_start_idx, shred->idx ));
187+
uint cnt = shred->idx+1-entry_batch_start_idx;
188+
entry_batch_start_idx = shred->idx+1;
189+
ulong sig = fd_disco_repair_replay_sig( shred->slot, (ushort)(shred->slot-ctx->rocksdb_slot_meta.parent_slot), cnt, slot_complete );
190+
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
191+
fd_stem_publish( stem, REPLAY_OUT_IDX, sig, 0, 0, 0, tspub, tspub );
192+
}
193+
}
194+
}
195+
}
196+
135197
static void
136198
unprivileged_init( fd_topo_t * topo,
137199
fd_topo_tile_t * tile ) {
@@ -203,7 +265,14 @@ after_credit( fd_archiver_backtest_tile_ctx_t * ctx,
203265
fd_rocksdb_root_iter_new( &ctx->rocksdb_root_iter );
204266
if( FD_UNLIKELY( fd_rocksdb_root_iter_seek( &ctx->rocksdb_root_iter, &ctx->rocksdb, wmark, &ctx->rocksdb_slot_meta, ctx->valloc ) ) )
205267
FD_LOG_ERR(( "Failed at seeking rocksdb root iter for slot=%lu", wmark ));
206-
rocksdb_inspect( ctx, stem );
268+
rocksdb_inspect( ctx );
269+
270+
fd_rocksdb_root_iter_new( &ctx->rocksdb_root_iter );
271+
if( FD_UNLIKELY( fd_rocksdb_root_iter_seek( &ctx->rocksdb_root_iter, &ctx->rocksdb, wmark, &ctx->rocksdb_slot_meta, ctx->valloc ) ) )
272+
FD_LOG_ERR(( "Failed at seeking rocksdb root iter for slot=%lu", wmark ));
273+
ctx->rocksdb_iter = rocksdb_create_iterator_cf(ctx->rocksdb.db, ctx->rocksdb.ro, ctx->rocksdb.cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED]);
274+
275+
notify_one_slot( ctx, stem );
207276
}
208277
}
209278

@@ -269,6 +338,7 @@ after_frag( fd_archiver_backtest_tile_ctx_t * ctx,
269338
FD_BASE58_ENC_32_ALLOCA( versioned->inner.current.frozen_hash.hash ),
270339
FD_BASE58_ENC_32_ALLOCA( bank_hash->hash ) ));
271340
}
341+
notify_one_slot( ctx, stem );
272342

273343
if( FD_UNLIKELY( slot>=ctx->playback_end_slot ) ) FD_LOG_ERR(( "Rocksdb playback done." ));
274344
}
@@ -285,7 +355,7 @@ after_frag( fd_archiver_backtest_tile_ctx_t * ctx,
285355
#include "../stem/fd_stem.c"
286356

287357
fd_topo_run_tile_t fd_tile_archiver_backtest = {
288-
.name = "arch_b",
358+
.name = "btest",
289359
.loose_footprint = loose_footprint,
290360
.unprivileged_init = unprivileged_init,
291361
.run = stem_run,
@@ -304,7 +374,7 @@ unprivileged_init( fd_topo_t * topo,
304374
}
305375

306376
fd_topo_run_tile_t fd_tile_archiver_backtest = {
307-
.name = "arch_b",
377+
.name = "btest",
308378
.unprivileged_init = unprivileged_init,
309379
};
310380

src/flamenco/runtime/tests/run_ledger_backtest.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ echo "
149149
affinity = \"auto\"
150150
bank_tile_count = 1
151151
shred_tile_count = 4
152-
exec_tile_count = 8
152+
exec_tile_count = 4
153153
[tiles]
154154
[tiles.archiver]
155155
enabled = true

0 commit comments

Comments
 (0)