Skip to content

Add Datagram transport #4283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
3 changes: 3 additions & 0 deletions common/crypto/chunk.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package crypto

import (
"context"
"encoding/binary"
"io"

"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
)

// ChunkSizeDecoder is a utility class to decode size value from bytes.
@@ -117,6 +119,7 @@ func (r *ChunkStreamReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
}
r.leftOverSize = size

errors.LogInfo(context.Background(), "StreamReader read ", size)
mb, err := r.reader.ReadAtMost(size)
if !mb.IsEmpty() {
r.leftOverSize -= mb.Len()
2 changes: 2 additions & 0 deletions common/mux/reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mux

import (
"context"
"io"

"github.com/xtls/xray-core/common/buf"
@@ -33,6 +34,7 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
}

size, err := serial.ReadUint16(r.reader)
errors.LogInfo(context.Background(), "PacketReader read ", size, r.dest)
if err != nil {
return nil, err
}
6 changes: 4 additions & 2 deletions common/mux/server_test.go
Original file line number Diff line number Diff line change
@@ -90,7 +90,8 @@ func TestRegressionOutboundLeak(t *testing.T) {
}

{
b := buf.FromBytes([]byte("hello"))
b := buf.New()
b.Write([]byte("hello"))
common.Must(muxClientDownlink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
}

@@ -102,7 +103,8 @@ func TestRegressionOutboundLeak(t *testing.T) {
}

{
b := buf.FromBytes([]byte("world"))
b := buf.New()
b.Write([]byte("world"))
common.Must(websiteUplink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
}

6 changes: 5 additions & 1 deletion common/mux/writer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package mux

import (
"context"

"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/serial"
@@ -75,10 +78,10 @@ func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuf
if _, err := serial.WriteUint16(frame, uint16(data.Len())); err != nil {
return err
}

mb2 := make(buf.MultiBuffer, 0, len(data)+1)
mb2 = append(mb2, frame)
mb2 = append(mb2, data...)
mb2 = buf.Compact(mb2)
return writer.WriteMultiBuffer(mb2)
}

@@ -106,6 +109,7 @@ func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
mb = mb2
chunk = buf.MultiBuffer{b}
}
errors.LogInfo(context.Background(), "MuxWriter write ", chunk.Len(), w.dest)
if err := w.writeData(chunk); err != nil {
return err
}
32 changes: 30 additions & 2 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/httpupgrade"
"github.com/xtls/xray-core/transport/internet/kcp"
"github.com/xtls/xray-core/transport/internet/quic"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/splithttp"
"github.com/xtls/xray-core/transport/internet/tcp"
@@ -332,6 +333,22 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
return config, nil
}

type QUICConfig struct {
// Header json.RawMessage `json:"header"`
// Security string `json:"security"`
// Key string `json:"key"`

Fec bool `json:"fec"`
}

// Build implements Buildable.
func (c *QUICConfig) Build() (proto.Message, error) {
config := &quic.Config{
Fec: c.Fec,
}
return config, nil
}

func readFileOrString(f string, s []string) ([]byte, error) {
if len(f) > 0 {
return filesystem.ReadFile(f)
@@ -683,8 +700,8 @@ func (p TransportProtocol) Build() (string, error) {
return "httpupgrade", nil
case "h2", "h3", "http":
return "", errors.PrintRemovedFeatureError("HTTP transport (without header padding, etc.)", "XHTTP stream-one H2 & H3")
case "quic":
return "", errors.PrintRemovedFeatureError("QUIC transport (without web service, etc.)", "XHTTP stream-one H3")
case "quic", "datagram":
return "quic", nil
default:
return "", errors.New("Config: unknown transport protocol: ", p)
}
@@ -839,6 +856,7 @@ type StreamConfig struct {
XHTTPSettings *SplitHTTPConfig `json:"xhttpSettings"`
SplitHTTPSettings *SplitHTTPConfig `json:"splithttpSettings"`
KCPSettings *KCPConfig `json:"kcpSettings"`
QUICSettings *QUICConfig `json:"quicSettings"`
GRPCSettings *GRPCConfig `json:"grpcSettings"`
WSSettings *WebSocketConfig `json:"wsSettings"`
HTTPUPGRADESettings *HttpUpgradeConfig `json:"httpupgradeSettings"`
@@ -930,6 +948,16 @@ func (c *StreamConfig) Build() (*internet.StreamConfig, error) {
Settings: serial.ToTypedMessage(ts),
})
}
if c.QUICSettings != nil {
qs, err := c.QUICSettings.Build()
if err != nil {
return nil, errors.New("Failed to build QUIC config").Base(err)
}
config.TransportSettings = append(config.TransportSettings, &internet.TransportConfig{
ProtocolName: "quic",
Settings: serial.ToTypedMessage(qs),
})
}
if c.GRPCSettings != nil {
gs, err := c.GRPCSettings.Build()
if err != nil {
1 change: 1 addition & 0 deletions main/distro/all/all.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ import (
_ "github.com/xtls/xray-core/transport/internet/grpc"
_ "github.com/xtls/xray-core/transport/internet/httpupgrade"
_ "github.com/xtls/xray-core/transport/internet/kcp"
_ "github.com/xtls/xray-core/transport/internet/quic"
_ "github.com/xtls/xray-core/transport/internet/reality"
_ "github.com/xtls/xray-core/transport/internet/splithttp"
_ "github.com/xtls/xray-core/transport/internet/tcp"
137 changes: 137 additions & 0 deletions transport/internet/quic/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions transport/internet/quic/config.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package xray.transport.internet.quic;
option csharp_namespace = "Xray.Transport.Internet.Quic";
option go_package = "github.com/xtls/xray-core/transport/internet/quic";
option java_package = "com.xray.transport.internet.quic";
option java_multiple_files = true;

message Config {
// string key = 1;
// xray.common.protocol.SecurityConfig security = 2;
// xray.common.serial.TypedMessage header = 3;
bool fec = 4;
}
247 changes: 247 additions & 0 deletions transport/internet/quic/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package quic

import (
"context"
"time"

"github.com/quic-go/quic-go"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/serial"
"github.com/xtls/xray-core/common/signal/done"
)

var MaxIncomingStreams = 16
var currentStream = 0

type interConn struct {
ctx context.Context
quicConn quic.Connection // small udp packet can be sent with Datagram directly
streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering
readChannel chan readResult
reader buf.MultiBufferContainer
done *done.Instance
local net.Addr
remote net.Addr
}

type readResult struct {
buffer []byte
err error
}

func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done.Instance, remote net.Addr) *interConn {
c := &interConn{
ctx: ctx,
quicConn: quicConn,
readChannel: make(chan readResult),
reader: buf.MultiBufferContainer{},
done: done,
local: quicConn.LocalAddr(),
remote: remote,
}
go func() {
for {
received, e := c.quicConn.ReceiveDatagram(c.ctx)
errors.LogInfo(c.ctx, "Read ReceiveDatagram ", len(received))
c.readChannel <- readResult{buffer: received, err: e}
}
}()
go c.acceptStreams()
return c
}

func (c *interConn) acceptStreams() {
for {
stream, err := c.quicConn.AcceptStream(context.Background())
errors.LogInfo(c.ctx, "Read AcceptStream ", err)
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to accept stream")
select {
case <-c.quicConn.Context().Done():
return
case <-c.done.Wait():
if err := c.quicConn.CloseWithError(0, ""); err != nil {
errors.LogInfoInner(context.Background(), err, "failed to close connection")
}
return
default:
time.Sleep(time.Second)
continue
}
}
go c.readMuxCoolPacket(stream)
c.streams = append(c.streams, stream)
}
}

func (c *interConn) readMuxCoolPacket(stream quic.Stream) {
for {
received := make([]byte, buf.Size)
i, e := stream.Read(received)
if e != nil {
errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i)
c.readChannel <- readResult{buffer: nil, err: e}
continue;
}
errors.LogInfo(c.ctx, "Read stream ", i)

buffer := buf.New()
buffer.Write(received[:i])
muxCoolReader := &buf.MultiBufferContainer{}
muxCoolReader.MultiBuffer = append(muxCoolReader.MultiBuffer, buffer)
var meta mux.FrameMetadata
err := meta.Unmarshal(muxCoolReader)
if err != nil {
errors.LogInfo(c.ctx, "Not a Mux Cool packet beginning, copy directly ", i)
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
c.readChannel <- readResult{buffer: received[:i], err: e}
continue;
}
if !meta.Option.Has(mux.OptionData) {
errors.LogInfo(c.ctx, "No option data, copy directly ", i)
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
c.readChannel <- readResult{buffer: received[:i], err: e}
continue;
}
size, err := serial.ReadUint16(muxCoolReader)
remaining := uint16(muxCoolReader.MultiBuffer.Len())
errors.LogInfo(c.ctx, "Read stream ", i, " option size ", size, " remaining size ", remaining)
if err != nil || size <= remaining || size > remaining + 1500 {
errors.LogInfo(c.ctx, "do not wait for second part of UDP packet ", i)
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
c.readChannel <- readResult{buffer: received[:i], err: e}
continue;
}

i2, e := stream.Read(received[i:])
if e != nil {
errors.LogErrorInner(c.ctx, e, "Error read stream, drop this buffer ", i2)
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
c.readChannel <- readResult{buffer: nil, err: e}
continue;
}
errors.LogInfo(c.ctx, "Read stream i2 size ", i2)
buf.ReleaseMulti(muxCoolReader.MultiBuffer)
c.readChannel <- readResult{buffer: received[:(i + i2)], err: e}
}
}

func (c *interConn) Read(b []byte) (int, error) {
if c.reader.MultiBuffer.Len() > 0 {
return c.reader.Read(b)
}
received := <- c.readChannel
if received.err != nil {
return 0, received.err
}
buffer := buf.New()
buffer.Write(received.buffer)
c.reader.MultiBuffer = append(c.reader.MultiBuffer, buffer)
errors.LogInfo(c.ctx, "Read copy ", len(received.buffer))
return c.reader.Read(b)
}

func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
mb = buf.Compact(mb)
mb, err := buf.WriteMultiBuffer(c, mb)
buf.ReleaseMulti(mb)
return err
}

func (c *interConn) Write(b []byte) (int, error) {
if len(b) > 1240 { // TODO: why quic-go increase internal MTU causing packet loss?
if len(c.streams) < MaxIncomingStreams {
stream, err := c.quicConn.OpenStream()
errors.LogInfo(c.ctx, "Write OpenStream ", err)
if err == nil {
c.streams = append(c.streams, stream)
} else {
errors.LogInfoInner(c.ctx, err, "failed to openStream: ")
}
}
currentStream++;
if currentStream > len(c.streams) - 1 {
currentStream = 0;
}
errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams))
return c.streams[currentStream].Write(b)
}
var err = c.quicConn.SendDatagram(b)
errors.LogInfo(c.ctx, "Write SendDatagram ", len(b), err)
if _, ok := err.(*quic.DatagramTooLargeError); ok {
if len(c.streams) < MaxIncomingStreams {
stream, err := c.quicConn.OpenStream()
errors.LogInfo(c.ctx, "Write OpenStream ", err)
if err == nil {
c.streams = append(c.streams, stream)
} else {
errors.LogInfoInner(c.ctx, err, "failed to openStream: ")
}
}
currentStream++;
if currentStream > len(c.streams) - 1 {
currentStream = 0;
}
errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams))
return c.streams[currentStream].Write(b)
}
if err != nil {
return 0, err
}
return len(b), nil
}

