Skip to content

Commit 9cefc2b

Browse files
authored
Add support for a secondary network (#178)
* Add secondary network manager * Add secondary field to log lines * Add support for secondary network to pythd * Publish prices to both primary and secondary networks if possible. * Populate get_product_list from the secondary network if the primary is unavailable. * Populate get_product RPC response from the secondary network if the primary is unavailable. * Populate get_all_products RPC response from secondary manager if necessary. * Add staleness comment * Track is_secondary using an explicit boolean flag * Start staggering price_sched requests at fixed intervals instead of on a new slot * Send notify_price_sched RPC messages even if manager isn't connected * Use slot when upd_price message was received as publish slot for price * Increase PC_PUB_INTERVAL to match slot time
1 parent 99f74b1 commit 9cefc2b

File tree

7 files changed

+177
-37
lines changed

7 files changed

+177
-37
lines changed

pc/manager.cpp

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ using namespace pc;
99
#define PC_RPC_HTTP_PORT 8899
1010
#define PC_RECONNECT_TIMEOUT (120L*1000000000L)
1111
#define PC_BLOCKHASH_TIMEOUT 3
12-
#define PC_PUB_INTERVAL (227L*PC_NSECS_IN_MSEC)
12+
#define PC_PUB_INTERVAL (400L*PC_NSECS_IN_MSEC)
1313
#define PC_RPC_HOST "localhost"
1414
#define PC_MAX_BATCH 8
1515
// Flush partial batches if not completed within 400 ms.
@@ -76,7 +76,9 @@ manager::manager()
7676
is_pub_( false ),
7777
cmt_( commitment::e_confirmed ),
7878
max_batch_( PC_MAX_BATCH ),
79-
sreq_{ { commitment::e_processed } }
79+
sreq_{ { commitment::e_processed } },
80+
secondary_{ nullptr },
81+
is_secondary_( false )
8082
{
8183
tconn_.set_sub( this );
8284
breq_->set_sub( this );
@@ -98,6 +100,9 @@ manager::~manager()
98100
delete ptr;
99101
}
100102
svec_.clear();
103+
if ( has_secondary() ) {
104+
delete secondary_;
105+
}
101106
}
102107

103108
bool manager::tx_parser::parse( const char *, size_t len, size_t& res )
@@ -116,7 +121,7 @@ void manager::del_map_sub()
116121
{
117122
if ( --num_sub_ <= 0 && !has_status( PC_PYTH_HAS_MAPPING ) ) {
118123
set_status( PC_PYTH_HAS_MAPPING );
119-
PC_LOG_INF( "completed_mapping_init" ).end();
124+
PC_LOG_INF( "completed_mapping_init" ).add( "secondary", get_is_secondary() ).end();
120125
// notify user that initialization is complete
121126
if ( sub_ ) {
122127
sub_->on_init( this );
@@ -266,7 +271,7 @@ uint64_t manager::get_slot() const
266271

267272
void manager::teardown()
268273
{
269-
PC_LOG_INF( "pythd_teardown" ).end();
274+
PC_LOG_INF( "pythd_teardown" ).add( "secondary", get_is_secondary() ).end();
270275

271276
// shutdown listener
272277
lsvr_.close();
@@ -287,6 +292,11 @@ void manager::teardown()
287292
wconn_ = nullptr;
288293
clnt_.set_ws_conn( nullptr );
289294
}
295+
296+
// Shutdown secondary messenger
297+
if ( has_secondary() ) {
298+
get_secondary()->teardown();
299+
}
290300
}
291301

292302
bool manager::init()
@@ -299,15 +309,15 @@ bool manager::init()
299309
// log import key names
300310
key_pair *kp = get_publish_key_pair();
301311
if ( kp ) {
302-
PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).end();
312+
PC_LOG_INF( "publish_key" ).add( "key_name", *kp ).add( "secondary", get_is_secondary() ).end();
303313
}
304314
pub_key *mpub = get_mapping_pub_key();
305315
if ( mpub ) {
306-
PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).end();
316+
PC_LOG_INF( "mapping_key" ).add( "key_name", *mpub ).add( "secondary", get_is_secondary() ).end();
307317
}
308318
pub_key *gpub = get_program_pub_key();
309319
if ( gpub ) {
310-
PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).end();
320+
PC_LOG_INF( "program_key" ).add( "key_name", *gpub ).add( "secondary", get_is_secondary() ).end();
311321
}
312322

