Skip to content

Add HttpDl tile and more generic stream ctx #5132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: ripatel/snapshot-tiles-wip
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 100 additions & 37 deletions src/app/firedancer-dev/commands/snapshot_load.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "../../../disco/topo/fd_topob.h"
#include "../../../disco/topo/fd_pod_format.h"
#include "../../../util/tile/fd_tile_private.h"
#include "../../../flamenco/snapshot/fd_snapshot_loader.h"
#include <sys/resource.h>
#include <linux/capability.h>
#include <unistd.h>
Expand Down Expand Up @@ -39,10 +40,14 @@ _is_zstd( char const * path ) {
fclose( file );
return ( magic==0xFD2FB528UL );
}

static void
snapshot_load_topo( config_t * config,
args_t const * args ) {
int is_zstd = _is_zstd( args->snapshot_load.snapshot_path );
fd_snapshot_src_t src[1];
char snapshot_path_copy[4096];
memcpy( snapshot_path_copy, args->snapshot_load.snapshot_path, sizeof(snapshot_path_copy) );
fd_snapshot_src_parse_type_unknown( src, snapshot_path_copy );

fd_topo_t * topo = &config->topo;
fd_topob_new( &config->topo, config->name );
Expand All @@ -63,21 +68,69 @@ snapshot_load_topo( config_t * config,
fd_topob_wksp( topo, "metric" );
fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[0], 0, 0 );

/* read() tile */
fd_topob_wksp( topo, "FileRd" );
fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 );
fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX );
FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi );
FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi );

/* Uncompressed data stream */
fd_topob_wksp( topo, "snap_stream" );
fd_topo_link_t * snapin_link = fd_topob_link( topo, "snap_stream", "snap_stream", 512UL, 0UL, 0UL );
fd_topo_obj_t * snapin_dcache = fd_topob_obj( topo, "dcache", "snap_stream" );
snapin_link->dcache_obj_id = snapin_dcache->id;
FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", snapin_dcache->id ) );

if( is_zstd ) { /* .tar.zst file */
if( src->type==FD_SNAPSHOT_SRC_FILE ) {

int is_zstd = _is_zstd( args->snapshot_load.snapshot_path );

/* read() tile */
fd_topob_wksp( topo, "FileRd" );
fd_topo_tile_t * filerd_tile = fd_topob_tile( topo, "FileRd", "FileRd", "FileRd", tile_to_cpu[1], 0, 0 );
fd_memcpy( filerd_tile->filerd.file_path, args->snapshot_load.snapshot_path, PATH_MAX );
FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==sizeof(args->snapshot_load.snapshot_path), abi );
FD_STATIC_ASSERT( sizeof(filerd_tile->filerd.file_path)==PATH_MAX, abi );

if( is_zstd ) { /* .tar.zst file */

/* "unzstd": Zstandard decompress tile */
fd_topob_wksp( topo, "Unzstd" );
fd_topo_tile_t * unzstd_tile = fd_topob_tile( topo, "Unzstd", "Unzstd", "Unzstd", tile_to_cpu[2], 0, 0 );
(void)unzstd_tile;

/* Compressed data stream */
fd_topob_wksp( topo, "snap_zstd" );
fd_topo_link_t * zstd_link = fd_topob_link( topo, "snap_zstd", "snap_zstd", 512UL, 0UL, 0UL );
fd_topo_obj_t * zstd_dcache = fd_topob_obj( topo, "dcache", "snap_zstd");
zstd_link->dcache_obj_id = zstd_dcache->id;
FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) );

/* filerd tile -> compressed stream */
fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL );
fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

/* compressed stream -> unzstd tile */
fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_uses( topo, unzstd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY );

