Skip to content

Commit 982ee7a

Browse files
committed
fix: Don't try watching items without resourceVersion
Trying to watch items without resourceVersion will fail. Instead of failing and retrying every second, wait until the next resync interval and try again then. This reduces load and log spam. Signed-off-by: Daniel Vassdal <[email protected]>
1 parent d65e9d9 commit 982ee7a

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed

pkg/cache/cluster.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,29 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666666
}
667667
}
668668

669+
// If the resourceVersion is still missing, watchutil.NewRetryWatcher will fail.
670+
// https://github.com/kubernetes/client-go/blob/78d2af792babf2dd937ba2e2a8d99c753a5eda89/tools/watch/retrywatcher.go#L68-L71
671+
// Instead, let's just check if the resourceVersion exists at the next resync ...
672+
if resourceVersion == "" {
673+
c.log.V(1).Info(fmt.Sprintf("Ignoring watch for %s on %s due to missing resourceVersion", api.GroupKind, c.config.Host))
674+
675+
var watchResyncTimeoutCh <-chan time.Time
676+
if c.watchResyncTimeout > 0 {
677+
shouldResync := time.NewTimer(c.watchResyncTimeout)
678+
defer shouldResync.Stop()
679+
watchResyncTimeoutCh = shouldResync.C
680+
}
681+
682+
for {
683+
select {
684+
case <-ctx.Done():
685+
return nil
686+
case <-watchResyncTimeoutCh:
687+
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)
688+
}
689+
}
690+
}
691+
669692
w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
670693
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
671694
res, err := resClient.Watch(ctx, options)

