From 6dbcefc07e346272e5c3949255fc0b6c172f2c1c Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 13 May 2024 12:28:55 -0700 Subject: [PATCH 1/4] WiP --- go.mod | 6 +-- go.sum | 8 ++-- localparticipant.go | 103 +++++++++++++++++++++++++++++++++++++++++--- room.go | 2 +- 4 files changed, 104 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index d9a4ef8d..d20f3422 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,13 @@ toolchain go1.22.2 require ( github.com/bep/debounce v1.2.1 + github.com/frostbyte73/core v0.0.10 github.com/go-logr/logr v1.4.1 github.com/go-logr/stdr v1.2.2 github.com/gorilla/websocket v1.5.1 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a - github.com/livekit/protocol v1.15.0 + github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 + github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd github.com/magefile/mage v1.15.0 github.com/pion/dtls/v2 v2.2.10 github.com/pion/interceptor v0.1.27 @@ -34,7 +35,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/frostbyte73/core v0.0.10 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gammazero/deque v0.2.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect diff --git a/go.sum b/go.sum index c787ac3d..eb4c0173 100644 --- a/go.sum +++ b/go.sum @@ -58,10 +58,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a h1:ATbv0x7G5tW2HgiouQ57csFE/G4gekl2oV1cxb2Dy24= -github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.15.0 h1:JAatoWKYdFx3D0U4JBWg25ZlrY+NK26xHabFopS2Jhk= -github.com/livekit/protocol v1.15.0/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 h1:NR227C8Xo0/Q8uJMDTuEcsOybMIwRJpRFdHiDr3eOHs= +github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11/go.mod h1:MEYWFuRwniEbshuIyL2K3cFi8km+XGlm1n5cqw6pmsU= +github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd h1:tJcKbNFygIPs0lS0RzwcewM1flVVHGmFBR+A4BDUkf8= +github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= diff --git a/localparticipant.go b/localparticipant.go index a7ec4129..0046fcd3 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -16,27 +16,37 @@ package lksdk import ( "sort" + "sync" "time" + "github.com/frostbyte73/core" "github.com/pion/webrtc/v3" "google.golang.org/protobuf/proto" + "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" ) const ( trackPublishTimeout = 10 * time.Second + timeSyncTimeout = 5 * time.Second ) type LocalParticipant struct { baseParticipant engine *RTCEngine + + timeSyncLock sync.Mutex + timeSynchronized *core.Fuse + timeSyncResponseChan chan livekit.TimeSyncResponse + timeSyncInfo *mediatransportutil.TimeSyncInfo } func newLocalParticipant(engine *RTCEngine, roomcallback *RoomCallback) *LocalParticipant { return &LocalParticipant{ - baseParticipant: *newBaseParticipant(roomcallback), - engine: engine, + baseParticipant: *newBaseParticipant(roomcallback), + engine: engine, + timeSyncResponseChan: make(chan livekit.TimeSyncResponse, 10), } } @@ -229,7 +239,7 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra return pub, nil } -func (p *LocalParticipant) republishTracks() { +func (p *LocalParticipant) republishTracksAndResetTimeSync() { var localPubs []*LocalTrackPublication p.tracks.Range(func(key, value interface{}) bool { track := value.(*LocalTrackPublication) @@ -272,11 +282,86 @@ func (p *LocalParticipant) closeTracks() { } } +func (p *LocalParticipant) syncTime(timeSyncInfo *mediatransportutil.TimeSyncInfo) { + req := p.timeSyncInfo.StartTimeSync() + dataPacket := &livekit.DataPacket{Value: &livekit.DataPacket_TimeSyncRequest{ + TimeSyncRequest: &req, + }} + + for { + select { + case <-time.After(timeSyncTimeout): + logger.Debugw("timeout waiting for time synchronization response") + return + case resp := <-p.timeSyncResponseChan: + err := timeSyncInfo.HandleTimeSyncResponse(resp) + if err == nil { + logger.Debugw("time synchronization successful") + return + } + // Wait for a potential other response to come on the channel + logger.Debugw("error handling time synchronization response", "error", err) + } + } +} + +func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyncInfo { + needToSyncTime := false + p.timeSyncLock.Lock() + if p.timeSyncInfo == nil { + needToSyncTime = true + p.timeSyncInfo = &mediatransportutil.TimeSyncInfo{} + p.timeSynchronized = &core.Fuse{} + } + + timeSyncInfo := p.timeSyncInfo + timeSynchronized := p.timeSynchronized + + p.timeSyncLock.Unlock() + + // Do not hold the lock while synchronizing time to prevent blocking the reconnection callback that resets timeSyncInfo + if needToSyncTime { + // Do not retry time sync if it fails for now as the most likely reason is that the SFU does not support the time sync protocol + p.syncTime(syncTime) + timeSynchronized.Break() + } + + <-timeSynchronized.Watch() + + return timeSyncInfo +} + +func (p *LocalParticipant) convertLocalTimeToSFUTime(ts *uint64, timeSyncInfo *mediatransportutil.TimeSyncInfo) { + if ts == nil { + return + } + + timeTs := mediatransportutil.NtpTime(*userPkt.User.StartTime).Time() + + sfuTs, err := p.timeSyncInfo.GetPeerTimeForLocalTime(timeTs) + if err != nil { + // Leave time as is + return + } + ntpSfuTs := mediatransportutil.ToNtpTime(sfuTs) + + *ts = uint64(ntpSfuTs) +} + func (p *LocalParticipant) publishData(kind livekit.DataPacket_Kind, dataPacket *livekit.DataPacket) error { if err := p.engine.ensurePublisherConnected(true); err != nil { return err } + if userPkt, ok := dataPacket.Value.(*livekit.DataPacket_User); ok { + if userPkt.User.StartTime != nil || userPkt.User.StartTime != nil { + timeSyncInfo := p.getSynchronizedTimeSync() + + p.convertLocalTimeToSFUTime(userPkt.User.StartTime, timeSyncInfo) + p.convertLocalTimeToSFUTime(userPkt.User.EndTime, timeSyncInfo) + } + } + encoded, err := proto.Marshal(dataPacket) if err != nil { return err @@ -318,8 +403,10 @@ func UserData(data []byte) *UserDataPacket { // UserDataPacket is a custom user data that can be sent via WebRTC on a custom topic. type UserDataPacket struct { - Payload []byte - Topic string // optional + Payload []byte + Topic string // optional + StartTime *uint64 // optional + EndTime *uint64 // optional } // ToProto implements DataPacket. @@ -330,8 +417,10 @@ func (p *UserDataPacket) ToProto() *livekit.DataPacket { } return &livekit.DataPacket{Value: &livekit.DataPacket_User{ User: &livekit.UserPacket{ - Payload: p.Payload, - Topic: topic, + Payload: p.Payload, + Topic: topic, + StartTime: p.StartTime, + EndTime: p.EndTime, }, }} } diff --git a/room.go b/room.go index 6e5f8739..06d4a84d 100644 --- a/room.go +++ b/room.go @@ -412,7 +412,7 @@ func (r *Room) handleRestarted(joinRes *livekit.JoinResponse) { r.handleParticipantUpdate(joinRes.OtherParticipants) - r.LocalParticipant.republishTracks() + r.LocalParticipant.republishTracksAndResetTimeSync() r.callback.OnReconnected() } From 8a8a1fad8eadb7b5c912d41f06c20380adff9196 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 13 May 2024 15:49:22 -0700 Subject: [PATCH 2/4] WiP --- localparticipant.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/localparticipant.go b/localparticipant.go index 0046fcd3..454dd986 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -240,6 +240,10 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra } func (p *LocalParticipant) republishTracksAndResetTimeSync() { + p.timeSyncLock.Lock() + p.timeSyncInfo = nil + p.timeSyncLock.Unlock() + var localPubs []*LocalTrackPublication p.tracks.Range(func(key, value interface{}) bool { track := value.(*LocalTrackPublication) @@ -288,6 +292,12 @@ func (p *LocalParticipant) syncTime(timeSyncInfo *mediatransportutil.TimeSyncInf TimeSyncRequest: &req, }} + err := p.publishData(livekit.DataPacket_RELIABLE, dataPacket) + if err != nil { + logger.Debugw("time sync data channel request failed sending", "error", err) + return + } + for { select { case <-time.After(timeSyncTimeout): @@ -322,7 +332,7 @@ func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyn // Do not hold the lock while synchronizing time to prevent blocking the reconnection callback that resets timeSyncInfo if needToSyncTime { // Do not retry time sync if it fails for now as the most likely reason is that the SFU does not support the time sync protocol - p.syncTime(syncTime) + p.syncTime(timeSyncInfo) timeSynchronized.Break() } @@ -336,7 +346,7 @@ func (p *LocalParticipant) convertLocalTimeToSFUTime(ts *uint64, timeSyncInfo *m return } - timeTs := mediatransportutil.NtpTime(*userPkt.User.StartTime).Time() + timeTs := mediatransportutil.NtpTime(*ts).Time() sfuTs, err := p.timeSyncInfo.GetPeerTimeForLocalTime(timeTs) if err != nil { From f0c6f5a0f7be0e289495245b2815efdb443ab1c3 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 15 May 2024 13:25:24 -0700 Subject: [PATCH 3/4] WiP --- engine.go | 4 ++++ go.mod | 3 ++- go.sum | 4 ++++ localparticipant.go | 31 ++++++++++++++++++++++++++----- room.go | 2 ++ 5 files changed, 38 insertions(+), 6 deletions(-) diff --git a/engine.go b/engine.go index adcc3a42..2fd9d756 100644 --- a/engine.go +++ b/engine.go @@ -499,6 +499,10 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) { if e.OnDataPacket != nil { e.OnDataPacket(identity, msg.SipDtmf) } + case *livekit.DataPacket_TimeSyncResponse: + if e.OnDataPacket != nil { + e.OnDataPacket(identity, msg.TimeSyncResponse) + } } } diff --git a/go.mod b/go.mod index d20f3422..41a90f65 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 - github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd + github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890 github.com/magefile/mage v1.15.0 github.com/pion/dtls/v2 v2.2.10 github.com/pion/interceptor v0.1.27 @@ -69,6 +69,7 @@ require ( go.uber.org/zap v1.27.0 // indirect go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.22.0 // indirect + golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect diff --git a/go.sum b/go.sum index eb4c0173..4dbf2574 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 h1:NR22 github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11/go.mod h1:MEYWFuRwniEbshuIyL2K3cFi8km+XGlm1n5cqw6pmsU= github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd h1:tJcKbNFygIPs0lS0RzwcewM1flVVHGmFBR+A4BDUkf8= github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890 h1:Sx56+lv2bm3Th/txkzOcfKrNOiN1esZKK6OsrlggtEg= +github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -177,6 +179,8 @@ golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJ golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= diff --git a/localparticipant.go b/localparticipant.go index 454dd986..a1ef7338 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -15,6 +15,7 @@ package lksdk import ( + "fmt" "sort" "sync" "time" @@ -336,6 +337,8 @@ func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyn timeSynchronized.Break() } + fmt.Println("time synced", p.timeSyncInfo) + <-timeSynchronized.Watch() return timeSyncInfo @@ -404,6 +407,7 @@ type DataPacket interface { var ( _ DataPacket = (*UserDataPacket)(nil) _ DataPacket = (*livekit.SipDTMF)(nil) // implemented in the protocol package + _ DataPacket = (*livekit.TimeSyncResponse)(nil) ) // UserData is a custom user data that can be sent via WebRTC. @@ -414,9 +418,9 @@ func UserData(data []byte) *UserDataPacket { // UserDataPacket is a custom user data that can be sent via WebRTC on a custom topic. type UserDataPacket struct { Payload []byte - Topic string // optional - StartTime *uint64 // optional - EndTime *uint64 // optional + Topic string // optional + StartTime *time.Time // optional + EndTime *time.Time // optional } // ToProto implements DataPacket. @@ -425,12 +429,21 @@ func (p *UserDataPacket) ToProto() *livekit.DataPacket { if p.Topic != "" { topic = proto.String(p.Topic) } + + var startTime, endTime *uint64 + if p.StartTime != nil { + startTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.StartTime))) + } + if p.EndTime != nil { + endTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.EndTime))) + } + return &livekit.DataPacket{Value: &livekit.DataPacket_User{ User: &livekit.UserPacket{ Payload: p.Payload, Topic: topic, - StartTime: p.StartTime, - EndTime: p.EndTime, + StartTime: startTime, + EndTime: endTime, }, }} } @@ -570,3 +583,11 @@ func (p *LocalParticipant) onTrackMuted(pub *LocalTrackPublication, muted bool) p.roomCallback.OnTrackUnmuted(pub, p) } } + +func (p *LocalParticipant) onTimeSyncResponse(resp livekit.TimeSyncResponse) { + select { + case p.timeSyncResponseChan <- resp: + default: + logger.Infow("time sync response chan full") + } +} diff --git a/room.go b/room.go index 06d4a84d..9c195973 100644 --- a/room.go +++ b/room.go @@ -443,6 +443,8 @@ func (r *Room) handleDataReceived(identity string, dataPacket DataPacket) { p.Callback.OnDataReceived(msg.Payload, params) } r.callback.OnDataReceived(msg.Payload, params) + case *livekit.TimeSyncResponse: + r.LocalParticipant.onTimeSyncResponse(*msg) } if p != nil { p.Callback.OnDataPacket(dataPacket, params) From d16a70933f89b85526facfc7d0d6b1b588a884b1 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 15 May 2024 13:30:09 -0700 Subject: [PATCH 4/4] WiP --- localparticipant.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/localparticipant.go b/localparticipant.go index a1ef7338..94c63de8 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -15,7 +15,6 @@ package lksdk import ( - "fmt" "sort" "sync" "time" @@ -337,8 +336,6 @@ func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyn timeSynchronized.Break() } - fmt.Println("time synced", p.timeSyncInfo) - <-timeSynchronized.Watch() return timeSyncInfo