/* unzstd tile -> uncompressed stream */
fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL );
fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

} else { /* .tar file */

/* filerd tile -> uncompressed stream */
fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL );
fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

}
}
else if ( src->type==FD_SNAPSHOT_SRC_HTTP ) {

/* httpdl() tile */
fd_topob_wksp( topo, "HttpDl" );
fd_topo_tile_t * httpdl_tile = fd_topob_tile( topo, "HttpDl", "HttpDl", "HttpDl", tile_to_cpu[1], 0, 0 );
fd_memcpy( httpdl_tile->httpdl.path, src->http.path, PATH_MAX );
fd_memcpy( httpdl_tile->httpdl.snapshot_dir, args->snapshot_load.snapshot_dir, PATH_MAX );
fd_memcpy( httpdl_tile->httpdl.dest, src->http.dest, sizeof(src->http.dest) );
httpdl_tile->httpdl.ip4 = src->http.ip4;
httpdl_tile->httpdl.path_len = src->http.path_len;
httpdl_tile->httpdl.port = src->http.port;

/* "unzstd": Zstandard decompress tile */
fd_topob_wksp( topo, "Unzstd" );
Expand All @@ -92,8 +145,8 @@ snapshot_load_topo( config_t * config,
FD_TEST( fd_pod_insertf_ulong( topo->props, (16UL<<20), "obj.%lu.data_sz", zstd_dcache->id ) );

/* filerd tile -> compressed stream */
fd_topob_tile_out( topo, "FileRd", 0UL, "snap_zstd", 0UL );
fd_topob_tile_uses( topo, filerd_tile, zstd_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_out( topo, "HttpDl", 0UL, "snap_zstd", 0UL );
fd_topob_tile_uses( topo, httpdl_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

/* compressed stream -> unzstd tile */
fd_topob_tile_in( topo, "Unzstd", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
Expand All @@ -102,13 +155,6 @@ snapshot_load_topo( config_t * config,
/* unzstd tile -> uncompressed stream */
fd_topob_tile_out( topo, "Unzstd", 0UL, "snap_stream", 0UL );
fd_topob_tile_uses( topo, unzstd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

} else { /* .tar file */

/* filerd tile -> uncompressed stream */
fd_topob_tile_out( topo, "FileRd", 0UL, "snap_stream", 0UL );
fd_topob_tile_uses( topo, filerd_tile, snapin_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );

}

fd_topob_wksp( topo, "SnapIn" );
Expand Down Expand Up @@ -149,19 +195,27 @@ static void
snapshot_load_cmd_args( int * pargc,
char *** pargv,
args_t * args ) {
char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL );
char const * snapshot_file = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL );
char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL );
char const * snapshot_src = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot", NULL, NULL );
char const * snapshot_dir = fd_env_strip_cmdline_cstr( pargc, pargv, "--snapshot-dir", NULL, NULL );

if( tile_cpus ) {
ulong tile_cpus_strlen = strlen( tile_cpus );
if( FD_UNLIKELY( tile_cpus_strlen>=sizeof(args->tile_cpus) ) ) FD_LOG_ERR(( "--tile-cpus: flag too long" ));
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->tile_cpus ), tile_cpus, tile_cpus_strlen ) );
}

if( FD_UNLIKELY( !snapshot_file ) ) FD_LOG_ERR(( "Missing --snapshot flag" ));
ulong snapshot_file_strlen = strlen( snapshot_file );
if( FD_UNLIKELY( !snapshot_src ) ) FD_LOG_ERR(( "Missing --snapshot flag" ));
ulong snapshot_file_strlen = strlen( snapshot_src );
if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_path) ) ) FD_LOG_ERR(( "--snapshot: path too long" ));
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_file, snapshot_file_strlen ) );
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_path ), snapshot_src, snapshot_file_strlen ) );

/* FIXME: check if we need the snapshot dir argument (parse the snapshot input src to see if it's http)*/
if( snapshot_dir!=NULL ) {
ulong snapshot_dir_strlen = strlen( snapshot_dir );
if( FD_UNLIKELY( snapshot_file_strlen>=sizeof(args->snapshot_load.snapshot_dir) ) ) FD_LOG_ERR(( "--snapshot-dir: dir too long" ));
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.snapshot_dir ), snapshot_dir, snapshot_dir_strlen ) );
}
}

static void
Expand Down Expand Up @@ -189,15 +243,20 @@ snapshot_load_cmd_fn( args_t * args,
double ns_per_tick = 1.0/tick_per_ns;
fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run, NULL );

fd_topo_tile_t * file_rd_tile = &topo->tiles[ fd_topo_find_tile( topo, "FileRd", 0UL ) ];
fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ];
ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL );
fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL;

ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ];
ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache );
ulong volatile * file_rd_metrics = fd_metrics_tile( file_rd_tile->metrics );
ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics );
ulong httpdl_tile_idx = fd_topo_find_tile( topo, "HttpDl", 0UL );
ulong filerd_tile_idx = fd_topo_find_tile( topo, "FileRd", 0UL );
fd_topo_tile_t * http_dl_tile = httpdl_tile_idx!=ULONG_MAX ? &topo->tiles[ httpdl_tile_idx ] : NULL;
fd_topo_tile_t * file_rd_tile = filerd_tile_idx!=ULONG_MAX ? &topo->tiles[ filerd_tile_idx ] : NULL;
fd_topo_tile_t * snap_in_tile = &topo->tiles[ fd_topo_find_tile( topo, "SnapIn", 0UL ) ];
ulong zstd_tile_idx = fd_topo_find_tile( topo, "Unzstd", 0UL );
fd_topo_tile_t * unzstd_tile = zstd_tile_idx!=ULONG_MAX ? &topo->tiles[ zstd_tile_idx ] : NULL;
// fd_topo_tile_t * actalc_tile = &topo->tiles[ fd_topo_find_tile( topo, "ActAlc", 0UL ) ];

