Skip to content

belt sanding stream_writer #5152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ typedef struct {
All input links will be automatically polled by the tile
infrastructure, and output links will automatically source and manage
credits from consumers. */
typedef struct {
struct fd_topo_tile {
ulong id; /* The ID of this tile. Indexed from [0, tile_cnt). When placed in a topology, the ID must be the index of the tile in the tiles list. */
char name[ 7UL ]; /* The name of this tile. There can be multiple of each tile name in a topology. */
ulong kind_id; /* The ID of this tile within its name. If there are n tile of a particular name, they have IDs [0, N). The pair (name, kind_id) uniquely identifies a tile, as does "id" on its own. */
Expand Down Expand Up @@ -455,7 +455,9 @@ typedef struct {
} actidx;

};
} fd_topo_tile_t;
};

typedef struct fd_topo_tile fd_topo_tile_t;

typedef struct {
ulong id;
Expand Down
9 changes: 7 additions & 2 deletions src/discof/restore/Local.mk
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
$(call add-objs,fd_filerd_tile,fd_discof)
ifdef FD_HAS_ZSTD
$(call add-objs,fd_unzstd_tile,fd_discof)
endif
ifdef FD_HAS_INT128
$(call add-objs,fd_snapin_tile,fd_discof)
$(call add-objs,fd_actalc_tile,fd_discof)
endif
$(call add-objs,fd_actidx_tile,fd_discof)
$(call add-objs,fd_unzstd_tile,fd_discof)
$(call add-objs,fd_httpdl_tile,fd_discof)
$(call add-objs,stream/fd_stream_writer,fd_discof)
$(call add-objs,stream/fd_event_map,fd_discof)
$(call add-objs,stream/fd_stream_ctx,fd_discof)
$(call make-unit-test,test_snapin_tile,test_snapin_tile,fd_discof fd_util)
ifdef FD_HAS_INT128
$(call make-unit-test,test_snapin_tile,test_snapin_tile,fd_discof fd_disco fd_flamenco fd_tango fd_ballet fd_util)
endif
35 changes: 13 additions & 22 deletions src/discof/restore/fd_filerd_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ unprivileged_init( fd_topo_t * topo,
static void
fd_filerd_init_from_stream_ctx( void * _ctx,
fd_stream_ctx_t * stream_ctx ) {
fd_filerd_tile_t * ctx = fd_type_pun(_ctx);

ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] );
fd_filerd_tile_t * ctx = _ctx;
ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] );
FD_TEST( ctx->writer );
fd_stream_writer_set_frag_sz_max( ctx->writer, FILE_READ_MAX );
}

Expand All @@ -63,7 +63,7 @@ fd_filerd_shutdown( fd_filerd_tile_t * ctx ) {
}
ctx->fd = -1;
FD_MGAUGE_SET( TILE, STATUS, 2UL );
fd_stream_writer_notify_shutdown( ctx->writer );
fd_stream_writer_close( ctx->writer );
FD_COMPILER_MFENCE();
FD_LOG_INFO(( "Reached end of file" ));

Expand All @@ -74,24 +74,22 @@ static void
after_credit( void * _ctx,
fd_stream_ctx_t * stream_ctx,
int * poll_in FD_PARAM_UNUSED ) {
fd_filerd_tile_t * ctx = fd_type_pun(_ctx);
fd_filerd_tile_t * ctx = _ctx;
(void)stream_ctx;

uchar * out = fd_stream_writer_prepare( ctx->writer );
ulong out_max = fd_stream_writer_publish_sz_max( ctx->writer );

/* technically, this is not needed because fd_stream_ctx_run_loop
checks for backpresure on all outgoing links and there is only one
outgoing link anyways. But, it is added for clarity that
callbacks should handle backpressure for their out links. */
if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) {
return;
}
if( FD_UNLIKELY( !out_max ) ) return;

int fd = ctx->fd;
if( FD_UNLIKELY( fd<0 ) ) return;

uchar * out = fd_stream_writer_get_write_ptr( ctx->writer );
ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer );

long res = read( fd, out, dst_max );
long res = read( fd, out, out_max );
if( FD_UNLIKELY( res<=0L ) ) {
if( FD_UNLIKELY( res==0 ) ) {
fd_filerd_shutdown( ctx );
Expand All @@ -102,11 +100,7 @@ after_credit( void * _ctx,
/* aborts app */
}

ulong sz = (ulong)res;
if( FD_LIKELY( sz ) ) {
fd_stream_writer_advance( ctx->writer, sz );
fd_stream_writer_publish( ctx->writer, sz );
}
fd_stream_writer_publish( ctx->writer, (ulong)res, 0UL );
}

__attribute__((noinline)) static void
Expand All @@ -128,11 +122,8 @@ static void
fd_filerd_run( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile );
ulong out_cnt = tile->out_cnt;

void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) );
fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt );
void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) );
fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile );
fd_filerd_run1( ctx, stream_ctx );
}

Expand Down
74 changes: 23 additions & 51 deletions src/discof/restore/fd_httpdl_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ fd_httpdl_init_from_stream_ctx( void * _ctx,
fd_httpdl_tile_t * ctx = fd_type_pun(_ctx);

/* join writer */
ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] );
ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] );
FD_TEST( ctx->writer );
fd_stream_writer_set_frag_sz_max( ctx->writer, HTTP_CHUNK_SZ );
}

