Skip to content

Commit df213ee

Browse files
committed
fix EvtPeerIdentificationCompleted handler
1 parent 0d674e2 commit df213ee

File tree

3 files changed

+22
-19
lines changed

3 files changed

+22
-19
lines changed

network/p2p/p2p.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
201201
sm := makeStreamManager(ctx, log, h, wsStreamHandlers, cfg.EnableGossipService)
202202
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
203203
if err != nil {
204-
err0 := fmt.Errorf("Failed to subscribe to peer identification events: %v", err)
204+
err0 := fmt.Errorf("failed to subscribe to peer identification events: %v", err)
205205
return nil, err0
206206
}
207207
go sm.peerWatcher(ctx, sub)

network/p2p/streams.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -109,34 +109,32 @@ func (n *streamManager) dispatch(ctx context.Context, remotePeer peer.ID, stream
109109
}
110110
}
111111

112-
// Connected is called when a connection is opened
113-
// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections.
114-
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
115-
}
116-
117112
func (n *streamManager) peerWatcher(ctx context.Context, sub event.Subscription) {
118113
defer sub.Close()
119114
for e := range sub.Out() {
120115
evt := e.(event.EvtPeerIdentificationCompleted)
121116
conn := evt.Conn
122-
if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
123-
n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String())
124-
continue
125-
}
126117

127118
remotePeer := conn.RemotePeer()
128119
localPeer := n.host.ID()
129120

121+
if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
122+
n.log.Debugf("%s: ignoring incoming connection from %s", localPeer.String(), remotePeer.String())
123+
continue
124+
}
125+
130126
// ensure that only one of the peers initiates the stream
131127
if localPeer > remotePeer {
132-
return
128+
n.log.Debugf("%s: ignoring a lesser peer ID %s", localPeer.String(), remotePeer.String())
129+
continue
133130
}
134131

135132
n.streamsLock.Lock()
136133
_, ok := n.streams[remotePeer]
137134
if ok {
138135
n.streamsLock.Unlock()
139-
return // there's already an active stream with this peer for our protocol
136+
n.log.Debugf("%s: already have a stream to/from %s", localPeer.String(), remotePeer.String())
137+
continue // there's already an active stream with this peer for our protocol
140138
}
141139

142140
protos := evt.Protocols
@@ -147,9 +145,9 @@ func (n *streamManager) peerWatcher(ctx context.Context, sub event.Subscription)
147145

148146
stream, err := n.host.NewStream(n.ctx, remotePeer, targetProto)
149147
if err != nil {
150-
n.log.Infof("Failed to open stream to %s (%s): %v", remotePeer, conn.RemoteMultiaddr().String(), err)
148+
n.log.Infof("%s: failed to open stream to %s (%s): %v", localPeer.String(), remotePeer, conn.RemoteMultiaddr().String(), err)
151149
n.streamsLock.Unlock()
152-
return
150+
continue
153151
}
154152
n.streams[remotePeer] = stream
155153
n.streamsLock.Unlock()
@@ -158,7 +156,7 @@ func (n *streamManager) peerWatcher(ctx context.Context, sub event.Subscription)
158156
if handler, ok := n.handlers[targetProto]; ok {
159157
handler(n.ctx, remotePeer, stream, incoming)
160158
} else {
161-
n.log.Errorf("No handler for protocol %s, peer %s", targetProto, remotePeer)
159+
n.log.Errorf("%s: no handler for protocol %s, peer %s", localPeer.String(), targetProto, remotePeer)
162160
_ = stream.Reset()
163161
}
164162

@@ -170,6 +168,11 @@ func (n *streamManager) peerWatcher(ctx context.Context, sub event.Subscription)
170168
}
171169
}
172170

171+
// Connected is called when a connection is opened
172+
// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections.
173+
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
174+
}
175+
173176
// Disconnected is called when a connection is closed
174177
func (n *streamManager) Disconnected(net network.Network, conn network.Conn) {
175178
n.streamsLock.Lock()

node/node_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1022,15 +1022,15 @@ func TestNodeP2PRelays(t *testing.T) {
10221022
switch i {
10231023
case 0:
10241024
// node R1 connects to R2
1025-
t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr())
1025+
t.Logf("Node%d %s phonebook: %s", i, ni[0].p2pID, ni[1].p2pMultiAddr())
10261026
return []string{ni[1].p2pMultiAddr()}
10271027
case 1:
10281028
// node R2 connects to none one
1029-
t.Logf("Node%d phonebook: empty", i)
1029+
t.Logf("Node%d %s phonebook: empty", i, ni[1].p2pID)
10301030
return []string{}
10311031
case 2:
1032-
// node N only connects to R1
1033-
t.Logf("Node%d phonebook: %s", i, ni[1].p2pMultiAddr())
1032+
// node N only connects to R2
1033+
t.Logf("Node%d %s phonebook: %s", i, ni[2].p2pID, ni[1].p2pMultiAddr())
10341034
return []string{ni[1].p2pMultiAddr()}
10351035
default:
10361036
t.Errorf("not expected number of nodes: %d", i)

0 commit comments

Comments
 (0)