Skip to content
This repository was archived by the owner on Aug 29, 2018. It is now read-only.

Commit 0171a60

Browse files
committed
Fix input/output deadlock condition
If the rate of input and output are very high, the message queue size can trend towards zero. In this case, it's possible that the Input will decide to evict from the queue, but by the time the eviction occurs the output has drained the queue to zero, causing the eviction dequeue to block forever. In this case, Input is now blocked trying to evict, and Output is blocked waiting for Input. Fix the deadlock by distinguishing between eviction of old messages and dropping of new messages, and making those operations non-blocking. Add a test which verifies the fix under these conditions.
1 parent aa81418 commit 0171a60

File tree

3 files changed

+106
-17
lines changed

3 files changed

+106
-17
lines changed

logshifter/core_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestDropOnBlockedOutput(t *testing.T) {
9191
// and the writer is unblocked.
9292
ag.AssertStatsEqual(t, map[string]float64{
9393
"input.read": 1000,
94-
"input.drop": 998,
94+
"input.evict": 998,
9595
"output.write": 2,
9696
})
9797

logshifter/input.go

+36-16
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
// be placed on the channel:
1515
//
1616
// input.read => number of messages processed during a Read call
17-
// input.drop => number of messages dropped from queue during a Read call
17+
// input.queue => number of messages queued during a Read call
18+
// input.drop => number of new messages dropped during a Read call
19+
// input.evict => number of old messages evicted from the queue during a Read call
1820
// input.read.duration => time taken to process a line from reader (micros)
1921
type Input struct {
2022
bufferSize int
@@ -40,17 +42,20 @@ func (input *Input) Read() *sync.WaitGroup {
4042
return wg
4143
}
4244

45+
func (input *Input) record(name string, val float64) {
46+
if input.statsChannel != nil {
47+
input.statsChannel <- Stat{name: name, value: val}
48+
}
49+
}
50+
4351
// Private synchronous portion of Read.
4452
func (input *Input) read() {
4553
reader := bufio.NewReaderSize(input.reader, input.bufferSize)
4654

4755
for {
4856
line, _, err := reader.ReadLine()
4957

50-
var start time.Time
51-
if input.statsChannel != nil {
52-
start = time.Now()
53-
}
58+
start := time.Now()
5459

5560
if err != nil {
5661
break
@@ -64,24 +69,39 @@ func (input *Input) read() {
6469

6570
copy(cp, line)
6671

67-
if input.statsChannel != nil {
68-
input.statsChannel <- Stat{name: "input.read", value: 1.0}
69-
}
72+
input.record("input.read", 1.0)
7073

7174
select {
7275
case input.queue <- cp:
7376
// queued
77+
input.record("input.queue", 1.0)
7478
default:
75-
// evict the oldest entry to make room
76-
<-input.queue
77-
if input.statsChannel != nil {
78-
input.statsChannel <- Stat{name: "input.drop", value: 1.0}
79+
// try to evict the oldest entry to make room
80+
select {
81+
case <-input.queue:
82+
input.record("input.evict", 1.0)
83+
// try again to queue
84+
select {
85+
case input.queue <- cp:
86+
// queued
87+
input.record("input.queue", 1.0)
88+
default:
89+
// no room, drop it
90+
input.record("input.drop", 1.0)
91+
}
92+
default:
93+
// queue is already empty, try to queue
94+
select {
95+
case input.queue <- cp:
96+
// queued
97+
input.record("input.queue", 1.0)
98+
default:
99+
// no room, drop it
100+
input.record("input.drop", 1.0)
101+
}
79102
}
80-
input.queue <- cp
81103
}
82104

83-
if input.statsChannel != nil {
84-
input.statsChannel <- Stat{name: "input.read.duration", value: float64(time.Now().Sub(start).Nanoseconds()) / float64(1000)}
85-
}
105+
input.record("input.read.duration", float64(time.Now().Sub(start).Nanoseconds())/float64(1000))
86106
}
87107
}

logshifter/input_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"sync"
7+
"testing"
8+
)
9+
10+
// TestInputBlocking ensures that Input never blocks, regardless of the queue
11+
// consumption rate.
12+
func TestInputBlocking(t *testing.T) {
13+
messageCount := int64(10000)
14+
messageSize := 1024
15+
16+
// Use a small queue to help ensure an input backup.
17+
queue := make(chan []byte, 1)
18+
19+
// Set up a fast output sink to help ensure an input backup.
20+
go func() {
21+
for {
22+
<-queue
23+
}
24+
}()
25+
26+
// Collect stats.
27+
stats := make(map[string]float64)
28+
statsCh := make(chan Stat)
29+
stop := make(chan struct{})
30+
statsWg := sync.WaitGroup{}
31+
statsWg.Add(1)
32+
go func() {
33+
for {
34+
select {
35+
case <-stop:
36+
statsWg.Done()
37+
return
38+
case s := <-statsCh:
39+
stats[s.name] = stats[s.name] + s.value
40+
}
41+
}
42+
}()
43+
44+
// Create an input Reader.
45+
var data string
46+
var i int64 = 0
47+
for ; i < messageCount; i++ {
48+
data += strings.Repeat("0", messageSize-1) + "\n"
49+
}
50+
buffer := bytes.NewBufferString(data)
51+
t.Logf("created %d byte test input (%d lines @ %d bytes each)\n", buffer.Len(), messageCount, messageSize)
52+
53+
// Read until the test reader closes.
54+
input := &Input{
55+
reader: buffer,
56+
queue: queue,
57+
bufferSize: messageSize,
58+
statsChannel: statsCh,
59+
}
60+
input.read()
61+
62+
// If execution reaches this point, the Input hasn't wedged and the test is
63+
// successful.
64+
65+
// Shut down the stats collector.
66+
close(stop)
67+
statsWg.Wait()
68+
t.Logf("stats: %#v", stats)
69+
}

0 commit comments

Comments
 (0)