diff --git a/src/app/firedancer-dev/commands/snapshot_load.c b/src/app/firedancer-dev/commands/snapshot_load.c index 69153f0671..795292876e 100644 --- a/src/app/firedancer-dev/commands/snapshot_load.c +++ b/src/app/firedancer-dev/commands/snapshot_load.c @@ -5,6 +5,7 @@ #include "../../../disco/topo/fd_topob.h" #include "../../../disco/topo/fd_pod_format.h" #include "../../../util/tile/fd_tile_private.h" +#include "../../../flamenco/snapshot/fd_snapshot_loader.h" #include #include #include @@ -39,10 +40,14 @@ _is_zstd( char const * path ) { fclose( file ); return ( magic==0xFD2FB528UL ); } + static void snapshot_load_topo( config_t * config, args_t const * args ) { - int is_zstd = _is_zstd( args->snapshot_load.snapshot_path ); + fd_snapshot_src_t src[1]; + char snapshot_path_copy[4096]; + memcpy( snapshot_path_copy, args->snapshot_load.snapshot_path, sizeof(snapshot_path_copy) ); + fd_snapshot_src_parse_type_unknown( src, snapshot_path_copy ); fd_topo_t * topo = &config->topo; fd_topob_new( &config->topo, config->name ); @@ -63,13 +68,6 @@ snapshot_load_topo( config_t * config, fd_topob_wksp( topo, "metric" ); fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[0], 0, 0 ); - /* read() tile */ - fd_topob_wksp( topo, "FileRd" ); - fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 ); - fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX ); - FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi ); - FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi ); - /* Uncompressed data stream */ fd_topob_wksp( topo, "snap_stream" ); fd_topo_link_t * snapin_link = fd_topob_link( topo, "snap_stream", "snap_stream", 512UL, 0UL, 0UL ); @@ -77,7 +75,62 @@ snapshot_load_topo( config_t * config, snapin_link->dcache_obj_id = snapin_dcache->id; FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", snapin_dcache->id ) ); - if( is_zstd ) { /* .tar.zst file */ + if( src->type==FD_SNAPSHOT_SRC_FILE ) { + + int is_zstd = _is_zstd( args->snapshot_load.snapshot_path ); + + /* read() tile */ + fd_topob_wksp( topo, "FileRd" ); + fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 ); + fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX ); + FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi ); + FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi ); + + if( is_zstd ) { /* .tar.zst file */ + + /* "unzstd": Zstandard decompress tile */ + fd_topob_wksp( topo, "Unzstd" ); + fd_topo_tile_t * unzstd_tile = fd_topob_tile( topo, "Unzstd", "Unzstd", "Unzstd", tile_to_cpu[2], 0, 0 ); + (void)unzstd_tile; + + /* Compressed data stream */ + fd_topob_wksp( topo, "snap_zstd" ); + fd_topo_link_t * zstd_link = fd_topob_link( topo, "snap_zstd", "snap_zstd", 512UL, 0UL, 0UL ); + fd_topo_obj_t * zstd_dcache = fd_topob_obj( topo, "dcache", "snap_zstd"); + zstd_link->dcache_obj_id = zstd_dcache->id; + FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) ); + + /* filerd tile -> compressed stream */ + fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL ); + fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + /* compressed stream -> unzstd tile */ + fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_uses( topo, unzstd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY ); + + /* unzstd tile -> uncompressed stream */ + fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + } else { /* .tar file */ + + /* filerd tile -> uncompressed stream */ + fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL ); + fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + } + } + else if ( src->type==FD_SNAPSHOT_SRC_HTTP ) { + + /* httpdl() tile */ + fd_topob_wksp( topo, "HttpDl" ); + fd_topo_tile_t * httpdl_tile = fd_topob_tile( topo, "HttpDl", "HttpDl", "HttpDl", tile_to_cpu[1], 0, 0 ); + fd_memcpy( httpdl_tile->httpdl.path, src->http.path, PATH_MAX ); + fd_memcpy( httpdl_tile->httpdl.snapshot_dir, args->snapshot_load.snapshot_dir, PATH_MAX ); + fd_memcpy( httpdl_tile->httpdl.dest, src->http.dest, sizeof(src->http.dest) ); + httpdl_tile->httpdl.ip4 = src->http.ip4; + httpdl_tile->httpdl.path_len = src->http.path_len; + httpdl_tile->httpdl.port = src->http.port; /* "unzstd": Zstandard decompress tile */ fd_topob_wksp( topo, "Unzstd" ); @@ -92,8 +145,8 @@ snapshot_load_topo( config_t * config, FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) ); /* filerd tile -> compressed stream */ - fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL ); - fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_out( topo, "HttpDl", 0UL, "snap_zstd", 0UL ); + fd_topob_tile_uses( topo, httpdl_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); /* compressed stream -> unzstd tile */ fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); @@ -102,13 +155,6 @@ snapshot_load_topo( config_t * config, /* unzstd tile -> uncompressed stream */ fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL ); fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); - - } else { /* .tar file */ - - /* filerd tile -> uncompressed stream */ - fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL ); - fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); - } fd_topob_wksp( topo, "SnapIn" ); @@ -149,8 +195,9 @@ static void snapshot_load_cmd_args( int * pargc, char *** pargv, args_t * args ) { - char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL ); - char const * snapshot_file = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL ); + char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL ); + char const * snapshot_src = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL ); + char const * snapshot_dir = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot-dir", NULL, NULL ); if( tile_cpus ) { ulong tile_cpus_strlen = strlen( tile_cpus ); @@ -158,10 +205,17 @@ snapshot_load_cmd_args( int * pargc, fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->tile_cpus ), tile_cpus, tile_cpus_strlen ) ); } - if( FD_UNLIKELY( !snapshot_file ) ) FD_LOG_ERR(( "Missing --snapshot flag" )); - ulong snapshot_file_strlen = strlen( snapshot_file ); + if( FD_UNLIKELY( !snapshot_src ) ) FD_LOG_ERR(( "Missing --snapshot flag" )); + ulong snapshot_file_strlen = strlen( snapshot_src ); if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_path) ) ) FD_LOG_ERR(( "--snapshot: path too long" )); - fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_file, snapshot_file_strlen ) ); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_src, snapshot_file_strlen ) ); + + /* FIXME: check if we need the snapshot dir argument (parse the snapshot input src to see if it's http)*/ + if( snapshot_dir!=NULL ) { + ulong snapshot_dir_strlen = strlen( snapshot_dir ); + if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_dir) ) ) FD_LOG_ERR(( "--snapshot-dir: dir too long" )); + fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_dir ), snapshot_dir, snapshot_dir_strlen ) ); + } } static void @@ -189,15 +243,20 @@ snapshot_load_cmd_fn( args_t * args, double ns_per_tick = 1.0/tick_per_ns; fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run, NULL ); - fd_topo_tile_t * file_rd_tile = &topo->tiles[ fd_topo_find_tile( topo, "FileRd", 0UL ) ]; - fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ]; - ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL ); - fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL; - - ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ]; - ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache ); - ulong volatile * file_rd_metrics = fd_metrics_tile( file_rd_tile->metrics ); - ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics ); + ulong httpdl_tile_idx = fd_topo_find_tile( topo, "HttpDl", 0UL ); + ulong filerd_tile_idx = fd_topo_find_tile( topo, "FileRd", 0UL ); + fd_topo_tile_t * http_dl_tile = httpdl_tile_idx!=ULONG_MAX ? &topo->tiles[ httpdl_tile_idx ] : NULL; + fd_topo_tile_t * file_rd_tile = filerd_tile_idx!=ULONG_MAX ? &topo->tiles[ filerd_tile_idx ] : NULL; + fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ]; + ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL ); + fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL; + // fd_topo_tile_t * actalc_tile = &topo->tiles[ fd_topo_find_tile( topo, "ActAlc", 0UL ) ]; + + ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ]; + ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache ); + ulong volatile * file_rd_metrics = file_rd_tile ? fd_metrics_tile( file_rd_tile->metrics ) : NULL; + ulong volatile * http_dl_metrics = http_dl_tile ? fd_metrics_tile( http_dl_tile->metrics ) : NULL; + ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics ); ulong volatile * unzstd_in_metrics = unzstd_tile ? fd_metrics_tile( unzstd_tile->metrics ) : NULL; ulong goff_old = 0UL; @@ -206,18 +265,22 @@ snapshot_load_cmd_fn( args_t * args, ulong acc_cnt_old = 0UL; ulong frag_cnt_old = 0UL; for(;;) { - sleep( 1 ); - - ulong filerd_status = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); + sleep( 1 ); + ulong filerd_status = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; + ulong httpdl_status = http_dl_metrics ? FD_VOLATILE_CONST( http_dl_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; ulong snapin_status = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong unzstd_status = unzstd_in_metrics ? FD_VOLATILE_CONST( unzstd_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL; - if( FD_UNLIKELY( filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) { + if( FD_UNLIKELY( httpdl_status==2UL && filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) { FD_LOG_NOTICE(( "Done" )); break; } ulong goff = FD_VOLATILE_CONST( snap_in_fseq[ 1 ] ); - ulong file_rd_backp = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); + ulong file_rd_backp = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ) : 0UL; + // ulong file_rd_wait = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + + // FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ) + + // file_rd_backp : 0UL; + // ulong snap_in_backp = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ); ulong snap_in_wait = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) + FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ); ulong frag_cnt = FD_VOLATILE_CONST( snap_accs_sync[0] ); diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index cb9fc04c42..0210f267a8 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -102,6 +102,7 @@ extern fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd; extern fd_topo_run_tile_t fd_tile_snapshot_restore_SnapIn; extern fd_topo_run_tile_t fd_tile_snapshot_restore_ActAlc; extern fd_topo_run_tile_t fd_tile_snapshot_restore_Unzstd; +extern fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl; fd_topo_run_tile_t * TILES[] = { &fd_tile_net, @@ -140,6 +141,7 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_snapshot_restore_SnapIn, &fd_tile_snapshot_restore_ActAlc, &fd_tile_snapshot_restore_Unzstd, + &fd_tile_snapshot_restore_HttpDl, NULL, }; diff --git a/src/app/shared/fd_action.h b/src/app/shared/fd_action.h index cd30022936..c111ef0304 100644 --- a/src/app/shared/fd_action.h +++ b/src/app/shared/fd_action.h @@ -92,6 +92,7 @@ struct fdctl_args { struct { char snapshot_path[ PATH_MAX ]; + char snapshot_dir[ PATH_MAX ]; } snapshot_load; }; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index 595474abba..04997b40a7 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -433,6 +433,15 @@ typedef struct { char file_path[ PATH_MAX ]; } filerd; + struct { + char dest[128]; + uint ip4; + ushort port; + char path[ PATH_MAX ]; + ulong path_len; + char snapshot_dir[ PATH_MAX ]; + } httpdl; + struct { ulong scratch_sz; } snapin; diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index ea659cac49..0c609fbfad 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -2,6 +2,7 @@ $(call add-objs,fd_filerd_tile,fd_discof) $(call add-objs,fd_snapin_tile,fd_discof) $(call add-objs,fd_actalc_tile,fd_discof) $(call add-objs,fd_unzstd_tile,fd_discof) +$(call add-objs,fd_httpdl_tile,fd_discof) $(call add-objs,stream/fd_stream_writer,fd_discof) $(call add-objs,stream/fd_event_map,fd_discof) $(call add-objs,stream/fd_stream_ctx,fd_discof) diff --git a/src/discof/restore/fd_httpdl_tile.c b/src/discof/restore/fd_httpdl_tile.c new file mode 100644 index 0000000000..df1a40488d --- /dev/null +++ b/src/discof/restore/fd_httpdl_tile.c @@ -0,0 +1,181 @@ +#include "../../disco/topo/fd_topo.h" +#include "../../flamenco/snapshot/fd_snapshot_http.h" +#include "stream/fd_stream_writer.h" +#include "stream/fd_stream_ctx.h" +#include + +#define NAME "http" +#define HTTP_CHUNK_SZ 8 * 1024 * 1024UL + +struct fd_httpdl_tile { + fd_snapshot_http_t * http; + fd_stream_writer_t * writer; +}; +typedef struct fd_httpdl_tile fd_httpdl_tile_t; + +FD_FN_PURE static ulong +scratch_align( void ) { + return fd_ulong_max( alignof(fd_httpdl_tile_t), + fd_ulong_max( fd_snapshot_http_align(), fd_stream_writer_align() ) ); +} + +FD_FN_PURE static ulong +scratch_footprint( fd_topo_tile_t const * tile ) { + (void)tile; + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_httpdl_tile_t), sizeof(fd_httpdl_tile_t) ); + l = FD_LAYOUT_APPEND( l, fd_snapshot_http_align(), fd_snapshot_http_footprint() ); + return FD_LAYOUT_FINI( l, scratch_align() ); +} + +static void +privileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); + fd_httpdl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_httpdl_tile_t), sizeof(fd_httpdl_tile_t) ); + void * http_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_http_align(), fd_snapshot_http_footprint() ); + + fd_memset( ctx, 0, sizeof(fd_httpdl_tile_t) ); + + if( FD_UNLIKELY( !tile->httpdl.dest[0] ) ) { + FD_LOG_ERR(( "http dest not set" )); + } + + /* TODO: is null ok for the name? */ + ctx->http = fd_snapshot_http_new( http_mem, + tile->httpdl.dest, + tile->httpdl.ip4, + tile->httpdl.port, + tile->httpdl.snapshot_dir, + NULL ); + + fd_snapshot_http_privileged_init( ctx->http ); +} + +static void +fd_httpdl_init_from_stream_ctx( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + + /* There's only one writer */ + ctx->writer = &stream_ctx->writers[0]; + fd_stream_writer_set_read_max( ctx->writer, HTTP_CHUNK_SZ ); +} + +__attribute__((noreturn)) FD_FN_UNUSED static void +fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) { + fd_snapshot_http_cleanup_fds( ctx->http ); + FD_MGAUGE_SET( TILE, STATUS, 2UL ); + FD_COMPILER_MFENCE(); + FD_LOG_WARNING(("Done downloading snapshot")); + + for(;;) pause(); +} + +__attribute__((unused)) static void +after_credit_chunk( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + (void)stream_ctx; + ulong downloaded_sz = 0UL; + + for(;;) { + if( downloaded_sz >= HTTP_CHUNK_SZ ) { + fd_stream_writer_publish( ctx->writer, downloaded_sz ); + break; + } + /* get write pointers into dcache buffer */ + uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); + ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); + ulong sz = 0UL; + + if( dst_max==0 ) { + fd_stream_writer_publish( ctx->writer, downloaded_sz ); + break; + } + + int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + if( FD_UNLIKELY( err<0 ) ) FD_LOG_ERR(( "http err: %d", err )); + if( FD_UNLIKELY( err>0 ) ) { + FD_LOG_WARNING(("HTTP download complete! shutting down")); + fd_httpdl_shutdown( ctx ); + } + + if( sz ) { + fd_stream_writer_advance( ctx->writer, sz ); + downloaded_sz += sz; + } + } +} + +__attribute__((unused)) static void +after_credit_stream( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_httpdl_tile_t * ctx = fd_type_pun(_ctx); + (void)stream_ctx; + + /* Don't do anything if backpressured */ + if( fd_stream_writer_is_backpressured( ctx->writer ) ) { + return; + } + + /* get write pointers into dcache buffer */ + uchar * out = fd_stream_writer_get_write_ptr( ctx->writer ); + ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer ); + ulong sz = 0UL; + + int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz ); + if( FD_UNLIKELY( err<0 ) ) FD_LOG_ERR(( "http err: %d", err )); + if( FD_UNLIKELY( err>0 ) ) { + FD_LOG_WARNING(("HTTP download complete! shutting down")); + fd_httpdl_shutdown( ctx ); + } + + if( sz ) { + fd_stream_writer_advance( ctx->writer, sz ); + fd_stream_writer_publish( ctx->writer, sz ); + } +} + +__attribute__((noinline)) static void +fd_httpdl_run1( + fd_httpdl_tile_t * ctx, + fd_stream_ctx_t * stream_ctx ) { + + FD_LOG_INFO(( "Running httpdl tile" )); + + fd_stream_ctx_init_run_loop( stream_ctx, + ctx, + fd_httpdl_init_from_stream_ctx, + NULL, + NULL, + NULL, + after_credit_stream, + NULL ); + fd_stream_ctx_run_loop( stream_ctx, ctx ); +} + +static void +fd_httpdl_run( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + fd_httpdl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); + ulong out_cnt = tile->out_cnt; + + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); + fd_httpdl_run1( ctx, stream_ctx ); +} + +fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = { + .name = "HttpDl", + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .privileged_init = privileged_init, + .run = fd_httpdl_run, +}; + +#undef NAME + + + diff --git a/src/discof/restore/fd_unzstd_tile.c b/src/discof/restore/fd_unzstd_tile.c index 4bd9b352ba..bf57aec3bd 100644 --- a/src/discof/restore/fd_unzstd_tile.c +++ b/src/discof/restore/fd_unzstd_tile.c @@ -15,8 +15,8 @@ struct fd_unzstd_tile { fd_stream_frag_meta_ctx_t in_state; /* input mcache context */ fd_zstd_dstream_t * dstream; /* zstd decompress reader */ fd_stream_writer_t * writer; /* stream writer object */ + ulong const volatile * shutdown_signal; }; - typedef struct fd_unzstd_tile fd_unzstd_tile_t; FD_FN_PURE static ulong @@ -30,8 +30,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_unzstd_tile_t), sizeof(fd_unzstd_tile_t) ); l = FD_LAYOUT_APPEND( l, fd_zstd_dstream_align(), fd_zstd_dstream_footprint( ZSTD_WINDOW_SZ ) ); - l = FD_LAYOUT_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); - return l; + return FD_LAYOUT_FINI( l, scratch_align() ); } static void @@ -40,7 +39,6 @@ unprivileged_init( fd_topo_t * topo, FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); fd_unzstd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_unzstd_tile_t), sizeof(fd_unzstd_tile_t) ); void * zstd_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_zstd_dstream_align(), fd_zstd_dstream_footprint( ZSTD_WINDOW_SZ ) ); - void * writer_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), fd_stream_writer_footprint() ); void * out_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ) ); FD_TEST( out_dcache ); @@ -49,14 +47,46 @@ unprivileged_init( fd_topo_t * topo, ctx->in_state.in_buf = (uchar const *)topo->workspaces[ topo->objs[ topo->links[ tile->in_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp; ctx->dstream = fd_zstd_dstream_new( zstd_mem, ZSTD_WINDOW_SZ ); - ctx->writer = fd_stream_writer_new( writer_mem, topo, tile, 0, ZSTD_WINDOW_SZ, 512UL, 2UL ); fd_zstd_dstream_reset( ctx->dstream ); } static void -during_housekeeping( fd_unzstd_tile_t * ctx ) { - (void)ctx; +fd_unzstd_init_from_stream_ctx( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + + /* There's only one writer */ + ctx->writer = &stream_ctx->writers[0]; + fd_stream_writer_set_read_max( ctx->writer, ZSTD_FRAME_SZ ); + ctx->shutdown_signal = fd_mcache_seq_laddr_const( stream_ctx->in[0].base.mcache->f ) + 2; +} + +__attribute__((noreturn)) static void +fd_unzstd_shutdown( fd_unzstd_tile_t * ctx ) { + FD_MGAUGE_SET( TILE, STATUS, 2UL ); + fd_stream_writer_notify_shutdown( ctx->writer ); + FD_COMPILER_MFENCE(); + + for(;;) pause(); +} + +static void +fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, + fd_unzstd_tile_t * ctx ) { + ulong const in_seq_max = FD_VOLATILE_CONST( *ctx->shutdown_signal ); + if( FD_UNLIKELY( in_seq_max == stream_ctx->in[ 0 ].base.seq && in_seq_max != 0) ) { + FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", + in_seq_max, stream_ctx->in[0].base.seq)); + fd_unzstd_shutdown( ctx ); + } +} + +static void +during_housekeeping( void * _ctx, + fd_stream_ctx_t * stream_ctx ) { + fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + fd_unzstd_poll_shutdown( stream_ctx, ctx ); } static int @@ -65,6 +95,12 @@ on_stream_frag( void * _ctx, fd_stream_frag_meta_t const * frag, ulong * sz ) { fd_unzstd_tile_t * ctx = fd_type_pun(_ctx); + + /* Don't do anything if backpressured */ + if( fd_stream_writer_is_backpressured( ctx->writer ) ) { + return 0; + } + uchar const * chunk0 = ctx->in_state.in_buf + frag->loff; uchar const * chunk_start = chunk0 + ctx->in_state.in_skip; uchar const * chunk_end = chunk0 + frag->sz; @@ -95,7 +131,6 @@ on_stream_frag( void * _ctx, break; } - /* fd_zstd_dstream_read updates chunk_start and out */ int zstd_err = fd_zstd_dstream_read( ctx->dstream, &cur, chunk_end, &out, out_end, NULL ); if( FD_UNLIKELY( zstd_err>0) ) { @@ -138,27 +173,6 @@ fd_unzstd_in_update( fd_stream_reader_t * in ) { accum[3] = 0U; accum[4] = 0U; accum[5] = 0U; } -__attribute__((noreturn)) static void -fd_unzstd_shutdown( fd_unzstd_tile_t * ctx ) { - FD_MGAUGE_SET( TILE, STATUS, 2UL ); - fd_stream_writer_notify_shutdown( ctx->writer ); - FD_COMPILER_MFENCE(); - - for(;;) pause(); -} - -static void -fd_unzstd_poll_shutdown( fd_stream_ctx_t * stream_ctx, - fd_unzstd_tile_t * ctx, - ulong const volatile * shutdown_signal ) { - ulong const in_seq_max = FD_VOLATILE_CONST( *shutdown_signal ); - if( FD_UNLIKELY( in_seq_max == stream_ctx->in[ 0 ].base.seq && in_seq_max != 0) ) { - FD_LOG_WARNING(( "zstd shutting down! in_seq_max is %lu in[0].base.seq is %lu", - in_seq_max, stream_ctx->in[0].base.seq)); - fd_unzstd_shutdown( ctx ); - } -} - __attribute__((noinline)) static void fd_unzstd_run1( fd_unzstd_tile_t * ctx, @@ -167,51 +181,16 @@ fd_unzstd_run1( FD_LOG_INFO(( "Running unzstd tile" )); /* run loop init */ - ulong const volatile * restrict shutdown_signal = fd_mcache_seq_laddr_const( stream_ctx->in[0].base.mcache->f ) + 2; - fd_stream_writer_init_flow_control_credits( ctx->writer ); - fd_stream_ctx_init_run_loop( stream_ctx ); - - for(;;) { - if( FD_UNLIKELY( fd_stream_ticks_is_housekeeping_time( stream_ctx->ticks ) ) ) { - ulong event_idx = fd_event_map_get_event( stream_ctx->event_map ); - - if( FD_LIKELY( event_idxcons_cnt ) ) { /* receive credits */ - ulong cons_idx = event_idx; - - /* Receive flow control credits from this out. */ - fd_stream_writer_receive_flow_control_credits( ctx->writer, cons_idx ); - - fd_unzstd_poll_shutdown( stream_ctx, ctx, shutdown_signal ); - - } else if( event_idx>stream_ctx->cons_cnt) { /* send credits */ - ulong in_idx = event_idx - stream_ctx->cons_cnt - 1UL; - fd_unzstd_in_update( &stream_ctx->in[ in_idx ] ); - } - else { /* event_idx==cons_cnt, housekeeping event */ - - /* Update metrics counters to external viewers */ - fd_stream_metrics_update_external( stream_ctx->metrics, - stream_ctx->ticks->now, - NULL, - ctx ); - /* Recalculate flow control credits */ - ulong slowest_cons = ULONG_MAX; - fd_stream_writer_update_flow_control_credits( ctx->writer, - &slowest_cons ); - fd_stream_ctx_update_cons_slow( stream_ctx, - slowest_cons ); - during_housekeeping( ctx ); - } - fd_stream_ctx_housekeeping_advance( stream_ctx ); - } - - /* Check if we are backpressured, otherwise poll */ - if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) { - fd_stream_ctx_process_backpressure( stream_ctx ); - } else { - fd_stream_ctx_poll( stream_ctx, ctx, on_stream_frag ); - } - } + fd_stream_ctx_init_run_loop( stream_ctx, + ctx, + fd_unzstd_init_from_stream_ctx, + fd_unzstd_in_update, + during_housekeeping, + NULL, + NULL, + on_stream_frag ); + + fd_stream_ctx_run_loop( stream_ctx, ctx ); } static void @@ -219,10 +198,10 @@ fd_unzstd_run( fd_topo_t * topo, fd_topo_tile_t * tile ) { fd_unzstd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile ); - ulong cons_cnt = fd_topo_tile_reliable_consumer_cnt( topo, tile ); + ulong out_cnt = tile->out_cnt; - void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, cons_cnt ) ); - fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, cons_cnt ); + void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) ); + fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt ); fd_unzstd_run1( ctx, stream_ctx ); } @@ -236,3 +215,5 @@ fd_topo_run_tile_t fd_tile_snapshot_restore_Unzstd = { .run = fd_unzstd_run, }; #endif + +#undef NAME diff --git a/src/discof/restore/stream/fd_event_map.c b/src/discof/restore/stream/fd_event_map.c index 8d02c60df5..9eddf5f0c5 100644 --- a/src/discof/restore/stream/fd_event_map.c +++ b/src/discof/restore/stream/fd_event_map.c @@ -3,7 +3,7 @@ fd_event_map_t * fd_event_map_new( void * mem, ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -17,13 +17,13 @@ fd_event_map_new( void * mem, FD_SCRATCH_ALLOC_INIT( l, mem ); fd_event_map_t * self = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_map_t), sizeof(fd_event_map_t) ); - ulong event_cnt = 1UL + in_cnt + cons_cnt; + ulong event_cnt = 1UL + in_cnt + out_cnt; self->event_map = FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), sizeof(ushort)*event_cnt ); self->event_cnt = event_cnt; self->event_seq = 0UL; /* init event map */ - fd_event_map_init(self, in_cnt, cons_cnt ); + fd_event_map_init(self, in_cnt, out_cnt ); return self; } diff --git a/src/discof/restore/stream/fd_event_map.h b/src/discof/restore/stream/fd_event_map.h index eaa89b9739..13b3fc0d08 100644 --- a/src/discof/restore/stream/fd_event_map.h +++ b/src/discof/restore/stream/fd_event_map.h @@ -22,8 +22,8 @@ fd_event_map_align( void ) { FD_FN_CONST static inline ulong fd_event_map_footprint( ulong in_cnt, - ulong cons_cnt ) { - ulong event_cnt = 1UL + in_cnt + cons_cnt; + ulong out_cnt ) { + ulong event_cnt = 1UL + in_cnt + out_cnt; ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND(l, alignof(fd_event_map_t), sizeof(fd_event_map_t) ); l = FD_LAYOUT_APPEND(l, alignof(ushort), sizeof(ushort)*event_cnt ); @@ -33,17 +33,17 @@ fd_event_map_footprint( ulong in_cnt, fd_event_map_t * fd_event_map_new( void * mem, ulong in_cnt, - ulong cons_cnt ); + ulong out_cnt ); static inline void fd_event_map_init( fd_event_map_t * map, ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { ulong idx = 0UL; - map->event_map[ idx++ ] = (ushort)cons_cnt; + map->event_map[ idx++ ] = (ushort)out_cnt; for( ulong in_idx=0UL; in_idxevent_map[ idx++ ] = (ushort)(in_idx+cons_cnt+1UL); - for( ulong cons_idx=0UL; cons_idxevent_map[ idx++ ] = (ushort)(in_idx+out_cnt+1UL); + for( ulong cons_idx=0UL; cons_idxevent_map[ idx++ ] = (ushort)cons_idx; } diff --git a/src/discof/restore/stream/fd_stream_ctx.c b/src/discof/restore/stream/fd_stream_ctx.c index 65c54dceb7..b6c95a6f34 100644 --- a/src/discof/restore/stream/fd_stream_ctx.c +++ b/src/discof/restore/stream/fd_stream_ctx.c @@ -21,28 +21,19 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, ctx->in_ptrs[ i ] = &ctx->in[ i ]; } - /* init cons_fseq */ - ulong cons_idx = 0UL; - for( ulong i=0UL; itile_cnt; i++ ) { - fd_topo_tile_t * consumer_tile = &topo->tiles[ i ]; - for( ulong j=0UL; jin_cnt; j++ ) { - for( ulong k=0UL; kout_cnt; k++ ) { - if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) { - ctx->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; - } - } - } + /* init writers */ + for( ulong i=0UL; iout_cnt; i++ ) { + fd_stream_writer_new( &ctx->writers[i], + topo, + tile, + i, + 512UL, + 2UL ); } fd_stream_ticks_init( ctx->ticks, ctx->event_map->event_cnt, 1e3L ); fd_stream_metrics_init( ctx->metrics ); FD_TEST( fd_rng_join( fd_rng_new( ctx->rng, 0, 0UL ) ) ); - - /* init metrics link for cons_slow */ - cons_idx = 0UL; - for( ; cons_idxcons_cnt; cons_idx++ ) { - ctx->cons_slow[ cons_idx ] = (ulong *)(fd_metrics_link_out( fd_metrics_base_tl, cons_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF); - } } fd_stream_ctx_t * @@ -50,7 +41,7 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING(( "NULL mem" )); return NULL; @@ -66,14 +57,13 @@ fd_stream_ctx_new( void * mem, self->in = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); self->in_ptrs = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); - self->cons_fseq = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); - self->cons_slow = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); - void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); + void * event_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); + self->writers = FD_SCRATCH_ALLOC_APPEND( l, fd_stream_writer_align(), sizeof(fd_stream_writer_t)*out_cnt ); self->in_cnt = in_cnt; - self->cons_cnt = cons_cnt; + self->out_cnt = out_cnt; - self->event_map = fd_event_map_new( event_map_mem, in_cnt, cons_cnt ); + self->event_map = fd_event_map_new( event_map_mem, in_cnt, out_cnt ); fd_stream_ctx_init( self, topo, tile ); self->in_seq = 0UL; diff --git a/src/discof/restore/stream/fd_stream_ctx.h b/src/discof/restore/stream/fd_stream_ctx.h index a8bf3522b6..300c09dc56 100644 --- a/src/discof/restore/stream/fd_stream_ctx.h +++ b/src/discof/restore/stream/fd_stream_ctx.h @@ -3,22 +3,42 @@ #include "../../../disco/topo/fd_topo.h" #include "fd_stream_reader.h" +#include "fd_stream_writer.h" #include "fd_event_map.h" #include "fd_stream_ticks.h" #include "fd_stream_metrics.h" +struct fd_stream_ctx; +typedef struct fd_stream_ctx fd_stream_ctx_t; + +typedef void fd_tile_ctx_init_run_loop_fn_t( void * ctx, + fd_stream_ctx_t * stream_ctx ); +typedef void fd_tile_update_in_fn_t( fd_stream_reader_t * reader ); +typedef void fd_tile_housekeeping_fn_t( void * ctx, + fd_stream_ctx_t * stream_ctx ); +typedef void fd_tile_metrics_write_fn_t( void * ctx ); +typedef void fd_tile_run_fn_t( void * ctx, fd_stream_ctx_t * stream_ctx ); +typedef int fd_tile_on_stream_frag_fn_t( void * ctx, + fd_stream_reader_t * reader, + fd_stream_frag_meta_t const * frag, +ulong * sz ); + struct fd_stream_ctx { - fd_stream_reader_t * in; - fd_stream_reader_t ** in_ptrs; - ulong ** cons_fseq; - ulong ** cons_slow; - fd_event_map_t * event_map; - ulong in_cnt; - ulong cons_cnt; - ulong in_seq; - fd_rng_t rng[1]; - fd_stream_ticks_t ticks[1]; - fd_stream_metrics_t metrics[1]; + fd_stream_reader_t * in; + fd_stream_reader_t ** in_ptrs; + fd_event_map_t * event_map; + ulong in_cnt; + ulong out_cnt; + ulong in_seq; + fd_rng_t rng[1]; + fd_stream_ticks_t ticks[1]; + fd_stream_metrics_t metrics[1]; + fd_stream_writer_t * writers; + fd_tile_update_in_fn_t * tile_update_in; + fd_tile_housekeeping_fn_t * tile_housekeeping; + fd_tile_metrics_write_fn_t * tile_metrics_write; + fd_tile_run_fn_t * tile_run; + fd_tile_on_stream_frag_fn_t * tile_on_stream_frag; }; typedef struct fd_stream_ctx fd_stream_ctx_t; @@ -31,14 +51,12 @@ fd_stream_ctx_scratch_align( void ) { FD_FN_PURE static inline ulong fd_stream_ctx_scratch_footprint( ulong in_cnt, - ulong cons_cnt ) { + ulong out_cnt ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_stream_ctx_t), sizeof(fd_stream_ctx_t) ); l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t), in_cnt*sizeof(fd_stream_reader_t) ); /* in */ l = FD_LAYOUT_APPEND( l, alignof(fd_stream_reader_t *), in_cnt*sizeof(fd_stream_reader_t *) ); /* in_ptrs */ - l = FD_LAYOUT_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) ); /* cons_fseq */ - l = FD_LAYOUT_APPEND( l, alignof(ulong *), cons_cnt*sizeof(ulong *) ); /* cons_slow */ - l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, cons_cnt ) ); /* event_map */ + l = FD_LAYOUT_APPEND( l, fd_event_map_align(), fd_event_map_footprint( in_cnt, out_cnt ) ); /* event_map */ return FD_LAYOUT_FINI( l, fd_stream_ctx_scratch_align() ); } @@ -47,7 +65,7 @@ fd_stream_ctx_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong in_cnt, - ulong cons_cnt ); + ulong out_cnt ); void fd_stream_ctx_init( fd_stream_ctx_t * ctx, @@ -55,19 +73,46 @@ fd_stream_ctx_init( fd_stream_ctx_t * ctx, fd_topo_tile_t * tile ); static inline void -fd_stream_ctx_update_cons_slow( fd_stream_ctx_t * ctx, - ulong slowest_cons ) { -if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) { - FD_COMPILER_MFENCE(); - (*ctx->cons_slow[ slowest_cons ]) += ctx->metrics->in_backp; - FD_COMPILER_MFENCE(); +fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx, + void * tile_ctx, + fd_tile_ctx_init_run_loop_fn_t * tile_init_run_loop, + fd_tile_update_in_fn_t * tile_update_in, + fd_tile_housekeeping_fn_t * tile_housekeeping, + fd_tile_metrics_write_fn_t * tile_metrics_write, + fd_tile_run_fn_t * tile_run, + fd_tile_on_stream_frag_fn_t * tile_on_stream_frag ) { + if( ctx->in_cnt && !tile_update_in ) { + FD_LOG_ERR(( "tile_update_in function cannot be null if there are producers to this tile!" )); } -} -static inline void -fd_stream_ctx_init_run_loop( fd_stream_ctx_t * ctx ) { + if( ctx->in_cnt && !tile_on_stream_frag ) { + FD_LOG_ERR(( "tile_on_stream_frag function cannot be null if there are producers to this tile!" )); + } + + ctx->tile_update_in = tile_update_in; + ctx->tile_housekeeping = tile_housekeeping; + ctx->tile_metrics_write = tile_metrics_write; + ctx->tile_run = tile_run; + ctx->tile_on_stream_frag = tile_on_stream_frag; + FD_MGAUGE_SET( TILE, STATUS, 1UL ); fd_stream_ticks_init_timer( ctx->ticks ); + + for( ulong i=0UL; iout_cnt; i++ ) { + fd_stream_writer_init_flow_control_credits( &ctx->writers[ i ] ); + } + + if( tile_init_run_loop ) { + tile_init_run_loop( tile_ctx, ctx ); + } +} + +static inline void +fd_stream_ctx_update_flow_control_credits( fd_stream_ctx_t * ctx ) { + /* Recalculate flow control credits */ + for( ulong i=0UL; iout_cnt; i++ ) { + fd_stream_writer_update_flow_control_credits( &ctx->writers[i] ); + } } static inline void @@ -85,57 +130,106 @@ fd_stream_ctx_housekeeping_advance( fd_stream_ctx_t * ctx ) { } static inline void -fd_stream_ctx_process_backpressure( fd_stream_ctx_t * ctx ) { - fd_stream_metrics_update_backpressure( ctx->metrics, - ctx->ticks->housekeeping_ticks ); - fd_stream_ticks_reload_backpressure( ctx->ticks ); +fd_stream_ctx_do_housekeeping( fd_stream_ctx_t * ctx, + void * tile_ctx ) { + if( FD_UNLIKELY( fd_stream_ticks_is_housekeeping_time( ctx->ticks ) ) ) { + ulong event_idx = fd_event_map_get_event( ctx->event_map ); + + if( FD_LIKELY( event_idxout_cnt ) ) { /* receive credits */ + ulong out_idx = event_idx; + + /* Receive flow control credits from this out. */ + fd_stream_writer_receive_flow_control_credits( &ctx->writers[ out_idx ] ); + + } else if( event_idx>ctx->out_cnt) { /* send credits */ + ulong in_idx = event_idx - ctx->out_cnt - 1UL; + ctx->tile_update_in( &ctx->in[ in_idx ] ); + + } else { /* event_idx==out_cnt, housekeeping event */ + + /* Update metrics counters to external viewers */ + fd_stream_metrics_update_external( ctx->metrics, + ctx->ticks->now, + ctx->tile_metrics_write, + ctx ); + fd_stream_ctx_update_flow_control_credits( ctx ); + + if( ctx->tile_housekeeping ) { + ctx->tile_housekeeping( tile_ctx, ctx ); + } + } + + fd_stream_ctx_housekeeping_advance( ctx ); + } +} + +static inline void +fd_stream_ctx_advance_poll_empty( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[0] += ctx->ticks->housekeeping_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[3] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; } -typedef int fd_on_stream_frag_fn_t( void * ctx, - fd_stream_reader_t * reader, - fd_stream_frag_meta_t const * frag, - ulong * sz ); +static inline void +fd_stream_ctx_advance_poll( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[1] += ctx->ticks->housekeeping_ticks; + ctx->metrics->regime_ticks[4] += ctx->ticks->prefrag_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[7] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; +} + +static inline void +fd_stream_ctx_advance_poll_idle( fd_stream_ctx_t * ctx ) { + ctx->metrics->regime_ticks[0] += ctx->ticks->housekeeping_ticks; + ctx->metrics->regime_ticks[3] += ctx->ticks->prefrag_ticks; + long next = fd_tickcount(); + ctx->metrics->regime_ticks[6] += (ulong)(next - ctx->ticks->now); + ctx->ticks->now = next; +} static inline void -fd_stream_ctx_poll( fd_stream_ctx_t * stream_ctx, - void * ctx, - fd_on_stream_frag_fn_t * on_stream_frag ) { - stream_ctx->metrics->in_backp = 0UL; - stream_ctx->ticks->prefrag_ticks = 0UL; +fd_stream_ctx_poll( fd_stream_ctx_t * ctx, + void * tile_ctx ) { + ctx->metrics->in_backp = 0UL; + + if( FD_UNLIKELY( !ctx->in_cnt ) ) { + fd_stream_ctx_advance_poll_empty( ctx ); + return; + } + + ctx->ticks->prefrag_ticks = 0UL; /* select input to poll */ - fd_stream_reader_t * this_in = &stream_ctx->in[ stream_ctx->in_seq ]; - stream_ctx->in_seq++; - if( stream_ctx->in_seq>=stream_ctx->in_cnt ) { - stream_ctx->in_seq = 0UL; /* cmov */ + fd_stream_reader_t * this_in = &ctx->in[ ctx->in_seq ]; + ctx->in_seq++; + if( ctx->in_seq>=ctx->in_cnt ) { + ctx->in_seq = 0UL; /* cmov */ } fd_frag_reader_consume_ctx_t consume_ctx; long diff = fd_stream_reader_poll_frag( this_in, - stream_ctx->in_seq, + ctx->in_seq, &consume_ctx ); if( FD_UNLIKELY( diff<0L ) ) { - fd_stream_metrics_update_poll( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now); + /* overrun case, technically impossible with reliable streams */ + fd_stream_ctx_advance_poll( ctx ); fd_stream_reader_process_overrun( this_in, &consume_ctx, diff ); } else if ( FD_UNLIKELY( diff ) ) { - fd_stream_metrics_update_poll_idle( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now ); + /* nothing new to poll */ + fd_stream_ctx_advance_poll_idle( ctx ); } else { FD_COMPILER_MFENCE(); ulong sz = 0U; fd_stream_frag_meta_t const * frag = fd_type_pun_const( consume_ctx.mline ); - int consumed_frag = on_stream_frag( ctx, this_in, frag, &sz ); + int consumed_frag = ctx->tile_on_stream_frag( tile_ctx, this_in, frag, &sz ); fd_stream_reader_consume_bytes( this_in, sz ); @@ -144,10 +238,21 @@ fd_stream_ctx_poll( fd_stream_ctx_t * stream_ctx, &consume_ctx ); } - fd_stream_metrics_update_poll( stream_ctx->metrics, - stream_ctx->ticks->housekeeping_ticks, - stream_ctx->ticks->prefrag_ticks, - &stream_ctx->ticks->now ); + fd_stream_ctx_advance_poll( ctx ); + } +} + +static inline void +fd_stream_ctx_run_loop( fd_stream_ctx_t * ctx, + void * tile_ctx ) { + for(;;) { + fd_stream_ctx_do_housekeeping( ctx, tile_ctx ); + + if( ctx->tile_run ) { + ctx->tile_run( tile_ctx, ctx ); + } + + fd_stream_ctx_poll( ctx, tile_ctx ); } } @@ -158,11 +263,6 @@ fd_stream_ctx_delete( fd_stream_ctx_t * ctx ) { ctx->in_ptrs[ i ] = NULL; } - for( ulong i=0UL; icons_cnt; i++ ) { - ctx->cons_fseq[ i ] = NULL; - ctx->cons_slow[ i ] = NULL; - } - fd_event_map_delete( ctx->event_map ); fd_memset(ctx, 0, sizeof(fd_stream_ctx_t) ); return (void *)ctx; diff --git a/src/discof/restore/stream/fd_stream_metrics.h b/src/discof/restore/stream/fd_stream_metrics.h index 031e5500e2..d3b103811d 100644 --- a/src/discof/restore/stream/fd_stream_metrics.h +++ b/src/discof/restore/stream/fd_stream_metrics.h @@ -29,8 +29,6 @@ fd_stream_metrics_update_external( fd_stream_metrics_t * metrics, void * ctx ) { FD_COMPILER_MFENCE(); FD_MGAUGE_SET( TILE, HEARTBEAT, (ulong)now ); - FD_MGAUGE_SET( TILE, IN_BACKPRESSURE, metrics->in_backp ); - FD_MCNT_INC ( TILE, BACKPRESSURE_COUNT, metrics->backp_cnt ); FD_MCNT_ENUM_COPY( TILE, REGIME_DURATION_NANOS, metrics->regime_ticks ); if( metrics_write ) { @@ -41,39 +39,6 @@ fd_stream_metrics_update_external( fd_stream_metrics_t * metrics, metrics->backp_cnt = 0UL; } -static inline void -fd_stream_metrics_update_backpressure( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks ) { - metrics->backp_cnt += (ulong)!metrics->in_backp; - metrics->in_backp = 1UL; - FD_SPIN_PAUSE(); - metrics->regime_ticks[2] += housekeeping_ticks; -} - -static inline void -fd_stream_metrics_update_poll( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks, - ulong prefrag_ticks, - long * now) { - metrics->regime_ticks[1] += housekeeping_ticks; - metrics->regime_ticks[4] += prefrag_ticks; - long next = fd_tickcount(); - metrics->regime_ticks[7] += (ulong)(next - *now); - *now = next; -} - -static inline void -fd_stream_metrics_update_poll_idle( fd_stream_metrics_t * metrics, - ulong housekeeping_ticks, - ulong prefrag_ticks, - long * now) { - metrics->regime_ticks[0] += housekeeping_ticks; - metrics->regime_ticks[3] += prefrag_ticks; - long next = fd_tickcount(); - metrics->regime_ticks[6] += (ulong)(next - *now); - *now = next; -} - FD_PROTOTYPES_END #endif /* HEADER_fd_src_discof_restore_stream_fd_stream_metrics_h */ diff --git a/src/discof/restore/stream/fd_stream_ticks.h b/src/discof/restore/stream/fd_stream_ticks.h index 9db346cfac..ac181fadb1 100644 --- a/src/discof/restore/stream/fd_stream_ticks.h +++ b/src/discof/restore/stream/fd_stream_ticks.h @@ -46,10 +46,4 @@ fd_stream_ticks_reload_housekeeping( fd_stream_ticks_t * ticks, fd_rng_t * rng ) ticks->now = next; } -static inline void -fd_stream_ticks_reload_backpressure( fd_stream_ticks_t * ticks ) { - long next = fd_tickcount(); - ticks->now = next; -} - #endif /* HEADER_fd_src_discof_restore_stream_fd_stream_ticks_h */ diff --git a/src/discof/restore/stream/fd_stream_writer.c b/src/discof/restore/stream/fd_stream_writer.c index dea9aeb027..69e654d03d 100644 --- a/src/discof/restore/stream/fd_stream_writer.c +++ b/src/discof/restore/stream/fd_stream_writer.c @@ -7,7 +7,6 @@ fd_stream_writer_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong link_id, - ulong read_max, ulong burst_byte, ulong burst_frag ) { if( FD_UNLIKELY( !mem ) ) { @@ -34,7 +33,7 @@ fd_stream_writer_new( void * mem, self->buf_off = 0UL; self->buf_sz = fd_dcache_data_sz( dcache ); self->goff = 0UL; - self->read_max = read_max; + self->read_max = 0UL; /* this should be set by the tile via fd_stream_writer_set_read_max */ self->stream_off = 0UL; self->goff_start = 0UL; self->out_seq = 0UL; @@ -61,7 +60,9 @@ fd_stream_writer_new( void * mem, for( ulong j=0UL; jin_cnt; j++ ) { if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ link_id ] && consumer_tile->in_link_reliable[ j ] ) ) { self->cons_fseq[ cons_idx ] = consumer_tile->in_link_fseq[ j ]; - if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx )); + if( FD_UNLIKELY( !self->cons_fseq[ cons_idx ] ) ) { + FD_LOG_ERR(( "NULL cons_fseq[%lu] for out_link=%lu", cons_idx, tile->out_link_id[ link_id ] )); + } cons_idx++; } } diff --git a/src/discof/restore/stream/fd_stream_writer.h b/src/discof/restore/stream/fd_stream_writer.h index a08bd2a0af..1908d0091e 100644 --- a/src/discof/restore/stream/fd_stream_writer.h +++ b/src/discof/restore/stream/fd_stream_writer.h @@ -60,7 +60,6 @@ fd_stream_writer_new( void * mem, fd_topo_t * topo, fd_topo_tile_t * tile, ulong link_id, - ulong read_max, ulong burst_byte, ulong burst_frag ); @@ -73,17 +72,23 @@ fd_stream_writer_init_flow_control_credits( fd_stream_writer_t * writer ) { } static inline void -fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer, - ulong cons_idx) { - FD_COMPILER_MFENCE(); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][0] ); - writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*cons_idx+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ cons_idx ][1] ); - FD_COMPILER_MFENCE(); +fd_stream_writer_set_read_max( fd_stream_writer_t * writer, + ulong read_max ) { + writer->read_max = read_max; } static inline void -fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer, - ulong * slowest_cons_out ) { +fd_stream_writer_receive_flow_control_credits( fd_stream_writer_t * writer ) { + for( ulong i=0UL; icons_cnt; i++ ) { + FD_COMPILER_MFENCE(); + writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][0] ); + writer->cons_seq [ EXPECTED_FSEQ_CNT_PER_CONS*i+1 ] = FD_VOLATILE_CONST( writer->cons_fseq[ i ][1] ); + FD_COMPILER_MFENCE(); + } +} + +static inline void +fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer ) { ulong slowest_cons = ULONG_MAX; if( FD_LIKELY( writer->cr_byte_availcr_byte_max || writer->cr_frag_availcr_frag_max ) ) { ulong cr_byte_avail = writer->cr_byte_max; @@ -99,10 +104,6 @@ fd_stream_writer_update_flow_control_credits( fd_stream_writer_t * writer, writer->cr_byte_avail = cr_byte_avail; writer->cr_frag_avail = cr_frag_avail; } - - if( slowest_cons_out ) { - *slowest_cons_out = slowest_cons; - } } static inline ulong diff --git a/src/flamenco/snapshot/fd_snapshot_http.c b/src/flamenco/snapshot/fd_snapshot_http.c index bf62bee2e6..4ddb9a6342 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.c +++ b/src/flamenco/snapshot/fd_snapshot_http.c @@ -122,14 +122,18 @@ fd_snapshot_http_new( void * mem, return this; } -static void +void fd_snapshot_http_cleanup_fds( fd_snapshot_http_t * this ) { if( this->snapshot_fd!=-1 ) { - close( this->snapshot_fd ); + if( FD_UNLIKELY( close( this->snapshot_fd ) ) ) { + FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } this->snapshot_fd = -1; } if( this->socket_fd!=-1 ) { - close( this->socket_fd ); + if( FD_UNLIKELY( close( this->socket_fd ) ) ) { + FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } this->socket_fd = -1; } } @@ -192,6 +196,18 @@ fd_snapshot_http_init( fd_snapshot_http_t * this ) { return 0; } +/* for http tile use */ +void +fd_snapshot_http_privileged_init( fd_snapshot_http_t * this ) { + fd_snapshot_http_init( this ); + + /* open snapshot fd for writing to snapshot file */ + this->snapshot_fd = open( this->snapshot_path, O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR ); + if( this->snapshot_fd<0 ) { + FD_LOG_ERR(( "open(%s) failed (%d-%s)", this->snapshot_path, errno, fd_io_strerror( errno ) )); + } +} + /* fd_snapshot_http_req writes out the request. */ static int @@ -652,6 +668,8 @@ fd_io_istream_snapshot_http_read( void * _this, return fd_snapshot_http_dl( this, dst, dst_max, dst_sz ); case FD_SNAPSHOT_HTTP_STATE_READ: return fd_snapshot_http_read( this, dst, dst_max, dst_sz ); + case FD_SNAPSHOT_HTTP_STATE_DONE: + return 1; } /* Not yet ready to read at this point. */ diff --git a/src/flamenco/snapshot/fd_snapshot_http.h b/src/flamenco/snapshot/fd_snapshot_http.h index 20b36aeea4..b0bd4239af 100644 --- a/src/flamenco/snapshot/fd_snapshot_http.h +++ b/src/flamenco/snapshot/fd_snapshot_http.h @@ -98,6 +98,20 @@ struct fd_snapshot_http { typedef struct fd_snapshot_http fd_snapshot_http_t; +FD_FN_PURE static inline ulong +fd_snapshot_http_align( void ) { + return fd_ulong_max( alignof(fd_snapshot_http_t), alignof(fd_snapshot_name_t) ); +} + +FD_FN_PURE static inline ulong +fd_snapshot_http_footprint( void ) { + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_http_t), sizeof(fd_snapshot_http_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_snapshot_name_t), sizeof(fd_snapshot_name_t) ); + return FD_LAYOUT_FINI( l, fd_snapshot_http_align() ); + +} + fd_snapshot_http_t * fd_snapshot_http_new( void * mem, const char * dst_str, @@ -106,6 +120,9 @@ fd_snapshot_http_new( void * mem, const char * snapshot_dir, fd_snapshot_name_t * name_out ); +void +fd_snapshot_http_privileged_init( fd_snapshot_http_t * this ); + void * fd_snapshot_http_delete( fd_snapshot_http_t * this ); @@ -142,6 +159,9 @@ fd_io_istream_snapshot_http_virtual( fd_snapshot_http_t * this ) { }; } +void +fd_snapshot_http_cleanup_fds( fd_snapshot_http_t * this ); + FD_PROTOTYPES_END #endif /* HEADER_fd_src_flamenco_snapshot_fd_snapshot_http_h */