pkg/cache/cluster_test.go

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package cache
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
57
"errors"
68
"fmt"
9+
"io"
10+
"regexp"
711
"sort"
812
"strings"
913
"sync"
@@ -29,6 +33,7 @@ import (
2933
"k8s.io/client-go/kubernetes/scheme"
3034
"k8s.io/client-go/rest"
3135
testcore "k8s.io/client-go/testing"
36+
"k8s.io/klog/v2/textlogger"
3237
"sigs.k8s.io/yaml"
3338

3439
"github.com/argoproj/gitops-engine/pkg/utils/kube"
@@ -1393,3 +1398,219 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
13931398
// })
13941399
// }
13951400
//}
1401+
1402+
type syncedBuffer struct {
1403+
mutex sync.Mutex
1404+
buf bytes.Buffer
1405+
}
1406+
1407+
func (lb *syncedBuffer) Read(p []byte) (n int, err error) {
1408+
lb.mutex.Lock()
1409+
defer lb.mutex.Unlock()
1410+
return lb.buf.Read(p)
1411+
}
1412+
1413+
func (lb *syncedBuffer) Write(p []byte) (n int, err error) {
1414+
lb.mutex.Lock()
1415+
defer lb.mutex.Unlock()
1416+
return lb.buf.Write(p)
1417+
}
1418+
1419+
func Test_watchEvents_Missing_resourceVersion(t *testing.T) {
1420+
objExample := &unstructured.Unstructured{Object: map[string]any{
1421+
"apiVersion": "apiservice.example.com/v1",
1422+
"kind": "Example",
1423+
"metadata": map[string]any{
1424+
"name": "example",
1425+
},
1426+
}}
1427+
1428+
testCases := []struct {
1429+
name string
1430+
objs []runtime.Object
1431+
funAssert func(t *testing.T, logLines []string)
1432+
waitForLogLines []string
1433+
waitForLogExtra time.Duration
1434+
watchResyncTimeout time.Duration
1435+
}{
1436+
{
1437+
name: "Should_ignore_resource_without_resourceVersion",
1438+
objs: []runtime.Object{objExample},
1439+
waitForLogLines: []string{"Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion"},
1440+
funAssert: func(t *testing.T, logLines []string) {
1441+
t.Helper()
1442+
require.NotContains(t, logLines, "Resyncing Example.apiservice.example.com on https://test due to timeout")
1443+
},
1444+
watchResyncTimeout: defaultWatchResyncTimeout,
1445+
waitForLogExtra: 0 * time.Millisecond,
1446+
},
1447+
{
1448+
name: "Should_not_ignore_resource_with_resourceVersion",
1449+
objs: []runtime.Object{testDeploy()},
1450+
waitForLogLines: []string{"Start watch Deployment.apps on https://test"},
1451+
funAssert: func(t *testing.T, logLines []string) {
1452+
t.Helper()
1453+
require.NotContains(t, logLines, "Ignoring watch for Deployment.apps on https://test due to missing resourceVersion")
1454+
},
1455+
watchResyncTimeout: defaultWatchResyncTimeout,
1456+
waitForLogExtra: 100 * time.Millisecond,
1457+
},
1458+
{
1459+
name: "Should_retry_ignored_resource_on_next_resync",
1460+
objs: []runtime.Object{objExample},
1461+
waitForLogLines: []string{"Failed to watch Example.apiservice.example.com on https://test: Resyncing Example.apiservice.example.com on https://test due to timeout, retrying in 1s"},
1462+
funAssert: func(t *testing.T, logLines []string) {
1463+
t.Helper()
1464+
require.Contains(t, logLines, "Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion")
1465+
},
1466+
watchResyncTimeout: 10 * time.Millisecond,
1467+
waitForLogExtra: 100 * time.Millisecond,
1468+
},
1469+
}
1470+
1471+
readLinesUntil := func(ctx context.Context, buf io.Reader, wantedLines []string, readExtra time.Duration) ([]string, error) {
1472+
wantedStatuses := map[string]bool{}
1473+
for _, wantedLine := range wantedLines {
1474+
wantedStatuses[strings.TrimSuffix(wantedLine, "\r\n")] = false
1475+
}
1476+
1477+
var logLines []string
1478+
readChan := make(chan any)
1479+
go func() {
1480+
lineRgx := regexp.MustCompile(`^.+?\s+\d+\s+.+?\.go:(?:\d+?|\d+?)\]\s+"(?P<msg>.+)"$`)
1481+
1482+
for {
1483+
scanner := bufio.NewScanner(buf)
1484+
for scanner.Scan() {
1485+
match := lineRgx.FindStringSubmatch(scanner.Text())
1486+
readChan <- match[1]
1487+
}
1488+
1489+
if scanner.Err() != nil {
1490+
readChan <- scanner.Err()
1491+
return
1492+
}
1493+
1494+
// EOF. Waiting for data.
1495+
time.Sleep(50 * time.Millisecond)
1496+
}
1497+
}()
1498+
1499+
var readExtraTimer *time.Timer
1500+
var readExtraTimeoutChan <-chan time.Time
1501+
1502+
for {
1503+
select {
1504+
case <-readExtraTimeoutChan:
1505+
return logLines, ctx.Err()
1506+
case <-ctx.Done():
1507+
return logLines, ctx.Err()
1508+
case read := <-readChan:
1509+
if err, ok := read.(error); ok {
1510+
return logLines, err
1511+
}
1512+
1513+
// EOF
1514+
if read == nil {
1515+
return logLines, nil
1516+
}
1517+
1518+
logLines = append(logLines, read.(string))
1519+
if readExtraTimer != nil {
1520+
continue
1521+
}
1522+
1523+
line := read.(string)
1524+
if _, ok := wantedStatuses[line]; ok {
1525+
wantedStatuses[line] = true
1526+
1527+
done := true
1528+
for _, ok := range wantedStatuses {
1529+
if !ok {
1530+
done = false
1531+
}
1532+
}
1533+
1534+
if done {
1535+
readExtraTimer = time.NewTimer(readExtra)
1536+
readExtraTimeoutChan = readExtraTimer.C
1537+
}
1538+
}
1539+
}
1540+
}
1541+
}
1542+
1543+
createCluster := func(opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache {
1544+
client := fake.NewSimpleDynamicClientWithCustomListKinds(scheme.Scheme,
1545+
map[schema.GroupVersionResource]string{
1546+
{Group: "apiservice.example.com", Version: "v1", Resource: "examples"}: "ExampleList",
1547+
},
1548+
objs...)
1549+
reactor := client.ReactionChain[0]
1550+
client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
1551+
handled, ret, err = reactor.React(action)
1552+
if err != nil || !handled {
1553+
return
1554+
}
1555+
1556+
// The apiservice.example.com group is for testing missing resourceVersion, so we omit setting it for those responses.
1557+
retList, ok := ret.(*unstructured.UnstructuredList)
1558+
if ok && len(retList.Items) > 0 && retList.Items[0].GetObjectKind().GroupVersionKind().Group == "apiservice.example.com" {
1559+
return
1560+
}
1561+
1562+
// make sure retList response have resource version
1563+
ret.(metav1.ListInterface).SetResourceVersion("123")
1564+
return
1565+
})
1566+
1567+
apiResources := []kube.APIResourceInfo{{
1568+
GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"},
1569+
GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
1570+
Meta: metav1.APIResource{Namespaced: true},
1571+
}, {
1572+
GroupKind: schema.GroupKind{Group: "apiservice.example.com", Kind: "Example"},
1573+
GroupVersionResource: schema.GroupVersionResource{Group: "apiservice.example.com", Version: "v1", Resource: "examples"},
1574+
Meta: metav1.APIResource{Namespaced: false},
1575+
}}
1576+
1577+
opts = append([]UpdateSettingsFunc{
1578+
SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}),
1579+
}, opts...)
1580+
1581+
cache := NewClusterCache(
1582+
&rest.Config{Host: "https://test"},
1583+
opts...,
1584+
)
1585+
return cache
1586+
}
1587+
1588+
for _, testCase := range testCases {
1589+
t.Run(testCase.name, func(t *testing.T) {
1590+
ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Second)
1591+
defer ctxCancel()
1592+
1593+
var logBuffer syncedBuffer
1594+
logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&logBuffer), textlogger.Verbosity(1), textlogger.FixedTime(time.Unix(0, 0))))
1595+
1596+
cluster := createCluster([]UpdateSettingsFunc{
1597+
SetLogr(logger),
1598+
SetWatchResyncTimeout(testCase.watchResyncTimeout),
1599+
}, testCase.objs...)
1600+
1601+
defer func() {
1602+
cluster.Invalidate()
1603+
}()
1604+
1605+
err := cluster.EnsureSynced()
1606+
require.NoError(t, err)
1607+
1608+
logLines, err := readLinesUntil(ctx, &logBuffer, testCase.waitForLogLines, testCase.waitForLogExtra)
1609+
require.NoError(t, err)
1610+
testCase.funAssert(t, logLines)
1611+
for _, wantedLogLine := range testCase.waitForLogLines {
1612+
require.Contains(t, logLines, wantedLogLine)
1613+
}
1614+
})
1615+
}
1616+
}

0 commit comments

Comments
 (0)