Skip to content

Commit a764ad1

Browse files
karknucoot
authored andcommitted
tx-submission: TX peer ranking
Use hashWithSalt to breaks ties when ranking peers with equal score. Track rejections in a bucket that slowly drains over time.
1 parent 764e98d commit a764ad1

File tree

8 files changed

+135
-43
lines changed

8 files changed

+135
-43
lines changed

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node/Kernel.hs

+9-6
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ import Data.Monoid.Synchronisation
5252
import Data.Void (Void)
5353
import GHC.Generics (Generic)
5454
import Numeric.Natural (Natural)
55-
import System.Random (RandomGen, StdGen, randomR, split)
55+
56+
import System.Random (RandomGen, StdGen, mkStdGen, random, randomR, split)
5657

5758
import Network.Socket (PortNumber)
5859

@@ -302,23 +303,24 @@ newNodeKernel :: ( MonadSTM m
302303
, Eq txid
303304
)
304305
=> s
306+
-> Int
305307
-> [Tx txid]
306308
-> m (NodeKernel header block s txid m)
307-
newNodeKernel rng txs = do
309+
newNodeKernel psRng txSeed txs = do
308310
publicStateVar <- makePublicPeerSelectionStateVar
309311
NodeKernel
310312
<$> newTVarIO Map.empty
311313
<*> newTVarIO (ChainProducerState Chain.Genesis Map.empty 0)
312314
<*> newFetchClientRegistry
313315
<*> newPeerSharingRegistry
314316
<*> ChainDB.newChainDB
315-
<*> newPeerSharingAPI publicStateVar rng
317+
<*> newPeerSharingAPI publicStateVar psRng
316318
ps_POLICY_PEER_SHARE_STICKY_TIME
317319
ps_POLICY_PEER_SHARE_MAX_PEERS
318320
<*> pure publicStateVar
319321
<*> newMempool txs
320322
<*> Strict.newMVar (TxChannels Map.empty)
321-
<*> newSharedTxStateVar
323+
<*> newSharedTxStateVar (mkStdGen txSeed)
322324

323325
-- | Register a new upstream chain-sync client.
324326
--
@@ -407,11 +409,12 @@ withNodeKernelThread
407409
withNodeKernelThread BlockGeneratorArgs { bgaSlotDuration, bgaBlockGenerator, bgaSeed }
408410
txs
409411
k = do
410-
kernel <- newNodeKernel psSeed txs
412+
kernel <- newNodeKernel psSeed txSeed txs
411413
withSlotTime bgaSlotDuration $ \waitForSlot ->
412414
withAsync (blockProducerThread kernel waitForSlot) (k kernel)
413415
where
414-
(bpSeed, psSeed) = split bgaSeed
416+
(bpSeed, rng) = split bgaSeed
417+
(txSeed, psSeed) = random rng
415418

416419
blockProducerThread :: NodeKernel header block seed txid m
417420
-> (SlotNo -> STM m SlotNo)

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/AppV2.hs

+5-1
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ import Data.ByteString.Lazy (ByteString)
3434
import Data.ByteString.Lazy qualified as BSL
3535
import Data.Foldable (traverse_)
3636
import Data.Function (on)
37+
import Data.Hashable
3738
import Data.List (nubBy)
3839
import Data.Map.Strict (Map)
3940
import Data.Map.Strict qualified as Map
4041
import Data.Maybe (fromMaybe)
4142
import Data.Void (Void)
43+
import System.Random (mkStdGen)
4244

4345
import Ouroboros.Network.Channel
4446
import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM)
@@ -129,6 +131,7 @@ runTxSubmission
129131
, NoThunks (Tx txid)
130132
, Show peeraddr
131133
, Ord peeraddr
134+
, Hashable peeraddr
132135

133136
, txid ~ Int
134137
)
@@ -150,9 +153,10 @@ runTxSubmission tracer tracerTxLogic state txDecisionPolicy = do
150153
) state
151154

152155
inboundMempool <- emptyMempool
156+
let txRng = mkStdGen 42 -- TODO
153157

