Skip to content

Commit d49c0a5

Browse files
authored
Merge pull request #5113 from IntersectMBO/karknu/conf_egre_int
Make egress poll interval configurable
2 parents b90ae58 + afd196a commit d49c0a5

File tree

18 files changed

+96
-67
lines changed

18 files changed

+96
-67
lines changed

decentralized-message-queue/src/DMQ/Configuration.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ mkDiffusionConfiguration
214214
, Diffusion.dcBulkChurnInterval = dmqcChurnInterval
215215
, Diffusion.dcMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy
216216
, Diffusion.dcLocalMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy
217+
, Diffusion.dcEgressPollInterval = 0 -- TODO: Make option flag for egress poll interval
217218
}
218219
where
219220
hints = defaultHints {

network-mux/bench/socket_read_write/Main.hs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,11 @@ startServerMany sndSizeV ad = forever $ do
201201
-- It will send streams of data over the 41 and 42 miniprotocol.
202202
-- Multiplexing is done with a separate thread running
203203
-- the Egress.muxer function.
204-
startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
205-
startServerEgresss sndSizeV ad = forever $ do
204+
startServerEgresss :: DiffTime -> StrictTMVar IO Int64 -> Socket -> IO ()
205+
startServerEgresss pollInterval sndSizeV ad = forever $ do
206206
(sd, _) <- Socket.accept ad
207207
withReadBufferIO (\buffer -> do
208-
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
208+
bearer <- getBearer (makeSocketBearer' pollInterval) sduTimeout activeTracer sd buffer
209209
sndSize <- atomically $ takeTMVar sndSizeV
210210
eq <- atomically $ newTBQueue 100
211211
w42 <- newTVarIO BL.empty
@@ -273,25 +273,28 @@ main = do
273273
ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
274274
ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
275275
ad3 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
276+
ad4 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
276277

277-
return (ad1, ad2, ad3)
278+
return (ad1, ad2, ad3, ad4)
278279
)
279-
(\(ad1, ad2, ad3) -> do
280+
(\(ad1, ad2, ad3, ad4) -> do
280281
Socket.close ad1
281282
Socket.close ad2
282283
Socket.close ad3
284+
Socket.close ad4
283285
)
284-
(\(ad1, ad2, ad3) -> do
286+
(\(ad1, ad2, ad3, ad4) -> do
285287
sndSizeV <- newEmptyTMVarIO
286288
sndSizeMV <- newEmptyTMVarIO
287289
sndSizeEV <- newEmptyTMVarIO
288290
addr <- setupServer ad1
289291
addrM <- setupServer ad2
290292
addrE <- setupServer ad3
293+
addrF <- setupServer ad4
291294

292295
withAsync (startServer sndSizeV ad1) $ \said -> do
293296
withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do
294-
withAsync (startServerEgresss sndSizeEV ad3) $ \saidE -> do
297+
withAsync (startServerEgresss 0.001 sndSizeEV ad3) $ \saidE -> withAsync (startServerEgresss 0 sndSizeEV ad4) $ \saidF -> do
295298
defaultMain [
296299
-- Suggested Max SDU size for Socket bearer
297300
bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr
@@ -305,9 +308,13 @@ main = do
305308
, bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM
306309
, bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM
307310

308-
-- Use standard muxer and demuxer
309-
, bench "Read/Write Mux Benchmark 800+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
310-
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE
311+
-- Use standard muxer and demuxer, 1ms poll
312+
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE
313+
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE
314+
315+
-- Use standard muxer and demuxer, 0ms poll
316+
, bench "Read/Write Mux Benchmark 800+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrF
317+
, bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrF
311318

312319
-- Use standard demuxer
313320
, bench "Read/Write Demuxer Queuing Benchmark 10 byte SDUs" $ nfIO $ readDemuxerQueueBenchmark sndSizeV 10 addr
@@ -316,4 +323,5 @@ main = do
316323
cancel said
317324
cancel saidM
318325
cancel saidE
326+
cancel saidF
319327
)

network-mux/src/Network/Mux/Bearer.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Network.Mux.Bearer
1010
( Bearer (..)
1111
, MakeBearer (..)
1212
, makeSocketBearer
13+
, makeSocketBearer'
1314
, makePipeChannelBearer
1415
, makeQueueChannelBearer
1516
#if defined(mingw32_HOST_OS)
@@ -61,8 +62,11 @@ pureBearer f = \sduTimeout rb tr fd -> pure (f sduTimeout rb tr fd)
6162

6263

6364
makeSocketBearer :: MakeBearer IO Socket
64-
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd rb -> do
65-
return $ socketAsBearer size batch rb sduTimeout tr fd)
65+
makeSocketBearer = makeSocketBearer' 0
66+
67+
makeSocketBearer' :: DiffTime -> MakeBearer IO Socket
68+
makeSocketBearer' pt = MakeBearer $ (\sduTimeout tr fd rb -> do
69+
return $ socketAsBearer size batch rb sduTimeout pt tr fd)
6670
where
6771
size = SDUSize 12_288
6872
batch = 131_072
@@ -89,13 +93,13 @@ makeQueueChannelBearer :: ( MonadSTM m
8993
, MonadThrow m
9094
)
9195
=> MakeBearer m (QueueChannel m)
92-
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _-> queueChannelAsBearer size tr q)
96+
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _ -> queueChannelAsBearer size tr q)
9397
where
9498
size = SDUSize 1_280
9599

96100
#if defined(mingw32_HOST_OS)
97101
makeNamedPipeBearer :: MakeBearer IO HANDLE
98-
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _-> namedPipeAsBearer size tr fd)
102+
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _ -> namedPipeAsBearer size tr fd)
99103
where
100104
size = SDUSize 24_576
101105
#endif

