Skip to content

draft: test data race and postMessagesOfInterestThread #5412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions network/requestTracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func TestRateLimiting(t *testing.T) {
if defaultConfig.ConnectionsRateLimitingCount == 0 || defaultConfig.ConnectionsRateLimitingWindowSeconds == 0 {
t.Skip()
}
log := logging.TestingLog(t)
log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel))
log := logging.Base()
log.SetLevel(logging.Error)
testConfig := defaultConfig
// This test is conducted locally, so we want to treat all hosts the same for counting incoming requests.
testConfig.DisableLocalhostConnectionRateLimit = false
Expand Down
12 changes: 8 additions & 4 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ func (wn *WebsocketNetwork) Stop() {
wn.messagesOfInterestMu.Lock()
defer wn.messagesOfInterestMu.Unlock()

close(wn.messagesOfInterestRefresh)
wn.messagesOfInterestEncoded = false
wn.messagesOfInterestEnc = nil
wn.messagesOfInterest = nil
Expand Down Expand Up @@ -2414,7 +2415,7 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) {
if peer.outgoing && peer.peerMessageDelay > 0 {
logEntry = logEntry.With("messageDelay", peer.peerMessageDelay)
}
logEntry.Infof("Peer %s disconnected: %s", peer.rootURL, reason)
logEntry.Infof("Peer %s disconnected: %s", peer.rootURL, reason) //xx
peerAddr := peer.OriginAddress()
// we might be able to get addr out of conn, or it might be closed
if peerAddr == "" && peer.conn != nil {
Expand Down Expand Up @@ -2591,23 +2592,26 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() {
atomic.AddUint32(&wn.messagesOfInterestGeneration, 1)
var peers []*wsPeer
peers, _ = wn.peerSnapshot(peers)
wn.log.Infof("updateMessagesOfInterestEnc maybe sending messagesOfInterest %v", wn.messagesOfInterest)
wn.log.Infof("updateMessagesOfInterestEnc maybe sending messagesOfInterest %v", wn.messagesOfInterest) //xx
for _, peer := range peers {
wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
}
}

func (wn *WebsocketNetwork) postMessagesOfInterestThread() {
for {
<-wn.messagesOfInterestRefresh
_, open := <-wn.messagesOfInterestRefresh
if !open {
return
}
// if we're not a relay, and not participating, we don't need txn pool
wantTXGossip := wn.nodeInfo.IsParticipating()
if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) {
wn.log.Infof("postMessagesOfInterestThread: enabling TX gossip")
wn.RegisterMessageInterest(protocol.TxnTag)
atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes)
} else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) {
wn.log.Infof("postMessagesOfInterestThread: disabling TX gossip")
wn.log.Infof("postMessagesOfInterestThread: disabling TX gossip") //xx
wn.DeregisterMessageInterest(protocol.TxnTag)
atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo)
}
Expand Down
6 changes: 3 additions & 3 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (wp *wsPeer) reportReadErr(err error) {
// only report error if we haven't already closed the peer
if atomic.LoadInt32(&wp.didInnerClose) == 0 {
_, _, line, _ := runtime.Caller(1)
wp.net.log.Warnf("peer[%s] line=%d read err: %s", wp.conn.RemoteAddr().String(), line, err)
wp.net.log.Warnf("peer[%s] line=%d read err: %s", wp.conn.RemoteAddr().String(), line, err) //xx
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "reader err"})
}
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data)
if err != nil {
if atomic.LoadInt32(&wp.didInnerClose) == 0 {
wp.net.log.Warn("peer write error ", err)
wp.net.log.Warn("peer write error ", err) //xx
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "write err"})
}
return disconnectWriteError
Expand Down Expand Up @@ -883,7 +883,7 @@ func (wp *wsPeer) Close(deadline time.Time) {
close(wp.closing)
err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
if err != nil {
wp.net.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddr().String())
wp.net.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddr().String()) //xx
}
err = wp.conn.CloseWithoutFlush()
if err != nil {
Expand Down