Skip to content

Synchronize publisher time to the SFU when sending a UserData message over the data channel with timing information #469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) {
if e.OnDataPacket != nil {
e.OnDataPacket(identity, msg.SipDtmf)
}
case *livekit.DataPacket_TimeSyncResponse:
if e.OnDataPacket != nil {
e.OnDataPacket(identity, msg.TimeSyncResponse)
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ toolchain go1.22.2

require (
github.com/bep/debounce v1.2.1
github.com/frostbyte73/core v0.0.10
github.com/go-logr/logr v1.4.1
github.com/go-logr/stdr v1.2.2
github.com/gorilla/websocket v1.5.1
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a
github.com/livekit/protocol v1.15.0
github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11
github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890
github.com/magefile/mage v1.15.0
github.com/pion/dtls/v2 v2.2.10
github.com/pion/interceptor v0.1.27
Expand All @@ -34,7 +35,6 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/channels v1.1.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/frostbyte73/core v0.0.10 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
Expand Down Expand Up @@ -69,6 +69,7 @@ require (
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a h1:ATbv0x7G5tW2HgiouQ57csFE/G4gekl2oV1cxb2Dy24=
github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.15.0 h1:JAatoWKYdFx3D0U4JBWg25ZlrY+NK26xHabFopS2Jhk=
github.com/livekit/protocol v1.15.0/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11 h1:NR227C8Xo0/Q8uJMDTuEcsOybMIwRJpRFdHiDr3eOHs=
github.com/livekit/mediatransportutil v0.0.0-20240511004033-f08cbc684b11/go.mod h1:MEYWFuRwniEbshuIyL2K3cFi8km+XGlm1n5cqw6pmsU=
github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd h1:tJcKbNFygIPs0lS0RzwcewM1flVVHGmFBR+A4BDUkf8=
github.com/livekit/protocol v1.15.1-0.20240508201813-6afe4c3491fd/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890 h1:Sx56+lv2bm3Th/txkzOcfKrNOiN1esZKK6OsrlggtEg=
github.com/livekit/protocol v1.15.1-0.20240515200202-fbcdeac2c890/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
Expand Down Expand Up @@ -177,6 +179,8 @@ golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJ
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
Expand Down
131 changes: 124 additions & 7 deletions localparticipant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,37 @@ package lksdk

import (
"sort"
"sync"
"time"

"github.com/frostbyte73/core"
"github.com/pion/webrtc/v3"
"google.golang.org/protobuf/proto"

"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
)

const (
trackPublishTimeout = 10 * time.Second
timeSyncTimeout = 5 * time.Second
)

type LocalParticipant struct {
baseParticipant
engine *RTCEngine

timeSyncLock sync.Mutex
timeSynchronized *core.Fuse
timeSyncResponseChan chan livekit.TimeSyncResponse
timeSyncInfo *mediatransportutil.TimeSyncInfo
}

func newLocalParticipant(engine *RTCEngine, roomcallback *RoomCallback) *LocalParticipant {
return &LocalParticipant{
baseParticipant: *newBaseParticipant(roomcallback),
engine: engine,
baseParticipant: *newBaseParticipant(roomcallback),
engine: engine,
timeSyncResponseChan: make(chan livekit.TimeSyncResponse, 10),
}
}

Expand Down Expand Up @@ -229,7 +239,11 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra
return pub, nil
}

func (p *LocalParticipant) republishTracks() {
func (p *LocalParticipant) republishTracksAndResetTimeSync() {
p.timeSyncLock.Lock()
p.timeSyncInfo = nil
p.timeSyncLock.Unlock()

var localPubs []*LocalTrackPublication
p.tracks.Range(func(key, value interface{}) bool {
track := value.(*LocalTrackPublication)
Expand Down Expand Up @@ -272,11 +286,92 @@ func (p *LocalParticipant) closeTracks() {
}
}

func (p *LocalParticipant) syncTime(timeSyncInfo *mediatransportutil.TimeSyncInfo) {
req := p.timeSyncInfo.StartTimeSync()
dataPacket := &livekit.DataPacket{Value: &livekit.DataPacket_TimeSyncRequest{
TimeSyncRequest: &req,
}}

err := p.publishData(livekit.DataPacket_RELIABLE, dataPacket)
if err != nil {
logger.Debugw("time sync data channel request failed sending", "error", err)
return
}

for {
select {
case <-time.After(timeSyncTimeout):
logger.Debugw("timeout waiting for time synchronization response")
return
case resp := <-p.timeSyncResponseChan:
err := timeSyncInfo.HandleTimeSyncResponse(resp)
if err == nil {
logger.Debugw("time synchronization successful")
return
}
// Wait for a potential other response to come on the channel
logger.Debugw("error handling time synchronization response", "error", err)
}
}
}

func (p *LocalParticipant) getSynchronizedTimeSync() *mediatransportutil.TimeSyncInfo {
needToSyncTime := false
p.timeSyncLock.Lock()
if p.timeSyncInfo == nil {
needToSyncTime = true
p.timeSyncInfo = &mediatransportutil.TimeSyncInfo{}
p.timeSynchronized = &core.Fuse{}
}

timeSyncInfo := p.timeSyncInfo
timeSynchronized := p.timeSynchronized

p.timeSyncLock.Unlock()

// Do not hold the lock while synchronizing time to prevent blocking the reconnection callback that resets timeSyncInfo
if needToSyncTime {
// Do not retry time sync if it fails for now as the most likely reason is that the SFU does not support the time sync protocol
p.syncTime(timeSyncInfo)
timeSynchronized.Break()
}

<-timeSynchronized.Watch()

return timeSyncInfo
}

func (p *LocalParticipant) convertLocalTimeToSFUTime(ts *uint64, timeSyncInfo *mediatransportutil.TimeSyncInfo) {
if ts == nil {
return
}

timeTs := mediatransportutil.NtpTime(*ts).Time()

sfuTs, err := p.timeSyncInfo.GetPeerTimeForLocalTime(timeTs)
if err != nil {
// Leave time as is
return
}
ntpSfuTs := mediatransportutil.ToNtpTime(sfuTs)

*ts = uint64(ntpSfuTs)
}

func (p *LocalParticipant) publishData(kind livekit.DataPacket_Kind, dataPacket *livekit.DataPacket) error {
if err := p.engine.ensurePublisherConnected(true); err != nil {
return err
}

if userPkt, ok := dataPacket.Value.(*livekit.DataPacket_User); ok {
if userPkt.User.StartTime != nil || userPkt.User.StartTime != nil {
timeSyncInfo := p.getSynchronizedTimeSync()

p.convertLocalTimeToSFUTime(userPkt.User.StartTime, timeSyncInfo)
p.convertLocalTimeToSFUTime(userPkt.User.EndTime, timeSyncInfo)
}
}

encoded, err := proto.Marshal(dataPacket)
if err != nil {
return err
Expand Down Expand Up @@ -309,6 +404,7 @@ type DataPacket interface {
var (
_ DataPacket = (*UserDataPacket)(nil)
_ DataPacket = (*livekit.SipDTMF)(nil) // implemented in the protocol package
_ DataPacket = (*livekit.TimeSyncResponse)(nil)
)

// UserData is a custom user data that can be sent via WebRTC.
Expand All @@ -318,8 +414,10 @@ func UserData(data []byte) *UserDataPacket {

// UserDataPacket is a custom user data that can be sent via WebRTC on a custom topic.
type UserDataPacket struct {
Payload []byte
Topic string // optional
Payload []byte
Topic string // optional
StartTime *time.Time // optional
EndTime *time.Time // optional
}

// ToProto implements DataPacket.
Expand All @@ -328,10 +426,21 @@ func (p *UserDataPacket) ToProto() *livekit.DataPacket {
if p.Topic != "" {
topic = proto.String(p.Topic)
}

var startTime, endTime *uint64
if p.StartTime != nil {
startTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.StartTime)))
}
if p.EndTime != nil {
endTime = proto.Uint64(uint64(mediatransportutil.ToNtpTime(*p.EndTime)))
}

return &livekit.DataPacket{Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{
Payload: p.Payload,
Topic: topic,
Payload: p.Payload,
Topic: topic,
StartTime: startTime,
EndTime: endTime,
},
}}
}
Expand Down Expand Up @@ -471,3 +580,11 @@ func (p *LocalParticipant) onTrackMuted(pub *LocalTrackPublication, muted bool)
p.roomCallback.OnTrackUnmuted(pub, p)
}
}

func (p *LocalParticipant) onTimeSyncResponse(resp livekit.TimeSyncResponse) {
select {
case p.timeSyncResponseChan <- resp:
default:
logger.Infow("time sync response chan full")
}
}
4 changes: 3 additions & 1 deletion room.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (r *Room) handleRestarted(joinRes *livekit.JoinResponse) {

r.handleParticipantUpdate(joinRes.OtherParticipants)

r.LocalParticipant.republishTracks()
r.LocalParticipant.republishTracksAndResetTimeSync()

r.callback.OnReconnected()
}
Expand Down Expand Up @@ -443,6 +443,8 @@ func (r *Room) handleDataReceived(identity string, dataPacket DataPacket) {
p.Callback.OnDataReceived(msg.Payload, params)
}
r.callback.OnDataReceived(msg.Payload, params)
case *livekit.TimeSyncResponse:
r.LocalParticipant.onTimeSyncResponse(*msg)
}
if p != nil {
p.Callback.OnDataPacket(dataPacket, params)
Expand Down