network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,13 @@ attenuationChannelAsBearer :: forall m.
273273
-> Bearer m
274274
attenuationChannelAsBearer sduSize sduTimeout muxTracer chan =
275275
Bearer {
276-
read = readMux,
277-
write = writeMux,
278-
writeMany = writeMuxMany,
276+
read = readMux,
277+
write = writeMux,
278+
writeMany = writeMuxMany,
279279
sduSize,
280-
batchSize = fromIntegral $ getSDUSize sduSize,
281-
name = "attenuation-channel"
280+
batchSize = fromIntegral $ getSDUSize sduSize,
281+
name = "attenuation-channel",
282+
egressInterval = 0
282283
}
283284
where
284285
readMux :: TimeoutFn m -> m (SDU, Time)

network-mux/src/Network/Mux/Bearer/NamedPipe.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ namedPipeAsBearer :: Mx.SDUSize
3333
-> Mx.Bearer IO
3434
namedPipeAsBearer sduSize tracer h =
3535
Mx.Bearer {
36-
Mx.read = readNamedPipe,
37-
Mx.write = writeNamedPipe,
38-
Mx.writeMany = writeNamedPipeMany,
39-
Mx.sduSize = sduSize,
40-
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
41-
Mx.name = "named-pipe"
36+
Mx.read = readNamedPipe,
37+
Mx.write = writeNamedPipe,
38+
Mx.writeMany = writeNamedPipeMany,
39+
Mx.sduSize = sduSize,
40+
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
41+
Mx.name = "named-pipe",
42+
Mx.egressInterval = 0
4243
}
4344
where
4445
readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Pipe.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ pipeAsBearer
7575
-> Bearer IO
7676
pipeAsBearer sduSize tracer channel =
7777
Mx.Bearer {
78-
Mx.read = readPipe,
79-
Mx.write = writePipe,
80-
Mx.writeMany = writePipeMany,
81-
Mx.sduSize = sduSize,
82-
Mx.name = "pipe",
83-
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize
78+
Mx.read = readPipe,
79+
Mx.write = writePipe,
80+
Mx.writeMany = writePipeMany,
81+
Mx.sduSize = sduSize,
82+
Mx.name = "pipe",
83+
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize,
84+
Mx.egressInterval = 0
8485
}
8586
where
8687
readPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Queues.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@ queueChannelAsBearer
4040
-> Bearer m
4141
queueChannelAsBearer sduSize tracer QueueChannel { writeQueue, readQueue } = do
4242
Mx.Bearer {
43-
Mx.read = readMux,
44-
Mx.write = writeMux,
45-
Mx.writeMany = writeMuxMany,
46-
Mx.sduSize = sduSize,
47-
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
48-
Mx.name = "queue-channel"
43+
Mx.read = readMux,
44+
Mx.write = writeMux,
45+
Mx.writeMany = writeMuxMany,
46+
Mx.sduSize = sduSize,
47+
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize),
48+
Mx.name = "queue-channel",
49+
Mx.egressInterval = 0
4950
}
5051
where
5152
readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time)

