@@ -268,6 +268,8 @@ enum IpfsEvent {
268
268
Disconnect ( MultiaddrWithPeerId , Channel < ( ) > ) ,
269
269
/// Request background task to return the listened and external addresses
270
270
GetAddresses ( OneshotSender < Vec < Multiaddr > > ) ,
271
+ PubsubAddPeer ( PeerId , OneshotSender < ( ) > ) ,
272
+ PubsubRemovePeer ( PeerId , OneshotSender < ( ) > ) ,
271
273
PubsubSubscribe ( String , OneshotSender < Option < SubscriptionStream > > ) ,
272
274
PubsubUnsubscribe ( String , OneshotSender < bool > ) ,
273
275
PubsubPublish ( String , Vec < u8 > , OneshotSender < ( ) > ) ,
@@ -755,6 +757,42 @@ impl<Types: IpfsTypes> Ipfs<Types> {
755
757
. await
756
758
}
757
759
760
+ /// Add a peer to list of nodes to propagate messages to.
761
+ ///
762
+ /// Unless a peer is added to the list in this way it will not receive any pubsub messages from this node.
763
+ pub async fn pubsub_add_peer ( & self , peer_id : PeerId ) -> Result < ( ) , Error > {
764
+ async move {
765
+ let ( tx, rx) = oneshot_channel :: < ( ) > ( ) ;
766
+
767
+ self . to_task
768
+ . clone ( )
769
+ . send ( IpfsEvent :: PubsubAddPeer ( peer_id, tx) )
770
+ . await ?;
771
+
772
+ Ok ( rx. await ?)
773
+ }
774
+ . instrument ( self . span . clone ( ) )
775
+ . await
776
+ }
777
+
778
+ /// Remove a peer from the list of nodes that messages are propagated to.
779
+ ///
780
+ /// This will not stop messages being sent to the specified peers for subscribed topics which have already been communicated.
781
+ pub async fn pubsub_remove_peer ( & self , peer_id : PeerId ) -> Result < ( ) , Error > {
782
+ async move {
783
+ let ( tx, rx) = oneshot_channel :: < ( ) > ( ) ;
784
+
785
+ self . to_task
786
+ . clone ( )
787
+ . send ( IpfsEvent :: PubsubRemovePeer ( peer_id, tx) )
788
+ . await ?;
789
+
790
+ Ok ( rx. await ?)
791
+ }
792
+ . instrument ( self . span . clone ( ) )
793
+ . await
794
+ }
795
+
758
796
/// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
759
797
/// The subscription can be unsubscribed by dropping the stream or calling
760
798
/// [`Ipfs::pubsub_unsubscribe`].
@@ -1431,6 +1469,20 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
1431
1469
// ignore error, perhaps caller went away already
1432
1470
let _ = ret. send ( addresses) ;
1433
1471
}
1472
+ IpfsEvent :: PubsubAddPeer ( peer_id, ret) => {
1473
+ self . swarm
1474
+ . behaviour_mut ( )
1475
+ . pubsub ( )
1476
+ . add_node_to_partial_view ( peer_id) ;
1477
+ let _ = ret. send ( ( ) ) ;
1478
+ }
1479
+ IpfsEvent :: PubsubRemovePeer ( peer_id, ret) => {
1480
+ self . swarm
1481
+ . behaviour_mut ( )
1482
+ . pubsub ( )
1483
+ . remove_node_from_partial_view ( & peer_id) ;
1484
+ let _ = ret. send ( ( ) ) ;
1485
+ }
1434
1486
IpfsEvent :: PubsubSubscribe ( topic, ret) => {
1435
1487
let _ = ret. send ( self . swarm . behaviour_mut ( ) . pubsub ( ) . subscribe ( topic) ) ;
1436
1488
}
@@ -1780,8 +1832,11 @@ mod node {
1780
1832
1781
1833
#[ cfg( test) ]
1782
1834
mod tests {
1835
+ use std:: time:: Duration ;
1836
+
1783
1837
use super :: * ;
1784
1838
use crate :: make_ipld;
1839
+ use futures:: { stream:: poll_immediate, StreamExt } ;
1785
1840
use multihash:: Sha2_256 ;
1786
1841
1787
1842
#[ tokio:: test]
@@ -1819,4 +1874,50 @@ mod tests {
1819
1874
ipfs. remove_pin ( & cid, false ) . await . unwrap ( ) ;
1820
1875
assert ! ( !ipfs. is_pinned( & cid) . await . unwrap( ) ) ;
1821
1876
}
1877
+
1878
+ #[ tokio:: test]
1879
+ async fn test_pubsub_send_and_receive ( ) {
1880
+ let alice = Node :: new ( "alice" ) . await ;
1881
+ let alice_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10001" . parse ( ) . unwrap ( ) ;
1882
+ alice. add_listening_address ( alice_addr) . await . unwrap ( ) ;
1883
+ let bob = Node :: new ( "bob" ) . await ;
1884
+ let bob_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10002" . parse ( ) . unwrap ( ) ;
1885
+ bob. add_listening_address ( bob_addr. clone ( ) ) . await . unwrap ( ) ;
1886
+
1887
+ let topic = String :: from ( "test_topic" ) ;
1888
+ alice
1889
+ . connect ( bob_addr. with ( Protocol :: P2p ( bob. id . into ( ) ) ) )
1890
+ . await
1891
+ . expect ( "alice failed to connect to bob" ) ;
1892
+ let _alice_messages = alice. pubsub_subscribe ( topic. clone ( ) ) . await . unwrap ( ) ;
1893
+ let mut bob_messages = poll_immediate ( bob. pubsub_subscribe ( topic. clone ( ) ) . await . unwrap ( ) ) ;
1894
+
1895
+ let data = vec ! [ 1 , 2 , 3 ] ;
1896
+
1897
+ alice
1898
+ . pubsub_publish ( topic. clone ( ) , data. clone ( ) )
1899
+ . await
1900
+ . unwrap ( ) ;
1901
+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1902
+
1903
+ assert_eq ! ( bob_messages. next( ) . await , Some ( Poll :: Pending ) ) ;
1904
+
1905
+ bob. pubsub_add_peer ( alice. id ) . await . unwrap ( ) ;
1906
+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1907
+
1908
+ assert_eq ! ( bob_messages. next( ) . await , Some ( Poll :: Pending ) ) ;
1909
+
1910
+ alice
1911
+ . pubsub_publish ( topic. clone ( ) , data. clone ( ) )
1912
+ . await
1913
+ . unwrap ( ) ;
1914
+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1915
+
1916
+ let received_data = bob_messages
1917
+ . next ( )
1918
+ . await
1919
+ . expect ( "unexpected end of stream" )
1920
+ . map ( |msg| msg. data . clone ( ) ) ;
1921
+ assert_eq ! ( received_data, Poll :: Ready ( data. clone( ) ) ) ;
1922
+ }
1822
1923
}
0 commit comments