diff --git a/network/addr.go b/network/addr.go index 694a707717..19208c8b50 100644 --- a/network/addr.go +++ b/network/addr.go @@ -34,6 +34,6 @@ func (wn *WebsocketNetwork) addrToGossipAddr(a string) (string, error) { if parsedURL.Scheme == "" { parsedURL.Scheme = "ws" } - parsedURL.Path = strings.Replace(path.Join(parsedURL.Path, GossipNetworkPath), "{genesisID}", wn.GenesisID, -1) + parsedURL.Path = strings.Replace(path.Join(parsedURL.Path, GossipNetworkPath), "{genesisID}", wn.genesisID, -1) return parsedURL.String(), nil } diff --git a/network/msgp_gen.go b/network/msgp_gen.go index df423c8746..125fa61660 100644 --- a/network/msgp_gen.go +++ b/network/msgp_gen.go @@ -3,6 +3,8 @@ package network // Code generated by github.com/algorand/msgp DO NOT EDIT. import ( + "sort" + "github.com/algorand/msgp/msgp" "github.com/algorand/go-algorand/crypto" @@ -89,6 +91,26 @@ import ( // |-----> (*) MsgIsZero // |-----> IdentityVerificationMessageSignedMaxSize() // +// peerMetaHeaders +// |-----> MarshalMsg +// |-----> CanMarshalMsg +// |-----> (*) UnmarshalMsg +// |-----> (*) UnmarshalMsgWithState +// |-----> (*) CanUnmarshalMsg +// |-----> Msgsize +// |-----> MsgIsZero +// |-----> PeerMetaHeadersMaxSize() +// +// peerMetaValues +// |-----> MarshalMsg +// |-----> CanMarshalMsg +// |-----> (*) UnmarshalMsg +// |-----> (*) UnmarshalMsgWithState +// |-----> (*) CanUnmarshalMsg +// |-----> Msgsize +// |-----> MsgIsZero +// |-----> PeerMetaValuesMaxSize() +// // MarshalMsg implements msgp.Marshaler func (z disconnectReason) MarshalMsg(b []byte) (o []byte) { @@ -1230,3 +1252,237 @@ func IdentityVerificationMessageSignedMaxSize() (s int) { s += 4 + crypto.SignatureMaxSize() return } + +// MarshalMsg implements msgp.Marshaler +func (z peerMetaHeaders) MarshalMsg(b []byte) (o []byte) { + o = msgp.Require(b, z.Msgsize()) + if z == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendMapHeader(o, uint32(len(z))) + } + za0005_keys := make([]string, 0, len(z)) + for za0005 := range z { + za0005_keys = append(za0005_keys, za0005) + } + sort.Sort(SortString(za0005_keys)) + for _, za0005 := range za0005_keys { + za0006 := z[za0005] + _ = za0006 + o = msgp.AppendString(o, za0005) + if za0006 == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendArrayHeader(o, uint32(len(za0006))) + } + for za0007 := range za0006 { + o = msgp.AppendString(o, za0006[za0007]) + } + } + return +} + +func (_ peerMetaHeaders) CanMarshalMsg(z interface{}) bool { + _, ok := (z).(peerMetaHeaders) + if !ok { + _, ok = (z).(*peerMetaHeaders) + } + return ok +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *peerMetaHeaders) UnmarshalMsgWithState(bts []byte, st msgp.UnmarshalState) (o []byte, err error) { + if st.AllowableDepth == 0 { + err = msgp.ErrMaxDepthExceeded{} + return + } + st.AllowableDepth-- + var zb0004 int + var zb0005 bool + zb0004, zb0005, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0004 > maxHeaderKeys { + err = msgp.ErrOverflow(uint64(zb0004), uint64(maxHeaderKeys)) + err = msgp.WrapError(err) + return + } + if zb0005 { + (*z) = nil + } else if (*z) == nil { + (*z) = make(peerMetaHeaders, zb0004) + } + for zb0004 > 0 { + var zb0001 string + var zb0002 peerMetaValues + zb0004-- + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + var zb0006 int + var zb0007 bool + zb0006, zb0007, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + if zb0006 > maxHeaderValues { + err = msgp.ErrOverflow(uint64(zb0006), uint64(maxHeaderValues)) + err = msgp.WrapError(err, zb0001) + return + } + if zb0007 { + zb0002 = nil + } else if zb0002 != nil && cap(zb0002) >= zb0006 { + zb0002 = (zb0002)[:zb0006] + } else { + zb0002 = make(peerMetaValues, zb0006) + } + for zb0003 := range zb0002 { + zb0002[zb0003], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, zb0003) + return + } + } + (*z)[zb0001] = zb0002 + } + o = bts + return +} + +func (z *peerMetaHeaders) UnmarshalMsg(bts []byte) (o []byte, err error) { + return z.UnmarshalMsgWithState(bts, msgp.DefaultUnmarshalState) +} +func (_ *peerMetaHeaders) CanUnmarshalMsg(z interface{}) bool { + _, ok := (z).(*peerMetaHeaders) + return ok +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z peerMetaHeaders) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for za0005, za0006 := range z { + _ = za0005 + _ = za0006 + s += 0 + msgp.StringPrefixSize + len(za0005) + msgp.ArrayHeaderSize + for za0007 := range za0006 { + s += msgp.StringPrefixSize + len(za0006[za0007]) + } + } + } + return +} + +// MsgIsZero returns whether this is a zero value +func (z peerMetaHeaders) MsgIsZero() bool { + return len(z) == 0 +} + +// MaxSize returns a maximum valid message size for this message type +func PeerMetaHeadersMaxSize() (s int) { + s += msgp.MapHeaderSize + // Adding size of map keys for z + s += maxHeaderKeys + panic("Unable to determine max size: String type za0005 is unbounded") + // Adding size of map values for z + s += maxHeaderKeys + // Calculating size of slice: za0006 + s += msgp.ArrayHeaderSize + panic("Unable to determine max size: String type is unbounded for za0006[za0007]") + return +} + +// MarshalMsg implements msgp.Marshaler +func (z peerMetaValues) MarshalMsg(b []byte) (o []byte) { + o = msgp.Require(b, z.Msgsize()) + if z == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendArrayHeader(o, uint32(len(z))) + } + for za0001 := range z { + o = msgp.AppendString(o, z[za0001]) + } + return +} + +func (_ peerMetaValues) CanMarshalMsg(z interface{}) bool { + _, ok := (z).(peerMetaValues) + if !ok { + _, ok = (z).(*peerMetaValues) + } + return ok +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *peerMetaValues) UnmarshalMsgWithState(bts []byte, st msgp.UnmarshalState) (o []byte, err error) { + if st.AllowableDepth == 0 { + err = msgp.ErrMaxDepthExceeded{} + return + } + st.AllowableDepth-- + var zb0002 int + var zb0003 bool + zb0002, zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0002 > maxHeaderValues { + err = msgp.ErrOverflow(uint64(zb0002), uint64(maxHeaderValues)) + err = msgp.WrapError(err) + return + } + if zb0003 { + (*z) = nil + } else if (*z) != nil && cap((*z)) >= zb0002 { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(peerMetaValues, zb0002) + } + for zb0001 := range *z { + (*z)[zb0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + o = bts + return +} + +func (z *peerMetaValues) UnmarshalMsg(bts []byte) (o []byte, err error) { + return z.UnmarshalMsgWithState(bts, msgp.DefaultUnmarshalState) +} +func (_ *peerMetaValues) CanUnmarshalMsg(z interface{}) bool { + _, ok := (z).(*peerMetaValues) + return ok +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z peerMetaValues) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for za0001 := range z { + s += msgp.StringPrefixSize + len(z[za0001]) + } + return +} + +// MsgIsZero returns whether this is a zero value +func (z peerMetaValues) MsgIsZero() bool { + return len(z) == 0 +} + +// MaxSize returns a maximum valid message size for this message type +func PeerMetaValuesMaxSize() (s int) { + // Calculating size of slice: z + s += msgp.ArrayHeaderSize + panic("Unable to determine max size: String type is unbounded for z[za0001]") + return +} diff --git a/network/msgp_gen_test.go b/network/msgp_gen_test.go index 1046371a1a..0144dea16e 100644 --- a/network/msgp_gen_test.go +++ b/network/msgp_gen_test.go @@ -433,3 +433,123 @@ func BenchmarkUnmarshalidentityVerificationMessageSigned(b *testing.B) { } } } + +func TestMarshalUnmarshalpeerMetaHeaders(t *testing.T) { + partitiontest.PartitionTest(t) + v := peerMetaHeaders{} + bts := v.MarshalMsg(nil) + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func TestRandomizedEncodingpeerMetaHeaders(t *testing.T) { + protocol.RunEncodingTest(t, &peerMetaHeaders{}) +} + +func BenchmarkMarshalMsgpeerMetaHeaders(b *testing.B) { + v := peerMetaHeaders{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgpeerMetaHeaders(b *testing.B) { + v := peerMetaHeaders{} + bts := make([]byte, 0, v.Msgsize()) + bts = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalpeerMetaHeaders(b *testing.B) { + v := peerMetaHeaders{} + bts := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalpeerMetaValues(t *testing.T) { + partitiontest.PartitionTest(t) + v := peerMetaValues{} + bts := v.MarshalMsg(nil) + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func TestRandomizedEncodingpeerMetaValues(t *testing.T) { + protocol.RunEncodingTest(t, &peerMetaValues{}) +} + +func BenchmarkMarshalMsgpeerMetaValues(b *testing.B) { + v := peerMetaValues{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgpeerMetaValues(b *testing.B) { + v := peerMetaValues{} + bts := make([]byte, 0, v.Msgsize()) + bts = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalpeerMetaValues(b *testing.B) { + v := peerMetaValues{} + bts := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 8d5f3e4f38..9b7156cf9f 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -18,7 +18,6 @@ package p2p import ( "context" - "encoding/base32" "fmt" "net" "net/http" @@ -89,12 +88,13 @@ type serviceImpl struct { topicsMu deadlock.RWMutex } -// AlgorandWsProtocol defines a libp2p protocol name for algorand's websockets messages -const AlgorandWsProtocol = "/algorand-ws/1.0.0" +// AlgorandWsProtocolV1 defines a libp2p protocol name for algorand's websockets messages +// as the very initial release +const AlgorandWsProtocolV1 = "/algorand-ws/1.0.0" -// algorandGUIDProtocolPrefix defines a libp2p protocol name for algorand node telemetry GUID exchange -const algorandGUIDProtocolPrefix = "/algorand-telemetry/1.0.0/" -const algorandGUIDProtocolTemplate = algorandGUIDProtocolPrefix + "%s/%s" +// AlgorandWsProtocolV22 defines a libp2p protocol name for algorand's websockets messages +// as a version supporting peer metadata and wsnet v2.2 protocol features +const AlgorandWsProtocolV22 = "/algorand-ws/2.2.0" const dialTimeout = 30 * time.Second @@ -180,18 +180,24 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) return rm, err } +// StreamHandlerPair is a struct that contains a protocol ID and a StreamHandler +type StreamHandlerPair struct { + ProtoID protocol.ID + Handler StreamHandler +} + +// StreamHandlers is an ordered list of StreamHandlerPair +type StreamHandlers []StreamHandlerPair + // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) { +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandlers StreamHandlers, metricsTracer pubsub.RawTracer) (*serviceImpl, error) { - sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService) + sm := makeStreamManager(ctx, log, h, wsStreamHandlers, cfg.EnableGossipService) h.Network().Notify(sm) - h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler) - // set an empty handler for telemetryID/telemetryInstance protocol in order to allow other peers to know our telemetryID - telemetryID := log.GetTelemetryGUID() - telemetryInstance := log.GetInstanceName() - telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance) - h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) + for _, pair := range wsStreamHandlers { + h.SetStreamHandler(pair.ProtoID, sm.streamHandler) + } ps, err := makePubSub(ctx, cfg, h, metricsTracer) if err != nil { @@ -320,35 +326,6 @@ func netAddressToListenAddress(netAddress string) (string, error) { return fmt.Sprintf("/ip4/%s/tcp/%s", ip, parts[1]), nil } -// GetPeerTelemetryInfo returns the telemetry ID of a peer by looking at its protocols -func GetPeerTelemetryInfo(peerProtocols []protocol.ID) (telemetryID string, telemetryInstance string) { - for _, protocol := range peerProtocols { - if strings.HasPrefix(string(protocol), algorandGUIDProtocolPrefix) { - telemetryInfo := string(protocol[len(algorandGUIDProtocolPrefix):]) - telemetryInfoParts := strings.Split(telemetryInfo, "/") - if len(telemetryInfoParts) == 2 { - telemetryIDBytes, err := base32.StdEncoding.DecodeString(telemetryInfoParts[0]) - if err == nil { - telemetryID = string(telemetryIDBytes) - } - telemetryInstanceBytes, err := base32.StdEncoding.DecodeString(telemetryInfoParts[1]) - if err == nil { - telemetryInstance = string(telemetryInstanceBytes) - } - return telemetryID, telemetryInstance - } - } - } - return "", "" -} - -func formatPeerTelemetryInfoProtocolName(telemetryID string, telemetryInstance string) string { - return fmt.Sprintf(algorandGUIDProtocolTemplate, - base32.StdEncoding.EncodeToString([]byte(telemetryID)), - base32.StdEncoding.EncodeToString([]byte(telemetryInstance)), - ) -} - var private6 = parseCIDR([]string{ "100::/64", "2001:2::/48", diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 9202dd9c74..9e255f7f33 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -17,15 +17,10 @@ package p2p import ( - "context" "fmt" "net" "testing" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" @@ -86,95 +81,6 @@ func TestNetAddressToListenAddress(t *testing.T) { } } -// TestP2PGetPeerTelemetryInfo tests the GetPeerTelemetryInfo function -func TestP2PGetPeerTelemetryInfo(t *testing.T) { - partitiontest.PartitionTest(t) - - testCases := []struct { - name string - peerProtocols []protocol.ID - expectedTelemetryID string - expectedTelemetryInstance string - }{ - { - name: "Valid Telemetry Info", - peerProtocols: []protocol.ID{protocol.ID(formatPeerTelemetryInfoProtocolName("telemetryID", "telemetryInstance"))}, - expectedTelemetryID: "telemetryID", - expectedTelemetryInstance: "telemetryInstance", - }, - { - name: "Partial Telemetry Info 1", - peerProtocols: []protocol.ID{protocol.ID(formatPeerTelemetryInfoProtocolName("telemetryID", ""))}, - expectedTelemetryID: "telemetryID", - expectedTelemetryInstance: "", - }, - { - name: "Partial Telemetry Info 2", - peerProtocols: []protocol.ID{protocol.ID(formatPeerTelemetryInfoProtocolName("", "telemetryInstance"))}, - expectedTelemetryID: "", - expectedTelemetryInstance: "telemetryInstance", - }, - { - name: "No Telemetry Info", - peerProtocols: []protocol.ID{protocol.ID("/some-other-protocol/1.0.0/otherID/otherInstance")}, - expectedTelemetryID: "", - expectedTelemetryInstance: "", - }, - { - name: "Invalid Telemetry Info Format", - peerProtocols: []protocol.ID{protocol.ID("/algorand-telemetry/1.0.0/invalidFormat")}, - expectedTelemetryID: "", - expectedTelemetryInstance: "", - }, - { - name: "Special Characters Telemetry Info Format", - peerProtocols: []protocol.ID{protocol.ID(formatPeerTelemetryInfoProtocolName("telemetry/ID", "123-//11-33"))}, - expectedTelemetryID: "telemetry/ID", - expectedTelemetryInstance: "123-//11-33", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - telemetryID, telemetryInstance := GetPeerTelemetryInfo(tc.peerProtocols) - if telemetryID != tc.expectedTelemetryID || telemetryInstance != tc.expectedTelemetryInstance { - t.Errorf("Expected telemetry ID: %s, telemetry instance: %s, but got telemetry ID: %s, telemetry instance: %s", - tc.expectedTelemetryID, tc.expectedTelemetryInstance, telemetryID, telemetryInstance) - } - }) - } -} - -func TestP2PProtocolAsMeta(t *testing.T) { - partitiontest.PartitionTest(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h1, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) - require.NoError(t, err) - defer h1.Close() - - h1TID := "telemetryID1" - h1Inst := "telemetryInstance2" - telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(h1TID, h1Inst) - h1.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) - - h2, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) - require.NoError(t, err) - defer h2.Close() - - err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}) - require.NoError(t, err) - - protos, err := h2.Peerstore().GetProtocols(h1.ID()) - require.NoError(t, err) - - tid, inst := GetPeerTelemetryInfo(protos) - require.Equal(t, h1TID, tid) - require.Equal(t, h1Inst, inst) -} - func TestP2PPrivateAddresses(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() diff --git a/network/p2p/streams.go b/network/p2p/streams.go index f271f00ffa..b44c48c6e3 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -18,6 +18,7 @@ package p2p import ( "context" + "fmt" "io" "github.com/algorand/go-algorand/logging" @@ -25,6 +26,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" ) @@ -33,7 +35,7 @@ type streamManager struct { ctx context.Context log logging.Logger host host.Host - handler StreamHandler + handlers StreamHandlers allowIncomingGossip bool streams map[peer.ID]network.Stream @@ -43,12 +45,12 @@ type streamManager struct { // StreamHandler is called when a new bidirectional stream for a given protocol and peer is opened. type StreamHandler func(ctx context.Context, pid peer.ID, s network.Stream, incoming bool) -func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler, allowIncomingGossip bool) *streamManager { +func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handlers StreamHandlers, allowIncomingGossip bool) *streamManager { return &streamManager{ ctx: ctx, log: log, host: h, - handler: handler, + handlers: handlers, allowIncomingGossip: allowIncomingGossip, streams: make(map[peer.ID]network.Stream), } @@ -83,7 +85,10 @@ func (n *streamManager) streamHandler(stream network.Stream) { n.streams[stream.Conn().RemotePeer()] = stream incoming := stream.Conn().Stat().Direction == network.DirInbound - n.handler(n.ctx, remotePeer, stream, incoming) + if err1 := n.dispatch(n.ctx, remotePeer, stream, incoming); err1 != nil { + n.log.Errorln(err1.Error()) + _ = stream.Reset() + } return } // otherwise, the old stream is still open, so we can close the new one @@ -93,51 +98,71 @@ func (n *streamManager) streamHandler(stream network.Stream) { // no old stream n.streams[stream.Conn().RemotePeer()] = stream incoming := stream.Conn().Stat().Direction == network.DirInbound - n.handler(n.ctx, remotePeer, stream, incoming) + if err := n.dispatch(n.ctx, remotePeer, stream, incoming); err != nil { + n.log.Errorln(err.Error()) + _ = stream.Reset() + } +} + +// dispatch the stream to the appropriate handler +func (n *streamManager) dispatch(ctx context.Context, remotePeer peer.ID, stream network.Stream, incoming bool) error { + for _, pair := range n.handlers { + if pair.ProtoID == stream.Protocol() { + pair.Handler(ctx, remotePeer, stream, incoming) + return nil + } + } + n.log.Errorf("No handler for protocol %s, peer %s", stream.Protocol(), remotePeer) + return fmt.Errorf("%s: no handler for protocol %s, peer %s", n.host.ID().String(), stream.Protocol(), remotePeer) } // Connected is called when a connection is opened // for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections. func (n *streamManager) Connected(net network.Network, conn network.Conn) { - if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip { - n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String()) - return - } remotePeer := conn.RemotePeer() localPeer := n.host.ID() + if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip { + n.log.Debugf("%s: ignoring incoming connection from %s", localPeer.String(), remotePeer.String()) + return + } + // ensure that only one of the peers initiates the stream if localPeer > remotePeer { + n.log.Debugf("%s: ignoring a lesser peer ID %s", localPeer.String(), remotePeer.String()) return } - needUnlock := true n.streamsLock.Lock() - defer func() { - if needUnlock { - n.streamsLock.Unlock() - } - }() _, ok := n.streams[remotePeer] if ok { + n.streamsLock.Unlock() + n.log.Debugf("%s: already have a stream to/from %s", localPeer.String(), remotePeer.String()) return // there's already an active stream with this peer for our protocol } - stream, err := n.host.NewStream(n.ctx, remotePeer, AlgorandWsProtocol) + protos := []protocol.ID{} + for _, pair := range n.handlers { + protos = append(protos, pair.ProtoID) + } + stream, err := n.host.NewStream(n.ctx, remotePeer, protos...) if err != nil { - n.log.Infof("Failed to open stream to %s (%s): %v", remotePeer, conn.RemoteMultiaddr().String(), err) + n.log.Infof("%s: failed to open stream to %s (%s): %v", localPeer.String(), remotePeer, conn.RemoteMultiaddr().String(), err) + n.streamsLock.Unlock() return } n.streams[remotePeer] = stream - - // release the lock to let handler do its thing - // otherwise reading/writing to the stream will deadlock - needUnlock = false n.streamsLock.Unlock() + n.log.Infof("%s: using protocol %s with peer %s", localPeer.String(), stream.Protocol(), remotePeer.String()) + incoming := stream.Conn().Stat().Direction == network.DirInbound - n.handler(n.ctx, remotePeer, stream, incoming) + err = n.dispatch(n.ctx, remotePeer, stream, incoming) + if err != nil { + n.log.Errorln(err.Error()) + _ = stream.Reset() + } } // Disconnected is called when a connection is closed diff --git a/network/p2pMetainfo.go b/network/p2pMetainfo.go new file mode 100644 index 0000000000..02a931d259 --- /dev/null +++ b/network/p2pMetainfo.go @@ -0,0 +1,126 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "encoding/binary" + "fmt" + "io" + "math" + "net/http" + + "github.com/algorand/go-algorand/data/basics" + "github.com/libp2p/go-libp2p/core/peer" +) + +// peerMetaHeaders holds peer metadata headers similar to wsnet http.Header +// due to msgp allocbound enforcement we need to limit the number of headers and values +// but it is cannot be done without msgp modification to accept both map and slice allocbound +// so that introduce a service peerMetaValues type to have allocbound set and msgp generator satisfied. + +const maxHeaderKeys = 64 +const maxHeaderValues = 16 + +// SortString is a type that implements sort.Interface for sorting strings +type SortString = basics.SortString + +//msgp:allocbound peerMetaValues maxHeaderValues +type peerMetaValues []string + +//msgp:allocbound peerMetaHeaders maxHeaderKeys +type peerMetaHeaders map[string]peerMetaValues + +func peerMetaHeadersToHTTPHeaders(headers peerMetaHeaders) http.Header { + httpHeaders := make(http.Header, len(headers)) + for k, v := range headers { + httpHeaders[k] = v + } + return httpHeaders +} + +func peerMetaHeadersFromHTTPHeaders(headers http.Header) peerMetaHeaders { + pmh := make(peerMetaHeaders, len(headers)) + for k, v := range headers { + pmh[k] = v + } + return pmh +} + +type peerMetaInfo struct { + telemetryID string + instanceName string + version string + features string +} + +func readPeerMetaHeaders(stream io.ReadWriter, p2pPeer peer.ID, netProtoSupportedVersions []string) (peerMetaInfo, error) { + var msgLenBytes [2]byte + rn, err := stream.Read(msgLenBytes[:]) + if rn != 2 || err != nil { + err0 := fmt.Errorf("error reading response message length from peer %s: %w", p2pPeer, err) + return peerMetaInfo{}, err0 + } + + msgLen := binary.BigEndian.Uint16(msgLenBytes[:]) + msgBytes := make([]byte, msgLen) + rn, err = stream.Read(msgBytes[:]) + if rn != int(msgLen) || err != nil { + err0 := fmt.Errorf("error reading response message from peer %s: %w, expected: %d, read: %d", p2pPeer, err, msgLen, rn) + return peerMetaInfo{}, err0 + } + var responseHeaders peerMetaHeaders + _, err = responseHeaders.UnmarshalMsg(msgBytes[:]) + if err != nil { + err0 := fmt.Errorf("error unmarshaling response message from peer %s: %w", p2pPeer, err) + return peerMetaInfo{}, err0 + } + headers := peerMetaHeadersToHTTPHeaders(responseHeaders) + matchingVersion, _ := checkProtocolVersionMatch(headers, netProtoSupportedVersions) + if matchingVersion == "" { + err0 := fmt.Errorf("peer %s does not support any of the supported protocol versions: %v", p2pPeer, netProtoSupportedVersions) + return peerMetaInfo{}, err0 + } + return peerMetaInfo{ + telemetryID: headers.Get(TelemetryIDHeader), + instanceName: headers.Get(InstanceNameHeader), + version: matchingVersion, + features: headers.Get(PeerFeaturesHeader), + }, nil +} + +func writePeerMetaHeaders(stream io.ReadWriter, p2pPeer peer.ID, networkProtoVersion string, pmp peerMetadataProvider) error { + header := make(http.Header) + setHeaders(header, networkProtoVersion, pmp) + meta := peerMetaHeadersFromHTTPHeaders(header) + data := meta.MarshalMsg(nil) + length := len(data) + if length > math.MaxUint16 { + // 64k is enough for everyone + // current headers size is 250 bytes + msg := fmt.Sprintf("error writing initial message, too large: %v, peer %s", header, p2pPeer) + panic(msg) + } + metaMsg := make([]byte, 2+length) + binary.BigEndian.PutUint16(metaMsg, uint16(length)) + copy(metaMsg[2:], data) + _, err := stream.Write(metaMsg) + if err != nil { + err0 := fmt.Errorf("error sending initial message: %w", err) + return err0 + } + return nil +} diff --git a/network/p2pMetainfo_test.go b/network/p2pMetainfo_test.go new file mode 100644 index 0000000000..08e8ada943 --- /dev/null +++ b/network/p2pMetainfo_test.go @@ -0,0 +1,177 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "encoding/binary" + "fmt" + "net/http" + "testing" + + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockStream is a io.ReaderWriter testing mock +type MockStream struct { + mock.Mock +} + +func (m *MockStream) Read(p []byte) (n int, err error) { + args := m.Called(p) + arg0 := args.Get(0).([]byte) + copy(p, arg0) + return len(arg0), args.Error(1) +} + +func (m *MockStream) Write(p []byte) (n int, err error) { + args := m.Called(p) + return args.Int(0), args.Error(1) +} + +func TestReadPeerMetaHeaders(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockStream := new(MockStream) + p2pPeer := peer.ID("mockPeer") + n := &P2PNetwork{ + log: logging.Base(), + supportedProtocolVersions: []string{"1.0", "2.2"}, + } + + httpHeaders := make(http.Header) + httpHeaders.Set(TelemetryIDHeader, "mockTelemetryID") + httpHeaders.Set(InstanceNameHeader, "mockInstanceName") + httpHeaders.Set(ProtocolVersionHeader, "1.0") + httpHeaders.Set(ProtocolAcceptVersionHeader, "1.0") + httpHeaders.Set(PeerFeaturesHeader, "mockFeatures") + headers := peerMetaHeadersFromHTTPHeaders(httpHeaders) + data := headers.MarshalMsg(nil) + length := uint16(len(data)) + lengthBytes := make([]byte, 2) + binary.BigEndian.PutUint16(lengthBytes, length) + + mockStream.On("Read", mock.Anything).Return(lengthBytes, nil).Once() + mockStream.On("Read", mock.Anything).Return(data, nil).Once() + + metaInfo, err := readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.NoError(t, err) + assert.Equal(t, "mockTelemetryID", metaInfo.telemetryID) + assert.Equal(t, "mockInstanceName", metaInfo.instanceName) + assert.Equal(t, "1.0", metaInfo.version) + assert.Equal(t, "mockFeatures", metaInfo.features) + mockStream.AssertExpectations(t) + + // Error case: incomplete length read + mockStream = new(MockStream) + mockStream.On("Read", mock.Anything).Return([]byte{1}, nil).Once() + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error reading response message length") + mockStream.AssertExpectations(t) + + // Error case: error reading length + mockStream = new(MockStream) + mockStream.On("Read", mock.Anything).Return([]byte{}, fmt.Errorf("read error")).Once() + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error reading response message length") + mockStream.AssertExpectations(t) + + // Error case: incomplete message read + mockStream = new(MockStream) + mockStream.On("Read", mock.Anything).Return(lengthBytes, nil).Once() + mockStream.On("Read", mock.Anything).Return(data[:len(data)/2], nil).Once() // Return only half the data + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error reading response message") + mockStream.AssertExpectations(t) + + // Error case: error reading message + mockStream = new(MockStream) + mockStream.On("Read", mock.Anything).Return(lengthBytes, nil).Once() + mockStream.On("Read", mock.Anything).Return([]byte{}, fmt.Errorf("read error")).Once() + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error reading response message") + mockStream.AssertExpectations(t) + + // Error case: invalid messagepack (unmarshaling error) + mockStream = new(MockStream) + corruptedMsgpLength := make([]byte, 2) + binary.BigEndian.PutUint16(corruptedMsgpLength, uint16(3)) + mockStream.On("Read", mock.Anything).Return(corruptedMsgpLength, nil).Once() + mockStream.On("Read", mock.Anything).Return([]byte{0x99, 0x01, 0x02}, nil).Once() + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error unmarshaling response message") + mockStream.AssertExpectations(t) + + // Error case: no matching protocol version + mockStream = new(MockStream) + incompatibleHeaders := make(http.Header) + incompatibleHeaders.Set(ProtocolVersionHeader, "99.0") // Unsupported version + incompatibleHeaders.Set(ProtocolAcceptVersionHeader, "99.0") + incompatibleData := peerMetaHeadersFromHTTPHeaders(incompatibleHeaders).MarshalMsg(nil) + incompatibleLength := uint16(len(incompatibleData)) + incompatibleLengthBytes := make([]byte, 2) + binary.BigEndian.PutUint16(incompatibleLengthBytes, incompatibleLength) + + mockStream.On("Read", mock.Anything).Return(incompatibleLengthBytes, nil).Once() + mockStream.On("Read", mock.Anything).Return(incompatibleData, nil).Once() + _, err = readPeerMetaHeaders(mockStream, p2pPeer, n.supportedProtocolVersions) + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not support any of the supported protocol versions") + mockStream.AssertExpectations(t) +} + +func TestWritePeerMetaHeaders(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + mockStream := new(MockStream) + p2pPeer := peer.ID("mockPeer") + n := &P2PNetwork{ + log: logging.Base(), + } + + header := make(http.Header) + setHeaders(header, "1.0", n) + meta := peerMetaHeadersFromHTTPHeaders(header) + data := meta.MarshalMsg(nil) + length := uint16(len(data)) + lengthBytes := make([]byte, 2) + binary.BigEndian.PutUint16(lengthBytes, length) + + mockStream.On("Write", append(lengthBytes, data...)).Return(len(lengthBytes)+len(data), nil).Once() + + err := writePeerMetaHeaders(mockStream, p2pPeer, "1.0", n) + assert.NoError(t, err) + mockStream.AssertExpectations(t) + + // Error case: write error + mockStream = new(MockStream) + mockStream.On("Write", mock.Anything).Return(0, fmt.Errorf("write error")).Once() + err = writePeerMetaHeaders(mockStream, p2pPeer, "1.0", n) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error sending initial message") + mockStream.AssertExpectations(t) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 3e19fecd92..f6408e75ec 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -48,6 +48,9 @@ import ( // some arbitrary number TODO: figure out a better value based on peerSelector/fetcher algorithm const numArchivalPeersToFind = 4 +// disableV22Protocol is a flag for testing in order to test v1 node can communicate with v1 + v22 node +var disableV22Protocol = false + // P2PNetwork implements the GossipNode interface type P2PNetwork struct { service p2p.Service @@ -87,6 +90,13 @@ type P2PNetwork struct { httpServer *p2p.HTTPServer identityTracker identityTracker + + // supportedProtocolVersions defines versions supported by this network. + // Should be used instead of a global network.SupportedProtocolVersions for network/peers configuration + supportedProtocolVersions []string + + // protocolVersion is an actual version announced as ProtocolVersionHeader + protocolVersion string } type bootstrapper struct { @@ -258,6 +268,16 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo net.identityTracker = noopIdentityTracker{} } + // set our supported versions + if net.config.NetworkProtocolVersion != "" { + net.supportedProtocolVersions = []string{net.config.NetworkProtocolVersion} + } else { + net.supportedProtocolVersions = SupportedProtocolVersions + } + + // set our actual version + net.protocolVersion = ProtocolVersion + err = p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) if err != nil { return nil, err @@ -269,7 +289,21 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) - net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, pubsubMetricsTracer{}) + // TODO: remove after consensus v41 takes effect. + // ordered list of supported protocol versions + hm := p2p.StreamHandlers{} + if !disableV22Protocol { + hm = append(hm, p2p.StreamHandlerPair{ + ProtoID: p2p.AlgorandWsProtocolV22, + Handler: net.wsStreamHandlerV22, + }) + } + hm = append(hm, p2p.StreamHandlerPair{ + ProtoID: p2p.AlgorandWsProtocolV1, + Handler: net.wsStreamHandlerV1, + }) + // END TODO + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, hm, pubsubMetricsTracer{}) if err != nil { return nil, err } @@ -754,11 +788,47 @@ func (n *P2PNetwork) OnNetworkAdvance() { } } +// TelemetryGUID returns the telemetry GUID of this node. +func (n *P2PNetwork) TelemetryGUID() string { + return n.log.GetTelemetryGUID() +} + +// InstanceName returns the instance name of this node. +func (n *P2PNetwork) InstanceName() string { + return n.log.GetInstanceName() +} + +// GenesisID returns the genesis ID of this node. +func (n *P2PNetwork) GenesisID() string { + return n.genesisID +} + +// SupportedProtoVersions returns the supported protocol versions of this node. +func (n *P2PNetwork) SupportedProtoVersions() []string { + return n.supportedProtocolVersions +} + +// RandomID satisfies the interface but is not used in P2PNetwork. +func (n *P2PNetwork) RandomID() string { + return "" +} + +// PublicAddress satisfies the interface but is not used in P2PNetwork. +func (n *P2PNetwork) PublicAddress() string { + return "" +} + +// Config returns the configuration of this node. +func (n *P2PNetwork) Config() config.Local { + return n.config +} + // wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a // stream for the websocket protocol. -func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) { - if stream.Protocol() != p2p.AlgorandWsProtocol { - n.log.Warnf("unknown protocol %s from peer%s", stream.Protocol(), p2pPeer) +// TODO: remove after consensus v41 takes effect. +func (n *P2PNetwork) wsStreamHandlerV1(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) { + if stream.Protocol() != p2p.AlgorandWsProtocolV1 { + n.log.Warnf("unknown protocol %s from peer %s", stream.Protocol(), p2pPeer) return } @@ -766,17 +836,60 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea var initMsg [1]byte rn, err := stream.Read(initMsg[:]) if rn == 0 || err != nil { - n.log.Warnf("wsStreamHandler: error reading initial message: %s, peer %s (%s)", err, p2pPeer, stream.Conn().RemoteMultiaddr().String()) + n.log.Warnf("wsStreamHandlerV1: error reading initial message from peer %s (%s): %v", p2pPeer, stream.Conn().RemoteMultiaddr().String(), err) return } } else { _, err := stream.Write([]byte("1")) if err != nil { - n.log.Warnf("wsStreamHandler: error sending initial message: %s", err) + n.log.Warnf("wsStreamHandlerV1: error sending initial message: %v", err) return } } + n.baseWsStreamHandler(ctx, p2pPeer, stream, incoming, peerMetaInfo{}) +} + +func (n *P2PNetwork) wsStreamHandlerV22(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) { + if stream.Protocol() != p2p.AlgorandWsProtocolV22 { + n.log.Warnf("unknown protocol %s from peer%s", stream.Protocol(), p2pPeer) + return + } + + var err error + var pmi peerMetaInfo + if incoming { + pmi, err = readPeerMetaHeaders(stream, p2pPeer, n.supportedProtocolVersions) + if err != nil { + n.log.Warnf("wsStreamHandlerV22: error reading peer meta headers response from peer %s (%s): %v", p2pPeer, stream.Conn().RemoteMultiaddr().String(), err) + _ = stream.Reset() + return + } + err = writePeerMetaHeaders(stream, p2pPeer, pmi.version, n) + if err != nil { + n.log.Warnf("wsStreamHandlerV22: error writing peer meta headers response to peer %s (%s): %v", p2pPeer, stream.Conn().RemoteMultiaddr().String(), err) + _ = stream.Reset() + return + } + } else { + err = writePeerMetaHeaders(stream, p2pPeer, n.protocolVersion, n) + if err != nil { + n.log.Warnf("wsStreamHandlerV22: error writing peer meta headers response to peer %s (%s): %v", p2pPeer, stream.Conn().RemoteMultiaddr().String(), err) + _ = stream.Reset() + return + } + // read the response + pmi, err = readPeerMetaHeaders(stream, p2pPeer, n.supportedProtocolVersions) + if err != nil { + n.log.Warnf("wsStreamHandlerV22: error reading peer meta headers response from peer %s (%s): %v", p2pPeer, stream.Conn().RemoteMultiaddr().String(), err) + _ = stream.Reset() + return + } + } + n.baseWsStreamHandler(ctx, p2pPeer, stream, incoming, pmi) +} + +func (n *P2PNetwork) baseWsStreamHandler(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool, pmi peerMetaInfo) { // get address for peer ID ma := stream.Conn().RemoteMultiaddr() addr := ma.String() @@ -803,17 +916,15 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea } peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr) wsp := &wsPeer{ - wsPeerCore: peerCore, - conn: &wsPeerConnP2P{stream: stream}, - outgoing: !incoming, - identity: netIdentPeerID, - peerType: peerTypeP2P, - } - protos, err := n.pstore.GetProtocols(p2pPeer) - if err != nil { - n.log.Warnf("Error getting protocols for peer %s: %v", p2pPeer, err) + wsPeerCore: peerCore, + conn: &wsPeerConnP2P{stream: stream}, + outgoing: !incoming, + identity: netIdentPeerID, + peerType: peerTypeP2P, + TelemetryGUID: pmi.telemetryID, + InstanceName: pmi.instanceName, + features: decodePeerFeatures(pmi.version, pmi.features), } - wsp.TelemetryGUID, wsp.InstanceName = p2p.GetPeerTelemetryInfo(protos) localAddr, has := n.Address() if !has { diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d78b9a7573..0429ecdaa1 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -40,6 +40,7 @@ import ( "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util/uuid" "github.com/algorand/go-deadlock" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -106,7 +107,7 @@ func TestP2PSubmitTX(t *testing.T) { ) require.Eventually(t, func() bool { return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() - }, 2*time.Second, 50*time.Millisecond) + }, 5*time.Second, 50*time.Millisecond) // for some reason the above check is not enough in race builds on CI time.Sleep(time.Second) // give time for peers to connect. @@ -199,7 +200,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { require.Eventually(t, func() bool { return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() - }, 2*time.Second, 50*time.Millisecond) + }, 5*time.Second, 50*time.Millisecond) time.Sleep(time.Second) // give time for peers to connect. @@ -282,7 +283,7 @@ func TestP2PSubmitWS(t *testing.T) { require.Eventually(t, func() bool { return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() - }, 2*time.Second, 50*time.Millisecond) + }, 5*time.Second, 50*time.Millisecond) time.Sleep(time.Second) // give time for peers to connect. @@ -1460,3 +1461,110 @@ func TestGetPeersFiltersSelf(t *testing.T) { } } } + +// TestP2PMetainfoExchange checks that the metainfo exchange works correctly +func TestP2PMetainfoExchange(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + cfg.NetAddress = "127.0.0.1:0" + cfg.EnableVoteCompression = true + log := logging.TestingLog(t) + err := log.EnableTelemetryContext(context.Background(), logging.TelemetryConfig{Enable: true, SendToLog: true, GUID: uuid.New()}) + require.NoError(t, err) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + cfg2 := cfg + cfg2.EnableVoteCompression = false + cfg.NetAddress = "" + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + netB, err := NewP2PNetwork(log, cfg2, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netB.Start() + require.NoError(t, err) + defer netB.Stop() + + require.Eventually(t, func() bool { + return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0 + }, 2*time.Second, 50*time.Millisecond) + + peers := netA.GetPeers(PeersConnectedIn) + require.Len(t, peers, 1) + peer := peers[0].(*wsPeer) + require.True(t, peer.features&pfCompressedProposal != 0) + require.False(t, peer.vpackVoteCompressionSupported()) + + peers = netB.GetPeers(PeersConnectedOut) + require.Len(t, peers, 1) + peer = peers[0].(*wsPeer) + require.True(t, peer.features&pfCompressedProposal != 0) + require.True(t, peer.vpackVoteCompressionSupported()) +} + +// TestP2PMetainfoV1vsV22 checks v1 and v22 nodes works together. +// It is done with setting disableV22Protocol=true for the second node, +// and it renders EnableVoteCompression options to have no effect. +func TestP2PMetainfoV1vsV22(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + cfg.NetAddress = "127.0.0.1:0" + cfg.EnableVoteCompression = true + log := logging.TestingLog(t) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + cfg2 := cfg + cfg2.EnableVoteCompression = true + cfg.NetAddress = "" + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + disableV22Protocol = true + defer func() { + disableV22Protocol = false + }() + netB, err := NewP2PNetwork(log, cfg2, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + err = netB.Start() + require.NoError(t, err) + defer netB.Stop() + + require.Eventually(t, func() bool { + return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0 + }, 2*time.Second, 50*time.Millisecond) + + var peers []Peer + require.Eventually(t, func() bool { + peers = netA.GetPeers(PeersConnectedIn) + return len(peers) > 0 + }, 2*time.Second, 50*time.Millisecond) + peer := peers[0].(*wsPeer) + require.False(t, peer.features&pfCompressedProposal != 0) + require.False(t, peer.vpackVoteCompressionSupported()) + + peers = netB.GetPeers(PeersConnectedOut) + require.Len(t, peers, 1) + peer = peers[0].(*wsPeer) + require.False(t, peer.features&pfCompressedProposal != 0) + require.False(t, peer.vpackVoteCompressionSupported()) +} diff --git a/network/requestLogger_test.go b/network/requestLogger_test.go index c5f3967197..04e288bca6 100644 --- a/network/requestLogger_test.go +++ b/network/requestLogger_test.go @@ -53,7 +53,7 @@ func TestRequestLogger(t *testing.T) { log: dl, config: defaultConfig, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: "go-test-network-genesis", + genesisID: "go-test-network-genesis", NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: noopIdentityTracker{}, diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go index a73646ae45..8ef12ad714 100644 --- a/network/requestTracker_test.go +++ b/network/requestTracker_test.go @@ -90,7 +90,7 @@ func TestRateLimiting(t *testing.T) { log: log, config: testConfig, phonebook: phonebook.MakePhonebook(1, 1), - GenesisID: "go-test-network-genesis", + genesisID: "go-test-network-genesis", NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: noopIdentityTracker{}, diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 3dc8b0849a..38bdad2b8f 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -176,9 +176,9 @@ type WebsocketNetwork struct { phonebook phonebook.Phonebook - GenesisID string + genesisID string NetworkID protocol.NetworkID - RandomID string + randomID string ready atomic.Int32 readyChan chan struct{} @@ -611,7 +611,7 @@ func (wn *WebsocketNetwork) setup() { var rbytes [10]byte crypto.RandBytes(rbytes[:]) - wn.RandomID = base64.StdEncoding.EncodeToString(rbytes[:]) + wn.randomID = base64.StdEncoding.EncodeToString(rbytes[:]) if wn.config.EnableIncomingMessageFilter { wn.incomingMsgFilter = makeMessageFilter(wn.config.IncomingMessageFilterBucketCount, wn.config.IncomingMessageFilterBucketSize) @@ -719,7 +719,7 @@ func (wn *WebsocketNetwork) Start() error { go wn.postMessagesOfInterestThread() - wn.log.Infof("serving genesisID=%s on %#v with RandomID=%s", wn.GenesisID, wn.PublicAddress(), wn.RandomID) + wn.log.Infof("serving genesisID=%s on %#v with RandomID=%s", wn.genesisID, wn.PublicAddress(), wn.randomID) return nil } @@ -813,40 +813,99 @@ func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageVa func (wn *WebsocketNetwork) ClearValidatorHandlers() { } -func (wn *WebsocketNetwork) setHeaders(header http.Header) { - localTelemetryGUID := wn.log.GetTelemetryGUID() - localInstanceName := wn.log.GetInstanceName() - header.Set(TelemetryIDHeader, localTelemetryGUID) - header.Set(InstanceNameHeader, localInstanceName) - header.Set(AddressHeader, wn.PublicAddress()) - header.Set(NodeRandomHeader, wn.RandomID) +type peerMetadataProvider interface { + TelemetryGUID() string + InstanceName() string + GenesisID() string + PublicAddress() string + RandomID() string + SupportedProtoVersions() []string + Config() config.Local +} + +// TelemetryGUID returns the telemetry GUID of this node. +func (wn *WebsocketNetwork) TelemetryGUID() string { + return wn.log.GetTelemetryGUID() +} + +// InstanceName returns the instance name of this node. +func (wn *WebsocketNetwork) InstanceName() string { + return wn.log.GetInstanceName() +} + +// GenesisID returns the genesis ID of this node. +func (wn *WebsocketNetwork) GenesisID() string { + return wn.genesisID +} + +// RandomID returns the random ID of this node. +func (wn *WebsocketNetwork) RandomID() string { + return wn.randomID +} + +// SupportedProtoVersions returns the supported protocol versions of this node. +func (wn *WebsocketNetwork) SupportedProtoVersions() []string { + return wn.supportedProtocolVersions +} + +// Config returns the configuration of this node. +func (wn *WebsocketNetwork) Config() config.Local { + return wn.config +} + +func setHeaders(header http.Header, netProtoVer string, meta peerMetadataProvider) { + header.Set(TelemetryIDHeader, meta.TelemetryGUID()) + header.Set(InstanceNameHeader, meta.InstanceName()) + if pa := meta.PublicAddress(); pa != "" { + header.Set(AddressHeader, pa) + } + if rid := meta.RandomID(); rid != "" { + header.Set(NodeRandomHeader, rid) + } + header.Set(GenesisHeader, meta.GenesisID()) + + // set the features header (comma-separated list) + header.Set(PeerFeaturesHeader, PeerFeatureProposalCompression) + features := []string{PeerFeatureProposalCompression} + if meta.Config().EnableVoteCompression { + features = append(features, PeerFeatureVoteVpackCompression) + } + header.Set(PeerFeaturesHeader, strings.Join(features, ",")) + + if netProtoVer != "" { + // for backward compatibility, include the ProtocolVersion header in request as well. + header.Set(ProtocolVersionHeader, netProtoVer) + } + for _, v := range meta.SupportedProtoVersions() { + header.Add(ProtocolAcceptVersionHeader, v) + } } // checkServerResponseVariables check that the version and random-id in the request headers matches the server ones. // it returns true if it's a match, and false otherwise. func (wn *WebsocketNetwork) checkServerResponseVariables(otherHeader http.Header, addr string) (bool, string) { - matchingVersion, otherVersion := wn.checkProtocolVersionMatch(otherHeader) + matchingVersion, otherVersion := checkProtocolVersionMatch(otherHeader, wn.supportedProtocolVersions) if matchingVersion == "" { wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", addr, wn.supportedProtocolVersions, otherVersion, otherHeader))) return false, "" } otherRandom := otherHeader.Get(NodeRandomHeader) - if otherRandom == wn.RandomID || otherRandom == "" { + if otherRandom == wn.randomID || otherRandom == "" { // This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out. if otherRandom == "" { // missing header. - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", addr, wn.RandomID, otherHeader))) + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", addr, wn.randomID, otherHeader))) } else { - wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", addr, wn.RandomID) + wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", addr, wn.randomID) } return false, "" } otherGenesisID := otherHeader.Get(GenesisHeader) - if wn.GenesisID != otherGenesisID { + if wn.genesisID != otherGenesisID { if otherGenesisID != "" { - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", addr, wn.GenesisID, otherGenesisID, otherHeader))) + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", addr, wn.genesisID, otherGenesisID, otherHeader))) } else { - wn.log.Warnf("new peer %#v did not include genesis header in response. mine=%#v headers %#v", addr, wn.GenesisID, otherHeader) + wn.log.Warnf("new peer %#v did not include genesis header in response. mine=%#v headers %#v", addr, wn.genesisID, otherHeader) } return false, "" } @@ -896,17 +955,17 @@ func (wn *WebsocketNetwork) checkIncomingConnectionLimits(response http.Response } // checkProtocolVersionMatch test ProtocolAcceptVersionHeader and ProtocolVersionHeader headers from the request/response and see if it can find a match. -func (wn *WebsocketNetwork) checkProtocolVersionMatch(otherHeaders http.Header) (matchingVersion string, otherVersion string) { +func checkProtocolVersionMatch(otherHeaders http.Header, ourSupportedProtocolVersions []string) (matchingVersion string, otherVersion string) { otherAcceptedVersions := otherHeaders[textproto.CanonicalMIMEHeaderKey(ProtocolAcceptVersionHeader)] for _, otherAcceptedVersion := range otherAcceptedVersions { // do we have a matching version ? - if slices.Contains(wn.supportedProtocolVersions, otherAcceptedVersion) { + if slices.Contains(ourSupportedProtocolVersions, otherAcceptedVersion) { return otherAcceptedVersion, "" } } otherVersion = otherHeaders.Get(ProtocolVersionHeader) - if slices.Contains(wn.supportedProtocolVersions, otherVersion) { + if slices.Contains(ourSupportedProtocolVersions, otherVersion) { return otherVersion, otherVersion } @@ -926,8 +985,8 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo return http.StatusNotFound } - if wn.GenesisID != otherGenesisID { - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", remoteAddrForLogging, wn.GenesisID, otherGenesisID, request.Header))) + if wn.genesisID != otherGenesisID { + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %#v genesis mismatch, mine=%#v theirs=%#v, headers %#v", remoteAddrForLogging, wn.genesisID, otherGenesisID, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching genesis-id"}) response.WriteHeader(http.StatusPreconditionFailed) n, err := response.Write([]byte("mismatching genesis ID")) @@ -942,7 +1001,7 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo // This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out. var message string // missing header. - wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", remoteAddrForLogging, wn.RandomID, request.Header))) + wn.log.Warn(filterASCII(fmt.Sprintf("new peer %s did not include random ID header in request. mine=%s headers %#v", remoteAddrForLogging, wn.randomID, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "missing random ID header"}) message = fmt.Sprintf("Request was missing a %s header", NodeRandomHeader) response.WriteHeader(http.StatusPreconditionFailed) @@ -951,10 +1010,10 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo wn.log.Warnf("ws failed to write response '%s' : n = %d err = %v", message, n, err) } return http.StatusPreconditionFailed - } else if otherRandom == wn.RandomID { + } else if otherRandom == wn.randomID { // This is pretty harmless and some configurations of phonebooks or DNS records make this likely. Quietly filter it out. var message string - wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", remoteAddrForLogging, wn.RandomID) + wn.log.Debugf("new peer %s has same node random id, am I talking to myself? %s", remoteAddrForLogging, wn.randomID) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "matching random ID header"}) message = fmt.Sprintf("Request included matching %s=%s header", NodeRandomHeader, otherRandom) response.WriteHeader(http.StatusLoopDetected) @@ -981,7 +1040,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt return } - matchingVersion, otherVersion := wn.checkProtocolVersionMatch(request.Header) + matchingVersion, otherVersion := checkProtocolVersionMatch(request.Header, wn.supportedProtocolVersions) if matchingVersion == "" { wn.log.Info(filterASCII(fmt.Sprintf("new peer %s version mismatch, mine=%v theirs=%s, headers %#v", trackedRequest.remoteHost, wn.supportedProtocolVersions, otherVersion, request.Header))) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "mismatching protocol version"}) @@ -1000,15 +1059,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } responseHeader := make(http.Header) - wn.setHeaders(responseHeader) - responseHeader.Set(ProtocolVersionHeader, matchingVersion) - responseHeader.Set(GenesisHeader, wn.GenesisID) - // set the features we support - features := []string{PeerFeatureProposalCompression} - if wn.config.EnableVoteCompression { - features = append(features, PeerFeatureVoteVpackCompression) - } - responseHeader.Set(PeerFeaturesHeader, strings.Join(features, ",")) + setHeaders(responseHeader, matchingVersion, wn) var challenge string if wn.prioScheme != nil { challenge = wn.prioScheme.NewPrioChallenge() @@ -1855,13 +1906,13 @@ func (wn *WebsocketNetwork) getDNSAddrs(dnsBootstrap string) (relaysAddresses [] return } -// ProtocolVersionHeader HTTP header for protocol version. +// ProtocolVersionHeader HTTP header for network protocol version. const ProtocolVersionHeader = "X-Algorand-Version" -// ProtocolAcceptVersionHeader HTTP header for accept protocol version. Client use this to advertise supported protocol versions. +// ProtocolAcceptVersionHeader HTTP header for accept network protocol version. Client use this to advertise supported protocol versions. const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version" -// SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ). +// SupportedProtocolVersions contains the list of supported network protocol versions by this node ( in order of preference ). var SupportedProtocolVersions = []string{"2.2"} // ProtocolVersion is the current version attached to the ProtocolVersionHeader header @@ -2004,9 +2055,9 @@ func (t *HTTPPAddressBoundTransport) RoundTrip(req *http.Request) (*http.Respons // control character, new lines, deletion, etc. All the alpha numeric and punctuation characters // are included in this range. func filterASCII(unfilteredString string) (filteredString string) { - for i, r := range unfilteredString { + for _, r := range unfilteredString { if int(r) >= 0x20 && int(r) <= 0x7e { - filteredString += string(unfilteredString[i]) + filteredString += string(r) } else { filteredString += unprintableCharacterGlyph } @@ -2026,10 +2077,7 @@ func (wn *WebsocketNetwork) tryConnect(netAddr, gossipAddr string) { defer wn.wg.Done() requestHeader := make(http.Header) - wn.setHeaders(requestHeader) - for _, supportedProtocolVersion := range wn.supportedProtocolVersions { - requestHeader.Add(ProtocolAcceptVersionHeader, supportedProtocolVersion) - } + setHeaders(requestHeader, wn.protocolVersion, wn) var idChallenge identityChallengeValue if wn.identityScheme != nil { @@ -2037,17 +2085,7 @@ func (wn *WebsocketNetwork) tryConnect(netAddr, gossipAddr string) { idChallenge = wn.identityScheme.AttachChallenge(requestHeader, theirAddr) } - // for backward compatibility, include the ProtocolVersion header as well. - requestHeader.Set(ProtocolVersionHeader, wn.protocolVersion) - // set the features header (comma-separated list) - features := []string{PeerFeatureProposalCompression} - if wn.config.EnableVoteCompression { - features = append(features, PeerFeatureVoteVpackCompression) - } - requestHeader.Set(PeerFeaturesHeader, strings.Join(features, ",")) SetUserAgentHeader(requestHeader) - myInstanceName := wn.log.GetInstanceName() - requestHeader.Set(InstanceNameHeader, myInstanceName) var websocketDialer = websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: 45 * time.Second, @@ -2245,7 +2283,7 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre log: log, config: config, phonebook: pb, - GenesisID: genesisID, + genesisID: genesisID, NetworkID: networkID, nodeInfo: nodeInfo, resolveSRVRecords: tools_network.ReadFromSRV, @@ -2493,4 +2531,4 @@ func (wn *WebsocketNetwork) postMessagesOfInterestThread() { } // GetGenesisID returns the network-specific genesisID. -func (wn *WebsocketNetwork) GetGenesisID() string { return wn.GenesisID } +func (wn *WebsocketNetwork) GetGenesisID() string { return wn.genesisID } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 4950f62bba..218b2687bd 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -131,7 +131,7 @@ func makeTestWebsocketNodeWithConfig(t testing.TB, conf config.Local, opts ...te log: log, config: conf, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, + genesisID: genesisID, NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: NewIdentityTracker(), @@ -855,7 +855,7 @@ func TestAddrToGossipAddr(t *testing.T) { partitiontest.PartitionTest(t) wn := &WebsocketNetwork{} - wn.GenesisID = "test genesisID" + wn.genesisID = "test genesisID" wn.log = logging.Base() addrtest(t, wn, "ws://r7.algodev.network.:4166/v1/test%20genesisID/gossip", "r7.algodev.network.:4166") addrtest(t, wn, "ws://r7.algodev.network.:4166/v1/test%20genesisID/gossip", "http://r7.algodev.network.:4166") @@ -1128,7 +1128,7 @@ func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwor log: logging.TestingLog(t).With("node", nodename), config: dc, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, + genesisID: genesisID, NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: logging.TestingLog(t).With("node", nodename)}, identityTracker: noopIdentityTracker{}, @@ -2535,39 +2535,39 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) { partitiontest.PartitionTest(t) wn := makeTestWebsocketNode(t) - wn.GenesisID = "genesis-id1" - wn.RandomID = "random-id1" + wn.genesisID = "genesis-id1" + wn.randomID = "random-id1" header := http.Header{} header.Set(ProtocolVersionHeader, ProtocolVersion) - header.Set(NodeRandomHeader, wn.RandomID+"tag") - header.Set(GenesisHeader, wn.GenesisID) + header.Set(NodeRandomHeader, wn.randomID+"tag") + header.Set(GenesisHeader, wn.genesisID) responseVariableOk, matchingVersion := wn.checkServerResponseVariables(header, "addressX") require.Equal(t, true, responseVariableOk) require.Equal(t, matchingVersion, ProtocolVersion) noVersionHeader := http.Header{} - noVersionHeader.Set(NodeRandomHeader, wn.RandomID+"tag") - noVersionHeader.Set(GenesisHeader, wn.GenesisID) + noVersionHeader.Set(NodeRandomHeader, wn.randomID+"tag") + noVersionHeader.Set(GenesisHeader, wn.genesisID) responseVariableOk, _ = wn.checkServerResponseVariables(noVersionHeader, "addressX") require.Equal(t, false, responseVariableOk) noRandomHeader := http.Header{} noRandomHeader.Set(ProtocolVersionHeader, ProtocolVersion) - noRandomHeader.Set(GenesisHeader, wn.GenesisID) + noRandomHeader.Set(GenesisHeader, wn.genesisID) responseVariableOk, _ = wn.checkServerResponseVariables(noRandomHeader, "addressX") require.Equal(t, false, responseVariableOk) sameRandomHeader := http.Header{} sameRandomHeader.Set(ProtocolVersionHeader, ProtocolVersion) - sameRandomHeader.Set(NodeRandomHeader, wn.RandomID) - sameRandomHeader.Set(GenesisHeader, wn.GenesisID) + sameRandomHeader.Set(NodeRandomHeader, wn.randomID) + sameRandomHeader.Set(GenesisHeader, wn.genesisID) responseVariableOk, _ = wn.checkServerResponseVariables(sameRandomHeader, "addressX") require.Equal(t, false, responseVariableOk) differentGenesisIDHeader := http.Header{} differentGenesisIDHeader.Set(ProtocolVersionHeader, ProtocolVersion) - differentGenesisIDHeader.Set(NodeRandomHeader, wn.RandomID+"tag") - differentGenesisIDHeader.Set(GenesisHeader, wn.GenesisID+"tag") + differentGenesisIDHeader.Set(NodeRandomHeader, wn.randomID+"tag") + differentGenesisIDHeader.Set(GenesisHeader, wn.genesisID+"tag") responseVariableOk, _ = wn.checkServerResponseVariables(differentGenesisIDHeader, "addressX") require.Equal(t, false, responseVariableOk) } @@ -2636,7 +2636,7 @@ func TestSlowPeerDisconnection(t *testing.T) { log: log, config: defaultConfig, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, + genesisID: genesisID, NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: noopIdentityTracker{}, @@ -2713,7 +2713,7 @@ func TestForceMessageRelaying(t *testing.T) { log: log, config: defaultConfig, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, + genesisID: genesisID, NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: noopIdentityTracker{}, @@ -2809,7 +2809,7 @@ func TestCheckProtocolVersionMatch(t *testing.T) { log: log, config: defaultConfig, phonebook: phonebook.MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, + genesisID: genesisID, NetworkID: config.Devtestnet, peerStater: peerConnectionStater{log: log}, identityTracker: noopIdentityTracker{}, @@ -2820,7 +2820,7 @@ func TestCheckProtocolVersionMatch(t *testing.T) { header1 := make(http.Header) header1.Add(ProtocolAcceptVersionHeader, "1") header1.Add(ProtocolVersionHeader, "3") - matchingVersion, otherVersion := wn.checkProtocolVersionMatch(header1) + matchingVersion, otherVersion := checkProtocolVersionMatch(header1, wn.supportedProtocolVersions) require.Equal(t, "1", matchingVersion) require.Equal(t, "", otherVersion) @@ -2828,19 +2828,19 @@ func TestCheckProtocolVersionMatch(t *testing.T) { header2.Add(ProtocolAcceptVersionHeader, "3") header2.Add(ProtocolAcceptVersionHeader, "4") header2.Add(ProtocolVersionHeader, "1") - matchingVersion, otherVersion = wn.checkProtocolVersionMatch(header2) + matchingVersion, otherVersion = checkProtocolVersionMatch(header2, wn.supportedProtocolVersions) require.Equal(t, "1", matchingVersion) require.Equal(t, "1", otherVersion) header3 := make(http.Header) header3.Add(ProtocolVersionHeader, "3") - matchingVersion, otherVersion = wn.checkProtocolVersionMatch(header3) + matchingVersion, otherVersion = checkProtocolVersionMatch(header3, wn.supportedProtocolVersions) require.Equal(t, "", matchingVersion) require.Equal(t, "3", otherVersion) header4 := make(http.Header) header4.Add(ProtocolVersionHeader, "5\n") - matchingVersion, otherVersion = wn.checkProtocolVersionMatch(header4) + matchingVersion, otherVersion = checkProtocolVersionMatch(header4, wn.supportedProtocolVersions) require.Equal(t, "", matchingVersion) require.Equal(t, "5"+unprintableCharacterGlyph, otherVersion) } @@ -3642,8 +3642,8 @@ func TestMaliciousCheckServerResponseVariables(t *testing.T) { partitiontest.PartitionTest(t) wn := makeTestWebsocketNode(t) - wn.GenesisID = "genesis-id1" - wn.RandomID = "random-id1" + wn.genesisID = "genesis-id1" + wn.randomID = "random-id1" wn.log = callbackLogger{ Logger: wn.log, InfoCallback: func(args ...interface{}) { @@ -3666,8 +3666,8 @@ func TestMaliciousCheckServerResponseVariables(t *testing.T) { header1 := http.Header{} header1.Set(ProtocolVersionHeader, ProtocolVersion+"א") - header1.Set(NodeRandomHeader, wn.RandomID+"tag") - header1.Set(GenesisHeader, wn.GenesisID) + header1.Set(NodeRandomHeader, wn.randomID+"tag") + header1.Set(GenesisHeader, wn.genesisID) responseVariableOk, matchingVersion := wn.checkServerResponseVariables(header1, "addressX") require.Equal(t, false, responseVariableOk) require.Equal(t, "", matchingVersion) @@ -3675,15 +3675,15 @@ func TestMaliciousCheckServerResponseVariables(t *testing.T) { header2 := http.Header{} header2.Set(ProtocolVersionHeader, ProtocolVersion) header2.Set("א", "א") - header2.Set(GenesisHeader, wn.GenesisID) + header2.Set(GenesisHeader, wn.genesisID) responseVariableOk, matchingVersion = wn.checkServerResponseVariables(header2, "addressX") require.Equal(t, false, responseVariableOk) require.Equal(t, "", matchingVersion) header3 := http.Header{} header3.Set(ProtocolVersionHeader, ProtocolVersion) - header3.Set(NodeRandomHeader, wn.RandomID+"tag") - header3.Set(GenesisHeader, wn.GenesisID+"א") + header3.Set(NodeRandomHeader, wn.randomID+"tag") + header3.Set(GenesisHeader, wn.genesisID+"א") responseVariableOk, matchingVersion = wn.checkServerResponseVariables(header3, "addressX") require.Equal(t, false, responseVariableOk) require.Equal(t, "", matchingVersion) diff --git a/node/node_test.go b/node/node_test.go index 77d23e0ae5..b39c114a5d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -1022,15 +1022,15 @@ func TestNodeP2PRelays(t *testing.T) { switch i { case 0: // node R1 connects to R2 - t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr()) + t.Logf("Node%d %s phonebook: %s", i, ni[0].p2pID, ni[1].p2pMultiAddr()) return []string{ni[1].p2pMultiAddr()} case 1: // node R2 connects to none one - t.Logf("Node%d phonebook: empty", i) + t.Logf("Node%d %s phonebook: empty", i, ni[1].p2pID) return []string{} case 2: - // node N only connects to R1 - t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr()) + // node N only connects to R2 + t.Logf("Node%d %s phonebook: %s", i, ni[2].p2pID, ni[1].p2pMultiAddr()) return []string{ni[1].p2pMultiAddr()} default: t.Errorf("not expected number of nodes: %d", i) @@ -1262,3 +1262,91 @@ func TestNodeHybridP2PGossipSend(t *testing.T) { require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0])) } } + +// TestNodeP2P_NetProtoVersions makes sure two p2p nodes with different network protocol versions +// can communicate and produce blocks. +func TestNodeP2P_NetProtoVersions(t *testing.T) { + partitiontest.PartitionTest(t) + + const consensusTest0 = protocol.ConsensusVersion("test0") + + configurableConsensus := make(config.ConsensusProtocols) + + testParams0 := config.Consensus[protocol.ConsensusCurrentVersion] + testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond + configurableConsensus[consensusTest0] = testParams0 + + maxMoneyAtStart := 100_000_000_000 + + const numAccounts = 2 + acctStake := make([]basics.MicroAlgos, numAccounts) + acctStake[0] = basics.MicroAlgos{Raw: uint64(maxMoneyAtStart / numAccounts)} + acctStake[1] = basics.MicroAlgos{Raw: uint64(maxMoneyAtStart / numAccounts)} + + configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) { + cfg = config.GetDefaultLocal() + cfg.BaseLoggerDebugLevel = uint32(logging.Debug) + cfg.EnableP2P = true + cfg.NetAddress = "" + + cfg.P2PPersistPeerID = true + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) + require.NoError(t, err) + ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) + require.NoError(t, err) + + switch ni.idx { + case 0: + cfg.NetAddress = ni.p2pNetAddr() + cfg.EnableVoteCompression = true + case 1: + cfg.EnableVoteCompression = false + default: + } + return ni, cfg + } + + phonebookHook := func(nodes []nodeInfo, nodeIdx int) []string { + phonebook := make([]string, 0, len(nodes)-1) + for i := range nodes { + if i != nodeIdx { + phonebook = append(phonebook, nodes[i].p2pMultiAddr()) + } + } + return phonebook + } + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) + require.Len(t, nodes, numAccounts) + require.Len(t, wallets, numAccounts) + for i := 0; i < len(nodes); i++ { + defer os.Remove(wallets[i]) + defer nodes[i].Stop() + } + + startAndConnectNodes(nodes, nodelayFirstNodeStartDelay) + + require.Eventually(t, func() bool { + connectPeers(nodes) + return len(nodes[0].net.GetPeers(network.PeersConnectedIn, network.PeersConnectedOut)) >= 1 && + len(nodes[1].net.GetPeers(network.PeersConnectedIn, network.PeersConnectedOut)) >= 1 + }, 60*time.Second, 1*time.Second) + + const initialRound = 1 + const maxRounds = 3 + for tests := basics.Round(0); tests < maxRounds; tests++ { + blocks := make([]bookkeeping.Block, len(wallets)) + for i := range wallets { + select { + case <-nodes[i].ledger.Wait(initialRound + tests): + blk, err := nodes[i].ledger.Block(initialRound + tests) + if err != nil { + panic(err) + } + blocks[i] = blk + case <-time.After(60 * time.Second): + require.Fail(t, fmt.Sprintf("no block notification for account: %v. Iteration: %v", wallets[i], tests)) + return + } + } + } +}