network-mux/src/Network/Mux/Bearer/Socket.hs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,19 @@ socketAsBearer
5252
-> Int
5353
-> Maybe (Mx.ReadBuffer IO)
5454
-> DiffTime
55+
-> DiffTime
5556
-> Tracer IO Mx.Trace
5657
-> Socket.Socket
5758
-> Bearer IO
58-
socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd =
59+
socketAsBearer sduSize batchSize readBuffer_m sduTimeout pollInterval tracer sd =
5960
Mx.Bearer {
60-
Mx.read = readSocket,
61-
Mx.write = writeSocket,
62-
Mx.writeMany = writeSocketMany,
63-
Mx.sduSize = sduSize,
64-
Mx.batchSize = batchSize,
65-
Mx.name = "socket-bearer"
61+
Mx.read = readSocket,
62+
Mx.write = writeSocket,
63+
Mx.writeMany = writeSocketMany,
64+
Mx.sduSize = sduSize,
65+
Mx.batchSize = batchSize,
66+
Mx.name = "socket-bearer",
67+
Mx.egressInterval = pollInterval
6668
}
6769
where
6870
readSocket :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time)

network-mux/src/Network/Mux/Egress.hs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ muxer
144144
=> EgressQueue m
145145
-> Bearer m
146146
-> m void
147-
muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
147+
muxer egressQueue Bearer { writeMany, sduSize, batchSize, egressInterval } =
148148
withTimeoutSerial $ \timeout ->
149149
forever $ do
150150
start <- getMonotonicTime
@@ -156,12 +156,9 @@ muxer egressQueue Bearer { writeMany, sduSize, batchSize } =
156156
empty <- atomically $ isEmptyTBQueue egressQueue
157157
when (empty) $ do
158158
let delta = diffTime end start
159-
threadDelay (loopInterval - delta)
159+
threadDelay (egressInterval - delta)
160160

161161
where
162-
loopInterval :: DiffTime
163-
loopInterval = 0.001
164-
165162
maxSDUsPerBatch :: Int
166163
maxSDUsPerBatch = 100
167164

network-mux/src/Network/Mux/Types.hs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,17 +244,19 @@ msHeaderLength = 8
244244
--
245245
data Bearer m = Bearer {
246246
-- | Timestamp and send SDU.
247-
write :: TimeoutFn m -> SDU -> m Time
247+
write :: TimeoutFn m -> SDU -> m Time
248248
-- | Timestamp and send many SDUs.
249-
, writeMany :: TimeoutFn m -> [SDU] -> m Time
249+
, writeMany :: TimeoutFn m -> [SDU] -> m Time
250250
-- | Read a SDU
251-
, read :: TimeoutFn m -> m (SDU, Time)
251+
, read :: TimeoutFn m -> m (SDU, Time)
252252
-- | Return a suitable SDU payload size.
253-
, sduSize :: SDUSize
253+
, sduSize :: SDUSize
254254
-- | Return a suitable batch size
255-
, batchSize :: Int
255+
, batchSize :: Int
256256
-- | Name of the bearer
257-
, name :: String
257+
, name :: String
258+
-- | Egress poll interval
259+
, egressInterval :: DiffTime
258260
}
259261

260262
newtype SDUSize = SDUSize { getSDUSize :: Word16 }