func (c *interConn) Close() error {
var err error
for _, s := range c.streams {
e := s.Close()
if e != nil {
err = e
}
}
return err
}

func (c *interConn) LocalAddr() net.Addr {
return c.local
}

func (c *interConn) RemoteAddr() net.Addr {
return c.remote
}

func (c *interConn) SetDeadline(t time.Time) error {
var err error
for _, s := range c.streams {
e := s.SetDeadline(t)
if e != nil {
err = e
}
}
return err
}

func (c *interConn) SetReadDeadline(t time.Time) error {
var err error
for _, s := range c.streams {
e := s.SetReadDeadline(t)
if e != nil {
err = e
}
}
return err
}

func (c *interConn) SetWriteDeadline(t time.Time) error {
var err error
for _, s := range c.streams {
e := s.SetWriteDeadline(t)
if e != nil {
err = e
}
}
return err
}
95 changes: 95 additions & 0 deletions transport/internet/quic/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package quic

import (
"context"
"time"

"github.com/quic-go/quic-go"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)

func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
if tlsConfig == nil {
tlsConfig = &tls.Config{
ServerName: internalDomain,
AllowInsecure: true,
}
}

var destAddr *net.UDPAddr
if dest.Address.Family().IsIP() {
destAddr = &net.UDPAddr{
IP: dest.Address.IP(),
Port: int(dest.Port),
}
} else {
dialerIp := internet.DestIpAddress()
if dialerIp != nil {
destAddr = &net.UDPAddr{
IP: dialerIp,
Port: int(dest.Port),
}
errors.LogInfo(ctx, "quic Dial use dialer dest addr: ", destAddr)
} else {
addr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
if err != nil {
return nil, err
}
destAddr = addr
}
}