ulong * snap_in_fseq = snap_in_tile->in_link_fseq[ 0 ];
ulong * snap_accs_sync = fd_mcache_seq_laddr( topo->links[ fd_topo_find_link( topo, "snap_frags", 0UL ) ].mcache );
ulong volatile * file_rd_metrics = file_rd_tile ? fd_metrics_tile( file_rd_tile->metrics ) : NULL;
ulong volatile * http_dl_metrics = http_dl_tile ? fd_metrics_tile( http_dl_tile->metrics ) : NULL;
ulong volatile * snap_in_metrics = fd_metrics_tile( snap_in_tile->metrics );
ulong volatile * unzstd_in_metrics = unzstd_tile ? fd_metrics_tile( unzstd_tile->metrics ) : NULL;

ulong goff_old = 0UL;
Expand All @@ -206,18 +265,22 @@ snapshot_load_cmd_fn( args_t * args,
ulong acc_cnt_old = 0UL;
ulong frag_cnt_old = 0UL;
for(;;) {
sleep( 1 );

ulong filerd_status = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
sleep( 1 );
ulong filerd_status = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL;
ulong httpdl_status = http_dl_metrics ? FD_VOLATILE_CONST( http_dl_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL;
ulong snapin_status = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
ulong unzstd_status = unzstd_in_metrics ? FD_VOLATILE_CONST( unzstd_in_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ) : 2UL;
if( FD_UNLIKELY( filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) {
if( FD_UNLIKELY( httpdl_status==2UL && filerd_status==2UL && unzstd_status==2UL && snapin_status == 2UL ) ) {
FD_LOG_NOTICE(( "Done" ));
break;
}

ulong goff = FD_VOLATILE_CONST( snap_in_fseq[ 1 ] );
ulong file_rd_backp = FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] );
ulong file_rd_backp = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] ) : 0UL;
// ulong file_rd_wait = file_rd_metrics ? FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) +
// FD_VOLATILE_CONST( file_rd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] ) +
// file_rd_backp : 0UL;
// ulong snap_in_backp = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ] );
ulong snap_in_wait = FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] ) +
FD_VOLATILE_CONST( snap_in_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] );
ulong frag_cnt = FD_VOLATILE_CONST( snap_accs_sync[0] );
Expand Down
2 changes: 2 additions & 0 deletions src/app/firedancer-dev/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ extern fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd;
extern fd_topo_run_tile_t fd_tile_snapshot_restore_SnapIn;
extern fd_topo_run_tile_t fd_tile_snapshot_restore_ActAlc;
extern fd_topo_run_tile_t fd_tile_snapshot_restore_Unzstd;
extern fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl;

fd_topo_run_tile_t * TILES[] = {
&fd_tile_net,
Expand Down Expand Up @@ -140,6 +141,7 @@ fd_topo_run_tile_t * TILES[] = {
&fd_tile_snapshot_restore_SnapIn,
&fd_tile_snapshot_restore_ActAlc,
&fd_tile_snapshot_restore_Unzstd,
&fd_tile_snapshot_restore_HttpDl,
NULL,
};

Expand Down
1 change: 1 addition & 0 deletions src/app/shared/fd_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct fdctl_args {

struct {
char snapshot_path[ PATH_MAX ];
char snapshot_dir[ PATH_MAX ];
} snapshot_load;
};

Expand Down
9 changes: 9 additions & 0 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,15 @@ typedef struct {
char file_path[ PATH_MAX ];
} filerd;

struct {
char dest[128];
uint ip4;
ushort port;
char path[ PATH_MAX ];
ulong path_len;
char snapshot_dir[ PATH_MAX ];
} httpdl;

struct {
ulong scratch_sz;
} snapin;
Expand Down
1 change: 1 addition & 0 deletions src/discof/restore/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ $(call add-objs,fd_filerd_tile,fd_discof)
$(call add-objs,fd_snapin_tile,fd_discof)
$(call add-objs,fd_actalc_tile,fd_discof)
$(call add-objs,fd_unzstd_tile,fd_discof)
$(call add-objs,fd_httpdl_tile,fd_discof)
$(call add-objs,stream/fd_stream_writer,fd_discof)
$(call add-objs,stream/fd_event_map,fd_discof)
$(call add-objs,stream/fd_stream_ctx,fd_discof)
Loading