From 1849928362f21ec315c15e0aea1ff8b4e92bd021 Mon Sep 17 00:00:00 2001 From: Aman Khinvasara Date: Mon, 7 Apr 2025 20:36:59 +0000 Subject: [PATCH] sender: landing UDP votes --- book/api/metrics-generated.md | 8 + src/app/firedancer-dev/commands/backtest.c | 14 +- src/app/firedancer-dev/config/default.toml | 2 +- src/app/firedancer-dev/config/private.toml | 2 +- src/app/firedancer/topology.c | 107 +++-- src/disco/dedup/fd_dedup_tile.c | 13 +- src/disco/metrics/generate/types.py | 3 +- src/disco/metrics/generated/fd_metrics_all.c | 3 + src/disco/metrics/generated/fd_metrics_all.h | 3 +- .../metrics/generated/fd_metrics_sender.c | 9 + .../metrics/generated/fd_metrics_sender.h | 31 ++ src/disco/metrics/metrics.xml | 7 + src/disco/sign/fd_sign_tile.c | 4 +- src/disco/verify/fd_verify_tile.c | 8 +- src/discof/eqvoc/fd_eqvoc_tile.c | 17 +- src/discof/gossip/fd_gossip_tile.c | 70 +-- src/discof/replay/fd_replay_tile.c | 26 +- src/discof/sender/Local.mk | 2 +- src/discof/sender/fd_sender_tile.c | 402 ++++++++---------- 19 files changed, 374 insertions(+), 357 deletions(-) create mode 100644 src/disco/metrics/generated/fd_metrics_sender.c create mode 100644 src/disco/metrics/generated/fd_metrics_sender.h diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md index 74629cab1a..f5323e7a57 100644 --- a/book/api/metrics-generated.md +++ b/book/api/metrics-generated.md @@ -584,3 +584,11 @@ | repair_​sent_​pkt_​types_​needed_​window | `counter` | What types of client messages are we sending (Need Window) | | repair_​sent_​pkt_​types_​needed_​highest_​window | `counter` | What types of client messages are we sending (Need Highest Window) | | repair_​sent_​pkt_​types_​needed_​orphan | `counter` | What types of client messages are we sending (Need Orphans) | + +## Sender Tile +| Metric | Type | Description | +|--------|------|-------------| +| sender_​txns_​sent_​to_​leader | `counter` | Total count of transactions sent to leader | +| sender_​leader_​sched_​not_​found | `counter` | Total count of times leader schedule not found | +| sender_​leader_​not_​found | `counter` | Total count of times leader not found for given slot | +| sender_​leader_​contact_​not_​found | `counter` | Total count of times leader contact info not found | diff --git a/src/app/firedancer-dev/commands/backtest.c b/src/app/firedancer-dev/commands/backtest.c index aa3207fa5a..7ff9f21b6f 100644 --- a/src/app/firedancer-dev/commands/backtest.c +++ b/src/app/firedancer-dev/commands/backtest.c @@ -280,20 +280,20 @@ backtest_topo( config_t * config ) { /* Setup replay->stake/sender/poh links in topo w/o consumers */ /**********************************************************************/ fd_topob_wksp( topo, "stake_out" ); - fd_topob_wksp( topo, "replay_voter" ); + fd_topob_wksp( topo, "replay_send" ); fd_topob_wksp( topo, "replay_poh" ); - fd_topob_link( topo, "stake_out", "stake_out", 128UL, 40UL + 40200UL * 40UL, 1UL ); - fd_topob_link( topo, "replay_voter", "replay_voter", 128UL, sizeof(fd_txn_p_t), 1UL ); + fd_topob_link( topo, "stake_out", "stake_out", 128UL, 40UL + 40200UL * 40UL, 1UL ); + fd_topob_link( topo, "replay_send", "replay_send", 128UL, sizeof(fd_txn_p_t), 1UL ); ulong bank_tile_cnt = config->layout.bank_tile_count; FOR(bank_tile_cnt) fd_topob_link( topo, "replay_poh", "replay_poh", 128UL, (4096UL*sizeof(fd_txn_p_t))+sizeof(fd_microblock_trailer_t), 1UL ); - fd_topob_tile_out( topo, "replay", 0UL, "stake_out", 0UL ); - fd_topob_tile_out( topo, "replay", 0UL, "replay_voter", 0UL ); + fd_topob_tile_out( topo, "replay", 0UL, "stake_out", 0UL ); + fd_topob_tile_out( topo, "replay", 0UL, "replay_send", 0UL ); FOR(bank_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_poh", i ); - topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "stake_out", 0 ) ] ].permit_no_consumers = 1; - topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "replay_voter", 0 ) ] ].permit_no_consumers = 1; + topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "stake_out", 0 ) ] ].permit_no_consumers = 1; + topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "replay_send", 0 ) ] ].permit_no_consumers = 1; FOR(bank_tile_cnt) topo->links[ replay_tile->out_link_id[ fd_topo_find_tile_out_link( topo, replay_tile, "replay_poh", i ) ] ].permit_no_consumers = 1; /**********************************************************************/ diff --git a/src/app/firedancer-dev/config/default.toml b/src/app/firedancer-dev/config/default.toml index d7b59ef367..d68775c982 100644 --- a/src/app/firedancer-dev/config/default.toml +++ b/src/app/firedancer-dev/config/default.toml @@ -32,7 +32,7 @@ prometheus_listen_address = "0.0.0.0" prometheus_listen_port = 7999 [consensus] - vote = false + vote = true expected_shred_version = 64475 [paths] identity_key = "{keys}/fd-identity-keypair.json" diff --git a/src/app/firedancer-dev/config/private.toml b/src/app/firedancer-dev/config/private.toml index aaa5f084c5..225ac2b5cc 100644 --- a/src/app/firedancer-dev/config/private.toml +++ b/src/app/firedancer-dev/config/private.toml @@ -26,7 +26,7 @@ prometheus_listen_address = "0.0.0.0" prometheus_listen_port = 7999 [consensus] - vote = false + vote = true expected_shred_version = 20896 [paths] identity_key = "{keys}/fd-identity-keypair.json" diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index 39c4acfe5b..bd4fc19dd1 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -224,7 +224,7 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "net_gossip" ); fd_topob_wksp( topo, "net_repair" ); fd_topob_wksp( topo, "net_quic" ); - fd_topob_wksp( topo, "net_voter" ); + fd_topob_wksp( topo, "net_send" ); fd_topob_wksp( topo, "quic_verify" ); fd_topob_wksp( topo, "verify_dedup" ); @@ -249,8 +249,8 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "replay_wtr" ); fd_topob_wksp( topo, "exec_writer" ); - fd_topob_wksp( topo, "voter_sign" ); - fd_topob_wksp( topo, "sign_voter" ); + fd_topob_wksp( topo, "send_sign" ); + fd_topob_wksp( topo, "sign_send" ); fd_topob_wksp( topo, "crds_shred" ); fd_topob_wksp( topo, "gossip_repai" ); @@ -269,10 +269,9 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "bank_busy" ); fd_topob_wksp( topo, "root_slot" ); fd_topob_wksp( topo, "pack_replay" ); - fd_topob_wksp( topo, "replay_voter" ); - fd_topob_wksp( topo, "gossip_voter" ); - fd_topob_wksp( topo, "voter_gossip" ); - fd_topob_wksp( topo, "voter_dedup" ); + fd_topob_wksp( topo, "replay_send" ); + fd_topob_wksp( topo, "gossip_send" ); + fd_topob_wksp( topo, "send_txns" ); fd_topob_wksp( topo, "batch_replay" ); if( enable_rstart ) { @@ -299,8 +298,8 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "writer" ); fd_topob_wksp( topo, "bstore" ); fd_topob_wksp( topo, "tcache" ); - fd_topob_wksp( topo, "poh" ); - fd_topob_wksp( topo, "voter" ); + fd_topob_wksp( topo, "poh" ); + fd_topob_wksp( topo, "sender" ); fd_topob_wksp( topo, "poh_slot" ); fd_topob_wksp( topo, "turb_slot" ); fd_topob_wksp( topo, "eqvoc" ); @@ -359,11 +358,10 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "crds_shred", "crds_shred", 128UL, 8UL + 40200UL * 38UL, 1UL ); /**/ fd_topob_link( topo, "gossip_repai", "gossip_repai", 128UL, 40200UL * 38UL, 1UL ); - /**/ fd_topob_link( topo, "gossip_voter", "gossip_voter", 128UL, 40200UL * 38UL, 1UL ); + /**/ fd_topob_link( topo, "gossip_send", "gossip_send", 128UL, 40200UL * 38UL, 1UL ); /**/ fd_topob_link( topo, "gossip_net", "net_gossip", config->net.ingress_buffer_size, FD_NET_MTU, 1UL ); - /**/ fd_topob_link( topo, "voter_net", "net_voter", config->net.ingress_buffer_size, FD_NET_MTU, 1UL ); - /**/ fd_topob_link( topo, "voter_dedup", "voter_dedup", 128UL, FD_TPU_MTU, 1UL ); + /**/ fd_topob_link( topo, "send_net", "net_send", config->net.ingress_buffer_size, FD_NET_MTU, 2UL ); /**/ fd_topob_link( topo, "store_repair", "store_repair", 1024UL, USHORT_MAX, 16UL ); /**/ fd_topob_link( topo, "repair_store", "repair_store", 1024UL*1024UL, FD_SHRED_MAX_SZ, 128UL ); @@ -380,10 +378,10 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "pack_replay", "pack_replay", 65536UL, USHORT_MAX, 1UL ); /**/ fd_topob_link( topo, "poh_pack", "replay_poh", 128UL, sizeof(fd_became_leader_t) , 1UL ); - /**/ fd_topob_link( topo, "replay_voter", "replay_voter", 128UL, sizeof(fd_txn_p_t), 1UL ); - /**/ fd_topob_link( topo, "voter_gossip", "voter_gossip", 128UL, FD_TXN_MTU, 1UL ); - /**/ fd_topob_link( topo, "voter_sign", "voter_sign", 128UL, FD_TXN_MTU, 1UL ); - /**/ fd_topob_link( topo, "sign_voter", "sign_voter", 128UL, 64UL, 1UL ); + /**/ fd_topob_link( topo, "replay_send", "replay_send", 128UL, sizeof(fd_txn_p_t), 1UL ); + /**/ fd_topob_link( topo, "send_txns", "send_txns", 128UL, FD_TXN_MTU, 1UL ); + /**/ fd_topob_link( topo, "send_sign", "send_sign", 128UL, FD_TXN_MTU, 1UL ); + /**/ fd_topob_link( topo, "sign_send", "sign_send", 128UL, 64UL, 1UL ); /**/ fd_topob_link( topo, "batch_replay", "batch_replay", 128UL, 32UL, 1UL ); @@ -441,7 +439,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile( topo, "poh", "poh", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); /**/ fd_topob_tile( topo, "gossip", "gossip", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); fd_topo_tile_t * repair_tile = fd_topob_tile( topo, "repair", "repair", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); - /**/ fd_topob_tile( topo, "sender", "voter", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); + /**/ fd_topob_tile( topo, "sender", "sender", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "eqvoc", "eqvoc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); fd_topo_tile_t * replay_tile = fd_topob_tile( topo, "replay", "replay", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); @@ -586,10 +584,11 @@ fd_topo_initialize( config_t * config ) { FOR(quic_tile_cnt) fd_topob_tile_out( topo, "quic", i, "quic_net", i ); /* All verify tiles read from all QUIC tiles, packets are round robin. */ FOR(verify_tile_cnt) for( ulong j=0UL; jin[ in_idx ].mem, chunk ); uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk ); - if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP || ctx->in_kind[ in_idx ]==IN_KIND_VOTER ) ) { - if( FD_UNLIKELY( sz>FD_TPU_MTU ) ) FD_LOG_ERR(( "received a gossip or voter transaction that was too large" )); + if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) { + if( FD_UNLIKELY( sz>FD_TPU_MTU ) ) FD_LOG_ERR(( "received a gossip transaction that was too large" )); fd_txn_m_t * txnm = (fd_txn_m_t *)dst; txnm->payload_sz = (ushort)sz; @@ -161,7 +160,7 @@ after_frag( fd_dedup_ctx_t * ctx, return; } - if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP || ctx->in_kind[ in_idx]==IN_KIND_VOTER ) ) { + if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) { /* Transactions coming in from these links are not parsed. We'll need to parse it so it's ready for downstream consumers. @@ -170,7 +169,7 @@ after_frag( fd_dedup_ctx_t * ctx, txnm->txn_t_sz = (ushort)fd_txn_parse( fd_txn_m_payload( txnm ), txnm->payload_sz, txn, NULL ); if( FD_UNLIKELY( !txnm->txn_t_sz ) ) FD_LOG_ERR(( "fd_txn_parse failed for vote transactions that should have been sigverified" )); - if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) FD_MCNT_INC( DEDUP, GOSSIPED_VOTES_RECEIVED, 1UL ); + FD_MCNT_INC( DEDUP, GOSSIPED_VOTES_RECEIVED, 1UL ); } int is_dup = 0; @@ -251,8 +250,6 @@ unprivileged_init( fd_topo_t * topo, if( !strcmp( link->name, "gossip_dedup" ) ) { ctx->in_kind[ i ] = IN_KIND_GOSSIP; - } else if( !strcmp( link->name, "voter_dedup" ) ) { - ctx->in_kind[ i ] = IN_KIND_VOTER; } else if( !strcmp( link->name, "verify_dedup" ) ) { ctx->in_kind[ i ] = IN_KIND_VERIFY; } else { diff --git a/src/disco/metrics/generate/types.py b/src/disco/metrics/generate/types.py index 643d31435d..5869a5a53a 100644 --- a/src/disco/metrics/generate/types.py +++ b/src/disco/metrics/generate/types.py @@ -25,7 +25,8 @@ class Tile(Enum): GOSSIP = 19 NETLNK = 20 SOCK = 21, - REPAIR = 22 + REPAIR = 22, + SENDER = 23 class MetricType(Enum): COUNTER = 0 diff --git a/src/disco/metrics/generated/fd_metrics_all.c b/src/disco/metrics/generated/fd_metrics_all.c index 0d07b8537e..ff17691fd0 100644 --- a/src/disco/metrics/generated/fd_metrics_all.c +++ b/src/disco/metrics/generated/fd_metrics_all.c @@ -53,6 +53,7 @@ const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT] = { "netlnk", "sock", "repair", + "sender", }; const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { @@ -73,6 +74,7 @@ const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_NETLNK_TOTAL, FD_METRICS_SOCK_TOTAL, FD_METRICS_REPAIR_TOTAL, + FD_METRICS_SENDER_TOTAL, }; const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_NET, @@ -92,4 +94,5 @@ const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] FD_METRICS_NETLNK, FD_METRICS_SOCK, FD_METRICS_REPAIR, + FD_METRICS_SENDER, }; diff --git a/src/disco/metrics/generated/fd_metrics_all.h b/src/disco/metrics/generated/fd_metrics_all.h index e1ff19b628..1f6630d3b8 100644 --- a/src/disco/metrics/generated/fd_metrics_all.h +++ b/src/disco/metrics/generated/fd_metrics_all.h @@ -5,6 +5,7 @@ #include "fd_metrics_net.h" #include "fd_metrics_sock.h" #include "fd_metrics_quic.h" +#include "fd_metrics_sender.h" #include "fd_metrics_bundle.h" #include "fd_metrics_verify.h" #include "fd_metrics_dedup.h" @@ -155,7 +156,7 @@ extern const fd_metrics_meta_t FD_METRICS_ALL_LINK_OUT[FD_METRICS_ALL_LINK_OUT_T #define FD_METRICS_TOTAL_SZ (8UL*246UL) -#define FD_METRICS_TILE_KIND_CNT 17 +#define FD_METRICS_TILE_KIND_CNT 18 extern const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT]; extern const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT]; extern const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT]; diff --git a/src/disco/metrics/generated/fd_metrics_sender.c b/src/disco/metrics/generated/fd_metrics_sender.c new file mode 100644 index 0000000000..7cff59d501 --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_sender.c @@ -0,0 +1,9 @@ +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ +#include "fd_metrics_sender.h" + +const fd_metrics_meta_t FD_METRICS_SENDER[FD_METRICS_SENDER_TOTAL] = { + DECLARE_METRIC( SENDER_TXNS_SENT_TO_LEADER, COUNTER ), + DECLARE_METRIC( SENDER_LEADER_SCHED_NOT_FOUND, COUNTER ), + DECLARE_METRIC( SENDER_LEADER_NOT_FOUND, COUNTER ), + DECLARE_METRIC( SENDER_LEADER_CONTACT_NOT_FOUND, COUNTER ), +}; diff --git a/src/disco/metrics/generated/fd_metrics_sender.h b/src/disco/metrics/generated/fd_metrics_sender.h new file mode 100644 index 0000000000..428cc5972e --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_sender.h @@ -0,0 +1,31 @@ +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ + +#include "../fd_metrics_base.h" +#include "fd_metrics_enums.h" + +#define FD_METRICS_COUNTER_SENDER_TXNS_SENT_TO_LEADER_OFF (16UL) +#define FD_METRICS_COUNTER_SENDER_TXNS_SENT_TO_LEADER_NAME "sender_txns_sent_to_leader" +#define FD_METRICS_COUNTER_SENDER_TXNS_SENT_TO_LEADER_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SENDER_TXNS_SENT_TO_LEADER_DESC "Total count of transactions sent to leader" +#define FD_METRICS_COUNTER_SENDER_TXNS_SENT_TO_LEADER_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_SENDER_LEADER_SCHED_NOT_FOUND_OFF (17UL) +#define FD_METRICS_COUNTER_SENDER_LEADER_SCHED_NOT_FOUND_NAME "sender_leader_sched_not_found" +#define FD_METRICS_COUNTER_SENDER_LEADER_SCHED_NOT_FOUND_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SENDER_LEADER_SCHED_NOT_FOUND_DESC "Total count of times leader schedule not found" +#define FD_METRICS_COUNTER_SENDER_LEADER_SCHED_NOT_FOUND_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_SENDER_LEADER_NOT_FOUND_OFF (18UL) +#define FD_METRICS_COUNTER_SENDER_LEADER_NOT_FOUND_NAME "sender_leader_not_found" +#define FD_METRICS_COUNTER_SENDER_LEADER_NOT_FOUND_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SENDER_LEADER_NOT_FOUND_DESC "Total count of times leader not found for given slot" +#define FD_METRICS_COUNTER_SENDER_LEADER_NOT_FOUND_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_SENDER_LEADER_CONTACT_NOT_FOUND_OFF (19UL) +#define FD_METRICS_COUNTER_SENDER_LEADER_CONTACT_NOT_FOUND_NAME "sender_leader_contact_not_found" +#define FD_METRICS_COUNTER_SENDER_LEADER_CONTACT_NOT_FOUND_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SENDER_LEADER_CONTACT_NOT_FOUND_DESC "Total count of times leader contact info not found" +#define FD_METRICS_COUNTER_SENDER_LEADER_CONTACT_NOT_FOUND_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_SENDER_TOTAL (4UL) +extern const fd_metrics_meta_t FD_METRICS_SENDER[FD_METRICS_SENDER_TOTAL]; diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml index 9b1efd429c..56e3150c49 100644 --- a/src/disco/metrics/metrics.xml +++ b/src/disco/metrics/metrics.xml @@ -210,6 +210,13 @@ metric introduced. + + + + + + + diff --git a/src/disco/sign/fd_sign_tile.c b/src/disco/sign/fd_sign_tile.c index 3833aef6ae..be3b00fc8e 100644 --- a/src/disco/sign/fd_sign_tile.c +++ b/src/disco/sign/fd_sign_tile.c @@ -279,9 +279,9 @@ unprivileged_init_sensitive( fd_topo_t * topo, FD_TEST( !strcmp( out_link->name, "sign_repair" ) ); FD_TEST( in_link->mtu==2048UL ); FD_TEST( out_link->mtu==64UL ); - } else if ( !strcmp(in_link->name, "voter_sign" ) ) { + } else if ( !strcmp(in_link->name, "send_sign" ) ) { ctx->in_role[ i ] = FD_KEYGUARD_ROLE_VOTER; - FD_TEST( !strcmp( out_link->name, "sign_voter" ) ); + FD_TEST( !strcmp( out_link->name, "sign_send" ) ); FD_TEST( in_link->mtu==FD_TXN_MTU ); FD_TEST( out_link->mtu==64UL ); } else if( !strcmp(in_link->name, "bundle_sign" ) ) { diff --git a/src/disco/verify/fd_verify_tile.c b/src/disco/verify/fd_verify_tile.c index b79bee3e86..e1c5be865a 100644 --- a/src/disco/verify/fd_verify_tile.c +++ b/src/disco/verify/fd_verify_tile.c @@ -7,7 +7,7 @@ #define IN_KIND_QUIC (0UL) #define IN_KIND_BUNDLE (1UL) #define IN_KIND_GOSSIP (2UL) - +#define IN_KIND_SENDER (3UL) /* The verify tile is a wrapper around the mux tile, that also verifies incoming transaction signatures match the data being signed. Non-matching transactions are filtered out of the frag stream. */ @@ -69,7 +69,8 @@ during_frag( fd_verify_ctx_t * ctx, ulong sz, ulong ctl FD_PARAM_UNUSED ) { - if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_QUIC || ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) { + ulong in_kind = ctx->in_kind[ in_idx ]; + if( FD_UNLIKELY( in_kind==IN_KIND_QUIC || in_kind==IN_KIND_GOSSIP || in_kind==IN_KIND_SENDER ) ) { if( FD_UNLIKELY( chunkin[in_idx].chunk0 || chunk>ctx->in[in_idx].wmark || sz>FD_TPU_MTU ) ) FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[in_idx].chunk0, ctx->in[in_idx].wmark )); @@ -79,7 +80,7 @@ during_frag( fd_verify_ctx_t * ctx, dst->payload_sz = (ushort)sz; dst->block_engine.bundle_id = 0UL; fd_memcpy( fd_txn_m_payload( dst ), src, sz ); - } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_BUNDLE ) ) { + } else if( FD_UNLIKELY( in_kind==IN_KIND_BUNDLE ) ) { if( FD_UNLIKELY( chunkin[in_idx].chunk0 || chunk>ctx->in[in_idx].wmark || sz>FD_TPU_RAW_MTU ) ) FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu,%lu]", chunk, sz, ctx->in[in_idx].chunk0, ctx->in[in_idx].wmark, FD_TPU_RAW_MTU )); @@ -205,6 +206,7 @@ unprivileged_init( fd_topo_t * topo, if( !strcmp( link->name, "quic_verify" ) ) ctx->in_kind[ i ] = IN_KIND_QUIC; else if( !strcmp( link->name, "bundle_verif" ) ) ctx->in_kind[ i ] = IN_KIND_BUNDLE; else if( !strcmp( link->name, "gossip_verif" ) ) ctx->in_kind[ i ] = IN_KIND_GOSSIP; + else if( !strcmp( link->name, "send_txns" ) ) ctx->in_kind[ i ] = IN_KIND_SENDER; else FD_LOG_ERR(( "unexpected link name %s", link->name )); } diff --git a/src/discof/eqvoc/fd_eqvoc_tile.c b/src/discof/eqvoc/fd_eqvoc_tile.c index 194421ea96..2cd1bab1b6 100644 --- a/src/discof/eqvoc/fd_eqvoc_tile.c +++ b/src/discof/eqvoc/fd_eqvoc_tile.c @@ -67,23 +67,20 @@ scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) { } static inline void -handle_new_cluster_contact_info( fd_eqvoc_tile_ctx_t * ctx, uchar const * buf, ulong buf_sz ) { +handle_new_cluster_contact_info( fd_eqvoc_tile_ctx_t * ctx, + uchar const * buf, + ulong buf_sz ) { ulong const * header = (ulong const *)fd_type_pun_const( buf ); - ulong dest_cnt = buf_sz; - - if( dest_cnt >= MAX_SHRED_DESTS ) - FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", - dest_cnt, - MAX_SHRED_DESTS )); + ulong dest_cnt = buf_sz / sizeof(fd_shred_dest_wire_t); fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header ); - fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci ); + fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci ); ctx->new_dest_ptr = dests; ctx->new_dest_cnt = dest_cnt; - for( ulong i = 0UL; i < dest_cnt; i++ ) { + for( ulong i=0UL; istake_ci = fd_stake_ci_join( fd_stake_ci_new( stake_ci_mem, ctx->identity_key ) ); ctx->eqvoc = fd_eqvoc_join( fd_eqvoc_new( eqvoc_mem, 1 << 10, 1 << 10, 0 ) ); - ctx->contact_in_idx = fd_topo_find_tile_in_link( topo, tile, "gossip_voter", 0 ); + ctx->contact_in_idx = fd_topo_find_tile_in_link( topo, tile, "gossip_send", 0 ); FD_TEST( ctx->contact_in_idx != ULONG_MAX ); fd_topo_link_t * contact_in_link = &topo->links[tile->in_link_id[ctx->contact_in_idx]]; ctx->contact_in_mem = topo->workspaces[topo->objs[contact_in_link->dcache_obj_id].wksp_id].wksp; diff --git a/src/discof/gossip/fd_gossip_tile.c b/src/discof/gossip/fd_gossip_tile.c index a821d7b6fa..78ffea7d60 100644 --- a/src/discof/gossip/fd_gossip_tile.c +++ b/src/discof/gossip/fd_gossip_tile.c @@ -108,15 +108,15 @@ struct fd_gossip_tile_ctx { ulong repair_contact_out_wmark; ulong repair_contact_out_chunk; - fd_frag_meta_t * voter_contact_out_mcache; - ulong * voter_contact_out_sync; - ulong voter_contact_out_depth; - ulong voter_contact_out_seq; + fd_frag_meta_t * sender_contact_out_mcache; + ulong * sender_contact_out_sync; + ulong sender_contact_out_depth; + ulong sender_contact_out_seq; - fd_wksp_t * voter_contact_out_mem; - ulong voter_contact_out_chunk0; - ulong voter_contact_out_wmark; - ulong voter_contact_out_chunk; + fd_wksp_t * sender_contact_out_mem; + ulong sender_contact_out_chunk0; + ulong sender_contact_out_wmark; + ulong sender_contact_out_chunk; fd_frag_meta_t * verify_out_mcache; ulong * verify_out_sync; @@ -588,12 +588,12 @@ after_credit( fd_gossip_tile_ctx_t * ctx, ulong tvu_peer_cnt = 0; ulong repair_peers_cnt = 0; - ulong voter_peers_cnt = 0; + ulong sender_peers_cnt = 0; ulong * shred_dest_msg = fd_chunk_to_laddr( ctx->shred_contact_out_mem, ctx->shred_contact_out_chunk ); fd_shred_dest_wire_t * tvu_peers = (fd_shred_dest_wire_t *)(shred_dest_msg+1); fd_shred_dest_wire_t * repair_peers = fd_chunk_to_laddr( ctx->repair_contact_out_mem, ctx->repair_contact_out_chunk ); - fd_shred_dest_wire_t * voter_peers = fd_chunk_to_laddr( ctx->voter_contact_out_mem, ctx->voter_contact_out_chunk ); + fd_shred_dest_wire_t * sender_peers = fd_chunk_to_laddr( ctx->sender_contact_out_mem, ctx->sender_contact_out_chunk ); for( fd_contact_info_table_iter_t iter = fd_contact_info_table_iter_init( ctx->contact_info_table ); !fd_contact_info_table_iter_done( ctx->contact_info_table, iter ); iter = fd_contact_info_table_iter_next( ctx->contact_info_table, iter ) ) { @@ -654,11 +654,11 @@ after_credit( fd_gossip_tile_ctx_t * ctx, continue; } - voter_peers[voter_peers_cnt].ip4_addr = ele->contact_info.tpu_vote.inner.ip4.addr; - voter_peers[voter_peers_cnt].udp_port = ele->contact_info.tpu_vote.inner.ip4.port; - memcpy( voter_peers[voter_peers_cnt].pubkey, ele->contact_info.id.key, sizeof(fd_pubkey_t) ); + sender_peers[sender_peers_cnt].ip4_addr = ele->contact_info.tpu_vote.inner.ip4.addr; + sender_peers[sender_peers_cnt].udp_port = ele->contact_info.tpu_vote.inner.ip4.port; + memcpy( sender_peers[sender_peers_cnt].pubkey, ele->contact_info.id.key, sizeof(fd_pubkey_t) ); - voter_peers_cnt++; + sender_peers_cnt++; } } @@ -667,13 +667,13 @@ after_credit( fd_gossip_tile_ctx_t * ctx, UPDATE_PEER_CNTS( tvu_peer_cnt, TVU ); UPDATE_PEER_CNTS( repair_peers_cnt, REPAIR ); - UPDATE_PEER_CNTS( voter_peers_cnt, VOTER ); + UPDATE_PEER_CNTS( sender_peers_cnt, VOTER ); #undef UPDATE_PEER_CNTS ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); - FD_LOG_INFO(( "publishing peers - tvu: %lu, repair: %lu, tpu_vote: %lu", tvu_peer_cnt, repair_peers_cnt, voter_peers_cnt )); + FD_LOG_INFO(( "publishing peers - tvu: %lu, repair: %lu, tpu_vote: %lu", tvu_peer_cnt, repair_peers_cnt, sender_peers_cnt )); if( tvu_peer_cnt>0 && ctx->shred_contact_out_mcache ) { *shred_dest_msg = tvu_peer_cnt; ulong shred_contact_sz = sizeof(ulong) + (tvu_peer_cnt * sizeof(fd_shred_dest_wire_t)); @@ -693,13 +693,13 @@ after_credit( fd_gossip_tile_ctx_t * ctx, ctx->repair_contact_out_chunk = fd_dcache_compact_next( ctx->repair_contact_out_chunk, repair_contact_sz, ctx->repair_contact_out_chunk0, ctx->repair_contact_out_wmark ); } - if( voter_peers_cnt>0 && ctx->voter_contact_out_mcache ) { - ulong voter_contact_sz = (voter_peers_cnt * sizeof(fd_shred_dest_wire_t)); - ulong voter_contact_sig = 4UL; - fd_mcache_publish( ctx->voter_contact_out_mcache, ctx->voter_contact_out_depth, ctx->voter_contact_out_seq, voter_contact_sig, ctx->voter_contact_out_chunk, - voter_peers_cnt, 0UL, tsorig, tspub ); - ctx->voter_contact_out_seq = fd_seq_inc( ctx->voter_contact_out_seq, 1UL ); - ctx->voter_contact_out_chunk = fd_dcache_compact_next( ctx->voter_contact_out_chunk, voter_contact_sz, ctx->voter_contact_out_chunk0, ctx->voter_contact_out_wmark ); + if( sender_peers_cnt>0 && ctx->sender_contact_out_mcache ) { + ulong sender_contact_sz = (sender_peers_cnt * sizeof(fd_shred_dest_wire_t)); + ulong sender_contact_sig = 4UL; + fd_mcache_publish( ctx->sender_contact_out_mcache, ctx->sender_contact_out_depth, ctx->sender_contact_out_seq, sender_contact_sig, ctx->sender_contact_out_chunk, + sender_contact_sz, 0UL, tsorig, tspub ); + ctx->sender_contact_out_seq = fd_seq_inc( ctx->sender_contact_out_seq, 1UL ); + ctx->sender_contact_out_chunk = fd_dcache_compact_next( ctx->sender_contact_out_chunk, sender_contact_sz, ctx->sender_contact_out_chunk0, ctx->sender_contact_out_wmark ); } } @@ -800,7 +800,7 @@ unprivileged_init( fd_topo_t * topo, ctx->in_kind[ in_idx ] = IN_KIND_NET; fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache ); continue; - } else if( 0==strcmp( link->name, "voter_gossip" ) ) { + } else if( 0==strcmp( link->name, "send_txns" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_VOTER; } else if( 0==strcmp( link->name, "rstart_gossi" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_RESTART; @@ -874,17 +874,17 @@ unprivileged_init( fd_topo_t * topo, sign_link_out_idx = out_idx; - } else if( 0==strcmp( link->name, "gossip_voter" ) ) { - - if( FD_UNLIKELY( ctx->voter_contact_out_mcache ) ) FD_LOG_ERR(( "gossip tile has multiple gossip_voter out links" )); - ctx->voter_contact_out_mcache = link->mcache; - ctx->voter_contact_out_sync = fd_mcache_seq_laddr( ctx->voter_contact_out_mcache ); - ctx->voter_contact_out_depth = fd_mcache_depth( ctx->voter_contact_out_mcache ); - ctx->voter_contact_out_seq = fd_mcache_seq_query( ctx->voter_contact_out_sync ); - ctx->voter_contact_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; - ctx->voter_contact_out_chunk0 = fd_dcache_compact_chunk0( ctx->voter_contact_out_mem, link->dcache ); - ctx->voter_contact_out_wmark = fd_dcache_compact_wmark ( ctx->voter_contact_out_mem, link->dcache, link->mtu ); - ctx->voter_contact_out_chunk = ctx->voter_contact_out_chunk0; + } else if( 0==strcmp( link->name, "gossip_send" ) ) { + + if( FD_UNLIKELY( ctx->sender_contact_out_mcache ) ) FD_LOG_ERR(( "gossip tile has multiple gossip_sender out links" )); + ctx->sender_contact_out_mcache = link->mcache; + ctx->sender_contact_out_sync = fd_mcache_seq_laddr( ctx->sender_contact_out_mcache ); + ctx->sender_contact_out_depth = fd_mcache_depth( ctx->sender_contact_out_mcache ); + ctx->sender_contact_out_seq = fd_mcache_seq_query( ctx->sender_contact_out_sync ); + ctx->sender_contact_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; + ctx->sender_contact_out_chunk0 = fd_dcache_compact_chunk0( ctx->sender_contact_out_mem, link->dcache ); + ctx->sender_contact_out_wmark = fd_dcache_compact_wmark ( ctx->sender_contact_out_mem, link->dcache, link->mtu ); + ctx->sender_contact_out_chunk = ctx->sender_contact_out_chunk0; } else if( 0==strcmp( link->name, "gossip_eqvoc" ) ) { diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index 239d24df58..20ec132f72 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -59,7 +59,6 @@ #define SHRED_IN_IDX (4UL) #define STAKE_OUT_IDX (0UL) -#define SENDER_OUT_IDX (1UL) #define POH_OUT_IDX (2UL) #define EXEC_BOOT_WAIT (0UL) @@ -1249,9 +1248,12 @@ send_tower_sync( fd_replay_tile_ctx_t * ctx ) { ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot; fd_hash_t vote_bank_hash[1] = { 0 }; fd_hash_t vote_block_hash[1] = { 0 }; - int err = fd_blockstore_bank_hash_query( ctx->blockstore, vote_slot, vote_bank_hash ); - if( err ) FD_LOG_ERR(( "invariant violation: missing bank hash for tower vote" )); - err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash ); + + /* guaranteed to be on frontier from caller check */ + const fd_fork_t * fork = fd_forks_query_const( ctx->forks, vote_slot ); + fd_memcpy( vote_bank_hash, &fork->slot_ctx->slot_bank.banks_hash, sizeof(fd_hash_t) ); + + int err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash ); if( err ) FD_LOG_ERR(( "invariant violation: missing block hash for tower vote" )); /* Build a vote state update based on current tower votes. */ @@ -1269,10 +1271,11 @@ send_tower_sync( fd_replay_tile_ctx_t * ctx ) { /* TODO: Can use a smaller size, adjusted for payload length */ ulong msg_sz = sizeof( fd_txn_p_t ); + ulong sig = vote_slot; fd_mcache_publish( ctx->sender_out_mcache, ctx->sender_out_depth, ctx->sender_out_seq, - 1UL, + sig, ctx->sender_out_chunk, msg_sz, 0UL, @@ -1772,7 +1775,6 @@ exec_slice( fd_replay_tile_ctx_t * ctx, FD_COMPILER_MFENCE(); block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ); memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) ); - memcpy( &block_info->bank_hash, &fork->slot_ctx->slot_bank.banks_hash, sizeof(fd_hash_t) ); fd_block_map_publish( query ); ctx->flags = EXEC_FLAG_FINISHED_SLOT; @@ -2506,6 +2508,13 @@ after_credit( fd_replay_tile_ctx_t * ctx, ctx->runtime_spad, &exec_para_ctx_block_finalize ); + /* Update blockstore with the freshly computed bank hash */ + fd_block_map_query_t query[1] = { 0 }; + fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING ); + fd_block_info_t * block_info = fd_block_map_query_ele( query ); + block_info->bank_hash = ctx->slot_ctx->slot_bank.banks_hash; + fd_block_map_publish( query ); + fd_spad_pop( ctx->runtime_spad ); FD_LOG_NOTICE(( "Spad memory after executing block %lu", ctx->runtime_spad->mem_used )); /**********************************************************************/ @@ -3230,7 +3239,10 @@ unprivileged_init( fd_topo_t * topo, ctx->notif_out_mcache = NULL; } - fd_topo_link_t * sender_out = &topo->links[ tile->out_link_id[ SENDER_OUT_IDX ] ]; + /* Setup sender output */ + ulong send_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_send", 0 ); + FD_TEST( send_out_idx!=ULONG_MAX ); + fd_topo_link_t * sender_out = &topo->links[ tile->out_link_id[ send_out_idx ] ]; ctx->sender_out_mcache = sender_out->mcache; ctx->sender_out_sync = fd_mcache_seq_laddr( ctx->sender_out_mcache ); ctx->sender_out_depth = fd_mcache_depth( ctx->sender_out_mcache ); diff --git a/src/discof/sender/Local.mk b/src/discof/sender/Local.mk index f6dc672f3e..8998d7f938 100644 --- a/src/discof/sender/Local.mk +++ b/src/discof/sender/Local.mk @@ -1,3 +1,3 @@ ifdef FD_HAS_SSE $(call add-objs,fd_sender_tile,fd_discof) -endif \ No newline at end of file +endif diff --git a/src/discof/sender/fd_sender_tile.c b/src/discof/sender/fd_sender_tile.c index 8d1a337d28..a06e19b18f 100644 --- a/src/discof/sender/fd_sender_tile.c +++ b/src/discof/sender/fd_sender_tile.c @@ -1,47 +1,59 @@ -/* Sender tile signs and sends transactions to the current leader. Currently - only supports transactions which require one signature. */ +/* Sender tile signs and sends transactions to the current leader. + Currently only supports transactions which require one signature. + Designed with voting as primary use case. Signing those votes will + eventually move to a separate consensus tile.*/ #define _GNU_SOURCE +#include "../../disco/metrics/fd_metrics.h" #include "../../disco/topo/fd_topo.h" #include "generated/fd_sender_tile_seccomp.h" -#include "../store/fd_store.h" -#include "../../flamenco/repair/fd_repair.h" -#include "../../flamenco/runtime/fd_blockstore.h" -#include "../../flamenco/leaders/fd_leaders.h" -#include "../../flamenco/fd_flamenco.h" #include "../../util/fd_util.h" -#include "../../choreo/fd_choreo.h" -#include "../../util/net/fd_eth.h" -#include "../../util/net/fd_ip4.h" -#include "../../util/net/fd_udp.h" +#include "../../util/net/fd_net_headers.h" + +#include "../../disco/fd_disco.h" #include "../../disco/shred/fd_stake_ci.h" -#include "../../disco/topo/fd_pod_format.h" +#include "../../disco/pack/fd_microblock.h" + #include "../../disco/keyguard/fd_keyload.h" #include "../../disco/keyguard/fd_keyguard_client.h" #include "../../disco/keyguard/fd_keyguard.h" + #include "../../flamenco/leaders/fd_leaders.h" -#include "../../flamenco/runtime/fd_runtime.h" -#include "../../disco/fd_disco.h" -#include "../../util/net/fd_net_headers.h" +#include "../../flamenco/gossip/fd_gossip.h" + -#include -#include -#include -#include -#include -#include -#include +#define IN_KIND_SIGN (0UL) +#define IN_KIND_GOSSIP (1UL) +#define IN_KIND_REPLAY (2UL) +#define IN_KIND_STAKE (3UL) -#define SCRATCH_MAX (4UL /*KiB*/ << 10) -#define SCRATCH_DEPTH (4UL) /* 4 scratch frames */ +struct fd_sender_link_in { + fd_wksp_t * mem; + ulong chunk0; + ulong wmark; + ulong kind; +}; +typedef struct fd_sender_link_in fd_sender_link_in_t; + +struct fd_sender_link_out { + ulong idx; + fd_frag_meta_t * mcache; + ulong * sync; + ulong depth; + + fd_wksp_t * mem; + ulong chunk0; + ulong wmark; + ulong chunk; +}; +typedef struct fd_sender_link_out fd_sender_link_out_t; struct fd_sender_tile_ctx { fd_pubkey_t identity_key[ 1 ]; fd_pubkey_t vote_acct_addr[ 1 ]; fd_stake_ci_t * stake_ci; - ulong * poh_slot; fd_shred_dest_weighted_t * new_dest_ptr; ulong new_dest_cnt; @@ -51,63 +63,27 @@ struct fd_sender_tile_ctx { fd_ip4_udp_hdrs_t packet_hdr[1]; ushort net_id; - ulong stake_in_idx; - fd_wksp_t * stake_in_mem; - ulong stake_in_chunk0; - ulong stake_in_wmark; - - ulong contact_in_idx; - fd_wksp_t * contact_in_mem; - ulong contact_in_chunk0; - ulong contact_in_wmark; - - ulong replay_in_idx; - fd_wksp_t * replay_in_mem; - ulong replay_in_chunk0; - ulong replay_in_wmark; - - ulong poh_in_idx; - fd_wksp_t * poh_in_mem; - ulong poh_in_chunk0; - ulong poh_in_wmark; - - ulong gossip_out_idx; - fd_frag_meta_t * gossip_out_mcache; - ulong * gossip_out_sync; - ulong gossip_out_depth; - ulong gossip_out_seq; - - fd_wksp_t * gossip_out_mem; - ulong gossip_out_chunk0; - ulong gossip_out_wmark; - ulong gossip_out_chunk; - - ulong dedup_out_idx; - fd_frag_meta_t * dedup_out_mcache; - ulong * dedup_out_sync; - ulong dedup_out_depth; - ulong dedup_out_seq; - - fd_wksp_t * dedup_out_mem; - ulong dedup_out_chunk0; - ulong dedup_out_wmark; - ulong dedup_out_chunk; - - ulong net_out_idx; - fd_frag_meta_t * net_out_mcache; - ulong * net_out_sync; - ulong net_out_depth; - ulong net_out_seq; - - fd_wksp_t * net_out_mem; - ulong net_out_chunk0; - ulong net_out_wmark; - ulong net_out_chunk; - - ulong sign_in_idx; + #define FD_SENDER_MAX_IN_LINK_CNT 32UL + fd_sender_link_in_t in_links[ FD_SENDER_MAX_IN_LINK_CNT ]; + + fd_sender_link_out_t gossip_verify_out[1]; + fd_sender_link_out_t net_out [1]; + ulong sign_out_idx; fd_keyguard_client_t keyguard_client[ 1 ]; + struct { + + /* Transaction metrics */ + ulong txns_sent_to_leader; /* Successfully sent to leader */ + + /* Leader metrics */ + ulong leader_sched_not_found; /* Number of times leader schedule not found */ + ulong leader_not_found; /* Number of times slot leader not found */ + ulong leader_contact_not_found; /* Number of times leader contact not found */ + + } metrics; + }; typedef struct fd_sender_tile_ctx fd_sender_tile_ctx_t; @@ -117,29 +93,24 @@ scratch_align( void ) { return 128UL; } -FD_FN_PURE static inline ulong -loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) { - return 0UL; -} - FD_FN_PURE static inline ulong scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_sender_tile_ctx_t), sizeof(fd_sender_tile_ctx_t) ); l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() ); - l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_MAX ) ); - l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_DEPTH ) ); return FD_LAYOUT_FINI( l, scratch_align() ); } static void -send_packet( fd_sender_tile_ctx_t * ctx, +send_packet( fd_sender_tile_ctx_t * ctx, + fd_stem_context_t * stem, uint dst_ip_addr, ushort dst_port, - uchar const * payload, + uchar const * payload, ulong payload_sz, ulong tsorig ) { - uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk ); + fd_sender_link_out_t * net_out_link = ctx->net_out; + uchar * packet = fd_chunk_to_laddr( net_out_link->mem, net_out_link->chunk ); fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet; *hdr = *ctx->packet_hdr; @@ -147,7 +118,6 @@ send_packet( fd_sender_tile_ctx_t * ctx, fd_ip4_hdr_t * ip4 = hdr->ip4; ip4->daddr = dst_ip_addr; ip4->net_id = fd_ushort_bswap( ctx->net_id++ ); - ip4->check = 0U; ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) ); ip4->check = fd_ip4_hdr_check_fast( ip4 ); @@ -155,31 +125,37 @@ send_packet( fd_sender_tile_ctx_t * ctx, udp->net_dport = fd_ushort_bswap( dst_port ); udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) ); fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz ); - udp->check = 0U; + udp->check = 0U; /* indicates no checksum */ ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) ); ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t); - fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, packet_sz, 0UL, tsorig, tspub ); - ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL ); - ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark ); + fd_stem_publish( stem, net_out_link->idx, sig, net_out_link->chunk, packet_sz, 0UL, tsorig, tspub ); + net_out_link->chunk = fd_dcache_compact_next( net_out_link->chunk, packet_sz, net_out_link->chunk0, net_out_link->wmark ); } + static int -get_current_leader_tpu_vote_contact( fd_sender_tile_ctx_t * ctx, +get_current_leader_tpu_vote_contact( fd_sender_tile_ctx_t * ctx, + ulong poh_slot, fd_shred_dest_weighted_t ** out_dest ) { - ulong poh_slot = fd_fseq_query( ctx->poh_slot ); - if( poh_slot==ULONG_MAX ) { return -1; } fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, poh_slot ); - if( FD_UNLIKELY( !lsched ) ) { return -1; } + if( FD_UNLIKELY( !lsched ) ) { + ctx->metrics.leader_sched_not_found++; + return -1; + } fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, poh_slot ); - if( FD_UNLIKELY( !slot_leader ) ) { return -1 ; } /* Count this as bad slot too */ + if( FD_UNLIKELY( !slot_leader ) ) { + ctx->metrics.leader_not_found++; + return -1; + } fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, poh_slot ); fd_shred_dest_idx_t sdest_idx = fd_shred_dest_pubkey_to_idx( sdest, slot_leader ); if( FD_UNLIKELY( sdest_idx==FD_SHRED_DEST_NO_DEST ) ) { + ctx->metrics.leader_contact_not_found++; return -1; } @@ -194,10 +170,7 @@ handle_new_cluster_contact_info( fd_sender_tile_ctx_t * ctx, ulong buf_sz ) { ulong const * header = (ulong const *)fd_type_pun_const( buf ); - ulong dest_cnt = buf_sz; - - if( dest_cnt >= MAX_SHRED_DESTS ) - FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS )); + ulong dest_cnt = buf_sz / sizeof(fd_shred_dest_wire_t); fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header ); fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci ); @@ -226,33 +199,32 @@ during_frag( fd_sender_tile_ctx_t * ctx, ulong sz, ulong ctl FD_PARAM_UNUSED ) { - if( FD_UNLIKELY( in_idx==ctx->sign_in_idx ) ) { - FD_LOG_CRIT(( "signing tile send out of band fragment" )); - } + fd_sender_link_in_t * in_link = &ctx->in_links[ in_idx ]; + if( FD_UNLIKELY( chunkchunk0 || chunk>in_link->wmark ) ) + FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu] on link %lu", chunk, sz, + in_link->chunk0, in_link->wmark, in_idx )); + + uchar const * dcache_entry = fd_chunk_to_laddr_const( in_link->mem, chunk ); + ulong kind = in_link->kind; - if( FD_UNLIKELY( in_idx==ctx->stake_in_idx ) ) { - if( FD_UNLIKELY( chunkstake_in_chunk0 || chunk>ctx->stake_in_wmark ) ) - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, - ctx->stake_in_chunk0, ctx->stake_in_wmark )); - uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->stake_in_mem, chunk ); + if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) { + if( sz>sizeof(fd_stake_weight_t)*(MAX_SHRED_DESTS+1UL) ) { + FD_LOG_ERR(( "sz %lu >= max expected stake update size %lu", sz, sizeof(fd_stake_weight_t) * (MAX_SHRED_DESTS+1UL) )); + } fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry ); } - if( FD_UNLIKELY( in_idx==ctx->contact_in_idx ) ) { - if( FD_UNLIKELY( chunkcontact_in_chunk0 || chunk>ctx->contact_in_wmark ) ) { - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->contact_in_chunk0, ctx->contact_in_wmark )); + if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) { + if( sz>sizeof(fd_shred_dest_wire_t)*MAX_SHRED_DESTS ) { + FD_LOG_ERR(( "sz %lu >= max expected gossip update size %lu", sz, sizeof(fd_shred_dest_wire_t) * MAX_SHRED_DESTS )); } - - uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->contact_in_mem, chunk ); handle_new_cluster_contact_info( ctx, dcache_entry, sz ); } - if( FD_UNLIKELY( in_idx==ctx->replay_in_idx ) ) { - if( FD_UNLIKELY( chunkreplay_in_chunk0 || chunk>ctx->replay_in_wmark || sz!=sizeof(fd_txn_p_t) ) ) { - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->replay_in_chunk0, ctx->replay_in_wmark )); + if( FD_UNLIKELY( kind==IN_KIND_REPLAY ) ) { + if( sz!=sizeof(fd_txn_p_t) ) { + FD_LOG_ERR(( "sz %lu != expected txn size %lu", sz, sizeof(fd_txn_p_t) )); } - - uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->replay_in_mem, chunk ); memcpy( ctx->txn_buf, dcache_entry, sz ); } } @@ -273,17 +245,20 @@ after_frag( fd_sender_tile_ctx_t * ctx, (void)tspub; (void)stem; - if( FD_UNLIKELY( in_idx==ctx->contact_in_idx ) ) { + fd_sender_link_in_t * in_link = &ctx->in_links[ in_idx ]; + ulong kind = in_link->kind; + + if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) { finalize_new_cluster_contact_info( ctx ); return; } - if( FD_UNLIKELY( in_idx==ctx->stake_in_idx ) ) { + if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) { fd_stake_ci_stake_msg_fini( ctx->stake_ci ); return; } - if( FD_UNLIKELY( in_idx==ctx->replay_in_idx ) ) { + if( FD_UNLIKELY( kind==IN_KIND_REPLAY ) ) { fd_txn_p_t * txn = (fd_txn_p_t *)fd_type_pun(ctx->txn_buf); /* sign the txn */ @@ -292,31 +267,23 @@ after_frag( fd_sender_tile_ctx_t * ctx, ulong message_sz = txn->payload_sz - TXN(txn)->message_off; fd_keyguard_client_sign( ctx->keyguard_client, signature, message, message_sz, FD_KEYGUARD_SIGN_TYPE_ED25519 ); - uchar * msg_to_gossip = fd_chunk_to_laddr( ctx->gossip_out_mem, ctx->gossip_out_chunk ); - memcpy( msg_to_gossip, txn->payload, txn->payload_sz ); + ulong poh_slot = sig; /* send to leader */ fd_shred_dest_weighted_t * leader_dest = NULL; - int res = get_current_leader_tpu_vote_contact( ctx, &leader_dest ); - /* TODO: add metrics for successful votes sent and failed votes */ + int res = get_current_leader_tpu_vote_contact( ctx, poh_slot, &leader_dest ); if( res==0 ) { - send_packet( ctx, leader_dest->ip4, leader_dest->port, msg_to_gossip, txn->payload_sz, 0UL ); + send_packet( ctx, stem, leader_dest->ip4, leader_dest->port, txn->payload, txn->payload_sz, 0UL ); + ctx->metrics.txns_sent_to_leader++; } - /* send to gossip */ - fd_mcache_publish( ctx->gossip_out_mcache, ctx->gossip_out_depth, ctx->gossip_out_seq, 1UL, ctx->gossip_out_chunk, - txn->payload_sz, 0UL, 0, 0 ); - ctx->gossip_out_seq = fd_seq_inc( ctx->gossip_out_seq, 1UL ); - ctx->gossip_out_chunk = fd_dcache_compact_next( ctx->gossip_out_chunk, txn->payload_sz, - ctx->gossip_out_chunk0, ctx->gossip_out_wmark ); - /* send to dedup */ - uchar * msg_to_pack = fd_chunk_to_laddr( ctx->dedup_out_mem, ctx->dedup_out_chunk ); - memcpy( msg_to_pack, msg_to_gossip, txn->payload_sz ); - fd_mcache_publish( ctx->dedup_out_mcache, ctx->dedup_out_depth, ctx->dedup_out_seq, 1UL, ctx->dedup_out_chunk, - txn->payload_sz, 0UL, 0, 0 ); - ctx->dedup_out_seq = fd_seq_inc( ctx->dedup_out_seq, 1UL ); - ctx->dedup_out_chunk = fd_dcache_compact_next( ctx->dedup_out_chunk, txn->payload_sz, ctx->dedup_out_chunk0, - ctx->dedup_out_wmark ); + /* send to gossip and dedup */ + fd_sender_link_out_t * gossip_verify_out = ctx->gossip_verify_out; + uchar * msg_to_gossip = fd_chunk_to_laddr( gossip_verify_out->mem, gossip_verify_out->chunk ); + fd_memcpy( msg_to_gossip, txn->payload, txn->payload_sz ); + fd_stem_publish( stem, gossip_verify_out->idx, 1UL, gossip_verify_out->chunk, txn->payload_sz, 0UL, 0, 0 ); + gossip_verify_out->chunk = fd_dcache_compact_next( gossip_verify_out->chunk, txn->payload_sz, gossip_verify_out->chunk0, + gossip_verify_out->wmark ); } } @@ -334,6 +301,40 @@ privileged_init( fd_topo_t * topo, ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->sender.identity_key_path, /* pubkey only: */ 1 ) ); } +static void +setup_input_link( fd_sender_tile_ctx_t * ctx, + fd_topo_t * topo, + fd_topo_tile_t * tile, + ulong kind, + const char * name ) { + ulong in_idx = fd_topo_find_tile_in_link( topo, tile, name, 0 ); + FD_TEST( in_idx!=ULONG_MAX ); + fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ in_idx ] ]; + fd_sender_link_in_t * in_link_desc = &ctx->in_links[ in_idx ]; + in_link_desc->mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp; + in_link_desc->chunk0 = fd_dcache_compact_chunk0( in_link_desc->mem, in_link->dcache ); + in_link_desc->wmark = fd_dcache_compact_wmark( in_link_desc->mem, in_link->dcache, in_link->mtu ); + in_link_desc->kind = kind; +} + +static void +setup_output_link( fd_sender_link_out_t * desc, + fd_topo_t * topo, + fd_topo_tile_t * tile, + const char * name ) { + ulong out_idx = fd_topo_find_tile_out_link( topo, tile, name, 0 ); + FD_TEST( out_idx!=ULONG_MAX ); + fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ out_idx ] ]; + desc->idx = out_idx; + desc->mcache = out_link->mcache; + desc->sync = fd_mcache_seq_laddr( desc->mcache ); + desc->depth = fd_mcache_depth( desc->mcache ); + desc->mem = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp; + desc->chunk0 = fd_dcache_compact_chunk0( desc->mem, out_link->dcache ); + desc->wmark = fd_dcache_compact_wmark( desc->mem, out_link->dcache, out_link->mtu ); + desc->chunk = desc->chunk0; +} + static void unprivileged_init( fd_topo_t * topo, fd_topo_tile_t * tile ) { @@ -345,94 +346,29 @@ unprivileged_init( fd_topo_t * topo, FD_SCRATCH_ALLOC_INIT( l, scratch ); fd_sender_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sender_tile_ctx_t), sizeof(fd_sender_tile_ctx_t) ); - // TODO: set the lo_mark_slot to the actual snapshot slot! ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() ), ctx->identity_key ) ); - void * scratch_smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( SCRATCH_MAX ) ); - void * scratch_fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( SCRATCH_DEPTH ) ); - /* scratch space attach */ - fd_scratch_attach( scratch_smem, scratch_fmem, SCRATCH_MAX, SCRATCH_DEPTH ); - - ctx->net_id = (ushort)0; + ctx->net_id = (ushort)0; ctx->tpu_serve_addr.addr = tile->sender.ip_addr; - ctx->tpu_serve_addr.port = fd_ushort_bswap( tile->sender.tpu_listen_port ); + ctx->tpu_serve_addr.port = tile->sender.tpu_listen_port; fd_ip4_udp_hdr_init( ctx->packet_hdr, FD_TXN_MTU, ctx->tpu_serve_addr.addr, ctx->tpu_serve_addr.port ); - ulong poh_slot_obj_id = fd_pod_query_ulong( topo->props, "poh_slot", ULONG_MAX ); - FD_TEST( poh_slot_obj_id!=ULONG_MAX ); - ctx->poh_slot = fd_fseq_join( fd_topo_obj_laddr( topo, poh_slot_obj_id ) ); - - /* Set up stake input */ - ctx->stake_in_idx = fd_topo_find_tile_in_link( topo, tile, "stake_out", 0 ); - FD_TEST( ctx->stake_in_idx!=ULONG_MAX ); - fd_topo_link_t * stake_in_link = &topo->links[ tile->in_link_id[ ctx->stake_in_idx ] ]; - ctx->stake_in_mem = topo->workspaces[ topo->objs[ stake_in_link->dcache_obj_id ].wksp_id ].wksp; - ctx->stake_in_chunk0 = fd_dcache_compact_chunk0( ctx->stake_in_mem, stake_in_link->dcache ); - ctx->stake_in_wmark = fd_dcache_compact_wmark( ctx->stake_in_mem, stake_in_link->dcache, stake_in_link->mtu ); - - /* Set up contact input */ - ctx->contact_in_idx = fd_topo_find_tile_in_link( topo, tile, "gossip_voter", 0 ); - FD_TEST( ctx->contact_in_idx!=ULONG_MAX ); - fd_topo_link_t * contact_in_link = &topo->links[ tile->in_link_id[ ctx->contact_in_idx ] ]; - ctx->contact_in_mem = topo->workspaces[ topo->objs[ contact_in_link->dcache_obj_id ].wksp_id ].wksp; - ctx->contact_in_chunk0 = fd_dcache_compact_chunk0( ctx->contact_in_mem, contact_in_link->dcache ); - ctx->contact_in_wmark = fd_dcache_compact_wmark( ctx->contact_in_mem, contact_in_link->dcache, contact_in_link->mtu ); - - /* Set up replay tile input */ - ctx->replay_in_idx = fd_topo_find_tile_in_link( topo, tile, "replay_voter", 0 ); - FD_TEST( ctx->replay_in_idx!=ULONG_MAX ); - fd_topo_link_t * replay_in_link = &topo->links[ tile->in_link_id[ ctx->replay_in_idx ] ]; - ctx->replay_in_mem = topo->workspaces[ topo->objs[ replay_in_link->dcache_obj_id ].wksp_id ].wksp; - ctx->replay_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_in_mem, replay_in_link->dcache ); - ctx->replay_in_wmark = fd_dcache_compact_wmark( ctx->replay_in_mem, replay_in_link->dcache, replay_in_link->mtu ); - - /* Set up repair request output */ - ctx->gossip_out_idx = fd_topo_find_tile_out_link( topo, tile, "voter_gossip", 0 ); - FD_TEST( ctx->gossip_out_idx!=ULONG_MAX ); - fd_topo_link_t * gossip_out_link = &topo->links[ tile->out_link_id[ ctx->gossip_out_idx ] ]; - ctx->gossip_out_mcache = gossip_out_link->mcache; - ctx->gossip_out_sync = fd_mcache_seq_laddr( ctx->gossip_out_mcache ); - ctx->gossip_out_depth = fd_mcache_depth( ctx->gossip_out_mcache ); - ctx->gossip_out_seq = fd_mcache_seq_query( ctx->gossip_out_sync ); - ctx->gossip_out_mem = topo->workspaces[ topo->objs[ gossip_out_link->dcache_obj_id ].wksp_id ].wksp; - ctx->gossip_out_chunk0 = fd_dcache_compact_chunk0( ctx->gossip_out_mem, gossip_out_link->dcache ); - ctx->gossip_out_wmark = fd_dcache_compact_wmark ( ctx->gossip_out_mem, gossip_out_link->dcache, gossip_out_link->mtu ); - ctx->gossip_out_chunk = ctx->gossip_out_chunk0; - - /* Set up dedup output */ - ctx->dedup_out_idx = fd_topo_find_tile_out_link( topo, tile, "voter_dedup", 0 ); - FD_TEST( ctx->dedup_out_idx!=ULONG_MAX ); - fd_topo_link_t * dedup_out_link = &topo->links[ tile->out_link_id[ ctx->dedup_out_idx ] ]; - ctx->dedup_out_mcache = dedup_out_link->mcache; - ctx->dedup_out_sync = fd_mcache_seq_laddr( ctx->dedup_out_mcache ); - ctx->dedup_out_depth = fd_mcache_depth( ctx->dedup_out_mcache ); - ctx->dedup_out_seq = fd_mcache_seq_query( ctx->dedup_out_sync ); - ctx->dedup_out_mem = topo->workspaces[ topo->objs[ dedup_out_link->dcache_obj_id ].wksp_id ].wksp; - ctx->dedup_out_chunk0 = fd_dcache_compact_chunk0( ctx->dedup_out_mem, dedup_out_link->dcache ); - ctx->dedup_out_wmark = fd_dcache_compact_wmark ( ctx->dedup_out_mem, dedup_out_link->dcache, dedup_out_link->mtu ); - ctx->dedup_out_chunk = ctx->dedup_out_chunk0; - - /* Set up net output */ - ctx->net_out_idx = fd_topo_find_tile_out_link( topo, tile, "voter_net", 0 ); - FD_TEST( ctx->net_out_idx!=ULONG_MAX ); - fd_topo_link_t * net_out_link = &topo->links[ tile->out_link_id[ ctx->net_out_idx ] ]; - ctx->net_out_mcache = net_out_link->mcache; - ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache ); - ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache ); - ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync ); - ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out_link->dcache_obj_id ].wksp_id ].wksp; - ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, net_out_link->dcache ); - ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out_link->dcache, net_out_link->mtu ); - ctx->net_out_chunk = ctx->net_out_chunk0; + setup_input_link( ctx, topo, tile, IN_KIND_GOSSIP, "gossip_send" ); + setup_input_link( ctx, topo, tile, IN_KIND_STAKE, "stake_out" ); + setup_input_link( ctx, topo, tile, IN_KIND_REPLAY, "replay_send" ); + setup_output_link( ctx->gossip_verify_out, topo, tile, "send_txns" ); + setup_output_link( ctx->net_out, topo, tile, "send_net" ); /* Set up keyguard(s) */ - ctx->sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_voter", 0 ); - ctx->sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "voter_sign", 0 ); - FD_TEST( ctx->sign_in_idx==( tile->in_cnt-1 ) ); - fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ ctx->sign_in_idx ] ]; + ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_send", 0 ); + fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ]; + fd_sender_link_in_t * sign_in_desc = &ctx->in_links[ sign_in_idx ]; + /* *** */ sign_in_desc->kind = IN_KIND_SIGN; + + ctx->sign_out_idx = fd_topo_find_tile_out_link( topo, tile, "send_sign", 0 ); fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ ctx->sign_out_idx ] ]; if ( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client, @@ -442,6 +378,10 @@ unprivileged_init( fd_topo_t * topo, sign_in->dcache ) )==NULL ) { FD_LOG_ERR(( "Keyguard join failed" )); } + + /* init metrics */ + fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) ); + ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() ); if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) { FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) )); @@ -478,19 +418,31 @@ populate_allowed_fds( fd_topo_t const * topo, return out_cnt; } -#define STEM_BURST (1UL) +static void +metrics_write( fd_sender_tile_ctx_t * ctx ) { + /* Transaction metrics */ + FD_MCNT_SET( SENDER, TXNS_SENT_TO_LEADER, ctx->metrics.txns_sent_to_leader ); + + /* Leader metrics */ + FD_MCNT_SET( SENDER, LEADER_SCHED_NOT_FOUND, ctx->metrics.leader_sched_not_found ); + FD_MCNT_SET( SENDER, LEADER_NOT_FOUND, ctx->metrics.leader_not_found ); + FD_MCNT_SET( SENDER, LEADER_CONTACT_NOT_FOUND, ctx->metrics.leader_contact_not_found ); +} + + +#define STEM_BURST (3UL) #define STEM_CALLBACK_CONTEXT_TYPE fd_sender_tile_ctx_t #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_sender_tile_ctx_t) -#define STEM_CALLBACK_DURING_FRAG during_frag -#define STEM_CALLBACK_AFTER_FRAG after_frag +#define STEM_CALLBACK_DURING_FRAG during_frag +#define STEM_CALLBACK_AFTER_FRAG after_frag +#define STEM_CALLBACK_METRICS_WRITE metrics_write #include "../../disco/stem/fd_stem.c" fd_topo_run_tile_t fd_tile_sender = { .name = "sender", - .loose_footprint = loose_footprint, .populate_allowed_seccomp = populate_allowed_seccomp, .populate_allowed_fds = populate_allowed_fds, .scratch_align = scratch_align,