__attribute__((noreturn)) FD_FN_UNUSED static void
fd_httpdl_shutdown( fd_httpdl_tile_t * ctx ) {
fd_snapshot_http_cleanup_fds( ctx->http );
FD_MGAUGE_SET( TILE, STATUS, 2UL );
fd_stream_writer_notify_shutdown( ctx->writer );
fd_stream_writer_close( ctx->writer );
FD_COMPILER_MFENCE();
FD_LOG_WARNING(("Done downloading snapshot"));

Expand All @@ -85,39 +86,23 @@ __attribute__((unused)) static void
after_credit_chunk( void * _ctx,
fd_stream_ctx_t * stream_ctx,
int * opt_poll_in FD_PARAM_UNUSED ) {
fd_httpdl_tile_t * ctx = fd_type_pun(_ctx);
fd_httpdl_tile_t * ctx = _ctx;
(void)stream_ctx;
ulong downloaded_sz = 0UL;

/* Don't do anything if backpressured */
if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) {
return;
}
/* Output */
uchar * const out = fd_stream_writer_prepare( ctx->writer );
uchar * const out_end = out + fd_stream_writer_publish_sz_max( ctx->writer );
uchar * out_cur = out;

for(;;) {
if( downloaded_sz >= HTTP_CHUNK_SZ ) {
fd_stream_writer_publish( ctx->writer, downloaded_sz );
break;
}
/* get write pointers into dcache buffer */
uchar * out = fd_stream_writer_get_write_ptr( ctx->writer );
ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer );
ulong sz = 0UL;

if( dst_max==0 ) {
fd_stream_writer_publish( ctx->writer, downloaded_sz );
break;
}

int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz );
while( out_cur<out_end ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh smart... so we can set a frag_sz_max for the stream writer and repeatedly fill up to frag_sz_max with calls to fd_io_istream_snapshot_http_read which is capped at the size of the HTTP resp buffer.

ulong chunk_sz;
int err = fd_io_istream_snapshot_http_read( ctx->http, out_cur, (ulong)out_cur-(ulong)out, &chunk_sz );
if( FD_UNLIKELY( err==1 ) ) fd_httpdl_shutdown( ctx );
else if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "http err: %d", err ));

if( sz ) {
fd_stream_writer_advance( ctx->writer, sz );
downloaded_sz += sz;
}
out_cur += chunk_sz;
}

fd_stream_writer_publish( ctx->writer, (ulong)out_cur-(ulong)out, 0UL );
}

__attribute__((unused)) static void
Expand All @@ -127,24 +112,16 @@ after_credit_stream( void * _ctx,
fd_httpdl_tile_t * ctx = fd_type_pun(_ctx);
(void)stream_ctx;

/* Don't do anything if backpressured */
if( FD_UNLIKELY( fd_stream_writer_is_backpressured( ctx->writer ) ) ) {
return;
}

/* get write pointers into dcache buffer */
uchar * out = fd_stream_writer_get_write_ptr( ctx->writer );
ulong dst_max = fd_stream_writer_get_avail_bytes( ctx->writer );
ulong sz = 0UL;
/* Output */
uchar * const out = fd_stream_writer_prepare( ctx->writer );
ulong const out_max = fd_stream_writer_publish_sz_max( ctx->writer );

int err = fd_io_istream_snapshot_http_read( ctx->http, out, dst_max, &sz );
ulong chunk_sz;
int err = fd_io_istream_snapshot_http_read( ctx->http, out, out_max, &chunk_sz );
if( FD_UNLIKELY( err==1 ) ) fd_httpdl_shutdown( ctx );
else if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "http err: %d", err ));

if( FD_LIKELY( sz ) ) {
fd_stream_writer_advance( ctx->writer, sz );
fd_stream_writer_publish( ctx->writer, sz );
}
fd_stream_writer_publish( ctx->writer, chunk_sz, 0UL );
}

__attribute__((noinline)) static void
Expand All @@ -168,11 +145,9 @@ static void
fd_httpdl_run( fd_topo_t * topo,
fd_topo_tile_t * tile ) {
fd_httpdl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
ulong in_cnt = fd_topo_tile_producer_cnt( topo, tile );
ulong out_cnt = tile->out_cnt;

void * ctx_mem = fd_alloca( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_scratch_footprint( in_cnt, out_cnt ) );
fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile, in_cnt, out_cnt );
void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) );
fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile );
FD_TEST( stream_ctx );
fd_httpdl_run1( ctx, stream_ctx );
}

Expand All @@ -186,6 +161,3 @@ fd_topo_run_tile_t fd_tile_snapshot_restore_HttpDl = {
};

#undef NAME



18 changes: 9 additions & 9 deletions src/discof/restore/fd_restore_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

union fd_stream_frag_meta {

struct {
struct {

ulong seq; /* frag sequence number */
uint sz;
ushort unused;
ushort ctl;
ulong seq; /* frag sequence number */
uint sz;
ushort unused;
ushort ctl;

ulong goff; /* stream offset */
ulong loff; /* dcache offset */
ulong goff; /* stream offset */
ulong loff; /* dcache offset */

};
};

fd_frag_meta_t f[1];
fd_frag_meta_t f[1];

};

Expand Down
Loading
Loading