Skip to content

Commit 7971521

Browse files
rename fd_stream_writer fields, add stream writer join api
1 parent 90fe1e6 commit 7971521

File tree

5 files changed

+41
-30
lines changed

5 files changed

+41
-30
lines changed

src/discof/restore/fd_filerd_tile.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ fd_filerd_init_from_stream_ctx( void * _ctx,
5252
fd_stream_ctx_t * stream_ctx ) {
5353
fd_filerd_tile_t * ctx = fd_type_pun(_ctx);
5454

55-
/* TODO: this should be a join */
56-
ctx->writer = &stream_ctx->writers[0];
57-
fd_stream_writer_set_read_max( ctx->writer, FILE_READ_MAX );
55+
ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] );
56+
fd_stream_writer_set_frag_sz_max( ctx->writer, FILE_READ_MAX );
5857
}
5958

6059
__attribute__((noreturn)) FD_FN_UNUSED static void

src/discof/restore/fd_httpdl_tile.c

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,9 @@ fd_httpdl_init_from_stream_ctx( void * _ctx,
6565
fd_stream_ctx_t * stream_ctx ) {
6666
fd_httpdl_tile_t * ctx = fd_type_pun(_ctx);
6767

68-
/* There's only one writer. Since fd_stream_ctx_t owns the
69-
stream writer, we just assign the pointer here. */
70-
ctx->writer = &stream_ctx->writers[0];
71-
fd_stream_writer_set_read_max( ctx->writer, HTTP_CHUNK_SZ );
68+
/* join writer */
69+
ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] );
70+
fd_stream_writer_set_frag_sz_max( ctx->writer, HTTP_CHUNK_SZ );
7271
}
7372

7473
__attribute__((noreturn)) FD_FN_UNUSED static void

src/discof/restore/fd_unzstd_tile.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ fd_unzstd_init_from_stream_ctx( void * _ctx,
5959
fd_stream_ctx_t * stream_ctx ) {
6060
fd_unzstd_tile_t * ctx = fd_type_pun(_ctx);
6161

62-
/* There's only one writer */
63-
ctx->writer = &stream_ctx->writers[0];
64-
fd_stream_writer_set_read_max( ctx->writer, ZSTD_FRAME_SZ );
62+
/* join writer */
63+
ctx->writer = fd_stream_writer_join( &stream_ctx->writers[0] );
64+
fd_stream_writer_set_frag_sz_max( ctx->writer, ZSTD_FRAME_SZ );
6565
}
6666

6767
__attribute__((noreturn)) static void

src/discof/restore/stream/fd_stream_writer.c

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,17 @@ fd_stream_writer_new( void * mem,
2727
fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ link_id ] ].mcache );
2828
ulong cons_cnt = fd_topo_link_reliable_consumer_cnt( topo, link );
2929

30-
self->out_mcache = out_mcache;
31-
self->buf = dcache;
32-
self->buf_base = (ulong)dcache - (ulong)fd_wksp_containing( dcache );
33-
self->buf_off = 0UL;
34-
self->buf_sz = fd_dcache_data_sz( dcache );
35-
self->goff = 0UL;
36-
self->read_max = 0UL; /* this should be set by the tile via fd_stream_writer_set_read_max */
37-
self->stream_off = 0UL;
38-
self->goff_start = 0UL;
39-
self->out_seq = 0UL;
30+
self->magic = FD_STREAM_WRITER_MAGIC;
31+
self->out_mcache = out_mcache;
32+
self->dcache = dcache;
33+
self->base = (ulong)dcache - (ulong)fd_wksp_containing( dcache );
34+
self->buf_off = 0UL;
35+
self->buf_sz = fd_dcache_data_sz( dcache );
36+
self->goff = 0UL;
37+
self->frag_sz_max = 0UL; /* this should be set by the tile via fd_stream_writer_set_frag_sz_max */
38+
self->stream_off = 0UL;
39+
self->goff_start = 0UL;
40+
self->out_seq = 0UL;
4041

4142
/* Set up flow control state */
4243
self->cr_byte_avail = 0UL;

