diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index 38a4aba0bc..6a693b14d8 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -89,7 +89,7 @@ typedef struct { All input links will be automatically polled by the tile infrastructure, and output links will automatically source and manage credits from consumers. */ -typedef struct { +struct fd_topo_tile { ulong id; /* The ID of this tile. Indexed from [0, tile_cnt). When placed in a topology, the ID must be the index of the tile in the tiles list. */ char name[ 7UL ]; /* The name of this tile. There can be multiple of each tile name in a topology. */ ulong kind_id; /* The ID of this tile within its name. If there are n tile of a particular name, they have IDs [0, N). The pair (name, kind_id) uniquely identifies a tile, as does "id" on its own. */ @@ -455,7 +455,9 @@ typedef struct { } actidx; }; -} fd_topo_tile_t; +}; + +typedef struct fd_topo_tile fd_topo_tile_t; typedef struct { ulong id; diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index bb1366c801..334526f06b 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -1,11 +1,16 @@ $(call add-objs,fd_filerd_tile,fd_discof) +ifdef FD_HAS_ZSTD $(call add-objs,fd_unzstd_tile,fd_discof) +endif +ifdef FD_HAS_INT128 $(call add-objs,fd_snapin_tile,fd_discof) $(call add-objs,fd_actalc_tile,fd_discof) +endif $(call add-objs,fd_actidx_tile,fd_discof) -$(call add-objs,fd_unzstd_tile,fd_discof) $(call add-objs,fd_httpdl_tile,fd_discof) $(call add-objs,stream/fd_stream_writer,fd_discof) $(call add-objs,stream/fd_event_map,fd_discof) $(call add-objs,stream/fd_stream_ctx,fd_discof) -$(call make-unit-test,test_snapin_tile,test_snapin_tile,fd_discof fd_util) +ifdef FD_HAS_INT128 +$(call make-unit-test,test_snapin_tile,test_snapin_tile,fd_discof fd_disco fd_flamenco fd_tango fd_ballet fd_util) +endif diff --git a/src/discof/restore/fd_filerd_tile.c b/src/discof/restore/fd_filerd_tile.c index 23fc148d84..da683448da 100644 --- a/src/discof/restore/fd_filerd_tile.c +++ b/src/discof/restore/fd_filerd_tile.c @@ -50,9 +50,9 @@ unprivileged_init( fd_topo_t * topo, static void fd_filerd_init_from_stream_ctx( void * _ctx, fd_stream_ctx_t * stream_ctx ) { - fd_filerd_tile_t * ctx = fd_type_pun(_ctx); - - ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] ); + fd_filerd_tile_t * ctx = _ctx; + ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] ); + FD_TEST( ctx->writer ); fd_stream_writer_set_frag_sz_max( ctx->writer, FILE_READ_MAX ); } @@ -63,7 +63,7 @@ fd_filerd_shutdown( fd_filerd_tile_t * ctx ) { } ctx->fd = -1; FD_MGAUGE_SET( TILE, STATUS, 2UL ); - fd_stream_writer_notify_shutdown( ctx->writer ); + fd_stream_writer_close( ctx->writer ); FD_COMPILER_MFENCE(); FD_LOG_INFO(( "Reached end of file" )); @@ -74,24 +74,22 @@ static void after_credit( void * _ctx, fd_stream_ctx_t * stream_ctx, int * poll_in FD_PARAM_UNUSED ) { - fd_filerd_tile_t * ctx = fd_type_pun(_ctx); + fd_filerd_tile_t * ctx = _ctx; (void)stream_ctx; + uchar * out = fd_stream_writer_prepare( ctx->writer ); + ulong out_max = fd_stream_writer_publish_sz_max( ctx->writer ); + /* technically, this is not needed because fd_stream_ctx_run_loop checks for backpresure on all outgoing links and there is only one outgoing link anyways. But, it is added for clarity that callbacks should handle backpressure for their out links. */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - return; - } + if( FD_UNLIKELY( !out_max ) ) return; int fd = ctx->fd; if( FD_UNLIKELY( fd<0 ) ) return; - uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); - ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); - - long res = read( fd, out, dst_max ); + long res = read( fd, out, out_max ); if( FD_UNLIKELY( res<=0L ) ) { if( FD_UNLIKELY( res==0 ) ) { fd_filerd_shutdown( ctx ); @@ -102,11 +100,7 @@ after_credit( void * _ctx, /* aborts app */ } - ulong sz = (ulong)res; - if( FD_LIKELY( sz ) ) { - fd_stream_writer_advance( ctx->writer, sz ); - fd_stream_writer_publish( ctx->writer, sz ); - } + fd_stream_writer_publish( ctx->writer, (ulong)res, 0UL ); } __attribute__((noinline)) static void @@ -128,11 +122,8 @@ static void fd_filerd_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong out_cnt = tile->out_cnt; - - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); + void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile ); fd_filerd_run1( ctx, stream_ctx ); } diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c index 0a90af60f3..fa03623d31 100644 --- a/src/discof/restore/fd_httpdl_tile.c +++ b/src/discof/restore/fd_httpdl_tile.c @@ -66,7 +66,8 @@ fd_httpdl_init_from_stream_ctx( void * _ctx, fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); /* join writer */ - ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] ); + ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] ); + FD_TEST( ctx->writer ); fd_stream_writer_set_frag_sz_max( ctx->writer, HTTP_CHUNK_SZ ); } @@ -74,7 +75,7 @@ __attribute__((noreturn)) FD_FN_UNUSED static void fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) { fd_snapshot_http_cleanup_fds( ctx->http ); FD_MGAUGE_SET( TILE, STATUS, 2UL ); - fd_stream_writer_notify_shutdown( ctx->writer ); + fd_stream_writer_close( ctx->writer ); FD_COMPILER_MFENCE(); FD_LOG_WARNING(("Done downloading snapshot")); @@ -85,39 +86,23 @@ __attribute__((unused)) static void after_credit_chunk( void * _ctx, fd_stream_ctx_t * stream_ctx, int * opt_poll_in FD_PARAM_UNUSED ) { - fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + fd_httpdl_tile_t * ctx = _ctx; (void)stream_ctx; - ulong downloaded_sz = 0UL; - /* Don't do anything if backpressured */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - return; - } + /* Output */ + uchar * const out = fd_stream_writer_prepare( ctx->writer ); + uchar * const out_end = out + fd_stream_writer_publish_sz_max( ctx->writer ); + uchar * out_cur = out; - for(;;) { - if( downloaded_sz >= HTTP_CHUNK_SZ ) { - fd_stream_writer_publish( ctx->writer, downloaded_sz ); - break; - } - /* get write pointers into dcache buffer */ - uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); - ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); - ulong sz = 0UL; - - if( dst_max==0 ) { - fd_stream_writer_publish( ctx->writer, downloaded_sz ); - break; - } - - int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + while( out_curhttp, out_cur, (ulong)out_cur-(ulong)out, &chunk_sz ); if( FD_UNLIKELY( err==1 ) ) fd_httpdl_shutdown( ctx ); else if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "http err: %d", err )); - - if( sz ) { - fd_stream_writer_advance( ctx->writer, sz ); - downloaded_sz += sz; - } + out_cur += chunk_sz; } + + fd_stream_writer_publish( ctx->writer, (ulong)out_cur-(ulong)out, 0UL ); } __attribute__((unused)) static void @@ -127,24 +112,16 @@ after_credit_stream( void * _ctx, fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); (void)stream_ctx; - /* Don't do anything if backpressured */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - return; - } - - /* get write pointers into dcache buffer */ - uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); - ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); - ulong sz = 0UL; + /* Output */ + uchar * const out = fd_stream_writer_prepare( ctx->writer ); + ulong const out_max = fd_stream_writer_publish_sz_max( ctx->writer ); - int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + ulong chunk_sz; + int err = fd_io_istream_snapshot_http_read( ctx->http, out, out_max, &chunk_sz ); if( FD_UNLIKELY( err==1 ) ) fd_httpdl_shutdown( ctx ); else if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "http err: %d", err )); - if( FD_LIKELY( sz ) ) { - fd_stream_writer_advance( ctx->writer, sz ); - fd_stream_writer_publish( ctx->writer, sz ); - } + fd_stream_writer_publish( ctx->writer, chunk_sz, 0UL ); } __attribute__((noinline)) static void @@ -168,11 +145,9 @@ static void fd_httpdl_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_httpdl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong out_cnt = tile->out_cnt; - - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); + void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile ); + FD_TEST( stream_ctx ); fd_httpdl_run1( ctx, stream_ctx ); } @@ -186,6 +161,3 @@ fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = { }; #undef NAME - - - diff --git a/src/discof/restore/fd_restore_base.h b/src/discof/restore/fd_restore_base.h index bdf011481f..0dd55ecc26 100644 --- a/src/discof/restore/fd_restore_base.h +++ b/src/discof/restore/fd_restore_base.h @@ -8,19 +8,19 @@ union fd_stream_frag_meta { -struct { + struct { - ulong seq; /* frag sequence number */ - uint sz; - ushort unused; - ushort ctl; + ulong seq; /* frag sequence number */ + uint sz; + ushort unused; + ushort ctl; - ulong goff; /* stream offset */ - ulong loff; /* dcache offset */ + ulong goff; /* stream offset */ + ulong loff; /* dcache offset */ -}; + }; -fd_frag_meta_t f[1]; + fd_frag_meta_t f[1]; }; diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index 9369f37550..e14eb3bf2f 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -57,6 +57,7 @@ typedef struct fd_snapshot_accv_map fd_snapshot_accv_map_t; #define SNAP_FLAG_FAILED 1 #define SNAP_FLAG_BLOCKED 2 +#define SNAP_FLAG_DONE 4 struct fd_snapin_tile { uchar state; @@ -68,6 +69,7 @@ struct fd_snapin_tile { uchar const * in_base; ulong goff_translate; ulong in_skip; + ulong const * in_sync; /* Frame buffer */ @@ -102,7 +104,6 @@ struct fd_snapin_tile { ulong out_cnt; ulong out_depth; ulong seq; - ulong const volatile * shutdown_signal; }; typedef struct fd_snapin_tile fd_snapin_tile_t; @@ -120,21 +121,20 @@ struct fd_snapin_in { typedef struct fd_snapin_in fd_snapin_in_t; -__attribute__((noreturn)) static void +static void fd_snapin_shutdown( fd_snapin_tile_t * ctx ) { - ulong in_seq_max = FD_VOLATILE_CONST( *ctx->shutdown_signal ); - /* wait for zstd tile to set shutdown sequence number */ - while ( in_seq_max == 0 ) { - in_seq_max = FD_VOLATILE_CONST( *ctx->shutdown_signal ); - FD_SPIN_PAUSE(); - } + ctx->flags = SNAP_FLAG_DONE; - /* FIXME set final sequence number */ FD_MGAUGE_SET( TILE, STATUS, 2UL ); - FD_TEST( in_seq_max == ctx->seq+1 && in_seq_max != 0 ); - FD_COMPILER_MFENCE(); FD_LOG_WARNING(( "Finished parsing snapshot" )); + /* Send synchronization info */ + ulong volatile * out_sync = fd_mcache_seq_laddr( ctx->out_mcache->f ); + FD_COMPILER_MFENCE(); + FD_VOLATILE( out_sync[0] ) = ctx->out_seq; + FD_VOLATILE( out_sync[2] ) = 1; + FD_COMPILER_MFENCE(); + for(;;) pause(); } @@ -285,8 +285,8 @@ restore_file( void * restore_, static uchar const * snapshot_read_buffered( fd_snapin_tile_t * restore, - uchar const * buf, - ulong bufsz ) { + uchar const * buf, + ulong bufsz ) { /* Should not be called if read is complete */ FD_TEST( restore->buf_ctr < restore->buf_sz ); @@ -363,7 +363,7 @@ snapshot_read_account_hdr_chunk( fd_snapin_tile_t * restore, peek_sz = fd_ulong_min( restore->acc_rem, bufsz ); } - int eom = bufsz > restore->acc_rem; + int eom = bufsz >= restore->acc_rem; /* Publish header-only fragment or header+data fragment. If data was included, skip ahead. (Combining header+data into the @@ -576,6 +576,25 @@ scratch_footprint( fd_topo_tile_t const * tile ) { return l; } +static fd_snapin_tile_t * +scratch_init( void * mem, + fd_topo_tile_t const * tile ) { + if( FD_UNLIKELY( !mem ) ) return NULL; + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, scratch_align() ) ) ) return NULL; + + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) ); + void * accv_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_accv_map_align(), fd_snapshot_accv_map_footprint() ); + void * scratch_mem = FD_SCRATCH_ALLOC_APPEND( l, 16UL, tile->snapin.scratch_sz ); + + fd_memset( ctx, 0, sizeof(fd_snapin_tile_t) ); + ctx->accv_map = fd_snapshot_accv_map_join( fd_snapshot_accv_map_new( accv_map_mem ) ); + FD_TEST( ctx->accv_map ); + ctx->buf = scratch_mem; + + return ctx; +} + FD_FN_UNUSED static void unprivileged_init( fd_topo_t * topo, fd_topo_tile_t * tile ) { @@ -587,11 +606,8 @@ unprivileged_init( fd_topo_t * topo, if( FD_UNLIKELY( !tile->snapin.scratch_sz ) ) FD_LOG_ERR(( "scratch_sz param not set" )); - FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); - fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) ); - void * accv_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_accv_map_align(), fd_snapshot_accv_map_footprint() ); - void * scratch_mem = FD_SCRATCH_ALLOC_APPEND( l, 16UL, tile->snapin.scratch_sz ); - fd_memset( ctx, 0, sizeof(fd_snapin_tile_t) ); + fd_snapin_tile_t * ctx = scratch_init( fd_topo_obj_laddr( topo, tile->tile_obj_id ), tile ); + if( FD_UNLIKELY( !ctx ) ) FD_LOG_ERR(( "scratch_init failed" )); /* Init state */ @@ -604,26 +620,20 @@ unprivileged_init( fd_topo_t * topo, FD_TEST( fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->in_link_id[ 0 ] ].dcache_obj_id ) ) ); ctx->in_base = (uchar const *)topo->workspaces[ topo->objs[ topo->links[ tile->in_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp; ctx->in_skip = 0UL; + ctx->in_sync = fd_mcache_seq_laddr_const( topo->links[ tile->in_link_id[ 0 ] ].mcache ); /* Join frame buffer */ - ctx->buf = scratch_mem; ctx->buf_sz = 0UL; ctx->buf_ctr = 0UL; ctx->buf_max = tile->snapin.scratch_sz; - /* Join snapshot file parser */ - - ctx->accv_map = fd_snapshot_accv_map_join( fd_snapshot_accv_map_new( accv_map_mem ) ); - FD_TEST( ctx->accv_map ); - /* Join account output */ ctx->out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ 0 ] ].mcache ); ctx->out_seq_max = 0UL; ctx->out_seq = 0UL; ctx->out_depth = fd_mcache_depth( ctx->out_mcache->f ); - ctx->shutdown_signal = fd_mcache_seq_laddr_const( topo->links[ tile->in_link_id[ 0 ] ].mcache ) + 2; } @@ -772,6 +782,10 @@ on_stream_frag( fd_snapin_tile_t * ctx, ulong * read_sz ) { if( FD_UNLIKELY( ctx->flags ) ) { if( FD_UNLIKELY( ctx->flags & SNAP_FLAG_FAILED ) ) FD_LOG_ERR(( "Failed to restore snapshot" )); + if( FD_UNLIKELY( ctx->flags & SNAP_FLAG_DONE ) ) { + *read_sz = frag->sz; + return 1; + } return 0; } @@ -795,7 +809,7 @@ on_stream_frag( fd_snapin_tile_t * ctx, FD_LOG_ERR(( "Failed to restore snapshot" )); } } - if( FD_UNLIKELY( ctx->out_seq >= ctx->out_seq_max ) ) { + if( FD_UNLIKELY( fd_seq_ge( ctx->out_seq, ctx->out_seq_max ) ) ) { consume_frag = 0; /* retry this frag */ ulong consumed_sz = (uint)( cur-start ); ctx->in_skip += consumed_sz; diff --git a/src/discof/restore/fd_unzstd_tile.c b/src/discof/restore/fd_unzstd_tile.c index d1f44bbdef..a22b9e061b 100644 --- a/src/discof/restore/fd_unzstd_tile.c +++ b/src/discof/restore/fd_unzstd_tile.c @@ -3,8 +3,7 @@ #include "fd_restore_base.h" #include "stream/fd_stream_ctx.h" #include "stream/fd_stream_writer.h" -#include -#include +#include /* pause */ #define NAME "unzstd" #define ZSTD_WINDOW_SZ (33554432UL) @@ -59,29 +58,30 @@ fd_unzstd_init_from_stream_ctx( void * _ctx, fd_stream_ctx_t * stream_ctx ) { fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); - /* join writer */ - ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] ); + /* There's only one writer */ + ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] ); + FD_TEST( ctx->writer ); fd_stream_writer_set_frag_sz_max( ctx->writer, ZSTD_FRAME_SZ ); } __attribute__((noreturn)) static void fd_unzstd_shutdown( fd_unzstd_tile_t * ctx ) { FD_MGAUGE_SET( TILE, STATUS, 2UL ); - fd_stream_writer_notify_shutdown( ctx->writer ); + fd_stream_writer_close( ctx->writer ); FD_COMPILER_MFENCE(); for(;;) pause(); } static void -fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, - fd_unzstd_tile_t * ctx ) { - ulong shutdown_seq = fd_stream_reader_poll_shutdown( stream_ctx->in_ptrs[0] ); - if( FD_UNLIKELY( shutdown_seq ) ) { - FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", - shutdown_seq, stream_ctx->in[0].base.seq)); - fd_unzstd_shutdown( ctx ); - } +fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, + fd_unzstd_tile_t * ctx ) { + ulong const volatile * in_sync = stream_ctx->in_ptrs[ 0 ]->in_sync; + if( FD_LIKELY( !FD_VOLATILE_CONST( in_sync[ 2 ] ) ) ) return; + + FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", + FD_VOLATILE_CONST( in_sync[ 0 ] ), stream_ctx->in[0].base.seq )); + fd_unzstd_shutdown( ctx ); } static void @@ -98,61 +98,44 @@ on_stream_frag( void * _ctx, ulong * sz ) { fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); - /* Don't do anything if backpressured */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - return 0; - } + /* Input */ + uchar const * in_chunk0 = ctx->in_state.in_buf + frag->loff; + uchar const * in_chunk_start = in_chunk0 + ctx->in_state.in_skip; + uchar const * in_chunk_end = in_chunk0 + frag->sz; + uchar const * in_cur = in_chunk_start; + int in_consume = 0; - uchar const * chunk0 = ctx->in_state.in_buf + frag->loff; - uchar const * chunk_start = chunk0 + ctx->in_state.in_skip; - uchar const * chunk_end = chunk0 + frag->sz; - uchar const * cur = chunk_start; - ulong total_decompressed = 0UL; - int consume_frag = 0; + /* Output */ + uchar * const out = fd_stream_writer_prepare( ctx->writer ); + uchar * const out_end = out + fd_stream_writer_publish_sz_max( ctx->writer ); + uchar * out_cur = out; - for(;;) { - uchar const * prev = cur; + while( out_curwriter, total_decompressed ); ctx->in_state.in_skip = 0UL; - consume_frag = 1; - break; - } - - /* get write pointers into dcache buffer */ - uchar * buf_write_start = fd_stream_writer_get_write_ptr( ctx->writer ); - uchar * out = buf_write_start; - ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); - uchar * out_end = buf_write_start + dst_max; - - if( dst_max==0 ) { - /* we are blocked by downstream */ - fd_stream_writer_publish( ctx->writer, total_decompressed ); + in_consume = 1; break; } /* fd_zstd_dstream_read updates chunk_start and out */ - int zstd_err = fd_zstd_dstream_read( ctx->dstream, &cur, chunk_end, &out, out_end, NULL ); - if( FD_UNLIKELY( zstd_err>0) ) { + int zstd_err = fd_zstd_dstream_read( ctx->dstream, &in_cur, in_chunk_end, &out_cur, out_end, NULL ); + if( FD_UNLIKELY( zstd_err>0 ) ) { FD_LOG_ERR(( "fd_zstd_dstream_read failed" )); break; } - /* accumulate decompressed bytes */ - ulong decompress_sz = (ulong)out - (ulong)buf_write_start; - total_decompressed += decompress_sz; - /* accumulate consumed bytes */ - ulong consumed_sz = (ulong)cur - (ulong)prev; + ulong consumed_sz = (ulong)in_cur - (ulong)in_prev; ctx->in_state.in_skip += consumed_sz; - - fd_stream_writer_advance( ctx->writer, decompress_sz ); } - *sz = (ulong)cur - (ulong)chunk_start; - return consume_frag; + fd_stream_writer_publish( ctx->writer, (ulong)out_cur-(ulong)out, 0UL ); + + *sz = (ulong)in_cur - (ulong)in_chunk_start; + return in_consume; } static void @@ -196,13 +179,10 @@ static void fd_unzstd_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_unzstd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong out_cnt = tile->out_cnt; - - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); - fd_unzstd_run1( ctx, - stream_ctx ); + void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile ); + FD_TEST( stream_ctx ); + fd_unzstd_run1( ctx, stream_ctx ); } #ifndef FD_TILE_TEST diff --git a/src/discof/restore/stream/fd_stream_ctx.c b/src/discof/restore/stream/fd_stream_ctx.c index 6d94775035..8743a7bb61 100644 --- a/src/discof/restore/stream/fd_stream_ctx.c +++ b/src/discof/restore/stream/fd_stream_ctx.c @@ -1,74 +1,102 @@ #include "fd_stream_ctx.h" +#include "fd_stream_writer.h" -void -fd_stream_ctx_init( fd_stream_ctx_t * ctx, - fd_topo_t * topo, - fd_topo_tile_t * tile ) { - /* init in */ - ulong in_idx = 0UL; - for( ulong i=0UL; iin_cnt; i++ ) { - if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue; - - fd_stream_reader_init( &ctx->in[ in_idx ], - fd_type_pun( topo->links[ tile->in_link_id[ i ] ].mcache ), - tile->in_link_fseq[ i ], - in_idx ); - in_idx++; - } +FD_FN_PURE ulong +fd_stream_ctx_align( void ) { + return 128UL; +} - /* init in_ptrs */ - for( ulong i=0UL; iin_cnt; i++ ) { - ctx->in_ptrs[ i ] = &ctx->in[ i ]; - } +ulong +fd_stream_ctx_footprint( fd_topo_t const * topo, + fd_topo_tile_t const * tile ) { + ulong const in_cnt = fd_topo_tile_producer_cnt( topo, tile ); + ulong const out_cnt = tile->out_cnt; - /* init writers */ - /* FIXME: make burst_byte and burst_frag configurable */ - for( ulong i=0UL; iout_cnt; i++ ) { - fd_stream_writer_new( &ctx->writers[i], - topo, - tile, - i, - 512UL, - 2UL ); + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_event_map_t), fd_event_map_footprint( in_cnt, out_cnt ) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_stream_writer_t *), out_cnt*sizeof(fd_stream_writer_t *) ); + for( ulong i=0UL; ilinks[ tile->out_link_id[ i ] ]; + ulong writer_fp = fd_stream_writer_footprint( fd_topo_link_reliable_consumer_cnt( topo, link ) ); + FD_TEST( writer_fp ); + l = FD_LAYOUT_APPEND( l, fd_stream_writer_align(), writer_fp ); } - - fd_stream_ticks_init( ctx->ticks, ctx->event_map->event_cnt, 1e3L ); - fd_stream_metrics_init( ctx->metrics ); - - /* FIXME: rng seed should not be 0 */ - FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, 0, 0UL ) ) ); + return FD_LAYOUT_FINI( l, fd_stream_ctx_align() ); } fd_stream_ctx_t * -fd_stream_ctx_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong in_cnt, - ulong out_cnt ) { +fd_stream_ctx_new( void * mem, + fd_topo_t const * topo, + fd_topo_tile_t const * tile ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; } - if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_stream_ctx_scratch_align() ) ) ) { + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_stream_ctx_align() ) ) ) { FD_LOG_WARNING(( "unaligned mem" )); return NULL; } FD_SCRATCH_ALLOC_INIT( l, mem ); fd_stream_ctx_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); + fd_memset( self, 0, sizeof(fd_stream_ctx_t) ); + + ulong const in_cnt = fd_topo_tile_producer_cnt( topo, tile ); + ulong const out_cnt = tile->out_cnt; self->in = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); self->in_ptrs = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); - self->writers = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), sizeof(fd_stream_writer_t)*out_cnt ); + self->writers = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t *), out_cnt*sizeof(fd_stream_writer_t *) ); + + for( ulong i=0UL; ilinks[ tile->out_link_id[ i ] ]; + ulong const cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); + void * writer = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint( cons_cnt ) ); + + self->writers[ i ] = fd_stream_writer_new_topo( + writer, + fd_topo_link_reliable_consumer_cnt( topo, link ), + topo, + tile, + i + ); + if( FD_UNLIKELY( !self->writers[ i ] ) ) return NULL; /* logs warning */ + } self->in_cnt = in_cnt; self->out_cnt = out_cnt; self->event_map = fd_event_map_new( event_map_mem, in_cnt, out_cnt ); - fd_stream_ctx_init( self, topo, tile ); self->in_seq = 0UL; + /* init in */ + ulong in_idx = 0UL; + for( ulong i=0UL; iin_cnt; i++ ) { + if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue; + + fd_stream_reader_init( &self->in[ in_idx ], + fd_type_pun( topo->links[ tile->in_link_id[ i ] ].mcache ), + tile->in_link_fseq[ i ], + in_idx ); + in_idx++; + } + + /* init in_ptrs */ + for( ulong i=0UL; iin_cnt; i++ ) { + self->in_ptrs[ i ] = &self->in[ i ]; + } + + fd_stream_ticks_init( self->ticks, self->event_map->event_cnt, 1e3L ); + fd_stream_metrics_init( self->metrics ); + + /* FIXME: rng seed should not be 0 */ + FD_TEST( fd_rng_join( fd_rng_new( self->rng, 0, 0UL ) ) ); + + FD_SCRATCH_ALLOC_FINI( l, fd_stream_ctx_align() ); return self; } diff --git a/src/discof/restore/stream/fd_stream_ctx.h b/src/discof/restore/stream/fd_stream_ctx.h index c812703684..ce46d547a4 100644 --- a/src/discof/restore/stream/fd_stream_ctx.h +++ b/src/discof/restore/stream/fd_stream_ctx.h @@ -11,19 +11,30 @@ struct fd_stream_ctx; typedef struct fd_stream_ctx fd_stream_ctx_t; -typedef void fd_tile_ctx_init_run_loop_fn_t( void * ctx, - fd_stream_ctx_t * stream_ctx ); -typedef void fd_tile_update_in_fn_t( fd_stream_reader_t * reader ); -typedef void fd_tile_housekeeping_fn_t( void * ctx, - fd_stream_ctx_t * stream_ctx ); -typedef void fd_tile_metrics_write_fn_t( void * ctx ); -typedef void fd_tile_run_fn_t( void * ctx, - fd_stream_ctx_t * stream_ctx, - int * opt_poll_in ); -typedef int fd_tile_on_stream_frag_fn_t( void * ctx, - fd_stream_reader_t * reader, - fd_stream_frag_meta_t const * frag, - ulong * sz ); +typedef void +(* fd_tile_ctx_init_run_loop_fn_t)( void * ctx, + fd_stream_ctx_t * stream_ctx ); + +typedef void +(* fd_tile_update_in_fn_t)( fd_stream_reader_t * reader ); + +typedef void +(* fd_tile_housekeeping_fn_t)( void * ctx, + fd_stream_ctx_t * stream_ctx ); + +typedef void +(* fd_tile_metrics_write_fn_t)( void * ctx ); + +typedef void +(* fd_tile_run_fn_t)( void * ctx, + fd_stream_ctx_t * stream_ctx, + int * opt_poll_in ); + +typedef int +(* fd_tile_on_stream_frag_fn_t)( void * ctx, + fd_stream_reader_t * reader, + fd_stream_frag_meta_t const * frag, + ulong * sz ); struct fd_stream_ctx { fd_stream_reader_t * in; @@ -35,54 +46,38 @@ struct fd_stream_ctx { fd_rng_t rng[1]; fd_stream_ticks_t ticks[1]; fd_stream_metrics_t metrics[1]; - fd_stream_writer_t * writers; - fd_tile_update_in_fn_t * tile_update_in; - fd_tile_housekeeping_fn_t * tile_housekeeping; - fd_tile_metrics_write_fn_t * tile_metrics_write; - fd_tile_run_fn_t * tile_run; - fd_tile_on_stream_frag_fn_t * tile_on_stream_frag; + fd_stream_writer_t ** writers; + fd_tile_update_in_fn_t tile_update_in; + fd_tile_housekeeping_fn_t tile_housekeeping; + fd_tile_metrics_write_fn_t tile_metrics_write; + fd_tile_run_fn_t tile_run; + fd_tile_on_stream_frag_fn_t tile_on_stream_frag; }; typedef struct fd_stream_ctx fd_stream_ctx_t; FD_PROTOTYPES_BEGIN -FD_FN_PURE static inline ulong -fd_stream_ctx_scratch_align( void ) { - return FD_STEM_SCRATCH_ALIGN; -} +FD_FN_PURE ulong +fd_stream_ctx_align( void ); -FD_FN_PURE static inline ulong -fd_stream_ctx_scratch_footprint( ulong in_cnt, - ulong out_cnt ) { - ulong l = FD_LAYOUT_INIT; - l = FD_LAYOUT_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); - l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); /* in */ - l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); /* in_ptrs */ - l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); /* event_map */ - return FD_LAYOUT_FINI( l, fd_stream_ctx_scratch_align() ); -} +ulong +fd_stream_ctx_footprint( fd_topo_t const * topo, + fd_topo_tile_t const * tile ); fd_stream_ctx_t * -fd_stream_ctx_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong in_cnt, - ulong out_cnt ); - -void -fd_stream_ctx_init( fd_stream_ctx_t * ctx, - fd_topo_t * topo, - fd_topo_tile_t * tile ); +fd_stream_ctx_new( void * mem, + fd_topo_t const * topo, + fd_topo_tile_t const * tile ); static inline void -fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, - void * tile_ctx, - fd_tile_ctx_init_run_loop_fn_t * tile_init_run_loop, - fd_tile_update_in_fn_t * tile_update_in, - fd_tile_housekeeping_fn_t * tile_housekeeping, - fd_tile_metrics_write_fn_t * tile_metrics_write, - fd_tile_run_fn_t * tile_run, - fd_tile_on_stream_frag_fn_t * tile_on_stream_frag ) { +fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, + void * tile_ctx, + fd_tile_ctx_init_run_loop_fn_t tile_init_run_loop, + fd_tile_update_in_fn_t tile_update_in, + fd_tile_housekeeping_fn_t tile_housekeeping, + fd_tile_metrics_write_fn_t tile_metrics_write, + fd_tile_run_fn_t tile_run, + fd_tile_on_stream_frag_fn_t tile_on_stream_frag ) { if( ctx->in_cnt && !tile_update_in ) { FD_LOG_ERR(( "tile_update_in function cannot be null if there are producers to this tile!" )); } @@ -100,20 +95,16 @@ fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, FD_MGAUGE_SET( TILE, STATUS, 1UL ); fd_stream_ticks_init_timer( ctx->ticks ); - for( ulong i=0UL; iout_cnt; i++ ) { - fd_stream_writer_init_flow_control_credits( &ctx->writers[ i ] ); - } - if( tile_init_run_loop ) { tile_init_run_loop( tile_ctx, ctx ); } } static inline void -fd_stream_ctx_update_flow_control_credits( fd_stream_ctx_t * ctx ) { +fd_stream_ctx_calculate_backpressure( fd_stream_ctx_t * ctx ) { /* Recalculate flow control credits */ for( ulong i=0UL; iout_cnt; i++ ) { - fd_stream_writer_update_flow_control_credits( &ctx->writers[i] ); + fd_stream_writer_calculate_backpressure( ctx->writers[i] ); } } @@ -141,7 +132,7 @@ fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, ulong out_idx = event_idx; /* Receive flow control credits from this out. */ - fd_stream_writer_receive_flow_control_credits( &ctx->writers[ out_idx ] ); + fd_stream_writer_receive_flow_control_credits( ctx->writers[ out_idx ] ); } else if( event_idx>ctx->out_cnt) { /* send credits */ ulong in_idx = event_idx - ctx->out_cnt - 1UL; @@ -154,7 +145,7 @@ fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, ctx->ticks->now, ctx->tile_metrics_write, ctx ); - fd_stream_ctx_update_flow_control_credits( ctx ); + fd_stream_ctx_calculate_backpressure( ctx ); if( ctx->tile_housekeeping ) { ctx->tile_housekeeping( tile_ctx, ctx ); @@ -180,7 +171,7 @@ static inline int fd_stream_ctx_is_backpressured( fd_stream_ctx_t * ctx ) { int backpressured = 1UL; for( ulong i=0UL; iout_cnt; i++ ) { - backpressured &= fd_stream_writer_is_backpressured( &ctx->writers[i] ); + backpressured &= !fd_stream_writer_publish_sz_max( ctx->writers[i] ); } return backpressured; } @@ -299,14 +290,14 @@ fd_stream_ctx_run_loop( fd_stream_ctx_t * ctx, } static inline void -fd_stream_ctx_run( fd_stream_ctx_t * ctx, - void * tile_ctx, - fd_tile_ctx_init_run_loop_fn_t * tile_init_run_loop, - fd_tile_update_in_fn_t * tile_update_in, - fd_tile_housekeeping_fn_t * tile_housekeeping, - fd_tile_metrics_write_fn_t * tile_metrics_write, - fd_tile_run_fn_t * tile_run, - fd_tile_on_stream_frag_fn_t * tile_on_stream_frag ) { +fd_stream_ctx_run( fd_stream_ctx_t * ctx, + void * tile_ctx, + fd_tile_ctx_init_run_loop_fn_t tile_init_run_loop, + fd_tile_update_in_fn_t tile_update_in, + fd_tile_housekeeping_fn_t tile_housekeeping, + fd_tile_metrics_write_fn_t tile_metrics_write, + fd_tile_run_fn_t tile_run, + fd_tile_on_stream_frag_fn_t tile_on_stream_frag ) { fd_stream_ctx_init_run_loop( ctx, tile_ctx, tile_init_run_loop, diff --git a/src/discof/restore/stream/fd_stream_reader.h b/src/discof/restore/stream/fd_stream_reader.h index 8b322594f5..bf40a9787d 100644 --- a/src/discof/restore/stream/fd_stream_reader.h +++ b/src/discof/restore/stream/fd_stream_reader.h @@ -16,11 +16,11 @@ struct fd_stream_reader { uint accum[6]; }; - fd_frag_reader_t r[1]; + fd_frag_reader_t r[1]; /* FIXME strict aliasing violation on mcache */ } base; ulong goff; - ulong const volatile * shutdown_signal; + ulong const volatile * in_sync; }; typedef struct fd_stream_reader fd_stream_reader_t; @@ -43,8 +43,7 @@ fd_stream_reader_init( fd_stream_reader_t * reader, ulong in_idx ) { fd_frag_reader_init( reader->base.r, mcache, fseq, in_idx ); reader->goff = 0UL; - /* shutdown signal is located at fseq 2 */ - reader->shutdown_signal = fd_mcache_seq_laddr_const( reader->base.mcache->f ) + 2; + reader->in_sync = fd_mcache_seq_laddr_const( reader->base.mcache->f ); } static inline fd_stream_reader_t * @@ -97,12 +96,6 @@ fd_stream_reader_consume_frag( fd_stream_reader_t * reader, fd_frag_reader_consume_frag( reader->base.r, ctx ); } -static inline ulong -fd_stream_reader_poll_shutdown( fd_stream_reader_t * reader ) { - ulong const in_seq_max = FD_VOLATILE_CONST( *reader->shutdown_signal ); - return in_seq_max == reader->base.seq && in_seq_max != 0 ? - in_seq_max : 0UL; -} static inline void * fd_stream_reader_delete( fd_stream_reader_t * reader ) { diff --git a/src/discof/restore/stream/fd_stream_writer.c b/src/discof/restore/stream/fd_stream_writer.c index cdc28b1131..f3fbf668ea 100644 --- a/src/discof/restore/stream/fd_stream_writer.c +++ b/src/discof/restore/stream/fd_stream_writer.c @@ -1,77 +1,164 @@ #include "fd_stream_writer.h" #include "../../../util/log/fd_log.h" #include "../../../tango/dcache/fd_dcache.h" +#include "../../../disco/topo/fd_topo.h" fd_stream_writer_t * fd_stream_writer_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong link_id, - ulong burst_byte, - ulong burst_frag ) { + ulong cons_max, + fd_stream_frag_meta_t * mcache, + uchar * dcache ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; } if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_stream_writer_align() ) ) ) { - FD_LOG_WARNING(( "unaligned mem" )); + FD_LOG_WARNING(( "misaligned mem" )); return NULL; } FD_SCRATCH_ALLOC_INIT( l, mem ); - fd_stream_writer_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); - - fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ link_id ] ]; - void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ link_id ] ].dcache_obj_id ) ); - fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache ); - ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link ); - - self->magic = FD_STREAM_WRITER_MAGIC; - self->out_mcache = out_mcache; - self->dcache = dcache; - self->base = (ulong)dcache - (ulong)fd_wksp_containing( dcache ); - self->buf_off = 0UL; - self->buf_sz = fd_dcache_data_sz( dcache ); - self->goff = 0UL; - self->frag_sz_max = 0UL; /* this should be set by the tile via fd_stream_writer_set_frag_sz_max */ - self->stream_off = 0UL; - self->goff_start = 0UL; - self->out_seq = 0UL; - - /* Set up flow control state */ - self->cr_byte_avail = 0UL; - self->cr_frag_avail = 0UL; - self->cr_byte_max = fd_dcache_data_sz( dcache ); - self->cr_frag_max = fd_mcache_depth( self->out_mcache->f ); - self->burst_byte = burst_byte; - self->burst_frag = burst_frag; - self->cons_cnt = cons_cnt; - self->cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); - self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); - self->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ link_id ] ].mcache ); - - /* Set up consumer fseq pointer array. - We keep track of 2 fseqs per consumer to manage stream flow control. - The first fseq tracks the consumer's mcache sequence number. - The second fseq tracks the consumer's global read offset into stream. */ - ulong cons_idx = 0UL; + fd_stream_writer_t * writer = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); + ulong * cons_seq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), cons_max*sizeof(ulong)*FD_STREAM_WRITER_CONS_SEQ_STRIDE ); + ulong volatile ** cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_max*sizeof(ulong *) ); + FD_SCRATCH_ALLOC_FINI( l, fd_stream_writer_align() ); + + fd_memset( writer, 0, sizeof(fd_stream_writer_t) ); + + writer->mcache = mcache; + writer->out_sync = fd_mcache_seq_laddr( mcache->f ); + writer->seq = fd_mcache_seq_query( writer->out_sync ); + writer->depth = fd_mcache_depth( mcache->f ); + + writer->data = dcache; + writer->data_max = fd_dcache_data_sz( dcache ); + writer->data_cur = 0UL; + writer->base = (uchar *)fd_wksp_containing( dcache ); /* FIXME impure */ + writer->goff = 0UL; + + writer->cr_byte_avail = ULONG_MAX; + writer->cr_frag_avail = ULONG_MAX; + writer->cons_seq = cons_seq; + writer->cons_fseq = cons_fseq; + + writer->frag_sz_max = writer->data_max; + + writer->cons_cnt = 0UL; + writer->cons_max = cons_max; + /* writer->out_sync already set */ + + FD_COMPILER_MFENCE(); + writer->magic = FD_STREAM_WRITER_MAGIC; + return writer; +} + +void * +fd_stream_writer_delete( fd_stream_writer_t * writer ) { + fd_memset( writer, 0, sizeof(fd_stream_writer_t) ); + return writer; +} + +ulong * +fd_stream_writer_register_consumer( + fd_stream_writer_t * writer, + ulong * fseq_join +) { + if( FD_UNLIKELY( writer->cons_cnt >= writer->cons_max ) ) { + FD_LOG_WARNING(( "Can't register consumer, cons_max %lu exceeded", writer->cons_max )); + return NULL; + } + writer->cr_byte_avail = 0UL; + writer->cr_frag_avail = 0UL; + + ulong const cons_idx = writer->cons_cnt++; + ulong * seq = writer->cons_seq + ( cons_idx*FD_STREAM_WRITER_CONS_SEQ_STRIDE ); + writer->cons_fseq[ cons_idx ] = fd_type_pun( fseq_join ); + seq[ 0 ] = FD_VOLATILE_CONST( fseq_join[ 0 ] ); + seq[ 1 ] = FD_VOLATILE_CONST( fseq_join[ 1 ] ); + return seq; +} + +fd_stream_writer_t * +fd_stream_writer_new_topo( + void * mem, + ulong cons_max, + fd_topo_t const * topo, + fd_topo_tile_t const * tile, + ulong out_link_idx +) { + ulong const out_link_id = tile->out_link_id[ out_link_idx ]; + fd_topo_link_t const * out_link = &topo->links[ out_link_id ]; + fd_stream_frag_meta_t * mcache = fd_type_pun( out_link->mcache ); + void * dcache = fd_dcache_join( fd_topo_obj_laddr( topo, out_link->dcache_obj_id ) ); + ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, out_link ); + if( FD_UNLIKELY( !mcache ) ) { + FD_LOG_WARNING(( "NULL mcache" )); + return NULL; + } + if( FD_UNLIKELY( !dcache ) ) { + FD_LOG_WARNING(( "NULL dcache" )); + return NULL; + } + if( FD_UNLIKELY( cons_cnt>cons_max ) ) { + FD_LOG_WARNING(( "cons_cnt is %lu but cons_max is only %lu", cons_cnt, cons_max )); + } + + fd_stream_writer_t * writer = fd_stream_writer_new( mem, cons_max, mcache, dcache ); + if( FD_UNLIKELY( !writer ) ) return NULL; /* logs warning */ + for( ulong i=0UL; itile_cnt; i++ ) { - fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; + fd_topo_tile_t const * consumer_tile = &topo->tiles[ i ]; for( ulong j=0UL; jin_cnt; j++ ) { - if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { - self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; - if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) { - FD_LOG_ERR(( "NULL cons_fseq[%lu] for out_link=%lu", cons_idx, tile->out_link_id[ link_id ] )); - } - cons_idx++; + if( consumer_tile->in_link_id[ j ]!=out_link_id ) continue; + if( !consumer_tile->in_link_reliable[ j ] ) continue; + + ulong * fseq = consumer_tile->in_link_fseq[ j ]; + if( FD_UNLIKELY( !fseq ) ) { + FD_LOG_WARNING(( "NULL fseq for consumer tile=%s:%lu in_link_idx=%lu", + consumer_tile->name, consumer_tile->kind_id, j )); + } + if( FD_UNLIKELY( !fd_stream_writer_register_consumer( writer, fseq ) ) ) { + return NULL; /* logs warning */ } } } - fd_memset(self->cons_seq, 0, EXPECTED_FSEQ_CNT_PER_CONS*cons_cnt*sizeof(ulong) ); - /* make sure we're not tripping */ - FD_TEST( cons_idx==cons_cnt ); + return writer; +} - return self; +void +fd_stream_writer_set_frag_sz_max( fd_stream_writer_t * writer, + ulong frag_sz_max ) { + writer->frag_sz_max = fd_ulong_min( writer->data_max, frag_sz_max ); +} + +void +fd_stream_writer_copy( fd_stream_writer_t * writer, + void const * data, + ulong data_sz, + ulong const ctl_mask ) { + if( FD_UNLIKELY( ( data_sz > writer->cr_byte_avail ) | + ( data_sz > writer->data_max ) ) ) { + FD_LOG_ERR(( "invalid data_sz %lu (cr_byte_avail=%lu data_max=%lu)", + data_sz, writer->cr_byte_avail, writer->data_max )); + } + + ulong const frag_sz_max = writer->frag_sz_max; + if( FD_UNLIKELY( !frag_sz_max ) ) { + FD_LOG_ERR(( "zero frag_sz_max" )); + } + int som = 1; + while( data_sz ) { + ulong const op_sz = fd_ulong_min( data_sz, frag_sz_max ); + ulong const next_sz = data_sz-op_sz; + int const eom = next_sz==0UL; + ulong const ctl = ctl_mask & fd_frag_meta_ctl( FD_FRAG_META_ORIG_MAX-1, som, eom, 1 ); + + fd_memcpy( fd_stream_writer_prepare( writer ), data, op_sz ); + fd_stream_writer_publish( writer, op_sz, ctl ); + + som = 0; + data_sz = next_sz; + } } diff --git a/src/discof/restore/stream/fd_stream_writer.h b/src/discof/restore/stream/fd_stream_writer.h index 5099427b74..4d3243d6db 100644 --- a/src/discof/restore/stream/fd_stream_writer.h +++ b/src/discof/restore/stream/fd_stream_writer.h @@ -1,181 +1,292 @@ #ifndef HEADER_fd_src_discof_restore_stream_fd_stream_writer_h #define HEADER_fd_src_discof_restore_stream_fd_stream_writer_h -#include "../../../util/fd_util_base.h" -#include "../../../disco/topo/fd_topo.h" -#include "fd_stream_reader.h" - -/* A shared stream has a single producer and multiple consumers. - fd_stream_writer implements the producer APIs of the shared stream */ -struct fd_stream_writer { - ulong magic; /* magic */ - fd_stream_frag_meta_t * out_mcache; /* frag producer mcache */ - - uchar * dcache; /* laddr of shared dcache buffer */ - ulong base; /* offset to the dcache buffer from wksp */ - - /* dcache buffer state */ - ulong buf_off; /* local write offset into dcache buffer */ - ulong buf_sz; /* dcache buffer size */ - ulong goff; /* global offset into byte stream */ - ulong frag_sz_max; /* max frag size (controls the size of a single write into dcache)*/ - ulong stream_off; /* start of published stream */ - ulong goff_start; /* start of goff in stream */ - ulong out_seq; /* current sequence number */ - - /* flow control */ - ulong cr_byte_avail; /* bytes available in the slowest consumer */ - ulong cr_frag_avail; /* frags available in the slowest consumer */ - ulong cr_byte_max; /* max dcache buffer credits (size of dcache buffer)*/ - ulong cr_frag_max; /* max mcache frag credits */ - ulong burst_byte; /* dcache backpressure threshold */ - ulong burst_frag; /* mcache backpressure threshold */ - ulong cons_cnt; /* number of consumers */ - ulong * cons_seq; /* consumer fseq values */ - ulong ** cons_fseq; /* consumer fseq pointers */ - ulong * out_sync; /* out fseq */ +/* fd_stream_writer.h provides an API to publish data to SPMC shared + memory byte streams. */ + +#include "../fd_restore_base.h" + +/* fd_stream_writer_t holds stream producer state. */ + +struct __attribute__((aligned(16))) fd_stream_writer { + /* Fragment descriptor output */ + fd_stream_frag_meta_t * mcache; /* frag producer mcache */ + ulong seq; /* next sequence number */ + ulong depth; /* mcache depth */ + + /* Data buffer (dcache) output */ + uchar * data; /* points to first byte of dcache data region (dcache join) */ + ulong data_max; /* dcache data region size */ + ulong data_cur; /* next dcache data offset in [0,data_sz) */ + uchar * base; /* workspace base address */ + ulong goff; /* byte stream offset */ + + /* This point is 16-byte aligned */ + + /* Backpressure */ + ulong cr_byte_avail; /* byte publish count before slowest consumer overrun */ + ulong cr_frag_avail; /* frag publish count before slowest consumer overrun */ + ulong * cons_seq; /* cons_seq[ 2*cons_idx+i ] caches cons_fseq[ cons_idx ][i] */ + ulong volatile ** cons_fseq; /* cons_fseq[ cons_idx ] points to consumer fseq */ + /* Each consumer reports a 'frag sequence number' and the 'stream offset' */ +# define FD_STREAM_WRITER_CONS_SEQ_STRIDE 2UL + + /* Fragmentation */ + ulong frag_sz_max; /* max data sz for each frag descriptor */ + + /* Cold data */ + ulong magic; + ulong cons_cnt; /* number of consumers */ + ulong cons_max; /* max number of consumers */ + ulong * out_sync; /* points to mcache 'sync' field (last published seq no) */ + + /* variable length data follows */ }; + typedef struct fd_stream_writer fd_stream_writer_t; #define FD_STREAM_WRITER_MAGIC (0xFD57337717E736C0UL) -#define EXPECTED_FSEQ_CNT_PER_CONS 2 + +/* Forward declarations */ + +typedef struct fd_topo fd_topo_t; +typedef struct fd_topo_tile fd_topo_tile_t; FD_PROTOTYPES_BEGIN +/* Constructor API ****************************************************/ + +/* fd_stream_writer_{align,footprint} describe a memory region suitable + to hold a stream_writer. */ + FD_FN_CONST static inline ulong fd_stream_writer_align( void ) { return alignof(fd_stream_writer_t); } FD_FN_CONST static inline ulong -fd_stream_writer_footprint( void ) { - return sizeof(fd_stream_writer_t); +fd_stream_writer_footprint( ulong cons_max ) { + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_stream_writer_t), sizeof(fd_stream_writer_t) ); + l = FD_LAYOUT_APPEND( l, alignof(ulong), cons_max*sizeof(ulong)*FD_STREAM_WRITER_CONS_SEQ_STRIDE ); + l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_max*sizeof(ulong *) ); + return FD_LAYOUT_FINI( l, fd_stream_writer_align() ); } +/* fd_stream_writer_new initializes the memory region at mem as a + stream_writer object. mcache_join is a local join to an mcache + (frag_meta or similar pointer) to which frags will be published. + dcache_join is a local join to a dcache into which data is written. + Returns writer object in mem on success, and NULL on failure. Logs + reason for failure. */ + +fd_stream_writer_t * +fd_stream_writer_new( void * mem, + ulong cons_max, + fd_stream_frag_meta_t * mcache_join, + uchar * dcache_join ); + +/* fd_stream_writer_delete releases the memory region backing a + stream_writer. Returns a pointer to the memory region originally + provided to fd_stream_writer_new. */ + +void * +fd_stream_writer_delete( fd_stream_writer_t * writer ); + +/* fd_stream_writer_new_topo constructs a stream writer for a topology + definition. Calls new() and register_consumer() under the hood. + tile is the actor that will be writing stream frags in topo. + out_link_idx is the index of the output link for that tile. */ + +fd_stream_writer_t * +fd_stream_writer_new_topo( + void * mem, + ulong cons_max, + fd_topo_t const * topo, + fd_topo_tile_t const * tile, + ulong out_link_idx +); + static inline fd_stream_writer_t * fd_stream_writer_join( void * _writer ) { - fd_stream_writer_t * writer = (fd_stream_writer_t *)_writer; + fd_stream_writer_t * writer = _writer; if( FD_UNLIKELY( !writer ) ) return NULL; if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)writer, fd_stream_writer_align() ) ) ) return NULL; - if( FD_UNLIKELY( writer->magic!=FD_STREAM_WRITER_MAGIC ) ) return NULL; - + if( FD_UNLIKELY( writer->magic!=FD_STREAM_WRITER_MAGIC ) ) return NULL; return writer; } -static inline uchar * -fd_stream_writer_get_write_ptr( fd_stream_writer_t * writer ) { - return writer->dcache + writer->buf_off; -} +/* Control API ********************************************************/ -fd_stream_writer_t * -fd_stream_writer_new( void * mem, - fd_topo_t * topo, - fd_topo_tile_t * tile, - ulong link_id, - ulong burst_byte, - ulong burst_frag ); +/* fd_stream_writer_register_consumer registers a consumer of the + stream to the writer. fseq_join is a local join to that consumer's + fseq (points to the fseq's seq[0] field). Future backpressure checks + will include this consumer. Returns a pointer to this consumer's + seq cache field, or NULL on if cons_max exceeded (logs warning). */ + +ulong * +fd_stream_writer_register_consumer( + fd_stream_writer_t * writer, + ulong * fseq_join +); + +/* fd_stream_writer_close marks the stream as closed. */ static inline void -fd_stream_writer_init_flow_control_credits( fd_stream_writer_t * writer ) { - for( ulong cons_idx=0UL; cons_idxcons_cnt; cons_idx++ ) { - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][0] ); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][1] ); - } +fd_stream_writer_close( fd_stream_writer_t * writer ) { + FD_VOLATILE( writer->out_sync[ 0 ] ) = writer->seq; + FD_VOLATILE( writer->out_sync[ 1 ] ) = writer->goff; + FD_COMPILER_MFENCE(); + FD_VOLATILE( writer->out_sync[ 2 ] ) = 1; } -static inline void +/* Flow control API ***************************************************/ + +/* fd_stream_writer_set_frag_sz_max puts an upper bound on the fragment + sizes produced to the stream. This helps reduce latency. */ + +void fd_stream_writer_set_frag_sz_max( fd_stream_writer_t * writer, - ulong frag_sz_max ) { - writer->frag_sz_max = frag_sz_max; -} + ulong frag_sz_max ); + +/* fd_stream_writer_receive_flow_control_credits updates cached consumer + progress from the consumers' fseq objects. + + FIXME Provide an API to round-robin update ins temporally spaced apart */ static inline void fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer ) { + ulong const stride = FD_STREAM_WRITER_CONS_SEQ_STRIDE; for( ulong i=0UL; icons_cnt; i++ ) { + /* FIXME could be SSE aligned copy */ FD_COMPILER_MFENCE(); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][0] ); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][1] ); + writer->cons_seq[ stride*i ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][0] ); + writer->cons_seq[ stride*i+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][1] ); FD_COMPILER_MFENCE(); } } +/* fd_stream_writer_calculate_backpressure updates fragment and stream + backpressure from cached consumer progress. */ + static inline void -fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer ) { - ulong slowest_cons = ULONG_MAX; - if( FD_LIKELY( writer->cr_byte_availcr_byte_max || writer->cr_frag_availcr_frag_max ) ) { - ulong cr_byte_avail = writer->cr_byte_max; - ulong cr_frag_avail = writer->cr_frag_max; - for( ulong cons_idx=0UL; cons_idxcons_cnt; cons_idx++ ) { - ulong cons_cr_byte_avail = (ulong)fd_long_max( (long)writer->cr_byte_max-fd_long_max( fd_seq_diff( writer->goff, writer->cons_seq[ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx+1 ] ), 0L ), 0L ); - ulong cons_cr_frag_avail = (ulong)fd_long_max( (long)writer->cr_frag_max-fd_long_max( fd_seq_diff( writer->out_seq, writer->cons_seq[ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx ] ), 0L ), 0L ); - slowest_cons = fd_ulong_if( cons_cr_byte_availcr_byte_avail = cr_byte_avail; - writer->cr_frag_avail = cr_frag_avail; +fd_stream_writer_calculate_backpressure( fd_stream_writer_t * writer ) { + ulong const cr_byte_max = writer->data_max; + ulong const cr_frag_max = writer->depth; + + ulong cr_byte_avail = ULONG_MAX; + ulong cr_frag_avail = ULONG_MAX; + ulong const stride = FD_STREAM_WRITER_CONS_SEQ_STRIDE; + for( ulong cons_idx=0UL; cons_idxcons_cnt; cons_idx++ ) { + ulong cons_cr_byte_avail = (ulong)fd_long_max( (long)cr_byte_max-fd_long_max( fd_seq_diff( writer->goff, writer->cons_seq[ stride*cons_idx+1 ] ), 0L ), 0L ); + ulong cons_cr_frag_avail = (ulong)fd_long_max( (long)cr_frag_max-fd_long_max( fd_seq_diff( writer->seq, writer->cons_seq[ stride*cons_idx ] ), 0L ), 0L ); + cr_byte_avail = fd_ulong_min( cons_cr_byte_avail, cr_byte_avail ); + cr_frag_avail = fd_ulong_min( cons_cr_frag_avail, cr_frag_avail ); } + + writer->cr_byte_avail = cr_byte_avail; + writer->cr_frag_avail = cr_frag_avail; } -static inline ulong -fd_stream_writer_get_avail_bytes( fd_stream_writer_t * writer ) { - if( FD_UNLIKELY( writer->buf_off > writer->buf_sz ) ) { - FD_LOG_CRIT(( "Buffer overflow (buf_off=%lu buf_sz=%lu)", writer->buf_off, writer->buf_sz )); +/* In-place publish API ************************************************ + + Example usage: + + void * p = fd_stream_writer_prepare( w ); + ulong sz = fd_stream_writer_publish_sz_max( w ) + fd_memcpy( p, src, sz ); + src += sz; + fd_stream_writer_publish( w, sz ); */ + +/* fd_stream_writer_prepare prepares the caller for a frag publish. + Returns a pointer to a memory region of publish_sz_max() bytes, into + which the caller can write data. A subsequent publish() call makes + the data visible to consumers. U.B. return value if + publish_sz_max()==0. */ + +static inline void * +fd_stream_writer_prepare( fd_stream_writer_t * writer ) { + if( FD_UNLIKELY( writer->data_cur > writer->data_max ) ) { + FD_LOG_CRIT(( "Out-of-bounds data_cur (data_cur=%lu data_max=%lu)", writer->data_cur, writer->data_max )); return 0; } + return writer->data + writer->data_cur; +} - ulong const frag_sz_max = fd_ulong_min( writer->cr_byte_avail, writer->frag_sz_max ); - return fd_ulong_min( frag_sz_max, writer->buf_sz - writer->buf_off ); +/* fd_stream_writer_publish_sz_max returns the max amount of bytes that + can be published in the next fragment. */ + +static inline ulong +fd_stream_writer_publish_sz_max( fd_stream_writer_t * writer ) { + ulong const data_backp = writer->cr_byte_avail; + ulong const frag_backp = fd_ulong_if( !!writer->cr_frag_avail, writer->frag_sz_max, 0UL ); + ulong const buf_avail = writer->data_max - writer->data_cur; + return fd_ulong_min( fd_ulong_min( data_backp, frag_backp ), buf_avail ); } +/* fd_stream_writer_publish completes a publish operation. Writes a + fragment descriptor out to the mcache if frag_sz>0. */ + static inline void fd_stream_writer_publish( fd_stream_writer_t * writer, - ulong frag_sz ) { - ulong loff = writer->base + writer->stream_off; - fd_mcache_publish_stream( writer->out_mcache, - fd_mcache_depth( writer->out_mcache->f ), - writer->out_seq, - writer->goff_start, - loff, - frag_sz, - 0 ); - writer->out_seq = fd_seq_inc( writer->out_seq, 1UL ); + ulong frag_sz, + ulong ctl ) { + if( FD_UNLIKELY( !frag_sz ) ) return; + + uchar * const data = writer->data + writer->data_cur; + ulong const loff = (ulong)data - (ulong)writer->base; + + fd_mcache_publish_stream( + writer->mcache, + writer->depth, + writer->seq, + writer->goff, + loff, + frag_sz, + ctl + ); + + /* Advance fragment descriptor stream */ + writer->seq = fd_seq_inc( writer->seq, 1UL ); writer->cr_frag_avail -= 1; - /* rewind buf_off to start of buffer */ - if( writer->buf_off >= writer->buf_sz ) { - writer->buf_off = 0UL; + /* Advance buffer */ + writer->data_cur += frag_sz; + writer->goff += frag_sz; + writer->cr_byte_avail -= frag_sz; + if( FD_UNLIKELY( writer->data_cur > writer->data_max ) ) { + FD_LOG_CRIT(( "Out-of-bounds data_cur (data_cur=%lu data_max=%lu)", writer->data_cur, writer->data_max )); + return; + } + if( writer->data_cur == writer->data_max ) { + writer->data_cur = 0UL; /* cmov */ } - - /* update stream_off and goff_start to current values - of buf_off and goff */ - writer->stream_off = writer->buf_off; - writer->goff_start = writer->goff; } -static inline void -fd_stream_writer_advance( fd_stream_writer_t * writer, - ulong sz ) { - writer->goff += sz; - writer->buf_off += sz; - writer->cr_byte_avail -= sz; -} +/* Copy publish API ***************************************************/ -static inline int -fd_stream_writer_is_backpressured( fd_stream_writer_t * writer ) { - return writer->cr_byte_availburst_byte || writer->cr_frag_availburst_frag; -} +/* fd_stream_writer_copy publishes the given chunk to the stream as a + sequence of stream frags. data points to the first byte of the chunk + to send. data_sz is the number of bytes (<=copy_max()). + ctl specifies how to set the 'ctl' field. All ctl bits are copied as + is, except for 'som' and 'eom', which act as a mask: + Use 'fd_frag_meta_ctl( ..., som=1, eom=1, ... )' to set the 'som' + bit on the first frag and the 'eom' bit on the last flag. Pass + 'fd_frag_meta_ctl( ..., som=0, eom=0, ... )' or just '0UL' to leave + fragmentation bits cleared on published frags. */ -static inline void -fd_stream_writer_notify_shutdown( fd_stream_writer_t * writer ) { - FD_VOLATILE( writer->out_sync[ EXPECTED_FSEQ_CNT_PER_CONS * writer->cons_cnt ] ) = writer->out_seq; -} +void +fd_stream_writer_copy( fd_stream_writer_t * writer, + void const * data, + ulong data_sz, + ulong ctl ); -static inline void * -fd_stream_writer_delete( fd_stream_writer_t * writer ) { - fd_memset( writer, 0, sizeof(fd_stream_writer_t) ); - return (void *)writer; +static inline ulong +fd_stream_writer_copy_max( fd_stream_writer_t * writer ) { + ulong const data_backp = writer->cr_byte_avail; + ulong const frag_backp = fd_ulong_sat_mul( writer->cr_frag_avail, writer->frag_sz_max ); + ulong const buf_avail = writer->data_max - writer->data_cur; + return fd_ulong_min( fd_ulong_min( data_backp, frag_backp ), buf_avail ); } FD_PROTOTYPES_END diff --git a/src/discof/restore/test_snapin_tile.c b/src/discof/restore/test_snapin_tile.c index 9cf0b60274..8e084a85ad 100644 --- a/src/discof/restore/test_snapin_tile.c +++ b/src/discof/restore/test_snapin_tile.c @@ -1,5 +1,105 @@ #define FD_TILE_TEST #include "fd_snapin_tile.c" +#include "stream/fd_stream_writer.h" + +static ulong +mock_stream_align( void ) { + return fd_ulong_max( fd_ulong_max( fd_stream_writer_align(), fd_mcache_align() ), fd_dcache_align() ); +} + +static ulong +mock_stream_footprint( ulong depth, + ulong dcache_data_sz ) { + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint( 0UL ) ); + l = FD_LAYOUT_APPEND( l, fd_mcache_align(), fd_mcache_footprint( depth, 0uL ) ); + l = FD_LAYOUT_APPEND( l, fd_dcache_align(), fd_dcache_footprint( dcache_data_sz, 0UL ) ); + return l; +} + +static fd_stream_writer_t * +mock_stream_init( void * mem, + ulong depth, + ulong dcache_data_sz ) { + if( FD_UNLIKELY( !mem ) ) return NULL; + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, mock_stream_align() ) ) ) return NULL; + + FD_SCRATCH_ALLOC_INIT( l, mem ); + void * writer_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint( 0UL ) ); + void * mcache_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_mcache_align(), fd_mcache_footprint( depth, 0uL ) ); + void * dcache_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_dcache_align(), fd_dcache_footprint( dcache_data_sz, 0UL ) ); + + fd_frag_meta_t * mcache = fd_mcache_join( fd_mcache_new( mcache_mem, depth, 0UL, 0UL ) ); + uchar * dcache = fd_dcache_join( fd_dcache_new( dcache_mem, dcache_data_sz, 0UL ) ); + + return fd_stream_writer_new( writer_mem, 0UL, fd_type_pun( mcache ), dcache ); +} + +static void * +mock_stream_delete( fd_stream_writer_t * writer ) { + fd_dcache_delete( fd_dcache_leave( writer->data ) ); + fd_mcache_delete( fd_mcache_leave( fd_type_pun( writer->mcache ) ) ); + return fd_stream_writer_delete( writer ); +} + +/* Feed in snapshot stream frags and validate the resulting account + frags are sane. This variant tests handwritten edge cases. */ + +static void +test_account_frags( fd_wksp_t * wksp ) { + /* Create a snapin context */ + fd_topo_tile_t topo_tile = { + .name = "snapin", + .snapin = { + .scratch_sz = 4096UL + } + }; + void * tile_scratch = fd_wksp_alloc_laddr( wksp, scratch_align(), scratch_footprint( &topo_tile ), 1UL ); + FD_TEST( tile_scratch ); + fd_snapin_tile_t * ctx = scratch_init( tile_scratch, &topo_tile ); + FD_TEST( ctx ); + + void * out_mcache_mem = fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( 128UL, 0UL ), 1UL ); + ctx->out_mcache = fd_type_pun( fd_mcache_join( fd_mcache_new( out_mcache_mem, 128UL, 0UL, 0UL ) ) ); + FD_TEST( ctx->out_mcache ); + ctx->out_depth = fd_mcache_depth( ctx->out_mcache->f ); + ctx->out_seq_max = UINT_MAX; + + ctx->tar_file_rem = ULONG_MAX; + ctx->accv_sz = ULONG_MAX; + fd_snapshot_expect_account_hdr( ctx ); + uchar scratch_buf[ 256 ]; + ctx->buf = scratch_buf; + ctx->buf_max = sizeof(scratch_buf); + + /* Create an input */ + void * in_stream_mem = fd_wksp_alloc_laddr( wksp, mock_stream_align(), mock_stream_footprint( 128UL, 4096UL ), 1UL ); + fd_stream_writer_t * in_stream = mock_stream_init( in_stream_mem, 128UL, 4096UL ); + FD_TEST( in_stream ); + fd_snapin_in_t in = { + .mcache = in_stream->mcache, + .depth = (uint)in_stream->depth, + .idx = 0U, + .seq = 0UL, + .goff = 0UL, + .mline = in_stream->mcache + }; + ctx->in_base = (uchar *)wksp; + + /* An empty account */ + fd_solana_account_hdr_t const acc1 = { .hash={ .uc={ 1,2,3 } } }; + fd_stream_writer_copy( in_stream, &acc1, sizeof(fd_solana_account_hdr_t), fd_frag_meta_ctl( 0, 1, 1, 0 ) ); + ulong read_sz; + FD_TEST( on_stream_frag( ctx, &in, in_stream->mcache+0, &read_sz )==1 ); + FD_TEST( ctx->out_mcache[ 0 ].seq==0UL ); + FD_TEST( ctx->out_mcache[ 0 ].sz==sizeof(fd_solana_account_hdr_t) ); + FD_TEST( ctx->out_mcache[ 0 ].ctl==fd_frag_meta_ctl( 0, 1, 1, 0 ) ); + FD_TEST( ctx->out_mcache[ 0 ].goff==0UL ); + FD_TEST( fd_memeq( ctx->in_base+ctx->out_mcache[ 0 ].loff, &acc1, sizeof(fd_solana_account_hdr_t) ) ); + + fd_wksp_free_laddr( mock_stream_delete( in_stream ) ); + fd_wksp_free_laddr( tile_scratch ); +} int main( int argc, @@ -9,18 +109,17 @@ main( int argc, char const * _page_sz = fd_env_strip_cmdline_cstr ( &argc, &argv, "--page-sz", NULL, "gigantic" ); ulong page_cnt = fd_env_strip_cmdline_ulong( &argc, &argv, "--page-cnt", NULL, 1UL ); ulong near_cpu = fd_env_strip_cmdline_ulong( &argc, &argv, "--near-cpu", NULL, fd_log_cpu_id() ); + uint rng_seed = fd_env_strip_cmdline_uint ( &argc, &argv, "--rng-seed", NULL, 0U ); + + fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, rng_seed, 0UL ) ); fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ), page_cnt, near_cpu, "wksp", 0UL ); if( FD_UNLIKELY( !wksp ) ) FD_LOG_ERR(( "Unable to attach to wksp" )); - fd_topo_tile_t topo_tile = { - .name = "snapin", - }; - - uchar * tile_scratch = fd_wksp_alloc_laddr( wksp, scratch_align(), scratch_footprint( &topo_tile ), 1UL ); - FD_TEST( tile_scratch ); + test_account_frags( wksp ); fd_wksp_delete_anonymous( wksp ); + fd_rng_delete( fd_rng_leave( rng ) ); FD_LOG_NOTICE(( "pass" )); fd_halt(); diff --git a/src/funk/fd_funk_base.h b/src/funk/fd_funk_base.h index 866cdc8e0e..068616fc5b 100644 --- a/src/funk/fd_funk_base.h +++ b/src/funk/fd_funk_base.h @@ -200,13 +200,22 @@ fd_funk_rec_key_hash( fd_funk_rec_key_t const * k, FIXME This version is vulnerable to HashDoS */ +FD_FN_PURE static inline ulong +fd_funk_rec_key_hash1( uchar const key[ 32 ], + ulong rec_type, + ulong seed ) { + seed ^= rec_type; + /* tons of ILP */ + return (fd_ulong_hash( seed ^ (1UL<<0) ^ FD_LOAD( ulong, key+ 0 ) ) ^ + fd_ulong_hash( seed ^ (1UL<<1) ^ FD_LOAD( ulong, key+ 8 ) ) ) ^ + (fd_ulong_hash( seed ^ (1UL<<2) ^ FD_LOAD( ulong, key+16 ) ) ^ + fd_ulong_hash( seed ^ (1UL<<3) ^ FD_LOAD( ulong, key+24 ) ) ); +} + FD_FN_PURE static inline ulong fd_funk_rec_key_hash( fd_funk_rec_key_t const * k, ulong seed ) { - seed ^= k->ul[4]; - /* tons of ILP */ - return (fd_ulong_hash( seed ^ (1UL<<0) ^ k->ul[0] ) ^ fd_ulong_hash( seed ^ (1UL<<1) ^ k->ul[1] ) ) ^ - (fd_ulong_hash( seed ^ (1UL<<2) ^ k->ul[2] ) ^ fd_ulong_hash( seed ^ (1UL<<3) ^ k->ul[3] ) ); + return fd_funk_rec_key_hash1( k->uc, k->ul[4], seed ); } #endif /* FD_HAS_INT128 */