313323
// initialize capture
@@ -365,10 +375,12 @@ bool manager::init()
365375
return set_err_msg( lsvr_.get_err_msg() );
366376
}
367377
PC_LOG_INF("listening").add("port",lsvr_.get_port())
378+
.add( "secondary", get_is_secondary() )
368379
.add( "content_dir", get_content_dir() )
369380
.end();
370381
}
371382
PC_LOG_INF( "initialized" )
383+
.add( "secondary", get_is_secondary() )
372384
.add( "version", PC_VERSION )
373385
.add( "rpc_host", get_rpc_host() )
374386
.add( "tx_host", get_tx_host() )
@@ -377,9 +389,48 @@ bool manager::init()
377389
.add( "publish_interval(ms)", get_publish_interval() )
378390
.end();
379391

392+
// Initialize secondary network manager
393+
if ( has_secondary() ) {
394+
PC_LOG_INF("initializing secondary manager").end();
395+
secondary_->init();
396+
PC_LOG_INF("initialized secondary manager").end();
397+
}
398+
380399
return true;
381400
}
382401

402+
void manager::add_secondary( const std::string& rpc_host, const std::string& key_dir )
403+
{
404+
405+
manager *mgr = new manager;
406+
mgr->set_dir( key_dir );
407+
mgr->set_rpc_host( rpc_host );
408+
mgr->set_tx_host( thost_ );
409+
mgr->set_do_tx( do_tx_ );
410+
mgr->set_do_ws( do_ws_ );
411+
mgr->set_commitment( cmt_ );
412+
mgr->set_is_secondary( true );
413+
414+
secondary_ = mgr;
415+
416+
}
417+
418+
bool manager::has_secondary() const {
419+
return secondary_ != nullptr;
420+
}
421+
422+
void manager::set_is_secondary(bool is_secondary) {
423+
is_secondary_ = is_secondary;
424+
}
425+
426+
bool manager::get_is_secondary() const {
427+
return is_secondary_;
428+
}
429+
430+
manager *manager::get_secondary() {
431+
return secondary_;
432+
}
433+
383434
bool manager::get_is_tx_send() const
384435
{
385436
return tconn_.get_is_send();
@@ -527,6 +578,9 @@ void manager::poll( bool do_wait )
527578
}
528579
}
529580

581+
// request quotes from the publishers
582+
poll_schedule();
583+
530584
// try to (re)connect to tx proxy
531585
if ( do_tx_ && ( !tconn_.get_is_connect() || tconn_.get_is_err() ) ) {
532586
tconn_.reconnect();
@@ -535,17 +589,28 @@ void manager::poll( bool do_wait )
535589
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
536590
!hconn_.get_is_err() &&
537591
( !wconn_ || !wconn_->get_is_err() ) ) {
538-
// request product quotes from pythd's clients while connected
539-
poll_schedule();
540-
541592
send_pending_ups();
542593
} else {
543594
reconnect_rpc();
544595
}
596+
597+
// Call the secondary manager's poll loop if necessary
598+
if ( has_secondary() ) {
599+
secondary_->poll();
600+
}
545601
}
546602

