Skip to content

Commit ec0003d

Browse files
committed
Merge remote-tracking branch 'origin/ripatel/snapshot-tiles-wip' into ripatel/snapshot-restore-beltsanding
2 parents b019984 + 7971521 commit ec0003d

8 files changed

+130
-292
lines changed

src/discof/restore/fd_filerd_tile.c

+46-256
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "fd_restore_base.h"
2+
#include "stream/fd_stream_ctx.h"
23
#include "../../disco/topo/fd_topo.h"
34
#include "../../disco/metrics/fd_metrics.h"
45
#include <errno.h>
@@ -7,18 +8,11 @@
78
#include <unistd.h>
89

910
#define NAME "FileRd"
11+
#define FILE_READ_MAX 8UL<<20
1012

1113
struct fd_filerd_tile {
12-
int fd;
13-
14-
uchar * buf; /* dcache */
15-
ulong buf_base;
16-
ulong buf_off;
17-
ulong buf_sz;
18-
ulong goff;
19-
ulong read_max;
20-
21-
ulong * out_sync; /* mcache seq sync */
14+
fd_stream_writer_t * writer;
15+
int fd;
2216
};
2317

2418
typedef struct fd_filerd_tile fd_filerd_tile_t;
@@ -48,293 +42,89 @@ privileged_init( fd_topo_t * topo,
4842
static void
4943
unprivileged_init( fd_topo_t * topo,
5044
fd_topo_tile_t * tile ) {
51-
fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
52-
45+
(void)topo;
5346
if( FD_UNLIKELY( tile->in_cnt !=0UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 0", tile->in_cnt ));
5447
if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
55-
56-
void * out_dcache = fd_dcache_join( fd_topo_obj_laddr( topo, topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ) );
57-
FD_TEST( out_dcache );
58-
59-
ctx->buf = out_dcache;
60-
ctx->buf_base = (ulong)out_dcache - (ulong)fd_wksp_containing( out_dcache );
61-
ctx->buf_off = 0UL;
62-
ctx->buf_sz = fd_dcache_data_sz( out_dcache );
63-
ctx->goff = 0UL;
64-
ctx->read_max = (8UL<<20);
65-
ctx->out_sync = fd_mcache_seq_laddr( topo->links[ tile->out_link_id[ 0 ] ].mcache );
6648
}
6749

6850
static void
69-
during_housekeeping( fd_filerd_tile_t * ctx ) {
70-
(void)ctx;
71-
}
72-
73-
static void
74-
metrics_write( fd_filerd_tile_t * ctx ) {
75-
(void)ctx;
51+
fd_filerd_init_from_stream_ctx( void * _ctx,
52+
fd_stream_ctx_t * stream_ctx ) {
53+
fd_filerd_tile_t * ctx = _ctx;
54+
ctx->writer = fd_stream_writer_join( stream_ctx->writers[0] );
55+
FD_TEST( ctx->writer );
56+
fd_stream_writer_set_frag_sz_max( ctx->writer, FILE_READ_MAX );
7657
}
7758

7859
__attribute__((noreturn)) FD_FN_UNUSED static void
79-
fd_filerd_shutdown( fd_filerd_tile_t * ctx,
80-
ulong seq_final ) {
60+
fd_filerd_shutdown( fd_filerd_tile_t * ctx ) {
8161
if( FD_UNLIKELY( close( ctx->fd ) ) ) {
8262
FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
8363
}
8464
ctx->fd = -1;
8565
FD_MGAUGE_SET( TILE, STATUS, 2UL );
86-
FD_VOLATILE( ctx->out_sync[ 2 ] ) = seq_final;
66+
fd_stream_writer_close( ctx->writer );
8767
FD_COMPILER_MFENCE();
8868
FD_LOG_INFO(( "Reached end of file" ));
8969

9070
for(;;) pause();
9171
}
9272

9373
static void
94-
after_credit( fd_filerd_tile_t * ctx,
95-
fd_stream_frag_meta_t * out_mcache,
96-
ulong const out_depth,
97-
ulong * restrict out_seq,
98-
ulong * restrict cr_frag_avail,
99-
ulong * restrict cr_byte_avail,
100-
int * restrict charge_busy_after ) {
101-
/* Assumes *cr_frag_avail>=2 */
102-
int fd = ctx->fd;
103-
if( FD_UNLIKELY( fd<0 ) ) return;
74+
after_credit( void * _ctx,
75+
fd_stream_ctx_t * stream_ctx,
76+
int * poll_in FD_PARAM_UNUSED ) {
77+
fd_filerd_tile_t * ctx = _ctx;
78+
(void)stream_ctx;
10479

105-
if( FD_UNLIKELY( ctx->buf_off >= ctx->buf_sz ) ) {
106-
FD_LOG_CRIT(( "Buffer overflow (buf_off=%lu buf_sz=%lu)", ctx->buf_off, ctx->buf_sz ));
107-
}
80+
uchar * out = fd_stream_writer_prepare( ctx->writer );
81+
ulong out_max = fd_stream_writer_publish_sz_max( ctx->writer );
10882

109-
ulong const read_max = fd_ulong_min( *cr_byte_avail, ctx->read_max );
110-
ulong const read_sz = fd_ulong_min( read_max, ctx->buf_sz - ctx->buf_off );
83+
/* technically, this is not needed because fd_stream_ctx_run_loop
84+
checks for backpresure on all outgoing links and there is only one
85+
outgoing link anyways. But, it is added for clarity that
86+
callbacks should handle backpressure for their out links. */
87+
if( FD_UNLIKELY( !out_max ) ) return;
88+
89+
int fd = ctx->fd;
90+
if( FD_UNLIKELY( fd<0 ) ) return;
11191

112-
long res = read( fd, ctx->buf + ctx->buf_off, read_sz );
92+
long res = read( fd, out, out_max );
11393
if( FD_UNLIKELY( res<=0L ) ) {
11494
if( FD_UNLIKELY( res==0 ) ) {
115-
fd_filerd_shutdown( ctx, out_seq[0] );
95+
fd_filerd_shutdown( ctx );
11696
return;
11797
}
11898
if( FD_LIKELY( errno==EAGAIN ) ) return;
11999
FD_LOG_ERR(( "readv() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
120100
/* aborts app */
121101
}
122102

123-
ulong sz = (ulong)res;
124-
cr_byte_avail[0] -= sz;
125-
*charge_busy_after = 1;
126-
127-
ulong frag_sz = fd_ulong_min( read_sz, sz );
128-
129-
ulong loff = ctx->buf_base + ctx->buf_off;
130-
fd_mcache_publish_stream( out_mcache, out_depth, out_seq[0], ctx->goff, loff, frag_sz, 0 );
131-
out_seq[0] = fd_seq_inc( out_seq[0], 1UL );
132-
cr_frag_avail[0]--;
133-
ctx->goff += frag_sz;
134-
ctx->buf_off += frag_sz;
135-
if( ctx->buf_off >= ctx->buf_sz ) ctx->buf_off = 0UL; /* cmov */
103+
fd_stream_writer_publish( ctx->writer, (ulong)res, 0UL );
136104
}
137105

138-
/* run/run1 are a custom run loop based on fd_stem.c. */
139-
140106
__attribute__((noinline)) static void
141-
fd_filerd_run1(
142-
fd_filerd_tile_t * ctx,
143-
fd_stream_frag_meta_t * out_mcache,
144-
void * out_dcache,
145-
ulong cons_cnt,
146-
ushort * restrict event_map, /* cnt=1+cons_cnt */
147-
ulong ** restrict cons_fseq, /* cnt= cons_cnt points to each consumer's fseq */
148-
ulong volatile ** restrict cons_slow, /* cnt= cons_cnt points to 'slow' metrics */
149-
ulong * restrict cons_seq, /* cnt=2*cons_cnt cache of recent fseq observations */
150-
long lazy,
151-
fd_rng_t * rng
152-
) {
153-
154-
/* out flow control state */
155-
ulong cr_byte_avail; /* byte burst quota */
156-
ulong cr_frag_avail; /* frag burst quota */
157-
158-
/* housekeeping state */
159-
ulong event_cnt;
160-
ulong event_seq;
161-
ulong async_min; /* min number of ticks between a housekeeping event */
162-
163-
/* performance metrics */
164-
ulong metric_in_backp;
165-
ulong metric_backp_cnt;
166-
ulong metric_regime_ticks[9];
167-
168-
metric_in_backp = 1UL;
169-
metric_backp_cnt = 0UL;
170-
memset( metric_regime_ticks, 0, sizeof( metric_regime_ticks ) );
171-
172-
/* out frag stream init */
173-
174-
cr_byte_avail = 0UL;
175-
cr_frag_avail = 0UL;
176-
177-
ulong const out_depth = fd_mcache_depth( out_mcache->f );
178-
ulong out_seq = 0UL;
179-
180-
ulong const out_bufsz = fd_dcache_data_sz( out_dcache );
181-
182-
ulong const cr_byte_max = out_bufsz;
183-
ulong const cr_frag_max = out_depth;
184-
185-
ulong const burst_byte = 512UL; /* don't producing frags smaller than this */
186-
ulong const burst_frag = 2UL;
187-
188-
for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
189-
if( FD_UNLIKELY( !cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx ));
190-
cons_slow[ cons_idx ] = fd_metrics_link_out( fd_metrics_base_tl, cons_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF;
191-
cons_seq [ 2*cons_idx ] = FD_VOLATILE_CONST( cons_fseq[ cons_idx ][0] );
192-
cons_seq [ 2*cons_idx+1 ] = FD_VOLATILE_CONST( cons_fseq[ cons_idx ][1] );
193-
}
194-
195-
/* housekeeping init */
196-
197-
//if( lazy<=0L ) lazy = fd_tempo_lazy_default( out_depth );
198-
lazy = 1e3L;
199-
FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
200-
201-
/* Initial event sequence */
202-
203-
event_cnt = 1UL + cons_cnt;
204-
event_map[ 0 ] = (ushort)cons_cnt;
205-
for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
206-
event_map[ 1+cons_idx ] = (ushort)cons_idx;
207-
}
208-
event_seq = 0UL;
209-
210-
async_min = fd_tempo_async_min( lazy, event_cnt, (float)fd_tempo_tick_per_ns( NULL ) );
211-
if( FD_UNLIKELY( !async_min ) ) FD_LOG_ERR(( "bad lazy %lu %lu", (ulong)lazy, event_cnt ));
212-
213-
FD_LOG_INFO(( "Running file reader" ));
214-
FD_MGAUGE_SET( TILE, STATUS, 1UL );
215-
long then = fd_tickcount();
216-
long now = then;
217-
for(;;) {
218-
219-
/* Do housekeeping at a low rate in the background */
220-
ulong housekeeping_ticks = 0UL;
221-
if( FD_UNLIKELY( (now-then)>=0L ) ) {
222-
ulong event_idx = (ulong)event_map[ event_seq ];
223-
224-
if( FD_LIKELY( event_idx<cons_cnt ) ) {
225-
ulong cons_idx = event_idx;
226-
227-
/* Receive flow control credits from this out. */
228-
FD_COMPILER_MFENCE();
229-
cons_seq[ 2*cons_idx ] = FD_VOLATILE_CONST( cons_fseq[ cons_idx ][0] );
230-
cons_seq[ 2*cons_idx+1 ] = FD_VOLATILE_CONST( cons_fseq[ cons_idx ][1] );
231-
FD_COMPILER_MFENCE();
232-
233-
} else { /* event_idx==cons_cnt, housekeeping event */
234-
235-
/* Update metrics counters to external viewers */
236-
FD_COMPILER_MFENCE();
237-
FD_MGAUGE_SET( TILE, HEARTBEAT, (ulong)now );
238-
FD_MGAUGE_SET( TILE, IN_BACKPRESSURE, metric_in_backp );
239-
FD_MCNT_INC ( TILE, BACKPRESSURE_COUNT, metric_backp_cnt );
240-
FD_MCNT_ENUM_COPY( TILE, REGIME_DURATION_NANOS, metric_regime_ticks );
241-
metrics_write( ctx );
242-
FD_COMPILER_MFENCE();
243-
metric_backp_cnt = 0UL;
244-
245-
/* Receive flow control credits */
246-
if( FD_LIKELY( cr_byte_avail<cr_byte_max || cr_frag_avail<cr_frag_max ) ) {
247-
ulong slowest_cons = ULONG_MAX;
248-
cr_frag_avail = cr_frag_max;
249-
cr_byte_avail = cr_byte_max;
250-
for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
251-
ulong cons_cr_frag_avail = (ulong)fd_long_max( (long)cr_frag_max-fd_long_max( fd_seq_diff( out_seq, cons_seq[ 2*cons_idx ] ), 0L ), 0L );
252-
ulong cons_cr_byte_avail = (ulong)fd_long_max( (long)cr_byte_max-fd_long_max( fd_seq_diff( ctx->goff, cons_seq[ 2*cons_idx+1 ] ), 0L ), 0L );
253-
slowest_cons = fd_ulong_if( cons_cr_byte_avail<cr_byte_avail, cons_idx, slowest_cons );
254-
cr_frag_avail = fd_ulong_min( cons_cr_frag_avail, cr_frag_avail );
255-
cr_byte_avail = fd_ulong_min( cons_cr_byte_avail, cr_byte_avail );
256-
}
257-
258-
if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) {
259-
FD_COMPILER_MFENCE();
260-
(*cons_slow[ slowest_cons ]) += metric_in_backp;
261-
FD_COMPILER_MFENCE();
262-
}
263-
}
264-
265-
during_housekeeping( ctx );
266-
}
267-
268-
event_seq++;
269-
if( FD_UNLIKELY( event_seq>=event_cnt ) ) {
270-
event_seq = 0UL;
271-
ulong swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)event_cnt );
272-
ushort map_tmp = event_map[ swap_idx ];
273-
event_map[ swap_idx ] = event_map[ 0 ];
274-
event_map[ 0 ] = map_tmp;
275-
}
276-
277-
/* Reload housekeeping timer */
278-
then = now + (long)fd_tempo_async_reload( rng, async_min );
279-
long next = fd_tickcount();
280-
housekeeping_ticks = (ulong)(next - now);
281-
now = next;
282-
}
283-
284-
/* Check if we are backpressured. */
285-
286-
if( FD_UNLIKELY( cr_byte_avail<burst_byte || cr_frag_avail<burst_frag ) ) {
287-
metric_backp_cnt += (ulong)!metric_in_backp;
288-
metric_in_backp = 1UL + (cr_byte_avail<burst_byte);
289-
FD_SPIN_PAUSE();
290-
metric_regime_ticks[2] += housekeeping_ticks;
291-
long next = fd_tickcount();
292-
metric_regime_ticks[5] += (ulong)(next - now);
293-
now = next;
294-
continue;
295-
}
296-
metric_in_backp = 0UL;
297-
298-
int charge_busy_after = 0;
299-
after_credit( ctx, out_mcache, out_depth, &out_seq, &cr_frag_avail, &cr_byte_avail, &charge_busy_after );
300-
301-
metric_regime_ticks[1] += housekeeping_ticks;
302-
long next = fd_tickcount();
303-
metric_regime_ticks[4] += (ulong)(next - now);
304-
now = next;
305-
continue;
306-
}
107+
fd_filerd_run1( fd_filerd_tile_t * ctx,
108+
fd_stream_ctx_t * stream_ctx ) {
109+
FD_LOG_INFO(( "Running filerd tile" ));
110+
111+
fd_stream_ctx_run( stream_ctx,
112+
ctx,
113+
fd_filerd_init_from_stream_ctx,
114+
NULL,
115+
NULL,
116+
NULL,
117+
after_credit,
118+
NULL );
307119
}
308120

309121
static void
310122
fd_filerd_run( fd_topo_t * topo,
311123
fd_topo_tile_t * tile ) {
312-
fd_stream_frag_meta_t * out_mcache = fd_type_pun( topo->links[ tile->out_link_id[ 0 ] ].mcache );
313-
FD_TEST( out_mcache );
314-
315-
ulong reliable_cons_cnt = 0UL;
316-
ulong * cons_fseq[ FD_TOPO_MAX_LINKS ];
317-
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
318-
fd_topo_tile_t * consumer_tile = &topo->tiles[ i ];
319-
for( ulong j=0UL; j<consumer_tile->in_cnt; j++ ) {
320-
if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[0] && consumer_tile->in_link_reliable[ j ] ) ) {
321-
cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ];
322-
FD_TEST( cons_fseq[ reliable_cons_cnt ] );
323-
reliable_cons_cnt++;
324-
FD_TEST( reliable_cons_cnt<FD_TOPO_MAX_LINKS );
325-
}
326-
}
327-
}
328-
329-
/* FIXME rng seed should not be zero */
330-
fd_rng_t rng[1];
331-
FD_TEST( fd_rng_join( fd_rng_new( rng, 0, 0UL ) ) );
332-
333124
fd_filerd_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
334-
ushort event_map[ 1+reliable_cons_cnt ];
335-
ulong volatile * cons_slow[ reliable_cons_cnt ];
336-
ulong cons_seq [ 2*reliable_cons_cnt ];
337-
fd_filerd_run1( ctx, out_mcache, ctx->buf, reliable_cons_cnt, event_map, cons_fseq, cons_slow, cons_seq, (ulong)10e3, rng );
125+
void * ctx_mem = fd_alloca_check( FD_STEM_SCRATCH_ALIGN, fd_stream_ctx_footprint( topo, tile ) );
126+
fd_stream_ctx_t * stream_ctx = fd_stream_ctx_new( ctx_mem, topo, tile );
127+
fd_filerd_run1( ctx, stream_ctx );
338128
}
339129

340130
fd_topo_run_tile_t fd_tile_snapshot_restore_FileRd = {

0 commit comments

Comments
 (0)