Skip to content

Commit 4fb029c

Browse files
Introduce support for pipelined requests, key prefix stripping, custom IPs, and Unix sockets
1 parent 49cf40c commit 4fb029c

File tree

5 files changed

+130
-80
lines changed

5 files changed

+130
-80
lines changed

cmd/basic/main.go

+19-10
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package main
33
import (
44
"flag"
55
"fmt"
6-
pcgr "github.com/dgryski/go-pcgr"
7-
mct "github.com/dormando/mctester"
86
"math/rand"
97
"os"
108
"runtime/pprof"
119
"time"
10+
11+
"github.com/dgryski/go-pcgr"
12+
mct "github.com/memcached/mctester"
1213
)
1314

1415
var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file")
@@ -31,6 +32,10 @@ func main() {
3132
zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number")
3233
valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss")
3334
clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss")
35+
pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) stack this many GET requests into the same syscall.")
36+
server := flag.String("server", "127.0.0.1:11211", "ip and port to connect to")
37+
socket := flag.String("socket", "", "domain socket to connect to")
38+
stripKeyPrefix := flag.Bool("stripkeyprefix", false, "strip key prefix before comparing with response.")
3439

3540
flag.Parse()
3641

@@ -53,7 +58,10 @@ func main() {
5358
*/
5459

5560
bl := &BasicLoader{
56-
servers: []string{"127.0.0.1:11211"},
61+
servers: []string{*server},
62+
socket: *socket,
63+
pipelines: *pipelines,
64+
stripKeyPrefix: *stripKeyPrefix,
5765
desiredConnCount: *connCount,
5866
requestsPerSleep: *reqPerSleep,
5967
requestBundlesPerConn: *reqBundlePerConn,
@@ -100,6 +108,9 @@ func main() {
100108
// - variances: how often to change item sizes
101109
type BasicLoader struct {
102110
servers []string
111+
socket string
112+
pipelines uint
113+
stripKeyPrefix bool
103114
stopAfter time.Time
104115
desiredConnCount int
105116
requestsPerSleep int
@@ -141,8 +152,8 @@ func (l *BasicLoader) Run() {
141152

142153
func (l *BasicLoader) Timer(tag string, start time.Time) {
143154
duration := time.Since(start)
144-
if duration > time.Millisecond * 10 {
145-
fmt.Printf("%s [%d]\n", tag, int64(time.Since(start) / time.Microsecond))
155+
if duration > time.Millisecond*10 {
156+
fmt.Printf("%s [%d]\n", tag, int64(time.Since(start)/time.Microsecond))
146157
}
147158
}
148159

@@ -152,7 +163,7 @@ func (l *BasicLoader) Timer(tag string, start time.Time) {
152163
func (l *BasicLoader) Worker(doneChan chan<- int) {
153164
// FIXME: selector.
154165
host := l.servers[0]
155-
mc := mct.NewClient(host)
166+
mc := mct.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix)
156167
bundles := l.requestBundlesPerConn
157168

158169
rs := pcgr.New(time.Now().UnixNano(), 0)
@@ -185,15 +196,13 @@ func (l *BasicLoader) Worker(doneChan chan<- int) {
185196
// Could also re-seed it twice, pull once Intn for length,
186197
// re-seed, then again for key space.
187198

188-
keyLen := l.keyLength
189199
if l.useZipf {
190200
subRS.Seed(int64(zipRS.Uint64()))
191201
} else {
192202
subRS.Seed(int64(randR.Intn(l.keySpace)))
193203
}
194-
// TODO: might be nice to pass (by ref?) prefix in here to make
195-
// use of string.Builder.
196-
key := l.keyPrefix + mct.RandString(&subRS, keyLen)
204+
205+
key := mct.RandString(&subRS, l.keyLength, l.keyPrefix)
197206
// chance we issue a delete instead.
198207
delChance := randR.Intn(1000)
199208
if l.deletePercent != 0 && delChance < l.deletePercent {

cmd/server/loader_basic.go

+27-22
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,42 @@ package main
22

33
import (
44
"fmt"
5-
pcgr "github.com/dgryski/go-pcgr"
6-
mct "github.com/dormando/mctester"
75
"math/rand"
86
"time"
7+
8+
"github.com/dgryski/go-pcgr"
9+
mct "github.com/memcached/mctester"
910
)
1011

1112
// Basic persistent load test, using text protocol:
1213
type BasicLoader struct {
13-
Servers []string `json:"servers"`
14-
stopAfter time.Time
15-
DesiredConnCount int `json:"conncount"`
16-
RequestsPerSleep int `json:"reqpersleep"`
17-
RequestBundlesPerConn int `json:"reqbundlesperconn"`
14+
Servers []string `json:"servers"`
15+
Socket string `json:"socket"`
16+
Pipelines uint `json:"pipelines"`
17+
StripKeyPrefix bool `json:"stripkeyprefix"`
18+
DesiredConnCount int `json:"conncount"`
19+
RequestsPerSleep int `json:"reqpersleep"`
20+
RequestBundlesPerConn int `json:"reqbundlesperconn"`
1821
SleepPerBundle time.Duration `json:"sleepperbundle"`
19-
DeletePercent int `json:"deletepercent"`
20-
KeyLength int `json:"keylength"`
21-
KeyPrefix string `json:"keyprefix"`
22-
KeySpace int `json:"keyspace"`
23-
KeyTTL uint `json:"keyttl"`
24-
UseZipf bool `json:"zipf"`
25-
ZipfS float64 `json:"zipfS"` // (> 1, generally 1.01-2) pulls the power curve toward 0)
26-
ZipfV float64 `json:"zipfV"` // v (< KeySpace) puts the main part of the curve before this number
27-
ValueSize uint `json:"valuesize"`
28-
ClientFlags uint `json:"clientflags"`
22+
DeletePercent int `json:"deletepercent"`
23+
KeyLength int `json:"keylength"`
24+
KeyPrefix string `json:"keyprefix"`
25+
KeySpace int `json:"keyspace"`
26+
KeyTTL uint `json:"keyttl"`
27+
UseZipf bool `json:"zipf"`
28+
ZipfS float64 `json:"zipfS"` // (> 1, generally 1.01-2) pulls the power curve toward 0)
29+
ZipfV float64 `json:"zipfV"` // v (< KeySpace) puts the main part of the curve before this number
30+
ValueSize uint `json:"valuesize"`
31+
ClientFlags uint `json:"clientflags"`
32+
stopAfter time.Time
2933
}
3034

3135
func newBasicLoader() *BasicLoader {
3236
return &BasicLoader{
3337
Servers: []string{"127.0.0.1:11211"},
38+
Socket: "",
39+
Pipelines: 1,
40+
StripKeyPrefix: false,
3441
DesiredConnCount: 1,
3542
RequestsPerSleep: 1,
3643
RequestBundlesPerConn: 1,
@@ -116,7 +123,7 @@ func runBasicLoader(Update <-chan interface{}, worker interface{}) {
116123
func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l *BasicLoader) {
117124
// TODO: server selector.
118125
host := l.Servers[0]
119-
mc := mct.NewClient(host)
126+
mc := mct.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix)
120127
bundles := l.RequestBundlesPerConn
121128

122129
rs := pcgr.New(time.Now().UnixNano(), 0)
@@ -149,15 +156,13 @@ func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l
149156
// Could also re-seed it twice, pull once Intn for length,
150157
// re-seed, then again for key space.
151158

152-
keyLen := l.KeyLength
153159
if l.UseZipf {
154160
subRS.Seed(int64(zipRS.Uint64()))
155161
} else {
156162
subRS.Seed(int64(randR.Intn(l.KeySpace)))
157163
}
158-
// TODO: might be nice to pass (by ref?) prefix in here to make
159-
// use of string.Builder.
160-
key := l.KeyPrefix + mct.RandString(&subRS, keyLen)
164+
165+
key := mct.RandString(&subRS, l.KeyLength, l.KeyPrefix)
161166
// chance we issue a delete instead.
162167
if l.DeletePercent != 0 && randR.Intn(1000) < l.DeletePercent {
163168
_, err := mc.Delete(key)

protocol.go

+76-45
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"io"
1818
"net"
1919
"strconv"
20+
"strings"
2021
"time"
2122
)
2223

@@ -36,7 +37,13 @@ type mcConn struct {
3637
}
3738

3839
func (c *Client) connectToMc() (*mcConn, error) {
39-
conn, err := net.DialTimeout("tcp", c.Host, c.ConnectTimeout)
40+
var conn net.Conn
41+
var err error
42+
if c.socket != "" {
43+
conn, err = net.DialTimeout("unix", c.socket, c.ConnectTimeout)
44+
} else {
45+
conn, err = net.DialTimeout("tcp", c.Host, c.ConnectTimeout)
46+
}
4047
if err != nil {
4148
return nil, err
4249
}
@@ -50,19 +57,27 @@ type Client struct {
5057
// read or write timeout
5158
NetTimeout time.Duration
5259
Host string
60+
socket string
5361
cn *mcConn
5462
WBufSize int
5563
RBufSize int
5664
// any necessary locks? channels?
5765
// binprot structure cache.
58-
binpkt *packet
59-
opaque uint32 // just for binprot?
66+
binpkt *packet
67+
opaque uint32 // just for binprot?
68+
pipelines int
69+
keyPrefix string
70+
stripKeyPrefix bool
6071
}
6172

62-
func NewClient(host string) (client *Client) {
73+
func NewClient(host string, socket string, pipelines uint, keyPrefix string, stripKeyPrefix bool) (client *Client) {
6374
client = &Client{
64-
Host: host,
65-
binpkt: &packet{},
75+
Host: host,
76+
socket: socket,
77+
pipelines: int(pipelines),
78+
keyPrefix: keyPrefix,
79+
stripKeyPrefix: stripKeyPrefix,
80+
binpkt: &packet{},
6681
}
6782
//client.rs = rand.NewSource(time.Now().UnixNano())
6883
return client
@@ -319,56 +334,72 @@ func (c *Client) MetaDebug(key string) (err error) {
319334
//////////////////////////////////////////////
320335

321336
func (c *Client) Get(key string) (flags uint64, value []byte, code McCode, err error) {
337+
pipelines := c.pipelines
338+
// Expected key from response
339+
respKey := key
340+
if c.stripKeyPrefix {
341+
respKey = strings.TrimPrefix(key, c.keyPrefix)
342+
}
343+
322344
err = c.runNow(key, len(key)+6, func() error {
323345
b := c.cn.b
324-
b.WriteString("get ")
325-
b.WriteString(key)
326-
b.WriteString("\r\n")
327-
err = b.Flush()
328-
329-
if err != nil {
330-
return err
346+
for i := 0; i < pipelines; i++ {
347+
b.WriteString("get ")
348+
b.WriteString(key)
349+
b.WriteString("\r\n")
331350
}
332-
333-
line, err := b.ReadBytes('\n')
351+
err = b.Flush()
334352
if err != nil {
335353
return err
336354
}
337355

338-
if bytes.Equal(line, []byte("END\r\n")) {
339-
code = McMISS
340-
} else {
341-
parts := bytes.Split(line[:len(line)-2], []byte(" "))
342-
if !bytes.Equal(parts[0], []byte("VALUE")) {
343-
// TODO: This should look for ERROR/SERVER_ERROR/etc
344-
return ErrUnexpectedResponse
345-
}
346-
if len(parts) != 4 {
347-
return ErrUnexpectedResponse
348-
}
349-
if !bytes.Equal(parts[1], []byte(key)) {
350-
// FIXME: how do we embed the received vs expected in here?
351-
// use the brand-new golang error wrapping thing?
352-
return ErrKeyDoesNotMatch
353-
}
354-
flags, _ = ParseUint(parts[2])
355-
size, _ := ParseUint(parts[3])
356-
357-
value = make([]byte, size+2)
358-
_, err := io.ReadFull(b, value)
356+
for i := 0; i < pipelines; i++ {
357+
line, err := b.ReadBytes('\n')
359358
if err != nil {
360359
return err
361360
}
362361

363-
if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) {
364-
return ErrCorruptValue
365-
}
366-
code = McHIT
367-
value = value[:size]
368-
369-
line, err = b.ReadBytes('\n')
370-
if !bytes.Equal(line, []byte("END\r\n")) {
371-
return ErrUnexpectedResponse
362+
if bytes.Equal(line, []byte("END\r\n")) {
363+
code = McMISS
364+
} else {
365+
parts := bytes.Split(line[:len(line)-2], []byte(" "))
366+
if !bytes.Equal(parts[0], []byte("VALUE")) {
367+
// TODO: This should look for ERROR/SERVER_ERROR/etc
368+
fmt.Print("Unexpected Response: ", string(line), "\n")
369+
continue
370+
}
371+
if len(parts) != 4 {
372+
fmt.Print("Unexpected Response: ", "parts not 4", "\n")
373+
continue
374+
}
375+
if !bytes.Equal(parts[1], []byte(respKey)) {
376+
fmt.Print("Unmatched Key: ", string(parts[1]), " and ", respKey, "\n")
377+
// FIXME: how do we embed the received vs expected in here?
378+
// use the brand-new golang error wrapping thing?
379+
continue
380+
}
381+
flags, _ = ParseUint(parts[2])
382+
size, _ := ParseUint(parts[3])
383+
384+
value = make([]byte, size+2)
385+
_, err := io.ReadFull(b, value)
386+
if err != nil {
387+
fmt.Print("io ReadFull error, return", "\n")
388+
return err
389+
}
390+
391+
if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) {
392+
fmt.Print("Unmatched Value", "\n")
393+
continue
394+
}
395+
code = McHIT
396+
value = value[:size]
397+
398+
line, err = b.ReadBytes('\n')
399+
if !bytes.Equal(line, []byte("END\r\n")) {
400+
fmt.Print("Unmatched Reponse: ", string(line), " is not END\r\n")
401+
continue
402+
}
372403
}
373404
}
374405

protocol_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ import (
1111

1212
// tests expect a recent memcached to be running at this address.
1313
const hostname = "127.0.0.1:11211"
14+
const socket = ""
15+
const pipelines = 1
16+
const keyPrefix = "mctester:"
17+
const stripKeyPrefix = false
1418

1519
func newcli() *Client {
16-
mc := NewClient(hostname)
20+
mc := NewClient(hostname, socket, pipelines, keyPrefix, stripKeyPrefix)
1721
mc.ConnectTimeout = 3 * time.Second
1822
mc.NetTimeout = time.Second
1923
mc.WBufSize = 64 * 1024

support.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ const (
2525

2626
// randomized keys!
2727
// TODO: is sb reusable?
28-
func RandString(src rand.Source, n int) string {
28+
func RandString(src rand.Source, n int, prefix string) string {
2929
sb := strings.Builder{}
30-
sb.Grow(n)
30+
sb.Grow(len(prefix) + n)
31+
sb.WriteString(prefix)
3132
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
3233
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
3334
if remain == 0 {

0 commit comments

Comments
 (0)