diff --git a/engine.go b/engine.go index adcc3a42..2fd9d756 100644 --- a/engine.go +++ b/engine.go @@ -499,6 +499,10 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) { if e.OnDataPacket != nil { e.OnDataPacket(identity, msg.SipDtmf) } + case *livekit.DataPacket_TimeSyncResponse: + if e.OnDataPacket != nil { + e.OnDataPacket(identity, msg.TimeSyncResponse) + } } } diff --git a/go.mod b/go.mod index d9a4ef8d..41a90f65 100644 --- a/go.mod +++ b/go.mod @@ -6,12 +6,13 @@ toolchain go1.22.2 require ( github.com/bep/debounce v1.2.1 + github.com/frostbyte73/core v0.0.10 github.com/go-logr/logr v1.4.1 github.com/go-logr/stdr v1.2.2 github.com/gorilla/websocket v1.5.1 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a - github.com/livekit/protocol v1.15.0 + github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 + github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890 github.com/magefile/mage v1.15.0 github.com/pion/dtls/v2 v2.2.10 github.com/pion/interceptor v0.1.27 @@ -34,7 +35,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/frostbyte73/core v0.0.10 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gammazero/deque v0.2.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect @@ -69,6 +69,7 @@ require ( go.uber.org/zap v1.27.0 // indirect go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.22.0 // indirect + golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect diff --git a/go.sum b/go.sum index c787ac3d..4dbf2574 100644 --- a/go.sum +++ b/go.sum @@ -58,10 +58,12 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a h1:ATbv0x7G5tW2HgiouQ57csFE/G4gekl2oV1cxb2Dy24= -github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.15.0 h1:JAatoWKYdFx3D0U4JBWg25ZlrY+NK26xHabFopS2Jhk= -github.com/livekit/protocol v1.15.0/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 h1:NR227C8Xo0/Q8uJMDTuEcsOybMIwRJpRFdHiDr3eOHs= +github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11/go.mod h1:MEYWFuRwniEbshuIyL2K3cFi8km+XGlm1n5cqw6pmsU= +github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd h1:tJcKbNFygIPs0lS0RzwcewM1flVVHGmFBR+A4BDUkf8= +github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890 h1:Sx56+lv2bm3Th/txkzOcfKrNOiN1esZKK6OsrlggtEg= +github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -177,6 +179,8 @@ golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJ golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= diff --git a/localparticipant.go b/localparticipant.go index a7ec4129..94c63de8 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -16,27 +16,37 @@ package lksdk import ( "sort" + "sync" "time" + "github.com/frostbyte73/core" "github.com/pion/webrtc/v3" "google.golang.org/protobuf/proto" + "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" ) const ( trackPublishTimeout = 10 * time.Second + timeSyncTimeout = 5 * time.Second ) type LocalParticipant struct { baseParticipant engine *RTCEngine + + timeSyncLock sync.Mutex + timeSynchronized *core.Fuse + timeSyncResponseChan chan livekit.TimeSyncResponse + timeSyncInfo *mediatransportutil.TimeSyncInfo } func newLocalParticipant(engine *RTCEngine, roomcallback *RoomCallback) *LocalParticipant { return &LocalParticipant{ - baseParticipant: *newBaseParticipant(roomcallback), - engine: engine, + baseParticipant: *newBaseParticipant(roomcallback), + engine: engine, + timeSyncResponseChan: make(chan livekit.TimeSyncResponse, 10), } } @@ -229,7 +239,11 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra return pub, nil } -func (p *LocalParticipant) republishTracks() { +func (p *LocalParticipant) republishTracksAndResetTimeSync() { + p.timeSyncLock.Lock() + p.timeSyncInfo = nil + p.timeSyncLock.Unlock() + var localPubs []*LocalTrackPublication p.tracks.Range(func(key, value interface{}) bool { track := value.(*LocalTrackPublication) @@ -272,11 +286,92 @@ func (p *LocalParticipant) closeTracks() { } } +func (p *LocalParticipant) syncTime(timeSyncInfo *mediatransportutil.TimeSyncInfo) { + req := p.timeSyncInfo.StartTimeSync() + dataPacket := &livekit.DataPacket{Value: &livekit.DataPacket_TimeSyncRequest{ + TimeSyncRequest: &req, + }} + + err := p.publishData(livekit.DataPacket_RELIABLE, dataPacket) + if err != nil { + logger.Debugw("time sync data channel request failed sending", "error", err) + return + } + + for { + select { + case <-time.After(timeSyncTimeout): + logger.Debugw("timeout waiting for time synchronization response") + return + case resp := <-p.timeSyncResponseChan: + err := timeSyncInfo.HandleTimeSyncResponse(resp) + if err == nil { + logger.Debugw("time synchronization successful") + return + } + // Wait for a potential other response to come on the channel + logger.Debugw("error handling time synchronization response", "error", err) + } + } +} + +func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyncInfo { + needToSyncTime := false + p.timeSyncLock.Lock() + if p.timeSyncInfo == nil { + needToSyncTime = true + p.timeSyncInfo = &mediatransportutil.TimeSyncInfo{} + p.timeSynchronized = &core.Fuse{} + } + + timeSyncInfo := p.timeSyncInfo + timeSynchronized := p.timeSynchronized + + p.timeSyncLock.Unlock() + + // Do not hold the lock while synchronizing time to prevent blocking the reconnection callback that resets timeSyncInfo + if needToSyncTime { + // Do not retry time sync if it fails for now as the most likely reason is that the SFU does not support the time sync protocol + p.syncTime(timeSyncInfo) + timeSynchronized.Break() + } + + <-timeSynchronized.Watch() + + return timeSyncInfo +} + +func (p *LocalParticipant) convertLocalTimeToSFUTime(ts *uint64, timeSyncInfo *mediatransportutil.TimeSyncInfo) { + if ts == nil { + return + } + + timeTs := mediatransportutil.NtpTime(*ts).Time() + + sfuTs, err := p.timeSyncInfo.GetPeerTimeForLocalTime(timeTs) + if err != nil { + // Leave time as is + return + } + ntpSfuTs := mediatransportutil.ToNtpTime(sfuTs) + + *ts = uint64(ntpSfuTs) +} + func (p *LocalParticipant) publishData(kind livekit.DataPacket_Kind, dataPacket *livekit.DataPacket) error { if err := p.engine.ensurePublisherConnected(true); err != nil { return err } + if userPkt, ok := dataPacket.Value.(*livekit.DataPacket_User); ok { + if userPkt.User.StartTime != nil || userPkt.User.StartTime != nil { + timeSyncInfo := p.getSynchronizedTimeSync() + + p.convertLocalTimeToSFUTime(userPkt.User.StartTime, timeSyncInfo) + p.convertLocalTimeToSFUTime(userPkt.User.EndTime, timeSyncInfo) + } + } + encoded, err := proto.Marshal(dataPacket) if err != nil { return err @@ -309,6 +404,7 @@ type DataPacket interface { var ( _ DataPacket = (*UserDataPacket)(nil) _ DataPacket = (*livekit.SipDTMF)(nil) // implemented in the protocol package + _ DataPacket = (*livekit.TimeSyncResponse)(nil) ) // UserData is a custom user data that can be sent via WebRTC. @@ -318,8 +414,10 @@ func UserData(data []byte) *UserDataPacket { // UserDataPacket is a custom user data that can be sent via WebRTC on a custom topic. type UserDataPacket struct { - Payload []byte - Topic string // optional + Payload []byte + Topic string // optional + StartTime *time.Time // optional + EndTime *time.Time // optional } // ToProto implements DataPacket. @@ -328,10 +426,21 @@ func (p *UserDataPacket) ToProto() *livekit.DataPacket { if p.Topic != "" { topic = proto.String(p.Topic) } + + var startTime, endTime *uint64 + if p.StartTime != nil { + startTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.StartTime))) + } + if p.EndTime != nil { + endTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.EndTime))) + } + return &livekit.DataPacket{Value: &livekit.DataPacket_User{ User: &livekit.UserPacket{ - Payload: p.Payload, - Topic: topic, + Payload: p.Payload, + Topic: topic, + StartTime: startTime, + EndTime: endTime, }, }} } @@ -471,3 +580,11 @@ func (p *LocalParticipant) onTrackMuted(pub *LocalTrackPublication, muted bool) p.roomCallback.OnTrackUnmuted(pub, p) } } + +func (p *LocalParticipant) onTimeSyncResponse(resp livekit.TimeSyncResponse) { + select { + case p.timeSyncResponseChan <- resp: + default: + logger.Infow("time sync response chan full") + } +} diff --git a/room.go b/room.go index 6e5f8739..9c195973 100644 --- a/room.go +++ b/room.go @@ -412,7 +412,7 @@ func (r *Room) handleRestarted(joinRes *livekit.JoinResponse) { r.handleParticipantUpdate(joinRes.OtherParticipants) - r.LocalParticipant.republishTracks() + r.LocalParticipant.republishTracksAndResetTimeSync() r.callback.OnReconnected() } @@ -443,6 +443,8 @@ func (r *Room) handleDataReceived(identity string, dataPacket DataPacket) { p.Callback.OnDataReceived(msg.Payload, params) } r.callback.OnDataReceived(msg.Payload, params) + case *livekit.TimeSyncResponse: + r.LocalParticipant.onTimeSyncResponse(*msg) } if p != nil { p.Callback.OnDataPacket(dataPacket, params)