src/discof/restore/stream/fd_stream_writer.h

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@
88
/* A shared stream has a single producer and multiple consumers.
99
fd_stream_writer implements the producer APIs of the shared stream */
1010
struct fd_stream_writer {
11+
ulong magic; /* magic */
1112
fd_stream_frag_meta_t * out_mcache; /* frag producer mcache */
1213

13-
uchar * buf; /* laddr of shared dcache buffer */
14-
ulong buf_base; /* offset to the dcache buffer from wksp */
14+
uchar * dcache; /* laddr of shared dcache buffer */
15+
ulong base; /* offset to the dcache buffer from wksp */
1516

1617
/* dcache buffer state */
1718
ulong buf_off; /* local write offset into dcache buffer */
1819
ulong buf_sz; /* dcache buffer size */
1920
ulong goff; /* global offset into byte stream */
20-
ulong read_max; /* max chunk size */
21+
ulong frag_sz_max; /* max frag size (controls the size of a single write into dcache)*/
2122
ulong stream_off; /* start of published stream */
2223
ulong goff_start; /* start of goff in stream */
2324
ulong out_seq; /* current sequence number */
@@ -36,6 +37,7 @@ struct fd_stream_writer {
3637
};
3738
typedef struct fd_stream_writer fd_stream_writer_t;
3839

40+
#define FD_STREAM_WRITER_MAGIC (0xFD57337717E736C0UL)
3941
#define EXPECTED_FSEQ_CNT_PER_CONS 2
4042

4143
FD_PROTOTYPES_BEGIN
@@ -50,9 +52,19 @@ fd_stream_writer_footprint( void ) {
5052
return sizeof(fd_stream_writer_t);
5153
}
5254

55+
static inline fd_stream_writer_t *
56+
fd_stream_writer_join( void * _writer ) {
57+
fd_stream_writer_t * writer = (fd_stream_writer_t *)_writer;
58+
if( FD_UNLIKELY( !writer ) ) return NULL;
59+
if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)writer, fd_stream_writer_align() ) ) ) return NULL;
60+
if( FD_UNLIKELY( writer->magic!=FD_STREAM_WRITER_MAGIC ) ) return NULL;
61+
62+
return writer;
63+
}
64+
5365
static inline uchar *
5466
fd_stream_writer_get_write_ptr( fd_stream_writer_t * writer ) {
55-
return writer->buf + writer->buf_off;
67+
return writer->dcache + writer->buf_off;
5668
}
5769

5870
fd_stream_writer_t *
@@ -72,9 +84,9 @@ fd_stream_writer_init_flow_control_credits( fd_stream_writer_t * writer ) {
7284
}
7385

7486
static inline void
75-
fd_stream_writer_set_read_max( fd_stream_writer_t * writer,
76-
ulong read_max ) {
77-
writer->read_max = read_max;
87+
fd_stream_writer_set_frag_sz_max( fd_stream_writer_t * writer,
88+
ulong frag_sz_max ) {
89+
writer->frag_sz_max = frag_sz_max;
7890
}
7991

8092
static inline void
@@ -113,14 +125,14 @@ fd_stream_writer_get_avail_bytes( fd_stream_writer_t * writer ) {
113125
return 0;
114126
}
115127

116-
ulong const read_max = fd_ulong_min( writer->cr_byte_avail, writer->read_max );
117-
return fd_ulong_min( read_max, writer->buf_sz - writer->buf_off );
128+
ulong const frag_sz_max = fd_ulong_min( writer->cr_byte_avail, writer->frag_sz_max );
129+
return fd_ulong_min( frag_sz_max, writer->buf_sz - writer->buf_off );
118130
}
119131

120132
static inline void
121133
fd_stream_writer_publish( fd_stream_writer_t * writer,
122134
ulong frag_sz ) {
123-
ulong loff = writer->buf_base + writer->stream_off;
135+
ulong loff = writer->base + writer->stream_off;
124136
fd_mcache_publish_stream( writer->out_mcache,
125137
fd_mcache_depth( writer->out_mcache->f ),
126138
writer->out_seq,

0 commit comments

Comments
 (0)