config := streamSettings.ProtocolSettings.(*Config)

return openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
}

func openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
dest := net.DestinationFromAddr(destAddr)
errors.LogInfo(ctx, "dialing quic to ", dest)
rawConn, err := internet.DialSystem(ctx, dest, sockopt)
if err != nil {
return nil, errors.New("failed to dial to dest: ", err).AtWarning().Base(err)
}

quicConfig := &quic.Config{
KeepAlivePeriod: 0,
HandshakeIdleTimeout: time.Second * 8,
MaxIdleTimeout: time.Second * 300,
EnableDatagrams: true,
}

var udpConn *net.UDPConn
switch conn := rawConn.(type) {
case *net.UDPConn:
udpConn = conn
case *internet.PacketConnWrapper:
udpConn = conn.Conn.(*net.UDPConn)
default:
rawConn.Close()
return nil, errors.New("QUIC with sockopt is unsupported").AtWarning()
}

tr := quic.Transport{
ConnectionIDLength: 12,
Conn: udpConn,
}
conn, err := tr.Dial(context.Background(), destAddr, tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
if err != nil {
udpConn.Close()
return nil, err
}

return NewConnInitReader(ctx, conn, done.New(), destAddr), nil
}

func init() {
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
}
108 changes: 108 additions & 0 deletions transport/internet/quic/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package quic