547603
void manager::poll_schedule()
548604
{
605+
// Enable publishing mode if enough time has elapsed since last time.
606+
int64_t time_since_last_stagger = curr_ts_ - pub_ts_;
607+
if ( !is_pub_ && ( time_since_last_stagger > pub_int_ ) ) {
608+
is_pub_ = true;
609+
kidx_ = 0;
610+
pub_ts_ = curr_ts_;
611+
}
612+
613+
// Schedule the price_sched requests in a staggered fashion.
549614
while ( is_pub_ && kidx_ < kvec_.size() ) {
550615
price_sched *kptr = kvec_[kidx_];
551616
int64_t pub_ts = pub_ts_ + static_cast< int64_t >(
@@ -578,15 +643,12 @@ void manager::reconnect_rpc()
578643

579644
// check for successful (re)connect
580645
if ( !hconn_.get_is_err() && ( !wconn_ || !wconn_->get_is_err() ) ) {
581-
PC_LOG_INF( "rpc_connected" ).end();
646+
PC_LOG_INF( "rpc_connected" ).add( "secondary", get_is_secondary() ).end();
582647
set_status( PC_PYTH_RPC_CONNECTED );
583648

584649
// reset state
585650
wait_conn_ = false;
586-
is_pub_ = false;
587-
kidx_ = 0;
588651
ctimeout_ = PC_NSECS_IN_SEC;
589-
pub_ts_ = 0L;
590652
slot_ = 0L;
591653
slot_cnt_ = 0UL;
592654
slot_ts_ = 0L;
@@ -685,6 +747,7 @@ void manager::log_disconnect()
685747
{
686748
if ( hconn_.get_is_err() ) {
687749
PC_LOG_ERR( "rpc_http_reset")
750+
.add( "secondary", get_is_secondary() )
688751
.add( "error", hconn_.get_err_msg() )
689752
.add( "host", rhost_ )
690753
.add( "port", hconn_.get_port() )
@@ -693,6 +756,7 @@ void manager::log_disconnect()
693756
}
694757
if ( wconn_ && wconn_->get_is_err() ) {
695758
PC_LOG_ERR( "rpc_websocket_reset" )
759+
.add( "secondary", get_is_secondary() )
696760
.add( "error", wconn_->get_err_msg() )
697761
.add( "host", rhost_ )
698762
.add( "port", wconn_->get_port() )
@@ -770,20 +834,14 @@ void manager::on_response( rpc::get_slot *res )
770834
PC_LOG_DBG( "received get_slot" )
771835
.add( "slot", slot_ )
772836
.add( "round_trip_time(ms)", 1e-6*ack_ts )
837+
.add( "secondary", get_is_secondary() )
773838
.end();
774839

775840
// submit block hash every N slots
776841
if ( slot_cnt_++ % PC_BLOCKHASH_TIMEOUT == 0 ) {
777842
clnt_.send( breq_ );
778843
}
779844

780-
// reset submit
781-
if ( !is_pub_ ) {
782-
kidx_ = 0;
783-
pub_ts_ = ts;
784-
is_pub_ = true;
785-
}
786-
787845
// flush capture
788846
if ( do_cap_ ) {
789847
cap_.flush();
@@ -812,6 +870,7 @@ void manager::on_response( rpc::get_recent_block_hash *m )
812870
// set initialized status for block hash
813871
set_status( PC_PYTH_HAS_BLOCK_HASH );
814872
PC_LOG_INF( "received_recent_block_hash" )
873+
.add( "secondary", get_is_secondary() )
815874
.add( "curr_slot", slot_ )
816875
.add( "hash_slot", m->get_slot() )
817876
.add( "round_trip_time(ms)", 1e-6*ack_ts )
@@ -830,13 +889,15 @@ void manager::on_response( rpc::account_update *m )
830889
if ( m->get_is_http() ) {
831890
int64_t ack_ts = m->get_recv_time() - m->get_sent_time();
832891
PC_LOG_DBG( "received account_update" )
892+
.add( "secondary", get_is_secondary() )
833893
.add( "account", *m->get_account() )
834894
.add( "slot", slot_ )
835895
.add( "round_trip_time(ms)", 1e-6*ack_ts )
836896
.end();
837897
}
838898
else {
839899
PC_LOG_DBG( "received account_update" )
900+
.add( "secondary", get_is_secondary() )
840901
.add( "account", *m->get_account() )
841902
.add( "slot", slot_ )
842903
.end();
@@ -883,12 +944,14 @@ bool manager::submit_poll( request *req )
883944
}
884945
if ( req->get_is_err() ) {
885946
PC_LOG_ERR( "request error")
947+
.add( "secondary", get_is_secondary() )
886948
.add( "error", req->get_err_msg() )
887949
.end();
888950
return false;
889951
}
890952
if ( get_is_err() ) {
891953
PC_LOG_ERR( "request error")
954+
.add( "secondary", get_is_secondary() )
892955
.add( "error", get_err_msg() )
893956
.end();
894957
return false;
@@ -908,7 +971,7 @@ void manager::on_connect()
908971
void manager::on_disconnect()
909972
{
910973
// callback user with connection status
911-
PC_LOG_INF( "pyth_tx_reset" ).end();
974+
PC_LOG_INF( "pyth_tx_reset" ).add( "secondary", get_is_secondary() ).end();
912975
if ( sub_ ) {
913976
sub_->on_tx_disconnect( this );
914977
}

pc/manager.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ namespace pc
164164
// accept new pyth client apps
165165
void accept( int fd ) override;
166166

167+
// add secondary network manager
168+
void add_secondary( const std::string& rpc_host, const std::string& key_dir );
169+
167170
// shut-down server
168171
void teardown();
169172

@@ -189,6 +192,11 @@ namespace pc
189192
get_mapping *get_last_mapping() const;
190193
bool get_is_rpc_send() const;
191194

195+
bool has_secondary() const;
196+
void set_is_secondary(bool is_secondary);
197+
bool get_is_secondary() const;
198+
manager *get_secondary();
199+
192200
private:
193201

194202
struct trait_account {
@@ -279,6 +287,9 @@ namespace pc
279287

280288
// Timestamp of the last batch
281289
int64_t last_upd_ts_= 0;
290+
291+
manager *secondary_; // manager for secondary network
292+
bool is_secondary_; // flag tracking whether we are a secondary manager
282293
};
283294

284295
inline bool manager::get_is_tx_connect() const

0 commit comments

Comments
 (0)