-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool_test.go
155 lines (142 loc) · 3.89 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package goloadbalancer
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"math/rand"
"net"
"sync"
"testing"
"time"
c "github.com/smartystreets/goconvey/convey"
) // 别名导入
func testServer() {
listen, err := net.Listen("tcp", "127.0.0.1:9090")
if err != nil {
log.Fatalf("listen failed, err:%v\n", err)
return
}
for {
// 等待客户端建立连接
conn, err := listen.Accept()
if err != nil {
log.Printf("accept failed, err:%v\n", err)
continue
}
// 启动一个单独的 goroutine 去处理连接
go func(c net.Conn) {
defer conn.Close()
for {
reader := bufio.NewReader(conn)
var buf [256]byte
n, err := reader.Read(buf[:])
if err != nil && err.Error() != "EOF" {
fmt.Printf("read from conn failed, err:%v\n", err)
break
}
if err == io.EOF {
break
}
recv := string(buf[:n])
log.Printf("接收到的数据:%v", recv)
// 将接受到的数据返回给客户端
_, err = conn.Write([]byte("ok"))
if err != nil {
log.Printf("write to client from conn failed, err:%v\n", err)
break
}
}
}(conn)
}
}
func TestPool(t *testing.T) {
c.Convey("TestPool", t, func() {
go testServer()
time.Sleep(1 * time.Second)
opts := NewPoolOptions(func(ctx context.Context) (Connection, error) {
conn, err := net.Dial("tcp", "127.0.0.1:9090")
if err != nil {
return nil, err
}
return NewConnectionImpl(conn, 30*time.Second, 10*time.Second), nil
})
opts.ConnectionUsedHook = append(opts.ConnectionUsedHook, ConnUseAt)
pool := NewConnPool(opts)
wg := &sync.WaitGroup{}
shouldBeRemoved := make([]string, 0)
lock := sync.RWMutex{}
errChan := make(chan error, 10)
elapsedChan := make(chan float64, 10)
IdleConnsChan := make(chan uint32, 10)
connChanCount := make(chan uint32, 10)
for i := 0; i < 30; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
// 初始化随机数种子
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
// 获取3到10之间的随机数
randomNumber := r.Intn(8) + 2
time.Sleep(time.Duration(randomNumber) * time.Second)
now := time.Now()
cn, err := pool.Get(ctx)
if err != nil {
elapsed := time.Since(now)
errChan <- err
elapsedChan <- elapsed.Seconds()
IdleConnsChan <- pool.Stats().(*StatsImpl).IdleConns
connChanCount <- pool.Stats().(*StatsImpl).TotalConns
if errors.Is(err, ErrPoolTimeout) {
log.Printf("get conn failed, err:%v,%f,%d\n", err, elapsed.Seconds(), pool.Stats().(*StatsImpl).IdleConns)
}
return
}
randomNumber = r.Intn(8) + 4
time.Sleep(time.Duration(randomNumber) * time.Second)
// log.Printf("total connections:%d\n", Stats().TotalConns)
conn := cn.ConnInstance().(net.Conn)
if randomNumber*int(time.Second) > 10*int(time.Second) {
lock.Lock()
shouldBeRemoved = append(shouldBeRemoved, conn.LocalAddr().String())
lock.Unlock()
}
defer pool.Release(ctx, cn)
_, err = conn.Write([]byte("hello"))
if err != nil {
log.Printf("write to server failed, err:%v\n", err)
return
}
var buf [1024]byte
_, err = conn.Read(buf[:])
if err != nil && err.Error() != "EOF" {
fmt.Printf("read failed, err:%v\n", err.Error() == "EOF")
return
}
}()
}
wg.Wait()
close(errChan)
close(elapsedChan)
close(IdleConnsChan)
close(connChanCount)
for err := range errChan {
c.So(err, c.ShouldEqual, ErrPoolTimeout)
}
for elapsed := range elapsedChan {
c.So(elapsed, c.ShouldBeBetween, 6, 7)
}
for idleConns := range IdleConnsChan {
c.So(idleConns, c.ShouldEqual, 0)
}
for connCount := range connChanCount {
c.So(connCount, c.ShouldEqual, opts.PoolSize)
}
c.So(pool.Stats().(*StatsImpl).InvalidConns, c.ShouldBeGreaterThanOrEqualTo, len(shouldBeRemoved))
// log.Printf("total REMOVE connections:%d\n", Stats().IdleConns)
})
}