154158
txChannelsMVar <- newMVar (TxChannels Map.empty)
155-
sharedTxStateVar <- newSharedTxStateVar
159+
sharedTxStateVar <- newSharedTxStateVar txRng
156160
labelTVarIO sharedTxStateVar "shared-tx-state"
157161
gsvVar <- newTVarIO Map.empty
158162
labelTVarIO gsvVar "gsv"

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxLogic.hs

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
{-# LANGUAGE BangPatterns #-}
22
{-# LANGUAGE BlockArguments #-}
33
{-# LANGUAGE CPP #-}
4+
{-# LANGUAGE DataKinds #-}
45
{-# LANGUAGE DeriveGeneric #-}
6+
{-# LANGUAGE DerivingVia #-}
57
{-# LANGUAGE FlexibleContexts #-}
68
{-# LANGUAGE FlexibleInstances #-}
79
{-# LANGUAGE GADTs #-}
810
{-# LANGUAGE LambdaCase #-}
911
{-# LANGUAGE NamedFieldPuns #-}
1012
{-# LANGUAGE ScopedTypeVariables #-}
13+
{-# LANGUAGE StandaloneDeriving #-}
1114
{-# LANGUAGE TupleSections #-}
1215
{-# LANGUAGE TypeOperators #-}
1316

@@ -17,6 +20,7 @@ module Test.Ouroboros.Network.TxSubmission.TxLogic where
1720

1821
import Prelude hiding (seq)
1922

23+
import Control.Monad.Class.MonadTime.SI (Time (..))
2024
import Control.Exception (assert)
2125

2226
import Data.Foldable (
@@ -36,6 +40,7 @@ import Data.Sequence.Strict qualified as StrictSeq
3640
import Data.Set (Set)
3741
import Data.Set qualified as Set
3842
import Data.Typeable
43+
import System.Random (mkStdGen, StdGen)
3944

4045
import NoThunks.Class
4146

@@ -304,6 +309,7 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
304309
requestedTxsInflightSize,
305310
unknownTxs,
306311
rejectedTxs = 0,
312+
rejectedTxsTs = Time 0,
307313
fetchedTxs = Set.empty }
308314
(Set.fromList $ Map.elems inflightMap)
309315
bufferedMap
@@ -384,6 +390,7 @@ genSharedTxState maxTxIdsInflight = do
384390
_mempoolHasTxFun@(Fun (_, _, x) _) <- arbitrary :: Gen (Fun Bool Bool)
385391
let mempoolHasTxFun = Fun (function (const False), False, x) (const False)
386392
pss <- listOf1 (genArbPeerTxState mempoolHasTxFun maxTxIdsInflight)
393+
seed <- arbitrary
387394

388395
let pss' :: [(PeerAddr, ArbPeerTxState txid (Tx txid))]
389396
pss' = [0..] `zip` pss
@@ -410,7 +417,8 @@ genSharedTxState maxTxIdsInflight = do
410417
| ArbPeerTxState { arbBufferedMap }
411418
<- pss
412419
],
413-
referenceCounts = Map.empty
420+
referenceCounts = Map.empty,
421+
peerRng = mkStdGen seed
414422
}
415423

416424
return ( mempoolHasTxFun
@@ -637,7 +645,7 @@ prop_acknowledgeTxIds :: ArbDecisionContextWithReceivedTxIds
637645
-> Property
638646
prop_acknowledgeTxIds (ArbDecisionContextWithReceivedTxIds policy SharedDecisionContext { sdcSharedTxState = st } ps _ _ _) =
639647
case TXS.acknowledgeTxIds policy st ps of
640-
(numTxIdsToAck, txIdsToRequest, txs, TXS.RefCountDiff { TXS.txIdsToAck }, ps') | txIdsToRequest > 0 ->
648+
(numTxIdsToAck, txIdsToRequest, txIdsTxs, TXS.RefCountDiff { TXS.txIdsToAck }, ps') | txIdsToRequest > 0 ->
641649
counterexample "number of tx ids to ack must agree with RefCountDiff"
642650
( fromIntegral numTxIdsToAck
643651
===
@@ -660,7 +668,7 @@ prop_acknowledgeTxIds (ArbDecisionContextWithReceivedTxIds policy SharedDecision
660668
| txid <- take (fromIntegral numTxIdsToAck) (toList $ unacknowledgedTxIds ps)
661669
, Just _ <- maybeToList $ txid `Map.lookup` bufferedTxs st
662670
]
663-
in getTxId `map` txs === acked)
671+
in map (getTxId . snd) txIdsTxs === acked)
664672
_otherwise -> property True
665673
where
666674
stripSuffix :: Eq a => [a] -> [a] -> Maybe [a]
@@ -913,6 +921,9 @@ prop_collectTxsImpl (ArbCollectTxs _mempoolHasTxFun txidsRequested txsReceived p
913921
Nothing -> error "impossible happened! Is the test still using `TxId` for `txid`?"
914922

915923

924+
925+
deriving via OnlyCheckWhnfNamed "StdGen" StdGen instance NoThunks StdGen
926+
916927
-- | Verify that `SharedTxState` returned by `collectTxsImpl` if evaluated to
917928
-- WHNF, it doesn't contain any thunks.
918929
--

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Decision.hs

+16-8
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import Control.Arrow ((>>>))
2020
import Control.Exception (assert)
2121

2222
import Data.Bifunctor (second)
23+
import Data.Hashable
2324
import Data.List (mapAccumR, sortOn)
2425
import Data.Map.Merge.Strict qualified as Map
2526
import Data.Map.Strict (Map)
2627
import Data.Map.Strict qualified as Map
2728
import Data.Maybe (mapMaybe)
2829
import Data.Set (Set)
2930
import Data.Set qualified as Set
31+
import System.Random (random)
3032

3133
import Data.Sequence.Strict qualified as StrictSeq
3234
import Ouroboros.Network.DeltaQ (PeerGSV (..), defaultGSV,
@@ -43,6 +45,7 @@ makeDecisions
4345
:: forall peeraddr txid tx.
4446
( Ord peeraddr
4547
, Ord txid
48+
, Hashable peeraddr
4649
)
4750
=> TxDecisionPolicy
4851
-- ^ decision policy
@@ -62,9 +65,11 @@ makeDecisions policy SharedDecisionContext {
6265
sdcPeerGSV = _peerGSV,
6366
sdcSharedTxState = st
6467
}
65-
= fn
66-
. pickTxsToDownload policy st
67-
. orderByRejections
68+
= let (salt, rng') = random (peerRng st)
69+
st' = st { peerRng = rng' } in
70+
fn
71+
. pickTxsToDownload policy st'
72+
. orderByRejections salt
6873
where
6974
fn :: forall a.
7075
(a, [(peeraddr, TxDecision txid tx)])
@@ -76,13 +81,16 @@ makeDecisions policy SharedDecisionContext {
7681
--
7782
-- TXs delivered late will fail to apply because they where included in
7883
-- a recently adopted block. Peers can race against each other by setting
79-
-- `txInflightMultiplicity` to > 1.
84+
-- `txInflightMultiplicity` to > 1. In case of a tie a hash of the peeraddr
85+
-- is used as a tie breaker. Since every invocation use a new salt a given
86+
-- peeraddr does not have an advantage over time.
8087
--
81-
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
82-
orderByRejections :: Map peeraddr (PeerTxState txid tx)
88+
orderByRejections :: Hashable peeraddr
89+
=> Int
90+
-> Map peeraddr (PeerTxState txid tx)
8391
-> [ (peeraddr, PeerTxState txid tx)]
84-
orderByRejections =
85-
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
92+
orderByRejections salt =
93+
sortOn (\(peeraddr, ps) -> (rejectedTxs ps, hashWithSalt salt peeraddr))
8694
. Map.toList
8795

8896
-- | Order peers by `DeltaQ`.

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Registry.hs

+62-10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ module Ouroboros.Network.TxSubmission.Inbound.Registry
1313
, newTxChannelsVar
1414
, PeerTxAPI (..)
1515
, decisionLogicThread
16+
, drainRejectionThread
1617
, withPeer
1718
) where
1819

@@ -21,20 +22,22 @@ import Control.Concurrent.Class.MonadSTM.Strict
2122
import Control.Monad.Class.MonadFork
2223
import Control.Monad.Class.MonadThrow
2324
import Control.Monad.Class.MonadTimer.SI
25+
import Control.Monad.Class.MonadTime.SI
2426

2527
import Data.Foldable (traverse_
2628
#if !MIN_VERSION_base(4,20,0)
2729
, foldl'
2830
#endif
2931
)
30-
import Data.Typeable (Typeable)
32+
import Data.Hashable
3133
import Data.Map.Strict (Map)
3234
import Data.Map.Strict qualified as Map
3335
import Data.Maybe (fromMaybe)
3436
import Data.Sequence.Strict (StrictSeq)
3537
import Data.Sequence.Strict qualified as StrictSeq
3638
import Data.Set (Set)
3739
import Data.Set qualified as Set
40+
import Data.Typeable (Typeable)
3841
import Data.Void (Void)
3942

4043
import Control.Tracer (Tracer, traceWith)
@@ -79,8 +82,11 @@ data PeerTxAPI m txid tx = PeerTxAPI {
7982
-> m (Maybe TxSubmissionProtocolError),
8083
-- ^ handle received txs
8184

82-
countRejectedTxs :: Int
83-
-> m Int,
85+
countRejectedTxs :: Time
86+
-> Double
87+
-> m Double,
88+
-- ^ updated score. The `Double` is difference between accepted and
89+
-- rejected transactions.
8490

8591
consumeFetchedTxs :: Set txid
8692
-> m (Set txid)
@@ -166,6 +172,7 @@ withPeer tracer
166172
unacknowledgedTxIds = StrictSeq.empty,
167173
unknownTxs = Set.empty,
168174
rejectedTxs = 0,
175+
rejectedTxsTs = Time 0,
169176
fetchedTxs = Set.empty }
170177
peerTxStates
171178
}
@@ -238,9 +245,11 @@ withPeer tracer
238245
peeraddr peerTxStates
239246
in st {peerTxStates = peerTxStates' }
240247

241-
countRejectedTxs :: Int
242-
-> m Int
243-
countRejectedTxs n = atomically $ do
248+
249+
countRejectedTxs :: Time
250+
-> Double
251+
-> m Double
252+
countRejectedTxs now n = atomically $ do
244253
modifyTVar sharedStateVar cntRejects
245254
st <- readTVar sharedStateVar
246255
case Map.lookup peeraddr (peerTxStates st) of
@@ -251,10 +260,10 @@ withPeer tracer
251260
-> SharedTxState peeraddr txid tx
252261
cntRejects st@SharedTxState { peerTxStates } =
253262
let peerTxStates' =
254-
Map.update
255-
(\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) })
256-
peeraddr peerTxStates
257-
in st { peerTxStates = peerTxStates' }
263+
Map.update (\ps -> Just $! (updateRejects now n ps))
264+
peeraddr peerTxStates
265+
in st {peerTxStates = peerTxStates'}
266+
258267

259268
consumeFetchedTxs :: Set txid
260269
-> m (Set txid)
@@ -274,6 +283,48 @@ withPeer tracer
274283
return o
275284

276285

286+
updateRejects
287+
:: Time
288+
-> Double
289+
-> PeerTxState txid tx
290+
-> PeerTxState txid tx
291+
updateRejects now 0 pts | rejectedTxs pts == 0
292+
= pts {rejectedTxsTs = now}
293+
updateRejects now n pts@PeerTxState { rejectedTxs, rejectedTxsTs } =
294+
let duration = diffTime now rejectedTxsTs
295+
rate = 0.1 -- 0.1 rejected tx/s
296+
maxTokens = 15 * 60 * rate -- 15 minutes worth of rejections
297+
!drain = realToFrac duration * rate
298+
!drained = max 0 $ rejectedTxs - drain in
299+
pts { rejectedTxs = max 0 $ min maxTokens $ drained + n
300+
, rejectedTxsTs = now }
301+
302+
303+
drainRejectionThread
304+
:: forall m peeraddr txid tx.
305+
( MonadDelay m
306+
, MonadSTM m
307+
, MonadThread m
308+
)
309+
=> SharedTxStateVar m peeraddr txid tx
310+
-> m Void
311+
drainRejectionThread sharedStateVar = do
312+
labelThisThread "tx-rejection-drain"
313+
go
314+
where
315+
go :: m Void
316+
go = do
317+
threadDelay 7
318+
319+
!now <- getMonotonicTime
320+
atomically $ do
321+
st <- readTVar sharedStateVar
322+
let ptss = Map.map (\pts -> updateRejects now 0 pts) (peerTxStates st)
323+
writeTVar sharedStateVar (st { peerTxStates = ptss })
324+
325+
go
326+
327+
277328
decisionLogicThread
278329
:: forall m peeraddr txid tx.
279330
( MonadDelay m
@@ -283,6 +334,7 @@ decisionLogicThread
283334
, MonadFork m
284335
, Ord peeraddr
285336
, Ord txid
337+
, Hashable peeraddr
286338
)
287339
=> Tracer m (TraceTxLogic peeraddr txid tx)
288340
-> TxDecisionPolicy

0 commit comments

Comments
 (0)