From 1144632f92cfa30c76ed48580e4db0e2e1d26af4 Mon Sep 17 00:00:00 2001 From: cali-jumptrading Date: Tue, 13 May 2025 20:35:34 +0000 Subject: [PATCH 1/4] working --- src/disco/topo/fd_topo.h | 9 + src/discof/restore/fd_httpdl_tile.c | 121 +++++++++++++ src/discof/restore/fd_snapin_tile.c | 4 +- src/discof/restore/fd_unzstd_tile.c | 117 ++++++------ src/discof/restore/stream/fd_stream_ctx.c | 39 +++- src/discof/restore/stream/fd_stream_ctx.h | 181 ++++++++++++++----- src/discof/restore/stream/fd_stream_writer.c | 113 ++++++++---- src/discof/restore/stream/fd_stream_writer.h | 20 +- src/flamenco/snapshot/fd_snapshot_http.c | 22 ++- src/flamenco/snapshot/fd_snapshot_http.h | 20 ++ 10 files changed, 489 insertions(+), 157 deletions(-) create mode 100644 src/discof/restore/fd_httpdl_tile.c diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index 595474abba..26d867bf76 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -433,6 +433,15 @@ typedef struct { char file_path[ PATH_MAX ]; } filerd; + struct { + char dest[128]; + uint ip4; + ushort port; + char const * path; + ulong path_len; + char const * snapshot_dir; + } httpdl; + struct { ulong scratch_sz; } snapin; diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c new file mode 100644 index 0000000000..cd39e4a254 --- /dev/null +++ b/src/discof/restore/fd_httpdl_tile.c @@ -0,0 +1,121 @@ +#include "../../disco/topo/fd_topo.h" +#include "../../flamenco/snapshot/fd_snapshot_http.h" +#include "stream/fd_stream_writer.h" +#include "stream/fd_stream_ctx.h" +#include + +#define NAME "http" + +struct fd_httpdl_tile { + fd_snapshot_http_t * http; + fd_stream_writer_t * writer; +}; +typedef struct fd_httpdl_tile fd_httpdl_tile_t; + +FD_FN_PURE static ulong +scratch_align( void ) { + return fd_ulong_max( alignof(fd_httpdl_tile_t), + fd_ulong_max( fd_snapshot_http_align(), fd_stream_writer_align() ) ); +} + +FD_FN_PURE static ulong +scratch_footprint( fd_topo_tile_t const * tile ) { + (void)tile; + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_httpdl_tile_t), sizeof(fd_httpdl_tile_t) ); + l = FD_LAYOUT_APPEND( l, fd_snapshot_http_align(), fd_snapshot_http_footprint() ); + return FD_LAYOUT_FINI( l, scratch_align() ); +} + +static void +privileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); + fd_httpdl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_httpdl_tile_t), sizeof(fd_httpdl_tile_t) ); + void * http_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_http_align(), fd_snapshot_http_footprint() ); + ctx->writer = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); + + fd_memset( ctx, 0, sizeof(fd_httpdl_tile_t) ); + + if( FD_UNLIKELY( !tile->httpdl.dest[0] ) ) { + FD_LOG_ERR(( "http dest not set" )); + } + + /* TODO: is null ok for the name? */ + ctx->http = fd_snapshot_http_new( http_mem, + tile->httpdl.dest, + tile->httpdl.ip4, + tile->httpdl.port, + tile->httpdl.snapshot_dir, + NULL ); + + fd_snapshot_http_privileged_init( ctx->http ); +} + +static void +fd_httpdl_init_from_stream_ctx( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + + /* There's only one writer */ + ctx->writer = &stream_ctx->writers[0]; +} + +static void +during_housekeeping( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + (void)_ctx; + (void)stream_ctx; +} + +static void +metrics_write( fd_httpdl_tile_t * ctx ) { + (void)ctx; +} + +__attribute__((noreturn)) FD_FN_UNUSED static void +fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) { + fd_snapshot_http_cleanup_fds( ctx->http ); + FD_MGAUGE_SET( TILE, STATUS, 2UL ); + FD_COMPILER_MFENCE(); + FD_LOG_WARNING(("Done downloading snapshot")); + + for(;;) pause(); +} + +__attribute__((noinline)) static void +fd_httpdl_run1( + fd_httpdl_tile_t * ctx, + fd_stream_ctx_t * stream_ctx ) { + + FD_LOG_INFO(( "Running httpdl tile" )); + + fd_stream_ctx_init_run_loop( stream_ctx, ctx, fd_httpdl_init_from_stream_ctx ); + for(;;) { + fd_stream_ctx_do_housekeeping( stream_ctx, + ctx, + NULL, + during_housekeeping, + NULL ); + + /* Check if we are backpressured, otherwise poll */ + if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { + fd_stream_ctx_process_backpressure( stream_ctx ); + } else { + after_credit( ctx ); + } + } +} + +fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = { + .name = NAME, + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .privileged_init = privileged_init, + .run = fd_httpdl_run, +}; + +#undef NAME + + + diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index 9c33116304..ca3aea7a2f 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -1135,14 +1135,14 @@ fd_snapin_run( fd_topo_t * topo, } ulong reliable_cons_cnt = 0UL; - ulong cons_out[ FD_TOPO_MAX_LINKS ]; + // ulong cons_out[ FD_TOPO_MAX_LINKS ]; ulong * cons_fseq[ FD_TOPO_MAX_LINKS ]; for( ulong i=0UL; itile_cnt; i++ ) { fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; for( ulong j=0UL; jin_cnt; j++ ) { for( ulong k=0UL; kout_cnt; k++ ) { if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) { - cons_out[ reliable_cons_cnt ] = k; + // cons_out[ reliable_cons_cnt ] = k; cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ]; FD_TEST( cons_fseq[ reliable_cons_cnt ] ); reliable_cons_cnt++; diff --git a/src/discof/restore/fd_unzstd_tile.c b/src/discof/restore/fd_unzstd_tile.c index 4bd9b352ba..7bd01a68e0 100644 --- a/src/discof/restore/fd_unzstd_tile.c +++ b/src/discof/restore/fd_unzstd_tile.c @@ -15,8 +15,8 @@ struct fd_unzstd_tile { fd_stream_frag_meta_ctx_t in_state; /* input mcache context */ fd_zstd_dstream_t * dstream; /* zstd decompress reader */ fd_stream_writer_t * writer; /* stream writer object */ + ulong const volatile * shutdown_signal; }; - typedef struct fd_unzstd_tile fd_unzstd_tile_t; FD_FN_PURE static ulong @@ -30,8 +30,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_unzstd_tile_t), sizeof(fd_unzstd_tile_t) ); l = FD_LAYOUT_APPEND( l, fd_zstd_dstream_align(), fd_zstd_dstream_footprint( ZSTD_WINDOW_SZ ) ); - l = FD_LAYOUT_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); - return l; + return FD_LAYOUT_FINI( l, scratch_align() ); } static void @@ -40,7 +39,6 @@ unprivileged_init( fd_topo_t * topo, FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); fd_unzstd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_unzstd_tile_t), sizeof(fd_unzstd_tile_t) ); void * zstd_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_zstd_dstream_align(), fd_zstd_dstream_footprint( ZSTD_WINDOW_SZ ) ); - void * writer_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); void * out_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ) ); FD_TEST( out_dcache ); @@ -49,14 +47,46 @@ unprivileged_init( fd_topo_t * topo, ctx->in_state.in_buf = (uchar const *)topo->workspaces[ topo->objs[ topo->links[ tile->in_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp; ctx->dstream = fd_zstd_dstream_new( zstd_mem, ZSTD_WINDOW_SZ ); - ctx->writer = fd_stream_writer_new( writer_mem, topo, tile, 0, ZSTD_WINDOW_SZ, 512UL, 2UL ); fd_zstd_dstream_reset( ctx->dstream ); } static void -during_housekeeping( fd_unzstd_tile_t * ctx ) { - (void)ctx; +fd_unzstd_init_from_stream_ctx( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + + /* There's only one writer */ + ctx->writer = &stream_ctx->writers[0]; + fd_stream_writer_set_read_max( ctx->writer, ZSTD_FRAME_SZ ); + ctx->shutdown_signal = fd_mcache_seq_laddr_const( stream_ctx->in[0].base.mcache->f ) + 2; +} + +__attribute__((noreturn)) static void +fd_unzstd_shutdown( fd_unzstd_tile_t * ctx ) { + FD_MGAUGE_SET( TILE, STATUS, 2UL ); + fd_stream_writer_notify_shutdown( ctx->writer ); + FD_COMPILER_MFENCE(); + + for(;;) pause(); +} + +static void +fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, + fd_unzstd_tile_t * ctx ) { + ulong const in_seq_max = FD_VOLATILE_CONST( *ctx->shutdown_signal ); + if( FD_UNLIKELY( in_seq_max == stream_ctx->in[ 0 ].base.seq && in_seq_max != 0) ) { + FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", + in_seq_max, stream_ctx->in[0].base.seq)); + fd_unzstd_shutdown( ctx ); + } +} + +static void +during_housekeeping( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + fd_unzstd_poll_shutdown( stream_ctx, ctx ); } static int @@ -95,7 +125,6 @@ on_stream_frag( void * _ctx, break; } - /* fd_zstd_dstream_read updates chunk_start and out */ int zstd_err = fd_zstd_dstream_read( ctx->dstream, &cur, chunk_end, &out, out_end, NULL ); if( FD_UNLIKELY( zstd_err>0) ) { @@ -138,27 +167,6 @@ fd_unzstd_in_update( fd_stream_reader_t * in ) { accum[3] = 0U; accum[4] = 0U; accum[5] = 0U; } -__attribute__((noreturn)) static void -fd_unzstd_shutdown( fd_unzstd_tile_t * ctx ) { - FD_MGAUGE_SET( TILE, STATUS, 2UL ); - fd_stream_writer_notify_shutdown( ctx->writer ); - FD_COMPILER_MFENCE(); - - for(;;) pause(); -} - -static void -fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, - fd_unzstd_tile_t * ctx, - ulong const volatile * shutdown_signal ) { - ulong const in_seq_max = FD_VOLATILE_CONST( *shutdown_signal ); - if( FD_UNLIKELY( in_seq_max == stream_ctx->in[ 0 ].base.seq && in_seq_max != 0) ) { - FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", - in_seq_max, stream_ctx->in[0].base.seq)); - fd_unzstd_shutdown( ctx ); - } -} - __attribute__((noinline)) static void fd_unzstd_run1( fd_unzstd_tile_t * ctx, @@ -167,43 +175,17 @@ fd_unzstd_run1( FD_LOG_INFO(( "Running unzstd tile" )); /* run loop init */ - ulong const volatile * restrict shutdown_signal = fd_mcache_seq_laddr_const( stream_ctx->in[0].base.mcache->f ) + 2; - fd_stream_writer_init_flow_control_credits( ctx->writer ); - fd_stream_ctx_init_run_loop( stream_ctx ); + fd_stream_ctx_init_run_loop( stream_ctx, + ctx, + fd_unzstd_init_from_stream_ctx, + fd_unzstd_in_update, + during_housekeeping, + NULL ); for(;;) { - if( FD_UNLIKELY( fd_stream_ticks_is_housekeeping_time( stream_ctx->ticks ) ) ) { - ulong event_idx = fd_event_map_get_event( stream_ctx->event_map ); - - if( FD_LIKELY( event_idxcons_cnt ) ) { /* receive credits */ - ulong cons_idx = event_idx; - - /* Receive flow control credits from this out. */ - fd_stream_writer_receive_flow_control_credits( ctx->writer, cons_idx ); - - fd_unzstd_poll_shutdown( stream_ctx, ctx, shutdown_signal ); - - } else if( event_idx>stream_ctx->cons_cnt) { /* send credits */ - ulong in_idx = event_idx - stream_ctx->cons_cnt - 1UL; - fd_unzstd_in_update( &stream_ctx->in[ in_idx ] ); - } - else { /* event_idx==cons_cnt, housekeeping event */ - - /* Update metrics counters to external viewers */ - fd_stream_metrics_update_external( stream_ctx->metrics, - stream_ctx->ticks->now, - NULL, - ctx ); - /* Recalculate flow control credits */ - ulong slowest_cons = ULONG_MAX; - fd_stream_writer_update_flow_control_credits( ctx->writer, - &slowest_cons ); - fd_stream_ctx_update_cons_slow( stream_ctx, - slowest_cons ); - during_housekeeping( ctx ); - } - fd_stream_ctx_housekeeping_advance( stream_ctx ); - } + /* do housekeeping manages flow control credits */ + fd_stream_ctx_do_housekeeping( stream_ctx, + ctx ); /* Check if we are backpressured, otherwise poll */ if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { @@ -220,9 +202,10 @@ fd_unzstd_run( fd_topo_t * topo, fd_unzstd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); ulong cons_cnt = fd_topo_tile_reliable_consumer_cnt( topo, tile ); + ulong out_cnt = tile->out_cnt; - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt ); + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt, out_cnt ); fd_unzstd_run1( ctx, stream_ctx ); } @@ -236,3 +219,5 @@ fd_topo_run_tile_t fd_tile_snapshot_restore_Unzstd = { .run = fd_unzstd_run, }; #endif + +#undef NAME diff --git a/src/discof/restore/stream/fd_stream_ctx.c b/src/discof/restore/stream/fd_stream_ctx.c index 65c54dceb7..f1c44b7c15 100644 --- a/src/discof/restore/stream/fd_stream_ctx.c +++ b/src/discof/restore/stream/fd_stream_ctx.c @@ -21,14 +21,35 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, ctx->in_ptrs[ i ] = &ctx->in[ i ]; } + /* init writers */ + for( ulong i=0UL; iout_cnt; i++ ) { + fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ i ] ].mcache ); + void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ i ] ].dcache_obj_id ) ); + fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ]; + ulong writer_cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); + fd_stream_writer_new( &ctx->writers[i], + out_mcache, + dcache, + writer_cons_cnt, + 512UL, + 2UL ); + } + /* init cons_fseq */ ulong cons_idx = 0UL; - for( ulong i=0UL; itile_cnt; i++ ) { - fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; - for( ulong j=0UL; jin_cnt; j++ ) { - for( ulong k=0UL; kout_cnt; k++ ) { - if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) { - ctx->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; + for( ulong i=0UL; iout_cnt; i++ ) { + ulong local_cons_idx = 0UL; + for( ulong j=0UL; jtile_cnt; j++ ) { + fd_topo_tile_t * consumer_tile = &topo->tiles[ j ]; + for( ulong k=0UL; kin_cnt; k++ ) { + if( FD_UNLIKELY( consumer_tile->in_link_id[ k ]==tile->out_link_id[ i ] && consumer_tile->in_link_reliable[ k ] ) ) { + ctx->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ k ]; + if( FD_UNLIKELY( !ctx->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); + fd_stream_writer_set_cons_fseq( &ctx->writers[i], local_cons_idx, consumer_tile->in_link_fseq[ k ] ); + ctx->consumer_ctx[ cons_idx ].writer = &ctx->writers[ i ]; + ctx->consumer_ctx[ cons_idx ].writer_cons_idx = local_cons_idx; + cons_idx++; + local_cons_idx++; } } } @@ -50,7 +71,8 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt ) { + ulong cons_cnt, + ulong out_cnt ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -69,9 +91,12 @@ fd_stream_ctx_new( void * mem, self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); self->cons_slow = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); + self->consumer_ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_consumer_ctx_t), sizeof(fd_consumer_ctx_t)*out_cnt ); + self->writers = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), sizeof(fd_stream_writer_t)*out_cnt ); self->in_cnt = in_cnt; self->cons_cnt = cons_cnt; + self->out_cnt = out_cnt; self->event_map = fd_event_map_new( event_map_mem, in_cnt, cons_cnt ); fd_stream_ctx_init( self, topo, tile ); diff --git a/src/discof/restore/stream/fd_stream_ctx.h b/src/discof/restore/stream/fd_stream_ctx.h index a8bf3522b6..c6792bb183 100644 --- a/src/discof/restore/stream/fd_stream_ctx.h +++ b/src/discof/restore/stream/fd_stream_ctx.h @@ -3,22 +3,49 @@ #include "../../../disco/topo/fd_topo.h" #include "fd_stream_reader.h" +#include "fd_stream_writer.h" #include "fd_event_map.h" #include "fd_stream_ticks.h" #include "fd_stream_metrics.h" +struct fd_consumer_ctx { + fd_stream_writer_t * writer; + ulong writer_cons_idx; +}; +typedef struct fd_consumer_ctx fd_consumer_ctx_t; + +struct fd_stream_ctx; +typedef struct fd_stream_ctx fd_stream_ctx_t; + +typedef void fd_tile_ctx_init_run_loop_fn_t( void * ctx, + fd_stream_ctx_t * stream_ctx ); +typedef void fd_tile_update_in_fn_t( fd_stream_reader_t * reader ); +typedef void fd_tile_housekeeping_fn_t( void * ctx, + fd_stream_ctx_t * stream_ctx ); +typedef void fd_tile_metrics_write_fn_t( void * ctx ); +typedef int fd_on_stream_frag_fn_t( void * ctx, + fd_stream_reader_t * reader, + fd_stream_frag_meta_t const * frag, +ulong * sz ); + struct fd_stream_ctx { - fd_stream_reader_t * in; - fd_stream_reader_t ** in_ptrs; - ulong ** cons_fseq; - ulong ** cons_slow; - fd_event_map_t * event_map; - ulong in_cnt; - ulong cons_cnt; - ulong in_seq; - fd_rng_t rng[1]; - fd_stream_ticks_t ticks[1]; - fd_stream_metrics_t metrics[1]; + fd_stream_reader_t * in; + fd_stream_reader_t ** in_ptrs; + ulong ** cons_fseq; + ulong ** cons_slow; + fd_event_map_t * event_map; + ulong in_cnt; + ulong cons_cnt; + ulong out_cnt; + ulong in_seq; + fd_rng_t rng[1]; + fd_stream_ticks_t ticks[1]; + fd_stream_metrics_t metrics[1]; + fd_stream_writer_t * writers; + fd_consumer_ctx_t * consumer_ctx; + fd_tile_update_in_fn_t * tile_update_in; + fd_tile_housekeeping_fn_t * tile_housekeeping; + fd_tile_metrics_write_fn_t * tile_metrics_write; }; typedef struct fd_stream_ctx fd_stream_ctx_t; @@ -31,7 +58,8 @@ fd_stream_ctx_scratch_align( void ) { FD_FN_PURE static inline ulong fd_stream_ctx_scratch_footprint( ulong in_cnt, - ulong cons_cnt ) { + ulong cons_cnt, + ulong out_cnt ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); /* in */ @@ -39,6 +67,7 @@ fd_stream_ctx_scratch_footprint( ulong in_cnt, l = FD_LAYOUT_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); /* cons_fseq */ l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); /* cons_slow */ l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); /* event_map */ + l = FD_LAYOUT_APPEND( l, alignof(fd_consumer_ctx_t), sizeof(fd_consumer_ctx_t)*out_cnt ); return FD_LAYOUT_FINI( l, fd_stream_ctx_scratch_align() ); } @@ -47,7 +76,8 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt ); + ulong cons_cnt, + ulong out_cnt ); void fd_stream_ctx_init( fd_stream_ctx_t * ctx, @@ -57,7 +87,7 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, static inline void fd_stream_ctx_update_cons_slow( fd_stream_ctx_t * ctx, ulong slowest_cons ) { -if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) { + if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) { FD_COMPILER_MFENCE(); (*ctx->cons_slow[ slowest_cons ]) += ctx->metrics->in_backp; FD_COMPILER_MFENCE(); @@ -65,9 +95,47 @@ if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) { } static inline void -fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx ) { +fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, + void * tile_ctx, + fd_tile_ctx_init_run_loop_fn_t * tile_init_run_loop, + fd_tile_update_in_fn_t * tile_update_in, + fd_tile_housekeeping_fn_t * tile_housekeeping, + fd_tile_metrics_write_fn_t * tile_metrics_write ) { FD_MGAUGE_SET( TILE, STATUS, 1UL ); fd_stream_ticks_init_timer( ctx->ticks ); + + for( ulong i=0UL; iout_cnt; i++ ) { + fd_stream_writer_init_flow_control_credits( &ctx->writers[ i ] ); + } + + if( tile_init_run_loop ) { + tile_init_run_loop( tile_ctx, ctx ); + } + + if( ctx->in_cnt && !tile_update_in ) { + FD_LOG_ERR(( "tile_update_in function cannot be null if there are producers to this tile!" )); + } + + ctx->tile_update_in = tile_update_in; + ctx->tile_housekeeping = tile_housekeeping; + ctx->tile_metrics_write = tile_metrics_write; +} + +static inline void +fd_stream_ctx_update_flow_control_credits( fd_stream_ctx_t * ctx ) { + /* Recalculate flow control credits */ + ulong slowest_cons = ULONG_MAX; + ulong global_cons_idx = 0UL; + for( ulong i=0UL; iout_cnt; i++ ) { + ulong slowest_local_cons = ULONG_MAX; + fd_stream_writer_update_flow_control_credits( &ctx->writers[i], + &slowest_local_cons ); + slowest_cons = fd_ulong_if( slowest_local_cons!=ULONG_MAX, global_cons_idx + slowest_local_cons, slowest_cons ); + global_cons_idx += ctx->writers[i].cons_cnt; + } + + fd_stream_ctx_update_cons_slow( ctx, + slowest_cons ); } static inline void @@ -84,6 +152,40 @@ fd_stream_ctx_housekeeping_advance( fd_stream_ctx_t * ctx ) { ctx->rng); } +static inline void +fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, + void * tile_ctx ) { + if( FD_UNLIKELY( fd_stream_ticks_is_housekeeping_time( ctx->ticks ) ) ) { + ulong event_idx = fd_event_map_get_event( ctx->event_map ); + + if( FD_LIKELY( event_idxcons_cnt ) ) { /* receive credits */ + ulong cons_idx = event_idx; + + /* Receive flow control credits from this out. */ + fd_stream_writer_receive_flow_control_credits( ctx->consumer_ctx[ cons_idx ].writer, + ctx->consumer_ctx[ cons_idx ].writer_cons_idx ); + + } else if( event_idx>ctx->cons_cnt) { /* send credits */ + ulong in_idx = event_idx - ctx->cons_cnt - 1UL; + ctx->tile_update_in( &ctx->in[ in_idx ] ); + + } else { /* event_idx==cons_cnt, housekeeping event */ + + /* Update metrics counters to external viewers */ + fd_stream_metrics_update_external( ctx->metrics, + ctx->ticks->now, + ctx->tile_metrics_write, + ctx ); + fd_stream_ctx_update_flow_control_credits( ctx ); + + if( ctx->tile_housekeeping ) { + ctx->tile_housekeeping( tile_ctx, ctx ); + } + } + fd_stream_ctx_housekeeping_advance( ctx ); + } +} + static inline void fd_stream_ctx_process_backpressure( fd_stream_ctx_t * ctx ) { fd_stream_metrics_update_backpressure( ctx->metrics, @@ -91,51 +193,46 @@ fd_stream_ctx_process_backpressure( fd_stream_ctx_t * ctx ) { fd_stream_ticks_reload_backpressure( ctx->ticks ); } -typedef int fd_on_stream_frag_fn_t( void * ctx, - fd_stream_reader_t * reader, - fd_stream_frag_meta_t const * frag, - ulong * sz ); - static inline void -fd_stream_ctx_poll( fd_stream_ctx_t * stream_ctx, - void * ctx, +fd_stream_ctx_poll( fd_stream_ctx_t * ctx, + void * tile_ctx, fd_on_stream_frag_fn_t * on_stream_frag ) { - stream_ctx->metrics->in_backp = 0UL; - stream_ctx->ticks->prefrag_ticks = 0UL; + ctx->metrics->in_backp = 0UL; + ctx->ticks->prefrag_ticks = 0UL; /* select input to poll */ - fd_stream_reader_t * this_in = &stream_ctx->in[ stream_ctx->in_seq ]; - stream_ctx->in_seq++; - if( stream_ctx->in_seq>=stream_ctx->in_cnt ) { - stream_ctx->in_seq = 0UL; /* cmov */ + fd_stream_reader_t * this_in = &ctx->in[ ctx->in_seq ]; + ctx->in_seq++; + if( ctx->in_seq>=ctx->in_cnt ) { + ctx->in_seq = 0UL; /* cmov */ } fd_frag_reader_consume_ctx_t consume_ctx; long diff = fd_stream_reader_poll_frag( this_in, - stream_ctx->in_seq, + ctx->in_seq, &consume_ctx ); if( FD_UNLIKELY( diff<0L ) ) { - fd_stream_metrics_update_poll( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now); + fd_stream_metrics_update_poll( ctx->metrics, + ctx->ticks->housekeeping_ticks, + ctx->ticks->prefrag_ticks, + &ctx->ticks->now); fd_stream_reader_process_overrun( this_in, &consume_ctx, diff ); } else if ( FD_UNLIKELY( diff ) ) { - fd_stream_metrics_update_poll_idle( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now ); + fd_stream_metrics_update_poll_idle( ctx->metrics, + ctx->ticks->housekeeping_ticks, + ctx->ticks->prefrag_ticks, + &ctx->ticks->now ); } else { FD_COMPILER_MFENCE(); ulong sz = 0U; fd_stream_frag_meta_t const * frag = fd_type_pun_const( consume_ctx.mline ); - int consumed_frag = on_stream_frag( ctx, this_in, frag, &sz ); + int consumed_frag = on_stream_frag( tile_ctx, this_in, frag, &sz ); fd_stream_reader_consume_bytes( this_in, sz ); @@ -144,10 +241,10 @@ fd_stream_ctx_poll( fd_stream_ctx_t * stream_ctx, &consume_ctx ); } - fd_stream_metrics_update_poll( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now ); + fd_stream_metrics_update_poll( ctx->metrics, + ctx->ticks->housekeeping_ticks, + ctx->ticks->prefrag_ticks, + &ctx->ticks->now ); } } diff --git a/src/discof/restore/stream/fd_stream_writer.c b/src/discof/restore/stream/fd_stream_writer.c index dea9aeb027..62a8801005 100644 --- a/src/discof/restore/stream/fd_stream_writer.c +++ b/src/discof/restore/stream/fd_stream_writer.c @@ -2,14 +2,85 @@ #include "../../../util/log/fd_log.h" #include "../../../tango/dcache/fd_dcache.h" +// fd_stream_writer_t * +// fd_stream_writer_new( void * mem, +// fd_topo_t * topo, +// fd_topo_tile_t * tile, +// ulong link_id, +// ulong read_max, +// ulong burst_byte, +// ulong burst_frag ) { +// if( FD_UNLIKELY( !mem ) ) { +// FD_LOG_WARNING(( "NULL mem" )); +// return NULL; +// } + +// if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_stream_writer_align() ) ) ) { +// FD_LOG_WARNING(( "unaligned mem" )); +// return NULL; +// } + +// FD_SCRATCH_ALLOC_INIT( l, mem ); +// fd_stream_writer_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); + +// fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ link_id ] ]; +// void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ link_id ] ].dcache_obj_id ) ); +// fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache ); +// ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); + +// self->out_mcache = out_mcache; +// self->buf = dcache; +// self->buf_base = (ulong)dcache - (ulong)fd_wksp_containing( dcache ); +// self->buf_off = 0UL; +// self->buf_sz = fd_dcache_data_sz( dcache ); +// self->goff = 0UL; +// self->read_max = read_max; +// self->stream_off = 0UL; +// self->goff_start = 0UL; +// self->out_seq = 0UL; + +// /* Set up flow control state */ +// self->cr_byte_avail = 0UL; +// self->cr_frag_avail = 0UL; +// self->cr_byte_max = fd_dcache_data_sz( dcache ); +// self->cr_frag_max = fd_mcache_depth( self->out_mcache->f ); +// self->burst_byte = burst_byte; +// self->burst_frag = burst_frag; +// self->cons_cnt = cons_cnt; +// self->cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); +// self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); +// self->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ link_id ] ].mcache ); + +// /* Set up consumer fseq pointer array. +// We keep track of 2 fseqs per consumer to manage stream flow control. +// The first fseq tracks the consumer's mcache sequence number. +// The second fseq tracks the consumer's global read offset into stream. */ +// ulong cons_idx = 0UL; +// for( ulong i=0UL; itile_cnt; i++ ) { +// fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; +// for( ulong j=0UL; jin_cnt; j++ ) { +// if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { +// self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; +// if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); +// cons_idx++; +// } +// } +// } + +// fd_memset(self->cons_seq, 0, EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); +// /* make sure we're not tripping */ +// FD_TEST( cons_idx==cons_cnt ); + +// return self; +// } + fd_stream_writer_t * fd_stream_writer_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong link_id, - ulong read_max, - ulong burst_byte, - ulong burst_frag ) { + fd_stream_frag_meta_t * mcache, + uchar * dcache, + ulong cons_cnt, + ulong burst_byte, + ulong burst_frag ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -23,18 +94,13 @@ fd_stream_writer_new( void * mem, FD_SCRATCH_ALLOC_INIT( l, mem ); fd_stream_writer_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); - fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ link_id ] ]; - void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ link_id ] ].dcache_obj_id ) ); - fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache ); - ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); - - self->out_mcache = out_mcache; + self->out_mcache = mcache; self->buf = dcache; self->buf_base = (ulong)dcache - (ulong)fd_wksp_containing( dcache ); self->buf_off = 0UL; self->buf_sz = fd_dcache_data_sz( dcache ); self->goff = 0UL; - self->read_max = read_max; + self->read_max = 0UL; self->stream_off = 0UL; self->goff_start = 0UL; self->out_seq = 0UL; @@ -49,27 +115,8 @@ fd_stream_writer_new( void * mem, self->cons_cnt = cons_cnt; self->cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); - self->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ link_id ] ].mcache ); - - /* Set up consumer fseq pointer array. - We keep track of 2 fseqs per consumer to manage stream flow control. - The first fseq tracks the consumer's mcache sequence number. - The second fseq tracks the consumer's global read offset into stream. */ - ulong cons_idx = 0UL; - for( ulong i=0UL; itile_cnt; i++ ) { - fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; - for( ulong j=0UL; jin_cnt; j++ ) { - if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { - self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; - if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); - cons_idx++; - } - } - } + self->out_sync = fd_mcache_seq_laddr( mcache->f ); fd_memset(self->cons_seq, 0, EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); - /* make sure we're not tripping */ - FD_TEST( cons_idx==cons_cnt ); - return self; } diff --git a/src/discof/restore/stream/fd_stream_writer.h b/src/discof/restore/stream/fd_stream_writer.h index a08bd2a0af..cee738105a 100644 --- a/src/discof/restore/stream/fd_stream_writer.h +++ b/src/discof/restore/stream/fd_stream_writer.h @@ -57,10 +57,9 @@ fd_stream_writer_get_write_ptr( fd_stream_writer_t * writer ) { fd_stream_writer_t * fd_stream_writer_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong link_id, - ulong read_max, + fd_stream_frag_meta_t * mcache, + uchar * dcache, + ulong cons_cnt, ulong burst_byte, ulong burst_frag ); @@ -72,6 +71,19 @@ fd_stream_writer_init_flow_control_credits( fd_stream_writer_t * writer ) { } } +static inline void +fd_stream_writer_set_cons_fseq( fd_stream_writer_t * writer, + ulong cons_idx, + ulong * cons_fseq ) { + writer->cons_fseq[ cons_idx ] = cons_fseq; +} + +static inline void +fd_stream_writer_set_read_max( fd_stream_writer_t * writer, + ulong read_max ) { + writer->read_max = read_max; +} + static inline void fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer, ulong cons_idx) { diff --git a/src/flamenco/snapshot/fd_snapshot_http.c b/src/flamenco/snapshot/fd_snapshot_http.c index bf62bee2e6..bafe5ae2af 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.c +++ b/src/flamenco/snapshot/fd_snapshot_http.c @@ -122,14 +122,18 @@ fd_snapshot_http_new( void * mem, return this; } -static void +void fd_snapshot_http_cleanup_fds( fd_snapshot_http_t * this ) { if( this->snapshot_fd!=-1 ) { - close( this->snapshot_fd ); + if( FD_UNLIKELY( close( this->snapshot_fd ) ) ) { + FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } this->snapshot_fd = -1; } if( this->socket_fd!=-1 ) { - close( this->socket_fd ); + if( FD_UNLIKELY( close( this->socket_fd ) ) ) { + FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } this->socket_fd = -1; } } @@ -192,6 +196,18 @@ fd_snapshot_http_init( fd_snapshot_http_t * this ) { return 0; } +/* for http tile use */ +void +fd_snapshot_http_privileged_init( fd_snapshot_http_t * this ) { + fd_snapshot_http_init( this ); + + /* open snapshot fd for writing to snapshot file */ + this->snapshot_fd = open( this->snapshot_path, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR ); + if( this->snapshot_fd<0 ) { + FD_LOG_ERR(( "open(%s) failed (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) )); + } +} + /* fd_snapshot_http_req writes out the request. */ static int diff --git a/src/flamenco/snapshot/fd_snapshot_http.h b/src/flamenco/snapshot/fd_snapshot_http.h index 20b36aeea4..b0bd4239af 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.h +++ b/src/flamenco/snapshot/fd_snapshot_http.h @@ -98,6 +98,20 @@ struct fd_snapshot_http { typedef struct fd_snapshot_http fd_snapshot_http_t; +FD_FN_PURE static inline ulong +fd_snapshot_http_align( void ) { + return fd_ulong_max( alignof(fd_snapshot_http_t), alignof(fd_snapshot_name_t) ); +} + +FD_FN_PURE static inline ulong +fd_snapshot_http_footprint( void ) { + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_http_t), sizeof(fd_snapshot_http_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_name_t), sizeof(fd_snapshot_name_t) ); + return FD_LAYOUT_FINI( l, fd_snapshot_http_align() ); + +} + fd_snapshot_http_t * fd_snapshot_http_new( void * mem, const char * dst_str, @@ -106,6 +120,9 @@ fd_snapshot_http_new( void * mem, const char * snapshot_dir, fd_snapshot_name_t * name_out ); +void +fd_snapshot_http_privileged_init( fd_snapshot_http_t * this ); + void * fd_snapshot_http_delete( fd_snapshot_http_t * this ); @@ -142,6 +159,9 @@ fd_io_istream_snapshot_http_virtual( fd_snapshot_http_t * this ) { }; } +void +fd_snapshot_http_cleanup_fds( fd_snapshot_http_t * this ); + FD_PROTOTYPES_END #endif /* HEADER_fd_src_flamenco_snapshot_fd_snapshot_http_h */ From 10a94a8c2ade70e2a777c8881dd3d21668f77d9a Mon Sep 17 00:00:00 2001 From: cali-jumptrading Date: Wed, 14 May 2025 00:25:36 +0000 Subject: [PATCH 2/4] http arg set up WIP --- .../firedancer-dev/commands/snapshot_load.c | 105 ++++++++++++------ src/app/shared/fd_action.h | 1 + src/disco/topo/fd_topo.h | 4 +- src/discof/restore/Local.mk | 1 + src/discof/restore/fd_httpdl_tile.c | 60 ++++++---- src/flamenco/snapshot/fd_snapshot_http.c | 2 + 6 files changed, 115 insertions(+), 58 deletions(-) diff --git a/src/app/firedancer-dev/commands/snapshot_load.c b/src/app/firedancer-dev/commands/snapshot_load.c index 69153f0671..27be7caabc 100644 --- a/src/app/firedancer-dev/commands/snapshot_load.c +++ b/src/app/firedancer-dev/commands/snapshot_load.c @@ -5,6 +5,7 @@ #include "../../../disco/topo/fd_topob.h" #include "../../../disco/topo/fd_pod_format.h" #include "../../../util/tile/fd_tile_private.h" +#include "../../../flamenco/snapshot/fd_snapshot_loader.h" #include #include #include @@ -39,10 +40,14 @@ _is_zstd( char const * path ) { fclose( file ); return ( magic==0xFD2FB528UL ); } + static void snapshot_load_topo( config_t * config, args_t const * args ) { - int is_zstd = _is_zstd( args->snapshot_load.snapshot_path ); + fd_snapshot_src_t src[1]; + char snapshot_path_copy[4096]; + memcpy( snapshot_path_copy, args->snapshot_load.snapshot_path, sizeof(snapshot_path_copy) ); + fd_snapshot_src_parse_type_unknown( src, snapshot_path_copy ); fd_topo_t * topo = &config->topo; fd_topob_new( &config->topo, config->name ); @@ -63,13 +68,6 @@ snapshot_load_topo( config_t * config, fd_topob_wksp( topo, "metric" ); fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[0], 0, 0 ); - /* read() tile */ - fd_topob_wksp( topo, "FileRd" ); - fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 ); - fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX ); - FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi ); - FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi ); - /* Uncompressed data stream */ fd_topob_wksp( topo, "snap_stream" ); fd_topo_link_t * snapin_link = fd_topob_link( topo, "snap_stream", "snap_stream", 512UL, 0UL, 0UL ); @@ -77,38 +75,65 @@ snapshot_load_topo( config_t * config, snapin_link->dcache_obj_id = snapin_dcache->id; FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", snapin_dcache->id ) ); - if( is_zstd ) { /* .tar.zst file */ + if( src->type==FD_SNAPSHOT_SRC_FILE ) { + + int is_zstd = _is_zstd( args->snapshot_load.snapshot_path ); + + /* read() tile */ + fd_topob_wksp( topo, "FileRd" ); + fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 ); + fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX ); + FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi ); + FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi ); - /* "unzstd": Zstandard decompress tile */ - fd_topob_wksp( topo, "Unzstd" ); - fd_topo_tile_t * unzstd_tile = fd_topob_tile( topo, "Unzstd", "Unzstd", "Unzstd", tile_to_cpu[2], 0, 0 ); - (void)unzstd_tile; + if( is_zstd ) { /* .tar.zst file */ - /* Compressed data stream */ - fd_topob_wksp( topo, "snap_zstd" ); - fd_topo_link_t * zstd_link = fd_topob_link( topo, "snap_zstd", "snap_zstd", 512UL, 0UL, 0UL ); - fd_topo_obj_t * zstd_dcache = fd_topob_obj( topo, "dcache", "snap_zstd"); - zstd_link->dcache_obj_id = zstd_dcache->id; - FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) ); + /* "unzstd": Zstandard decompress tile */ + fd_topob_wksp( topo, "Unzstd" ); + fd_topo_tile_t * unzstd_tile = fd_topob_tile( topo, "Unzstd", "Unzstd", "Unzstd", tile_to_cpu[2], 0, 0 ); + (void)unzstd_tile; - /* filerd tile -> compressed stream */ - fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL ); - fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + /* Compressed data stream */ + fd_topob_wksp( topo, "snap_zstd" ); + fd_topo_link_t * zstd_link = fd_topob_link( topo, "snap_zstd", "snap_zstd", 512UL, 0UL, 0UL ); + fd_topo_obj_t * zstd_dcache = fd_topob_obj( topo, "dcache", "snap_zstd"); + zstd_link->dcache_obj_id = zstd_dcache->id; + FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) ); - /* compressed stream -> unzstd tile */ - fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - fd_topob_tile_uses( topo, unzstd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY ); + /* filerd tile -> compressed stream */ + fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL ); + fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); - /* unzstd tile -> uncompressed stream */ - fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL ); - fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + /* compressed stream -> unzstd tile */ + fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_uses( topo, unzstd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY ); - } else { /* .tar file */ + /* unzstd tile -> uncompressed stream */ + fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); - /* filerd tile -> uncompressed stream */ - fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL ); - fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + } else { /* .tar file */ + /* filerd tile -> uncompressed stream */ + fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + } + } + else if ( src->type==FD_SNAPSHOT_SRC_HTTP ) { + + /* httpdl() tile */ + fd_topob_wksp( topo, "HttpDl" ); + fd_topo_tile_t * httpdl_tile = fd_topob_tile( topo, "HttpDl", "HttpDl", "HttpDl", tile_to_cpu[1], 0, 0 ); + fd_memcpy( httpdl_tile->httpdl.path, src->http.path, PATH_MAX ); + fd_memcpy( httpdl_tile->httpdl.snapshot_dir, args->snapshot_load.snapshot_path, PATH_MAX ); + fd_memcpy( httpdl_tile->httpdl.dest, src->http.dest, sizeof(src->http.dest) ); + httpdl_tile->httpdl.ip4 = src->http.ip4; + httpdl_tile->httpdl.path_len = src->http.path_len; + httpdl_tile->httpdl.port = src->http.port; + + fd_topob_tile_out( topo, "HttpDl", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, httpdl_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); } fd_topob_wksp( topo, "SnapIn" ); @@ -149,8 +174,9 @@ static void snapshot_load_cmd_args( int * pargc, char *** pargv, args_t * args ) { - char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL ); - char const * snapshot_file = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL ); + char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL ); + char const * snapshot_src = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL ); + char const * snapshot_dir = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot-dir", NULL, NULL ); if( tile_cpus ) { ulong tile_cpus_strlen = strlen( tile_cpus ); @@ -158,10 +184,15 @@ snapshot_load_cmd_args( int * pargc, fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->tile_cpus ), tile_cpus, tile_cpus_strlen ) ); } - if( FD_UNLIKELY( !snapshot_file ) ) FD_LOG_ERR(( "Missing --snapshot flag" )); - ulong snapshot_file_strlen = strlen( snapshot_file ); + if( FD_UNLIKELY( !snapshot_src ) ) FD_LOG_ERR(( "Missing --snapshot flag" )); + ulong snapshot_file_strlen = strlen( snapshot_src ); if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_path) ) ) FD_LOG_ERR(( "--snapshot: path too long" )); - fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_file, snapshot_file_strlen ) ); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_src, snapshot_file_strlen ) ); + + /* FIXME: check if we need the snapshot dir argument (parse the snapshot input src to see if it's http)*/ + ulong snapshot_dir_strlen = strlen( snapshot_dir ); + if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_dir) ) ) FD_LOG_ERR(( "--snapshot-dir: dir too long" )); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_dir ), snapshot_dir, snapshot_dir_strlen ) ); } static void diff --git a/src/app/shared/fd_action.h b/src/app/shared/fd_action.h index cd30022936..c111ef0304 100644 --- a/src/app/shared/fd_action.h +++ b/src/app/shared/fd_action.h @@ -92,6 +92,7 @@ struct fdctl_args { struct { char snapshot_path[ PATH_MAX ]; + char snapshot_dir[ PATH_MAX ]; } snapshot_load; }; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index 26d867bf76..04997b40a7 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -437,9 +437,9 @@ typedef struct { char dest[128]; uint ip4; ushort port; - char const * path; + char path[ PATH_MAX ]; ulong path_len; - char const * snapshot_dir; + char snapshot_dir[ PATH_MAX ]; } httpdl; struct { diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index ea659cac49..0c609fbfad 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -2,6 +2,7 @@ $(call add-objs,fd_filerd_tile,fd_discof) $(call add-objs,fd_snapin_tile,fd_discof) $(call add-objs,fd_actalc_tile,fd_discof) $(call add-objs,fd_unzstd_tile,fd_discof) +$(call add-objs,fd_httpdl_tile,fd_discof) $(call add-objs,stream/fd_stream_writer,fd_discof) $(call add-objs,stream/fd_event_map,fd_discof) $(call add-objs,stream/fd_stream_ctx,fd_discof) diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c index cd39e4a254..bf4e2c034a 100644 --- a/src/discof/restore/fd_httpdl_tile.c +++ b/src/discof/restore/fd_httpdl_tile.c @@ -33,7 +33,6 @@ privileged_init( fd_topo_t * topo, FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); fd_httpdl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_httpdl_tile_t), sizeof(fd_httpdl_tile_t) ); void * http_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_http_align(), fd_snapshot_http_footprint() ); - ctx->writer = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); fd_memset( ctx, 0, sizeof(fd_httpdl_tile_t) ); @@ -59,18 +58,7 @@ fd_httpdl_init_from_stream_ctx( void * _ctx, /* There's only one writer */ ctx->writer = &stream_ctx->writers[0]; -} - -static void -during_housekeeping( void * _ctx, - fd_stream_ctx_t * stream_ctx ) { - (void)_ctx; - (void)stream_ctx; -} - -static void -metrics_write( fd_httpdl_tile_t * ctx ) { - (void)ctx; + fd_stream_writer_set_read_max( ctx->writer, FD_SNAPSHOT_HTTP_RESP_BUF_MAX ); } __attribute__((noreturn)) FD_FN_UNUSED static void @@ -83,6 +71,25 @@ fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) { for(;;) pause(); } +static void +after_credit( fd_httpdl_tile_t * ctx, + fd_stream_ctx_t * stream_ctx ) { + (void)stream_ctx; + /* get write pointers into dcache buffer */ + uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); + ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); + ulong sz = 0UL; + + int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + if( FD_UNLIKELY( err<0 ) ) FD_LOG_ERR(( "http err: %d", err )); + if( FD_UNLIKELY( err>0 ) ) fd_httpdl_shutdown( ctx ); + + if( sz ) { + fd_stream_writer_advance( ctx->writer, sz ); + fd_stream_writer_publish( ctx->writer, sz ); + } +} + __attribute__((noinline)) static void fd_httpdl_run1( fd_httpdl_tile_t * ctx, @@ -90,23 +97,38 @@ fd_httpdl_run1( FD_LOG_INFO(( "Running httpdl tile" )); - fd_stream_ctx_init_run_loop( stream_ctx, ctx, fd_httpdl_init_from_stream_ctx ); + fd_stream_ctx_init_run_loop( stream_ctx, + ctx, + fd_httpdl_init_from_stream_ctx, + NULL, + NULL, + NULL ); for(;;) { fd_stream_ctx_do_housekeeping( stream_ctx, - ctx, - NULL, - during_housekeeping, - NULL ); + ctx ); /* Check if we are backpressured, otherwise poll */ if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { fd_stream_ctx_process_backpressure( stream_ctx ); } else { - after_credit( ctx ); + after_credit( ctx, stream_ctx ); } } } +static void +fd_httpdl_run( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + fd_httpdl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); + ulong cons_cnt = fd_topo_tile_reliable_consumer_cnt( topo, tile ); + ulong out_cnt = tile->out_cnt; + + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt, out_cnt ); + fd_httpdl_run1( ctx, stream_ctx ); +} + fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = { .name = NAME, .scratch_align = scratch_align, diff --git a/src/flamenco/snapshot/fd_snapshot_http.c b/src/flamenco/snapshot/fd_snapshot_http.c index bafe5ae2af..4ddb9a6342 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.c +++ b/src/flamenco/snapshot/fd_snapshot_http.c @@ -668,6 +668,8 @@ fd_io_istream_snapshot_http_read( void * _this, return fd_snapshot_http_dl( this, dst, dst_max, dst_sz ); case FD_SNAPSHOT_HTTP_STATE_READ: return fd_snapshot_http_read( this, dst, dst_max, dst_sz ); + case FD_SNAPSHOT_HTTP_STATE_DONE: + return 1; } /* Not yet ready to read at this point. */ From be315a4470bea16335c3017a965c536a826dc476 Mon Sep 17 00:00:00 2001 From: cali-jumptrading Date: Wed, 14 May 2025 14:52:16 +0000 Subject: [PATCH 3/4] now it's sort of working --- .../firedancer-dev/commands/snapshot_load.c | 70 ++++++++++++++----- src/app/firedancer-dev/main.c | 2 + src/discof/restore/fd_httpdl_tile.c | 7 +- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/app/firedancer-dev/commands/snapshot_load.c b/src/app/firedancer-dev/commands/snapshot_load.c index 27be7caabc..bd46a5ead4 100644 --- a/src/app/firedancer-dev/commands/snapshot_load.c +++ b/src/app/firedancer-dev/commands/snapshot_load.c @@ -126,14 +126,35 @@ snapshot_load_topo( config_t * config, fd_topob_wksp( topo, "HttpDl" ); fd_topo_tile_t * httpdl_tile = fd_topob_tile( topo, "HttpDl", "HttpDl", "HttpDl", tile_to_cpu[1], 0, 0 ); fd_memcpy( httpdl_tile->httpdl.path, src->http.path, PATH_MAX ); - fd_memcpy( httpdl_tile->httpdl.snapshot_dir, args->snapshot_load.snapshot_path, PATH_MAX ); + fd_memcpy( httpdl_tile->httpdl.snapshot_dir, args->snapshot_load.snapshot_dir, PATH_MAX ); fd_memcpy( httpdl_tile->httpdl.dest, src->http.dest, sizeof(src->http.dest) ); httpdl_tile->httpdl.ip4 = src->http.ip4; httpdl_tile->httpdl.path_len = src->http.path_len; httpdl_tile->httpdl.port = src->http.port; - fd_topob_tile_out( topo, "HttpDl", 0UL, "snap_stream", 0UL ); + /* "unzstd": Zstandard decompress tile */ + fd_topob_wksp( topo, "Unzstd" ); + fd_topo_tile_t * unzstd_tile = fd_topob_tile( topo, "Unzstd", "Unzstd", "Unzstd", tile_to_cpu[2], 0, 0 ); + (void)unzstd_tile; + + /* Compressed data stream */ + fd_topob_wksp( topo, "snap_zstd" ); + fd_topo_link_t * zstd_link = fd_topob_link( topo, "snap_zstd", "snap_zstd", 512UL, 0UL, 0UL ); + fd_topo_obj_t * zstd_dcache = fd_topob_obj( topo, "dcache", "snap_zstd"); + zstd_link->dcache_obj_id = zstd_dcache->id; + FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) ); + + /* filerd tile -> compressed stream */ + fd_topob_tile_out( topo, "HttpDl", 0UL, "snap_zstd", 0UL ); fd_topob_tile_uses( topo, httpdl_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + /* compressed stream -> unzstd tile */ + fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_uses( topo, unzstd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY ); + + /* unzstd tile -> uncompressed stream */ + fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); } fd_topob_wksp( topo, "SnapIn" ); @@ -190,9 +211,11 @@ snapshot_load_cmd_args( int * pargc, fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_src, snapshot_file_strlen ) ); /* FIXME: check if we need the snapshot dir argument (parse the snapshot input src to see if it's http)*/ - ulong snapshot_dir_strlen = strlen( snapshot_dir ); - if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_dir) ) ) FD_LOG_ERR(( "--snapshot-dir: dir too long" )); - fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_dir ), snapshot_dir, snapshot_dir_strlen ) ); + if( snapshot_dir!=NULL ) { + ulong snapshot_dir_strlen = strlen( snapshot_dir ); + if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_dir) ) ) FD_LOG_ERR(( "--snapshot-dir: dir too long" )); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_dir ), snapshot_dir, snapshot_dir_strlen ) ); + } } static void @@ -220,15 +243,20 @@ snapshot_load_cmd_fn( args_t * args, double ns_per_tick = 1.0/tick_per_ns; fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run, NULL ); - fd_topo_tile_t * file_rd_tile = &topo->tiles[ fd_topo_find_tile( topo, "FileRd", 0UL ) ]; - fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ]; - ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL ); - fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL; - - ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ]; - ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache ); - ulong volatile * file_rd_metrics = fd_metrics_tile( file_rd_tile->metrics ); - ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics ); + ulong httpdl_tile_idx = fd_topo_find_tile( topo, "HttpDl", 0UL ); + ulong filerd_tile_idx = fd_topo_find_tile( topo, "FileRd", 0UL ); + fd_topo_tile_t * http_dl_tile = httpdl_tile_idx!=ULONG_MAX ? &topo->tiles[ httpdl_tile_idx ] : NULL; + fd_topo_tile_t * file_rd_tile = filerd_tile_idx!=ULONG_MAX ? &topo->tiles[ filerd_tile_idx ] : NULL; + fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ]; + ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL ); + fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL; + fd_topo_tile_t * actalc_tile = &topo->tiles[ fd_topo_find_tile( topo, "ActAlc", 0UL ) ]; + + ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ]; + ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache ); + ulong volatile * file_rd_metrics = file_rd_tile ? fd_metrics_tile( file_rd_tile->metrics ) : NULL; + ulong volatile * http_dl_metrics = http_dl_tile ? fd_metrics_tile( http_dl_tile->metrics ) : NULL; + ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics ); ulong volatile * unzstd_in_metrics = unzstd_tile ? fd_metrics_tile( unzstd_tile->metrics ) : NULL; ulong goff_old = 0UL; @@ -237,18 +265,22 @@ snapshot_load_cmd_fn( args_t * args, ulong acc_cnt_old = 0UL; ulong frag_cnt_old = 0UL; for(;;) { - sleep( 1 ); - - ulong filerd_status = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); + sleep( 1 ); + ulong filerd_status = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; + ulong httpdl_status = http_dl_metrics ? FD_VOLATILE_CONST( http_dl_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; ulong snapin_status = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong unzstd_status = unzstd_in_metrics ? FD_VOLATILE_CONST( unzstd_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; - if( FD_UNLIKELY( filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) { + if( FD_UNLIKELY( httpdl_status==2UL && filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) { FD_LOG_NOTICE(( "Done" )); break; } ulong goff = FD_VOLATILE_CONST( snap_in_fseq[ 1 ] ); - ulong file_rd_backp = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); + ulong file_rd_backp = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ) : 0UL; + ulong file_rd_wait = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + + FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ) + + file_rd_backp : 0UL; + ulong snap_in_backp = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); ulong snap_in_wait = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ); ulong frag_cnt = FD_VOLATILE_CONST( snap_accs_sync[0] ); diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index cb9fc04c42..0210f267a8 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -102,6 +102,7 @@ extern fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd; extern fd_topo_run_tile_t fd_tile_snapshot_restore_SnapIn; extern fd_topo_run_tile_t fd_tile_snapshot_restore_ActAlc; extern fd_topo_run_tile_t fd_tile_snapshot_restore_Unzstd; +extern fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl; fd_topo_run_tile_t * TILES[] = { &fd_tile_net, @@ -140,6 +141,7 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_snapshot_restore_SnapIn, &fd_tile_snapshot_restore_ActAlc, &fd_tile_snapshot_restore_Unzstd, + &fd_tile_snapshot_restore_HttpDl, NULL, }; diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c index bf4e2c034a..5d64443709 100644 --- a/src/discof/restore/fd_httpdl_tile.c +++ b/src/discof/restore/fd_httpdl_tile.c @@ -82,7 +82,10 @@ after_credit( fd_httpdl_tile_t * ctx, int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); if( FD_UNLIKELY( err<0 ) ) FD_LOG_ERR(( "http err: %d", err )); - if( FD_UNLIKELY( err>0 ) ) fd_httpdl_shutdown( ctx ); + if( FD_UNLIKELY( err>0 ) ) { + FD_LOG_WARNING(("HTTP download complete! shutting down")); + fd_httpdl_shutdown( ctx ); + } if( sz ) { fd_stream_writer_advance( ctx->writer, sz ); @@ -130,7 +133,7 @@ fd_httpdl_run( fd_topo_t * topo, } fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = { - .name = NAME, + .name = "HttpDl", .scratch_align = scratch_align, .scratch_footprint = scratch_footprint, .privileged_init = privileged_init, From 48562fe468d963c9306b50447d38dabfff954e86 Mon Sep 17 00:00:00 2001 From: cali-jumptrading Date: Wed, 14 May 2025 21:52:36 +0000 Subject: [PATCH 4/4] even more generic stream ctx --- .../firedancer-dev/commands/snapshot_load.c | 10 +- src/discof/restore/fd_httpdl_tile.c | 71 ++++++-- src/discof/restore/fd_snapin_tile.c | 4 +- src/discof/restore/fd_unzstd_tile.c | 28 ++- src/discof/restore/stream/fd_event_map.c | 6 +- src/discof/restore/stream/fd_event_map.h | 14 +- src/discof/restore/stream/fd_stream_ctx.c | 45 +---- src/discof/restore/stream/fd_stream_ctx.h | 161 +++++++++--------- src/discof/restore/stream/fd_stream_metrics.h | 35 ---- src/discof/restore/stream/fd_stream_ticks.h | 6 - src/discof/restore/stream/fd_stream_writer.c | 114 ++++--------- src/discof/restore/stream/fd_stream_writer.h | 33 ++-- 12 files changed, 214 insertions(+), 313 deletions(-) diff --git a/src/app/firedancer-dev/commands/snapshot_load.c b/src/app/firedancer-dev/commands/snapshot_load.c index bd46a5ead4..795292876e 100644 --- a/src/app/firedancer-dev/commands/snapshot_load.c +++ b/src/app/firedancer-dev/commands/snapshot_load.c @@ -250,7 +250,7 @@ snapshot_load_cmd_fn( args_t * args, fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ]; ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL ); fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL; - fd_topo_tile_t * actalc_tile = &topo->tiles[ fd_topo_find_tile( topo, "ActAlc", 0UL ) ]; + // fd_topo_tile_t * actalc_tile = &topo->tiles[ fd_topo_find_tile( topo, "ActAlc", 0UL ) ]; ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ]; ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache ); @@ -277,10 +277,10 @@ snapshot_load_cmd_fn( args_t * args, ulong goff = FD_VOLATILE_CONST( snap_in_fseq[ 1 ] ); ulong file_rd_backp = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ) : 0UL; - ulong file_rd_wait = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + - FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ) + - file_rd_backp : 0UL; - ulong snap_in_backp = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); + // ulong file_rd_wait = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + + // FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ) + + // file_rd_backp : 0UL; + // ulong snap_in_backp = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); ulong snap_in_wait = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ); ulong frag_cnt = FD_VOLATILE_CONST( snap_accs_sync[0] ); diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c index 5d64443709..df1a40488d 100644 --- a/src/discof/restore/fd_httpdl_tile.c +++ b/src/discof/restore/fd_httpdl_tile.c @@ -5,6 +5,7 @@ #include #define NAME "http" +#define HTTP_CHUNK_SZ 8 * 1024 * 1024UL struct fd_httpdl_tile { fd_snapshot_http_t * http; @@ -58,7 +59,7 @@ fd_httpdl_init_from_stream_ctx( void * _ctx, /* There's only one writer */ ctx->writer = &stream_ctx->writers[0]; - fd_stream_writer_set_read_max( ctx->writer, FD_SNAPSHOT_HTTP_RESP_BUF_MAX ); + fd_stream_writer_set_read_max( ctx->writer, HTTP_CHUNK_SZ ); } __attribute__((noreturn)) FD_FN_UNUSED static void @@ -71,10 +72,53 @@ fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) { for(;;) pause(); } -static void -after_credit( fd_httpdl_tile_t * ctx, - fd_stream_ctx_t * stream_ctx ) { +__attribute__((unused)) static void +after_credit_chunk( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + (void)stream_ctx; + ulong downloaded_sz = 0UL; + + for(;;) { + if( downloaded_sz >= HTTP_CHUNK_SZ ) { + fd_stream_writer_publish( ctx->writer, downloaded_sz ); + break; + } + /* get write pointers into dcache buffer */ + uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); + ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); + ulong sz = 0UL; + + if( dst_max==0 ) { + fd_stream_writer_publish( ctx->writer, downloaded_sz ); + break; + } + + int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + if( FD_UNLIKELY( err<0 ) ) FD_LOG_ERR(( "http err: %d", err )); + if( FD_UNLIKELY( err>0 ) ) { + FD_LOG_WARNING(("HTTP download complete! shutting down")); + fd_httpdl_shutdown( ctx ); + } + + if( sz ) { + fd_stream_writer_advance( ctx->writer, sz ); + downloaded_sz += sz; + } + } +} + +__attribute__((unused)) static void +after_credit_stream( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); (void)stream_ctx; + + /* Don't do anything if backpressured */ + if( fd_stream_writer_is_backpressured( ctx->writer ) ) { + return; + } + /* get write pointers into dcache buffer */ uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); @@ -105,18 +149,10 @@ fd_httpdl_run1( fd_httpdl_init_from_stream_ctx, NULL, NULL, + NULL, + after_credit_stream, NULL ); - for(;;) { - fd_stream_ctx_do_housekeeping( stream_ctx, - ctx ); - - /* Check if we are backpressured, otherwise poll */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - fd_stream_ctx_process_backpressure( stream_ctx ); - } else { - after_credit( ctx, stream_ctx ); - } - } + fd_stream_ctx_run_loop( stream_ctx, ctx ); } static void @@ -124,11 +160,10 @@ fd_httpdl_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_httpdl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong cons_cnt = fd_topo_tile_reliable_consumer_cnt( topo, tile ); ulong out_cnt = tile->out_cnt; - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt, out_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt, out_cnt ); + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); fd_httpdl_run1( ctx, stream_ctx ); } diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index ca3aea7a2f..9c33116304 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -1135,14 +1135,14 @@ fd_snapin_run( fd_topo_t * topo, } ulong reliable_cons_cnt = 0UL; - // ulong cons_out[ FD_TOPO_MAX_LINKS ]; + ulong cons_out[ FD_TOPO_MAX_LINKS ]; ulong * cons_fseq[ FD_TOPO_MAX_LINKS ]; for( ulong i=0UL; itile_cnt; i++ ) { fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; for( ulong j=0UL; jin_cnt; j++ ) { for( ulong k=0UL; kout_cnt; k++ ) { if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) { - // cons_out[ reliable_cons_cnt ] = k; + cons_out[ reliable_cons_cnt ] = k; cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ]; FD_TEST( cons_fseq[ reliable_cons_cnt ] ); reliable_cons_cnt++; diff --git a/src/discof/restore/fd_unzstd_tile.c b/src/discof/restore/fd_unzstd_tile.c index 7bd01a68e0..bf57aec3bd 100644 --- a/src/discof/restore/fd_unzstd_tile.c +++ b/src/discof/restore/fd_unzstd_tile.c @@ -95,6 +95,12 @@ on_stream_frag( void * _ctx, fd_stream_frag_meta_t const * frag, ulong * sz ) { fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + + /* Don't do anything if backpressured */ + if( fd_stream_writer_is_backpressured( ctx->writer ) ) { + return 0; + } + uchar const * chunk0 = ctx->in_state.in_buf + frag->loff; uchar const * chunk_start = chunk0 + ctx->in_state.in_skip; uchar const * chunk_end = chunk0 + frag->sz; @@ -180,20 +186,11 @@ fd_unzstd_run1( fd_unzstd_init_from_stream_ctx, fd_unzstd_in_update, during_housekeeping, - NULL ); + NULL, + NULL, + on_stream_frag ); - for(;;) { - /* do housekeeping manages flow control credits */ - fd_stream_ctx_do_housekeeping( stream_ctx, - ctx ); - - /* Check if we are backpressured, otherwise poll */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - fd_stream_ctx_process_backpressure( stream_ctx ); - } else { - fd_stream_ctx_poll( stream_ctx, ctx, on_stream_frag ); - } - } + fd_stream_ctx_run_loop( stream_ctx, ctx ); } static void @@ -201,11 +198,10 @@ fd_unzstd_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_unzstd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong cons_cnt = fd_topo_tile_reliable_consumer_cnt( topo, tile ); ulong out_cnt = tile->out_cnt; - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt, out_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt, out_cnt ); + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); fd_unzstd_run1( ctx, stream_ctx ); } diff --git a/src/discof/restore/stream/fd_event_map.c b/src/discof/restore/stream/fd_event_map.c index 8d02c60df5..9eddf5f0c5 100644 --- a/src/discof/restore/stream/fd_event_map.c +++ b/src/discof/restore/stream/fd_event_map.c @@ -3,7 +3,7 @@ fd_event_map_t * fd_event_map_new( void * mem, ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -17,13 +17,13 @@ fd_event_map_new( void * mem, FD_SCRATCH_ALLOC_INIT( l, mem ); fd_event_map_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_map_t), sizeof(fd_event_map_t) ); - ulong event_cnt = 1UL + in_cnt + cons_cnt; + ulong event_cnt = 1UL + in_cnt + out_cnt; self->event_map = FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), sizeof(ushort)*event_cnt ); self->event_cnt = event_cnt; self->event_seq = 0UL; /* init event map */ - fd_event_map_init(self, in_cnt, cons_cnt ); + fd_event_map_init(self, in_cnt, out_cnt ); return self; } diff --git a/src/discof/restore/stream/fd_event_map.h b/src/discof/restore/stream/fd_event_map.h index eaa89b9739..13b3fc0d08 100644 --- a/src/discof/restore/stream/fd_event_map.h +++ b/src/discof/restore/stream/fd_event_map.h @@ -22,8 +22,8 @@ fd_event_map_align( void ) { FD_FN_CONST static inline ulong fd_event_map_footprint( ulong in_cnt, - ulong cons_cnt ) { - ulong event_cnt = 1UL + in_cnt + cons_cnt; + ulong out_cnt ) { + ulong event_cnt = 1UL + in_cnt + out_cnt; ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND(l, alignof(fd_event_map_t), sizeof(fd_event_map_t) ); l = FD_LAYOUT_APPEND(l, alignof(ushort), sizeof(ushort)*event_cnt ); @@ -33,17 +33,17 @@ fd_event_map_footprint( ulong in_cnt, fd_event_map_t * fd_event_map_new( void * mem, ulong in_cnt, - ulong cons_cnt ); + ulong out_cnt ); static inline void fd_event_map_init( fd_event_map_t * map, ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { ulong idx = 0UL; - map->event_map[ idx++ ] = (ushort)cons_cnt; + map->event_map[ idx++ ] = (ushort)out_cnt; for( ulong in_idx=0UL; in_idxevent_map[ idx++ ] = (ushort)(in_idx+cons_cnt+1UL); - for( ulong cons_idx=0UL; cons_idxevent_map[ idx++ ] = (ushort)(in_idx+out_cnt+1UL); + for( ulong cons_idx=0UL; cons_idxevent_map[ idx++ ] = (ushort)cons_idx; } diff --git a/src/discof/restore/stream/fd_stream_ctx.c b/src/discof/restore/stream/fd_stream_ctx.c index f1c44b7c15..b6c95a6f34 100644 --- a/src/discof/restore/stream/fd_stream_ctx.c +++ b/src/discof/restore/stream/fd_stream_ctx.c @@ -23,47 +23,17 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, /* init writers */ for( ulong i=0UL; iout_cnt; i++ ) { - fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ i ] ].mcache ); - void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ i ] ].dcache_obj_id ) ); - fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ]; - ulong writer_cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); fd_stream_writer_new( &ctx->writers[i], - out_mcache, - dcache, - writer_cons_cnt, + topo, + tile, + i, 512UL, 2UL ); } - /* init cons_fseq */ - ulong cons_idx = 0UL; - for( ulong i=0UL; iout_cnt; i++ ) { - ulong local_cons_idx = 0UL; - for( ulong j=0UL; jtile_cnt; j++ ) { - fd_topo_tile_t * consumer_tile = &topo->tiles[ j ]; - for( ulong k=0UL; kin_cnt; k++ ) { - if( FD_UNLIKELY( consumer_tile->in_link_id[ k ]==tile->out_link_id[ i ] && consumer_tile->in_link_reliable[ k ] ) ) { - ctx->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ k ]; - if( FD_UNLIKELY( !ctx->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); - fd_stream_writer_set_cons_fseq( &ctx->writers[i], local_cons_idx, consumer_tile->in_link_fseq[ k ] ); - ctx->consumer_ctx[ cons_idx ].writer = &ctx->writers[ i ]; - ctx->consumer_ctx[ cons_idx ].writer_cons_idx = local_cons_idx; - cons_idx++; - local_cons_idx++; - } - } - } - } - fd_stream_ticks_init( ctx->ticks, ctx->event_map->event_cnt, 1e3L ); fd_stream_metrics_init( ctx->metrics ); FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, 0, 0UL ) ) ); - - /* init metrics link for cons_slow */ - cons_idx = 0UL; - for( ; cons_idxcons_cnt; cons_idx++ ) { - ctx->cons_slow[ cons_idx ] = (ulong *)(fd_metrics_link_out( fd_metrics_base_tl, cons_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF); - } } fd_stream_ctx_t * @@ -71,7 +41,6 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt, ulong out_cnt ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); @@ -88,17 +57,13 @@ fd_stream_ctx_new( void * mem, self->in = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); self->in_ptrs = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); - self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); - self->cons_slow = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); - void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); - self->consumer_ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_consumer_ctx_t), sizeof(fd_consumer_ctx_t)*out_cnt ); + void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); self->writers = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), sizeof(fd_stream_writer_t)*out_cnt ); self->in_cnt = in_cnt; - self->cons_cnt = cons_cnt; self->out_cnt = out_cnt; - self->event_map = fd_event_map_new( event_map_mem, in_cnt, cons_cnt ); + self->event_map = fd_event_map_new( event_map_mem, in_cnt, out_cnt ); fd_stream_ctx_init( self, topo, tile ); self->in_seq = 0UL; diff --git a/src/discof/restore/stream/fd_stream_ctx.h b/src/discof/restore/stream/fd_stream_ctx.h index c6792bb183..300c09dc56 100644 --- a/src/discof/restore/stream/fd_stream_ctx.h +++ b/src/discof/restore/stream/fd_stream_ctx.h @@ -8,12 +8,6 @@ #include "fd_stream_ticks.h" #include "fd_stream_metrics.h" -struct fd_consumer_ctx { - fd_stream_writer_t * writer; - ulong writer_cons_idx; -}; -typedef struct fd_consumer_ctx fd_consumer_ctx_t; - struct fd_stream_ctx; typedef struct fd_stream_ctx fd_stream_ctx_t; @@ -23,29 +17,28 @@ typedef void fd_tile_update_in_fn_t( fd_stream_reader_t * reader ); typedef void fd_tile_housekeeping_fn_t( void * ctx, fd_stream_ctx_t * stream_ctx ); typedef void fd_tile_metrics_write_fn_t( void * ctx ); -typedef int fd_on_stream_frag_fn_t( void * ctx, - fd_stream_reader_t * reader, - fd_stream_frag_meta_t const * frag, +typedef void fd_tile_run_fn_t( void * ctx, fd_stream_ctx_t * stream_ctx ); +typedef int fd_tile_on_stream_frag_fn_t( void * ctx, + fd_stream_reader_t * reader, + fd_stream_frag_meta_t const * frag, ulong * sz ); struct fd_stream_ctx { fd_stream_reader_t * in; fd_stream_reader_t ** in_ptrs; - ulong ** cons_fseq; - ulong ** cons_slow; fd_event_map_t * event_map; ulong in_cnt; - ulong cons_cnt; ulong out_cnt; ulong in_seq; fd_rng_t rng[1]; fd_stream_ticks_t ticks[1]; fd_stream_metrics_t metrics[1]; fd_stream_writer_t * writers; - fd_consumer_ctx_t * consumer_ctx; fd_tile_update_in_fn_t * tile_update_in; fd_tile_housekeeping_fn_t * tile_housekeeping; fd_tile_metrics_write_fn_t * tile_metrics_write; + fd_tile_run_fn_t * tile_run; + fd_tile_on_stream_frag_fn_t * tile_on_stream_frag; }; typedef struct fd_stream_ctx fd_stream_ctx_t; @@ -58,16 +51,12 @@ fd_stream_ctx_scratch_align( void ) { FD_FN_PURE static inline ulong fd_stream_ctx_scratch_footprint( ulong in_cnt, - ulong cons_cnt, ulong out_cnt ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); /* in */ l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); /* in_ptrs */ - l = FD_LAYOUT_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); /* cons_fseq */ - l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); /* cons_slow */ - l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); /* event_map */ - l = FD_LAYOUT_APPEND( l, alignof(fd_consumer_ctx_t), sizeof(fd_consumer_ctx_t)*out_cnt ); + l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); /* event_map */ return FD_LAYOUT_FINI( l, fd_stream_ctx_scratch_align() ); } @@ -76,7 +65,6 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt, ulong out_cnt ); void @@ -84,23 +72,29 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, fd_topo_t * topo, fd_topo_tile_t * tile ); -static inline void -fd_stream_ctx_update_cons_slow( fd_stream_ctx_t * ctx, - ulong slowest_cons ) { - if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) { - FD_COMPILER_MFENCE(); - (*ctx->cons_slow[ slowest_cons ]) += ctx->metrics->in_backp; - FD_COMPILER_MFENCE(); - } -} - static inline void fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, void * tile_ctx, fd_tile_ctx_init_run_loop_fn_t * tile_init_run_loop, fd_tile_update_in_fn_t * tile_update_in, fd_tile_housekeeping_fn_t * tile_housekeeping, - fd_tile_metrics_write_fn_t * tile_metrics_write ) { + fd_tile_metrics_write_fn_t * tile_metrics_write, + fd_tile_run_fn_t * tile_run, + fd_tile_on_stream_frag_fn_t * tile_on_stream_frag ) { + if( ctx->in_cnt && !tile_update_in ) { + FD_LOG_ERR(( "tile_update_in function cannot be null if there are producers to this tile!" )); + } + + if( ctx->in_cnt && !tile_on_stream_frag ) { + FD_LOG_ERR(( "tile_on_stream_frag function cannot be null if there are producers to this tile!" )); + } + + ctx->tile_update_in = tile_update_in; + ctx->tile_housekeeping = tile_housekeeping; + ctx->tile_metrics_write = tile_metrics_write; + ctx->tile_run = tile_run; + ctx->tile_on_stream_frag = tile_on_stream_frag; + FD_MGAUGE_SET( TILE, STATUS, 1UL ); fd_stream_ticks_init_timer( ctx->ticks ); @@ -111,31 +105,14 @@ fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, if( tile_init_run_loop ) { tile_init_run_loop( tile_ctx, ctx ); } - - if( ctx->in_cnt && !tile_update_in ) { - FD_LOG_ERR(( "tile_update_in function cannot be null if there are producers to this tile!" )); - } - - ctx->tile_update_in = tile_update_in; - ctx->tile_housekeeping = tile_housekeeping; - ctx->tile_metrics_write = tile_metrics_write; } static inline void fd_stream_ctx_update_flow_control_credits( fd_stream_ctx_t * ctx ) { /* Recalculate flow control credits */ - ulong slowest_cons = ULONG_MAX; - ulong global_cons_idx = 0UL; for( ulong i=0UL; iout_cnt; i++ ) { - ulong slowest_local_cons = ULONG_MAX; - fd_stream_writer_update_flow_control_credits( &ctx->writers[i], - &slowest_local_cons ); - slowest_cons = fd_ulong_if( slowest_local_cons!=ULONG_MAX, global_cons_idx + slowest_local_cons, slowest_cons ); - global_cons_idx += ctx->writers[i].cons_cnt; + fd_stream_writer_update_flow_control_credits( &ctx->writers[i] ); } - - fd_stream_ctx_update_cons_slow( ctx, - slowest_cons ); } static inline void @@ -158,18 +135,17 @@ fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, if( FD_UNLIKELY( fd_stream_ticks_is_housekeeping_time( ctx->ticks ) ) ) { ulong event_idx = fd_event_map_get_event( ctx->event_map ); - if( FD_LIKELY( event_idxcons_cnt ) ) { /* receive credits */ - ulong cons_idx = event_idx; + if( FD_LIKELY( event_idxout_cnt ) ) { /* receive credits */ + ulong out_idx = event_idx; /* Receive flow control credits from this out. */ - fd_stream_writer_receive_flow_control_credits( ctx->consumer_ctx[ cons_idx ].writer, - ctx->consumer_ctx[ cons_idx ].writer_cons_idx ); + fd_stream_writer_receive_flow_control_credits( &ctx->writers[ out_idx ] ); - } else if( event_idx>ctx->cons_cnt) { /* send credits */ - ulong in_idx = event_idx - ctx->cons_cnt - 1UL; + } else if( event_idx>ctx->out_cnt) { /* send credits */ + ulong in_idx = event_idx - ctx->out_cnt - 1UL; ctx->tile_update_in( &ctx->in[ in_idx ] ); - } else { /* event_idx==cons_cnt, housekeeping event */ + } else { /* event_idx==out_cnt, housekeeping event */ /* Update metrics counters to external viewers */ fd_stream_metrics_update_external( ctx->metrics, @@ -182,22 +158,47 @@ fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, ctx->tile_housekeeping( tile_ctx, ctx ); } } + fd_stream_ctx_housekeeping_advance( ctx ); } } static inline void -fd_stream_ctx_process_backpressure( fd_stream_ctx_t * ctx ) { - fd_stream_metrics_update_backpressure( ctx->metrics, - ctx->ticks->housekeeping_ticks ); - fd_stream_ticks_reload_backpressure( ctx->ticks ); +fd_stream_ctx_advance_poll_empty( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[0] += ctx->ticks->housekeeping_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[3] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; +} + +static inline void +fd_stream_ctx_advance_poll( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[1] += ctx->ticks->housekeeping_ticks; + ctx->metrics->regime_ticks[4] += ctx->ticks->prefrag_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[7] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; +} + +static inline void +fd_stream_ctx_advance_poll_idle( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[0] += ctx->ticks->housekeeping_ticks; + ctx->metrics->regime_ticks[3] += ctx->ticks->prefrag_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[6] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; } static inline void fd_stream_ctx_poll( fd_stream_ctx_t * ctx, - void * tile_ctx, - fd_on_stream_frag_fn_t * on_stream_frag ) { + void * tile_ctx ) { ctx->metrics->in_backp = 0UL; + + if( FD_UNLIKELY( !ctx->in_cnt ) ) { + fd_stream_ctx_advance_poll_empty( ctx ); + return; + } + ctx->ticks->prefrag_ticks = 0UL; /* select input to poll */ @@ -213,26 +214,22 @@ fd_stream_ctx_poll( fd_stream_ctx_t * ctx, &consume_ctx ); if( FD_UNLIKELY( diff<0L ) ) { - fd_stream_metrics_update_poll( ctx->metrics, - ctx->ticks->housekeeping_ticks, - ctx->ticks->prefrag_ticks, - &ctx->ticks->now); + /* overrun case, technically impossible with reliable streams */ + fd_stream_ctx_advance_poll( ctx ); fd_stream_reader_process_overrun( this_in, &consume_ctx, diff ); } else if ( FD_UNLIKELY( diff ) ) { - fd_stream_metrics_update_poll_idle( ctx->metrics, - ctx->ticks->housekeeping_ticks, - ctx->ticks->prefrag_ticks, - &ctx->ticks->now ); + /* nothing new to poll */ + fd_stream_ctx_advance_poll_idle( ctx ); } else { FD_COMPILER_MFENCE(); ulong sz = 0U; fd_stream_frag_meta_t const * frag = fd_type_pun_const( consume_ctx.mline ); - int consumed_frag = on_stream_frag( tile_ctx, this_in, frag, &sz ); + int consumed_frag = ctx->tile_on_stream_frag( tile_ctx, this_in, frag, &sz ); fd_stream_reader_consume_bytes( this_in, sz ); @@ -241,10 +238,21 @@ fd_stream_ctx_poll( fd_stream_ctx_t * ctx, &consume_ctx ); } - fd_stream_metrics_update_poll( ctx->metrics, - ctx->ticks->housekeeping_ticks, - ctx->ticks->prefrag_ticks, - &ctx->ticks->now ); + fd_stream_ctx_advance_poll( ctx ); + } +} + +static inline void +fd_stream_ctx_run_loop( fd_stream_ctx_t * ctx, + void * tile_ctx ) { + for(;;) { + fd_stream_ctx_do_housekeeping( ctx, tile_ctx ); + + if( ctx->tile_run ) { + ctx->tile_run( tile_ctx, ctx ); + } + + fd_stream_ctx_poll( ctx, tile_ctx ); } } @@ -255,11 +263,6 @@ fd_stream_ctx_delete( fd_stream_ctx_t * ctx ) { ctx->in_ptrs[ i ] = NULL; } - for( ulong i=0UL; icons_cnt; i++ ) { - ctx->cons_fseq[ i ] = NULL; - ctx->cons_slow[ i ] = NULL; - } - fd_event_map_delete( ctx->event_map ); fd_memset(ctx, 0, sizeof(fd_stream_ctx_t) ); return (void *)ctx; diff --git a/src/discof/restore/stream/fd_stream_metrics.h b/src/discof/restore/stream/fd_stream_metrics.h index 031e5500e2..d3b103811d 100644 --- a/src/discof/restore/stream/fd_stream_metrics.h +++ b/src/discof/restore/stream/fd_stream_metrics.h @@ -29,8 +29,6 @@ fd_stream_metrics_update_external( fd_stream_metrics_t * metrics, void * ctx ) { FD_COMPILER_MFENCE(); FD_MGAUGE_SET( TILE, HEARTBEAT, (ulong)now ); - FD_MGAUGE_SET( TILE, IN_BACKPRESSURE, metrics->in_backp ); - FD_MCNT_INC ( TILE, BACKPRESSURE_COUNT, metrics->backp_cnt ); FD_MCNT_ENUM_COPY( TILE, REGIME_DURATION_NANOS, metrics->regime_ticks ); if( metrics_write ) { @@ -41,39 +39,6 @@ fd_stream_metrics_update_external( fd_stream_metrics_t * metrics, metrics->backp_cnt = 0UL; } -static inline void -fd_stream_metrics_update_backpressure( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks ) { - metrics->backp_cnt += (ulong)!metrics->in_backp; - metrics->in_backp = 1UL; - FD_SPIN_PAUSE(); - metrics->regime_ticks[2] += housekeeping_ticks; -} - -static inline void -fd_stream_metrics_update_poll( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks, - ulong prefrag_ticks, - long * now) { - metrics->regime_ticks[1] += housekeeping_ticks; - metrics->regime_ticks[4] += prefrag_ticks; - long next = fd_tickcount(); - metrics->regime_ticks[7] += (ulong)(next - *now); - *now = next; -} - -static inline void -fd_stream_metrics_update_poll_idle( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks, - ulong prefrag_ticks, - long * now) { - metrics->regime_ticks[0] += housekeeping_ticks; - metrics->regime_ticks[3] += prefrag_ticks; - long next = fd_tickcount(); - metrics->regime_ticks[6] += (ulong)(next - *now); - *now = next; -} - FD_PROTOTYPES_END #endif /* HEADER_fd_src_discof_restore_stream_fd_stream_metrics_h */ diff --git a/src/discof/restore/stream/fd_stream_ticks.h b/src/discof/restore/stream/fd_stream_ticks.h index 9db346cfac..ac181fadb1 100644 --- a/src/discof/restore/stream/fd_stream_ticks.h +++ b/src/discof/restore/stream/fd_stream_ticks.h @@ -46,10 +46,4 @@ fd_stream_ticks_reload_housekeeping( fd_stream_ticks_t * ticks, fd_rng_t * rng ) ticks->now = next; } -static inline void -fd_stream_ticks_reload_backpressure( fd_stream_ticks_t * ticks ) { - long next = fd_tickcount(); - ticks->now = next; -} - #endif /* HEADER_fd_src_discof_restore_stream_fd_stream_ticks_h */ diff --git a/src/discof/restore/stream/fd_stream_writer.c b/src/discof/restore/stream/fd_stream_writer.c index 62a8801005..69e654d03d 100644 --- a/src/discof/restore/stream/fd_stream_writer.c +++ b/src/discof/restore/stream/fd_stream_writer.c @@ -2,85 +2,13 @@ #include "../../../util/log/fd_log.h" #include "../../../tango/dcache/fd_dcache.h" -// fd_stream_writer_t * -// fd_stream_writer_new( void * mem, -// fd_topo_t * topo, -// fd_topo_tile_t * tile, -// ulong link_id, -// ulong read_max, -// ulong burst_byte, -// ulong burst_frag ) { -// if( FD_UNLIKELY( !mem ) ) { -// FD_LOG_WARNING(( "NULL mem" )); -// return NULL; -// } - -// if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_stream_writer_align() ) ) ) { -// FD_LOG_WARNING(( "unaligned mem" )); -// return NULL; -// } - -// FD_SCRATCH_ALLOC_INIT( l, mem ); -// fd_stream_writer_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); - -// fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ link_id ] ]; -// void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ link_id ] ].dcache_obj_id ) ); -// fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache ); -// ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); - -// self->out_mcache = out_mcache; -// self->buf = dcache; -// self->buf_base = (ulong)dcache - (ulong)fd_wksp_containing( dcache ); -// self->buf_off = 0UL; -// self->buf_sz = fd_dcache_data_sz( dcache ); -// self->goff = 0UL; -// self->read_max = read_max; -// self->stream_off = 0UL; -// self->goff_start = 0UL; -// self->out_seq = 0UL; - -// /* Set up flow control state */ -// self->cr_byte_avail = 0UL; -// self->cr_frag_avail = 0UL; -// self->cr_byte_max = fd_dcache_data_sz( dcache ); -// self->cr_frag_max = fd_mcache_depth( self->out_mcache->f ); -// self->burst_byte = burst_byte; -// self->burst_frag = burst_frag; -// self->cons_cnt = cons_cnt; -// self->cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); -// self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); -// self->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ link_id ] ].mcache ); - -// /* Set up consumer fseq pointer array. -// We keep track of 2 fseqs per consumer to manage stream flow control. -// The first fseq tracks the consumer's mcache sequence number. -// The second fseq tracks the consumer's global read offset into stream. */ -// ulong cons_idx = 0UL; -// for( ulong i=0UL; itile_cnt; i++ ) { -// fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; -// for( ulong j=0UL; jin_cnt; j++ ) { -// if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { -// self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; -// if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); -// cons_idx++; -// } -// } -// } - -// fd_memset(self->cons_seq, 0, EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); -// /* make sure we're not tripping */ -// FD_TEST( cons_idx==cons_cnt ); - -// return self; -// } - fd_stream_writer_t * fd_stream_writer_new( void * mem, - fd_stream_frag_meta_t * mcache, - uchar * dcache, - ulong cons_cnt, - ulong burst_byte, - ulong burst_frag ) { + fd_topo_t * topo, + fd_topo_tile_t * tile, + ulong link_id, + ulong burst_byte, + ulong burst_frag ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -94,13 +22,18 @@ fd_stream_writer_new( void * mem, FD_SCRATCH_ALLOC_INIT( l, mem ); fd_stream_writer_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); - self->out_mcache = mcache; + fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ link_id ] ]; + void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ link_id ] ].dcache_obj_id ) ); + fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache ); + ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); + + self->out_mcache = out_mcache; self->buf = dcache; self->buf_base = (ulong)dcache - (ulong)fd_wksp_containing( dcache ); self->buf_off = 0UL; self->buf_sz = fd_dcache_data_sz( dcache ); self->goff = 0UL; - self->read_max = 0UL; + self->read_max = 0UL; /* this should be set by the tile via fd_stream_writer_set_read_max */ self->stream_off = 0UL; self->goff_start = 0UL; self->out_seq = 0UL; @@ -115,8 +48,29 @@ fd_stream_writer_new( void * mem, self->cons_cnt = cons_cnt; self->cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); - self->out_sync = fd_mcache_seq_laddr( mcache->f ); + self->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ link_id ] ].mcache ); + + /* Set up consumer fseq pointer array. + We keep track of 2 fseqs per consumer to manage stream flow control. + The first fseq tracks the consumer's mcache sequence number. + The second fseq tracks the consumer's global read offset into stream. */ + ulong cons_idx = 0UL; + for( ulong i=0UL; itile_cnt; i++ ) { + fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; + for( ulong j=0UL; jin_cnt; j++ ) { + if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { + self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; + if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) { + FD_LOG_ERR(( "NULL cons_fseq[%lu] for out_link=%lu", cons_idx, tile->out_link_id[ link_id ] )); + } + cons_idx++; + } + } + } fd_memset(self->cons_seq, 0, EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); + /* make sure we're not tripping */ + FD_TEST( cons_idx==cons_cnt ); + return self; } diff --git a/src/discof/restore/stream/fd_stream_writer.h b/src/discof/restore/stream/fd_stream_writer.h index cee738105a..1908d0091e 100644 --- a/src/discof/restore/stream/fd_stream_writer.h +++ b/src/discof/restore/stream/fd_stream_writer.h @@ -57,9 +57,9 @@ fd_stream_writer_get_write_ptr( fd_stream_writer_t * writer ) { fd_stream_writer_t * fd_stream_writer_new( void * mem, - fd_stream_frag_meta_t * mcache, - uchar * dcache, - ulong cons_cnt, + fd_topo_t * topo, + fd_topo_tile_t * tile, + ulong link_id, ulong burst_byte, ulong burst_frag ); @@ -71,13 +71,6 @@ fd_stream_writer_init_flow_control_credits( fd_stream_writer_t * writer ) { } } -static inline void -fd_stream_writer_set_cons_fseq( fd_stream_writer_t * writer, - ulong cons_idx, - ulong * cons_fseq ) { - writer->cons_fseq[ cons_idx ] = cons_fseq; -} - static inline void fd_stream_writer_set_read_max( fd_stream_writer_t * writer, ulong read_max ) { @@ -85,17 +78,17 @@ fd_stream_writer_set_read_max( fd_stream_writer_t * writer, } static inline void -fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer, - ulong cons_idx) { - FD_COMPILER_MFENCE(); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][0] ); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][1] ); - FD_COMPILER_MFENCE(); +fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer ) { + for( ulong i=0UL; icons_cnt; i++ ) { + FD_COMPILER_MFENCE(); + writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][0] ); + writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][1] ); + FD_COMPILER_MFENCE(); + } } static inline void -fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer, - ulong * slowest_cons_out ) { +fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer ) { ulong slowest_cons = ULONG_MAX; if( FD_LIKELY( writer->cr_byte_availcr_byte_max || writer->cr_frag_availcr_frag_max ) ) { ulong cr_byte_avail = writer->cr_byte_max; @@ -111,10 +104,6 @@ fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer, writer->cr_byte_avail = cr_byte_avail; writer->cr_frag_avail = cr_frag_avail; } - - if( slowest_cons_out ) { - *slowest_cons_out = slowest_cons; - } } static inline ulong