Skip to content

Commit f840cd1

Browse files
committed
Experiment: pingpong seed
1 parent 5d252bf commit f840cd1

File tree

11 files changed

+155
-298
lines changed

11 files changed

+155
-298
lines changed

common/buf/copy.go

+14-15
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111

1212
type dataHandler func(MultiBuffer)
1313

14-
type copyHandler struct {
15-
onData []dataHandler
14+
type CopyHandler struct {
15+
OnData []dataHandler
1616
}
1717

1818
// SizeCounter is for counting bytes copied by Copy().
@@ -21,30 +21,30 @@ type SizeCounter struct {
2121
}
2222

2323
// CopyOption is an option for copying data.
24-
type CopyOption func(*copyHandler)
24+
type CopyOption func(*CopyHandler)
2525

2626
// UpdateActivity is a CopyOption to update activity on each data copy operation.
2727
func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
28-
return func(handler *copyHandler) {
29-
handler.onData = append(handler.onData, func(MultiBuffer) {
28+
return func(handler *CopyHandler) {
29+
handler.OnData = append(handler.OnData, func(MultiBuffer) {
3030
timer.Update()
3131
})
3232
}
3333
}
3434

3535
// CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
3636
func CountSize(sc *SizeCounter) CopyOption {
37-
return func(handler *copyHandler) {
38-
handler.onData = append(handler.onData, func(b MultiBuffer) {
37+
return func(handler *CopyHandler) {
38+
handler.OnData = append(handler.OnData, func(b MultiBuffer) {
3939
sc.Size += int64(b.Len())
4040
})
4141
}
4242
}
4343

4444
// AddToStatCounter a CopyOption add to stat counter
4545
func AddToStatCounter(sc stats.Counter) CopyOption {
46-
return func(handler *copyHandler) {
47-
handler.onData = append(handler.onData, func(b MultiBuffer) {
46+
return func(handler *CopyHandler) {
47+
handler.OnData = append(handler.OnData, func(b MultiBuffer) {
4848
if sc != nil {
4949
sc.Add(int64(b.Len()))
5050
}
@@ -88,18 +88,17 @@ func IsWriteError(err error) bool {
8888
return ok
8989
}
9090

91-
func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
91+
func copyInternal(reader Reader, writer Writer, handler *CopyHandler) error {
9292
for {
9393
buffer, err := reader.ReadMultiBuffer()
9494
if !buffer.IsEmpty() {
95-
for _, handler := range handler.onData {
96-
handler(buffer)
97-
}
98-
9995
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
10096
return writeError{werr}
10197
}
10298
}
99+
for _, handler := range handler.OnData {
100+
handler(buffer)
101+
}
103102

104103
if err != nil {
105104
return readError{err}
@@ -109,7 +108,7 @@ func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
109108

110109
// Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
111110
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
112-
var handler copyHandler
111+
var handler CopyHandler
113112
for _, option := range options {
114113
option(&handler)
115114
}

proxy/addons.pb.go

+29-20
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proxy/addons.proto

+1
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ message DelayConfig {
3838

3939
message SchedulerConfig {
4040
uint32 TimeoutMillis = 1; // original traffic will not be sent right away but when scheduler want to send or pending buffer times out
41+
bool PingPong = 2;
4142
// Other TBD
4243
}

proxy/freedom/freedom.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
231231
inTimer = inbound.Timer
232232
}
233233
if !isTLSConn(conn) { // it would be tls conn in special use case of MITM, we need to let link handle traffic
234-
return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer)
234+
return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer, inTimer, nil)
235235
}
236236
}
237237
var reader buf.Reader

proxy/proxy.go

+24-9
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ type VisionWriter struct {
207207
trafficState *TrafficState
208208
ctx context.Context
209209
writeOnceUserUUID *[]byte
210-
scheduler *Scheduler
210+
Scheduler *Scheduler
211211
}
212212

213213
func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, context context.Context) *VisionWriter {
@@ -219,7 +219,7 @@ func NewVisionWriter(writer buf.Writer, addon *Addons, state *TrafficState, cont
219219
trafficState: state,
220220
ctx: context,
221221
writeOnceUserUUID: &w,
222-
scheduler: NewScheduler(writer, addon, state, &w, context),
222+
Scheduler: NewScheduler(writer, addon, state, &w, context),
223223
}
224224
}
225225

@@ -272,12 +272,24 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
272272
if w.trafficState.StartTime.IsZero() {
273273
w.trafficState.StartTime = time.Now()
274274
}
275-
w.scheduler.Buffer <- mb
276-
if w.addons.Scheduler == nil {
277-
w.scheduler.Trigger <- -1 // send all buffers
275+
w.Scheduler.Buffer <- mb
276+
w.Scheduler.Trigger <- -1 // send all buffers if no independent scheduler
277+
if w.addons.Scheduler != nil {
278+
w.Scheduler.TimeoutLock.Lock()
279+
w.Scheduler.TimeoutCounter++
280+
w.Scheduler.TimeoutLock.Unlock()
281+
go func() {
282+
time.Sleep(time.Duration(w.addons.Scheduler.TimeoutMillis) * time.Millisecond)
283+
w.Scheduler.TimeoutLock.Lock()
284+
w.Scheduler.TimeoutCounter--
285+
if w.Scheduler.TimeoutCounter == 0 {
286+
w.Scheduler.Trigger <- 0 // send when the latest buffer timeout
287+
}
288+
w.Scheduler.TimeoutLock.Unlock()
289+
}()
278290
}
279-
if len(w.scheduler.Error) > 0 {
280-
return <-w.scheduler.Error
291+
if len(w.Scheduler.Error) > 0 {
292+
return <-w.Scheduler.Error
281293
}
282294
return nil
283295
}
@@ -509,7 +521,7 @@ func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) {
509521
// CopyRawConnIfExist use the most efficient copy method.
510522
// - If caller don't want to turn on splice, do not pass in both reader conn and writer conn
511523
// - writer are from *transport.Link
512-
func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer) error {
524+
func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer *signal.ActivityTimer, inTimer *signal.ActivityTimer, scheduler *Scheduler) error {
513525
readerConn, readCounter, _ := UnwrapRawConn(readerConn)
514526
writerConn, _, writeCounter := UnwrapRawConn(writerConn)
515527
reader := buf.NewReader(readerConn)
@@ -572,10 +584,13 @@ func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net
572584
if readCounter != nil {
573585
readCounter.Add(int64(buffer.Len()))
574586
}
575-
timer.Update()
576587
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
577588
return werr
578589
}
590+
timer.Update()
591+
}
592+
if scheduler != nil {
593+
scheduler.Trigger <- 2
579594
}
580595
if err != nil {
581596
return err

proxy/scheduler.go

+23-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type Scheduler struct {
1515
Buffer chan buf.MultiBuffer
1616
Trigger chan int
1717
Error chan error
18+
TimeoutCounter int
19+
TimeoutLock *sync.Mutex
1820
closed chan int
1921
bufferReadLock *sync.Mutex
2022
writer buf.Writer
@@ -24,11 +26,21 @@ type Scheduler struct {
2426
ctx context.Context
2527
}
2628

29+
func TriggerScheduler(scheduler *Scheduler) buf.CopyOption {
30+
return func(handler *buf.CopyHandler) {
31+
handler.OnData = append(handler.OnData, func(buf.MultiBuffer) {
32+
scheduler.Trigger <- 2 // send fake buffer if no pending
33+
})
34+
}
35+
}
36+
2737
func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler {
2838
var s = Scheduler{
2939
Buffer: make(chan buf.MultiBuffer, 100),
3040
Trigger: make(chan int),
3141
Error: make(chan error, 100),
42+
TimeoutCounter: 0,
43+
TimeoutLock: new(sync.Mutex),
3244
closed: make(chan int),
3345
bufferReadLock: new(sync.Mutex),
3446
writer: w,
@@ -37,18 +49,27 @@ func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]
3749
writeOnceUserUUID: userUUID,
3850
ctx: context,
3951
}
52+
return &s
53+
}
54+
55+
func(s *Scheduler) Start() {
4056
go s.mainLoop()
41-
if s.addons.Scheduler != nil {
57+
if s.addons.Scheduler != nil && !s.addons.Scheduler.PingPong {
4258
go s.exampleIndependentScheduler()
4359
}
44-
return &s
4560
}
4661

4762
func(s *Scheduler) mainLoop() {
4863
for trigger := range s.Trigger {
4964
if len(s.closed) > 0 {
5065
return
5166
}
67+
if trigger == -1 && s.addons.Scheduler != nil {
68+
continue
69+
}
70+
if trigger == 2 && (s.addons.Scheduler == nil || !s.addons.Scheduler.PingPong) {
71+
continue
72+
}
5273
go func() { // each trigger has independent delay, trigger does not block
5374
var d = 0 * time.Millisecond
5475
if s.addons.Delay != nil {

proxy/vless/encoding/addons.go

+5-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"io"
7+
"strings"
78

89
"github.com/xtls/xray-core/common/buf"
910
"github.com/xtls/xray-core/common/errors"
@@ -54,15 +55,6 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*proxy.Addons, er
5455
return addons, nil
5556
}
5657

57-
// EncodeBodyAddons returns a Writer that auto-encrypt content written by caller.
58-
func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Writer {
59-
w := proxy.NewVisionWriter(writer, addons, state, context)
60-
if request.Command == protocol.RequestCommandUDP {
61-
return NewMultiLengthPacketWriter(w)
62-
}
63-
return w
64-
}
65-
6658
// DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.
6759
func DecodeBodyAddons(reader io.Reader, request *protocol.RequestHeader, addons *proxy.Addons, state *proxy.TrafficState, context context.Context) buf.Reader {
6860
r := proxy.NewVisionReader(buf.NewReader(reader), addons, state, context)
@@ -181,7 +173,7 @@ func (r *LengthPacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
181173
func PopulateSeed(seed string, addons *proxy.Addons) {
182174
if len(seed) > 0 {
183175
addons.Seed = []byte {1} // only turn on, more TBD
184-
addons.Mode = proxy.SeedMode_PaddingPlusDelay
176+
addons.Mode = proxy.SeedMode_IndependentScheduler
185177
addons.Duration = "0-8"
186178
addons.Padding = &proxy.PaddingConfig{
187179
RegularMin: 0,
@@ -196,6 +188,7 @@ func PopulateSeed(seed string, addons *proxy.Addons) {
196188
// }
197189
addons.Scheduler = &proxy.SchedulerConfig{
198190
TimeoutMillis: 600,
191+
PingPong: strings.Contains(seed, "pingpong"),
199192
}
200193
} else if addons.Flow == vless.XRV {
201194
addons.Seed = []byte {1} // only turn on, more TBD
@@ -244,7 +237,8 @@ func CheckSeed(requestAddons *proxy.Addons, responseAddons *proxy.Addons) error
244237
return errors.New("Delay of one is nil but the other is not nil")
245238
}
246239
if requestAddons.Scheduler != nil && responseAddons.Scheduler != nil {
247-
if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis {
240+
if requestAddons.Scheduler.TimeoutMillis != responseAddons.Scheduler.TimeoutMillis ||
241+
requestAddons.Scheduler.PingPong != responseAddons.Scheduler.PingPong {
248242
return errors.New("Scheduler not match")
249243
}
250244
} else if requestAddons.Scheduler != nil || responseAddons.Scheduler != nil {

0 commit comments

Comments
 (0)