import (
"context"
"time"

"github.com/quic-go/quic-go"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/tls/cert"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/tls"
)

// Listener is an internet.Listener that listens for TCP connections.
type Listener struct {
rawConn *net.UDPConn
listener *quic.Listener
done *done.Instance
addConn internet.ConnHandler
}

func (l *Listener) keepAccepting(ctx context.Context) {
for {
conn, err := l.listener.Accept(context.Background())
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to accept QUIC connection")
if l.done.Done() {
break
}
time.Sleep(time.Second)
continue
}
l.addConn(NewConnInitReader(ctx, conn, l.done, conn.RemoteAddr()))
}
}

// Addr implements internet.Listener.Addr.
func (l *Listener) Addr() net.Addr {
return l.listener.Addr()
}

// Close implements internet.Listener.Close.
func (l *Listener) Close() error {
l.done.Close()
l.listener.Close()
l.rawConn.Close()
return nil
}

// Listen creates a new Listener based on configurations.
func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
if address.Family().IsDomain() {
return nil, errors.New("domain address is not allows for listening quic")
}

tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
if tlsConfig == nil {
tlsConfig = &tls.Config{
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.DNSNames(internalDomain), cert.CommonName(internalDomain)))},
}
}

//config := streamSettings.ProtocolSettings.(*Config)
rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, err
}

quicConfig := &quic.Config{
KeepAlivePeriod: 0,
HandshakeIdleTimeout: time.Second * 8,
MaxIdleTimeout: time.Second * 300,
MaxIncomingStreams: 16,
MaxIncomingUniStreams: -1,
EnableDatagrams: true,
}

tr := quic.Transport{
ConnectionIDLength: 12,
Conn: rawConn.(*net.UDPConn),
}
qListener, err := tr.Listen(tlsConfig.GetTLSConfig(), quicConfig)
if err != nil {
rawConn.Close()
return nil, err
}

listener := &Listener{
done: done.New(),
rawConn: rawConn.(*net.UDPConn),
listener: qListener,
addConn: handler,
}

go listener.keepAccepting(ctx)

return listener, nil
}

func init() {
common.Must(internet.RegisterTransportListener(protocolName, Listen))
}
17 changes: 17 additions & 0 deletions transport/internet/quic/quic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package quic

import (
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/transport/internet"
)

const (
protocolName = "quic"
internalDomain = "quic.internal.example.com"
)

func init() {
common.Must(internet.RegisterProtocolConfigCreator(protocolName, func() interface{} {
return new(Config)
}))
}
105 changes: 105 additions & 0 deletions transport/internet/quic/quic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package quic_test

import (
"context"
"crypto/rand"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol/tls/cert"
"github.com/xtls/xray-core/testing/servers/udp"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/quic"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)

func TestShortQuicConnection(t *testing.T) {
testQuicConnection(t, 1024)
}

func TestAroundMTUQuicConnection(t *testing.T) {
testQuicConnection(t, 1247)
}

func TestLongQuicConnection(t *testing.T) {
testQuicConnection(t, 1500)
}

func testQuicConnection(t *testing.T, dataLen int32) {
port := udp.PickPort()

listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{
ProtocolName: "quic",
ProtocolSettings: &quic.Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
Certificate: []*tls.Certificate{
tls.ParseCertificate(
cert.MustGenerate(nil,
cert.DNSNames("www.example.com"),
),
),
},
},
}, func(conn stat.Connection) {
go func() {
defer conn.Close()

b := buf.New()
defer b.Release()

for {
b.Clear()
if _, err := b.ReadFrom(conn); err != nil {
return
}
common.Must2(conn.Write(b.Bytes()))
}
}()
})
common.Must(err)

defer listener.Close()

time.Sleep(time.Second)

dctx := context.Background()
conn, err := quic.Dial(dctx, net.TCPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{
ProtocolName: "quic",
ProtocolSettings: &quic.Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
ServerName: "www.example.com",
AllowInsecure: true,
},
})
common.Must(err)
defer conn.Close()

b1 := make([]byte, dataLen)
common.Must2(rand.Read(b1))
b2 := buf.New()

common.Must2(conn.Write(b1))

b2.Clear()
common.Must2(b2.ReadFullFrom(conn, dataLen))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}

time.Sleep(1000 * time.Millisecond)

common.Must2(conn.Write(b1))

b2.Clear()
common.Must2(b2.ReadFullFrom(conn, dataLen))
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
t.Error(r)
}
}