network-mux/test/Test/Mux.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ runWithSocket cap clientBuf_m serverBuf_m initApps respApps = withIOManager (\io
919919
)
920920
)
921921
where
922-
mkBearer buf_m sock tr = getBearer makeSocketBearer (-1) tr sock buf_m
922+
mkBearer buf_m sock tr = getBearer (makeSocketBearer' 0.001) (-1) tr sock buf_m
923923
clientTracer = contramap (Mx.WithBearer "client") activeTracer
924924
serverTracer = contramap (Mx.WithBearer "server") activeTracer
925925

ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,13 @@ makeFDBearer :: MonadDelay m
347347
=> MakeBearer m (FD m)
348348
makeFDBearer = MakeBearer $ \_ _ _ _ ->
349349
return Mx.Bearer {
350-
Mx.write = \_ _ -> getMonotonicTime,
351-
Mx.writeMany = \_ _ -> getMonotonicTime,
352-
Mx.read = \_ -> forever (threadDelay 3600),
353-
Mx.sduSize = Mx.SDUSize 1500,
354-
Mx.batchSize = 1500,
355-
Mx.name = "FD"
350+
Mx.write = \_ _ -> getMonotonicTime,
351+
Mx.writeMany = \_ _ -> getMonotonicTime,
352+
Mx.read = \_ -> forever (threadDelay 3600),
353+
Mx.sduSize = Mx.SDUSize 1500,
354+
Mx.batchSize = 1500,
355+
Mx.name = "FD",
356+
Mx.egressInterval = 0
356357
}
357358

358359
-- | We only keep exceptions here which should not be handled by the test

ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module Ouroboros.Network.Snocket
2121
, AddressFamily (..)
2222
, Snocket (..)
2323
, makeSocketBearer
24+
, makeSocketBearer'
2425
, makeLocalRawBearer
2526
-- ** Socket based Snockets
2627
, SocketSnocket

ouroboros-network/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
- Renamed `Arguments` to `DiffusionConfiguration`
3838
- Renamed `Applications` to `DiffusionApplications`
3939
- `runM` function now receives `ExtraParameters` as an argument
40+
- Configurable Mux Egress Poll Interval
4041

4142
## 0.20.1.0 -- 2025-03-13
4243

ouroboros-network/cardano-diffusion/Cardano/Network/Diffusion.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ run lpci tracerChurnMode localConfig metrics tracers args apps = do
136136
(\e -> traceWith tracer (Diffusion.DiffusionErrored e)
137137
>> throwIO (Diffusion.DiffusionError e))
138138
$ withIOManager $ \iocp -> do
139-
interfaces <- Diffusion.mkInterfaces iocp tracer
139+
interfaces <- Diffusion.mkInterfaces iocp tracer (Diffusion.dcEgressPollInterval args)
140140
Diffusion.runM
141141
interfaces
142142
tracers

ouroboros-network/src/Ouroboros/Network/Diffusion.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ import Ouroboros.Network.Protocol.Handshake
7878
import Ouroboros.Network.RethrowPolicy
7979
import Ouroboros.Network.Server qualified as Server
8080
import Ouroboros.Network.Snocket (LocalAddress, LocalSocket (..),
81-
localSocketFileDescriptor, makeLocalBearer, makeSocketBearer)
81+
localSocketFileDescriptor, makeLocalBearer, makeSocketBearer')
8282
import Ouroboros.Network.Snocket qualified as Snocket
8383
import Ouroboros.Network.Socket (configureSocket, configureSystemdSocket)
8484

@@ -870,7 +870,7 @@ run extraParams tracers args apps = do
870870
>> throwIO (DiffusionError e))
871871
$ withIOManager $ \iocp -> do
872872

873-
interfaces <- mkInterfaces iocp tracer
873+
interfaces <- mkInterfaces iocp tracer (dcEgressPollInterval args)
874874

875875
runM interfaces
876876
tracers
@@ -884,21 +884,25 @@ run extraParams tracers args apps = do
884884

885885
mkInterfaces :: IOManager
886886
-> Tracer IO (DiffusionTracer ntnAddr ntcAddr)
887+
-> DiffTime
887888
-> IO (Interfaces Socket
888889
RemoteAddress
889890
LocalSocket
890891
LocalAddress
891892
Resolver
892893
IOException
893894
IO)
894-
mkInterfaces iocp tracer = do
895+
mkInterfaces iocp tracer egressPollInterval = do
895896

896897
diRng <- newStdGen
897898
diConnStateIdSupply <- atomically $ CM.newConnStateIdSupply Proxy
898899

900+
-- Clamp the mux egress poll interval to sane values.
901+
let egressInterval = max 0 $ min 0.200 egressPollInterval
902+
899903
return $ Interfaces {
900904
diNtnSnocket = Snocket.socketSnocket iocp,
901-
diNtnBearer = makeSocketBearer,
905+
diNtnBearer = makeSocketBearer' egressInterval,
902906
diWithBuffer = withReadBufferIO,
903907
diNtnConfigureSocket = configureSocket,
904908
diNtnConfigureSystemdSocket =

ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ data Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr = Configuration {
467467
--
468468
, dcLocalMuxForkPolicy :: Mx.ForkPolicy ntcAddr
469469

470+
-- | Mux egress queue's poll interval
471+
, dcEgressPollInterval :: DiffTime
472+
470473
}
471474

472475

ouroboros-network/testlib/Test/Ouroboros/Network/Diffusion/Node.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ run blockGeneratorArgs limits ni na
475475
, Diffusion.dcReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently
476476
, Diffusion.dcMuxForkPolicy = noBindForkPolicy
477477
, Diffusion.dcLocalMuxForkPolicy = noBindForkPolicy
478+
, Diffusion.dcEgressPollInterval = 0.001
478479
}
479480

480481
appArgs :: PeerMetrics m NtNAddr

0 commit comments

Comments
 (0)