Skip to content

Commit 4f41229

Browse files
committed
Pass stop channel to informer factory instances
1 parent 98393df commit 4f41229

22 files changed

+79
-61
lines changed

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func main() {
136136
}
137137

138138
// Lookup all the selected sources by names and pass them the desired configuration.
139-
sources, err := source.ByNames(&source.SingletonClientGenerator{
139+
sources, err := source.ByNames(ctx, &source.SingletonClientGenerator{
140140
KubeConfig: cfg.KubeConfig,
141141
APIServerURL: cfg.APIServerURL,
142142
// If update events are enabled, disable timeout.

source/ambassador_host.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"k8s.io/apimachinery/pkg/labels"
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/runtime/schema"
33-
"k8s.io/apimachinery/pkg/util/wait"
3433
"k8s.io/client-go/dynamic"
3534
"k8s.io/client-go/dynamic/dynamicinformer"
3635
"k8s.io/client-go/informers"
@@ -64,6 +63,7 @@ type ambassadorHostSource struct {
6463

6564
// NewAmbassadorHostSource creates a new ambassadorHostSource with the given config.
6665
func NewAmbassadorHostSource(
66+
ctx context.Context,
6767
dynamicKubeClient dynamic.Interface,
6868
kubeClient kubernetes.Interface,
6969
namespace string) (Source, error) {
@@ -82,8 +82,7 @@ func NewAmbassadorHostSource(
8282
},
8383
)
8484

85-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
86-
informerFactory.Start(wait.NeverStop)
85+
informerFactory.Start(ctx.Done())
8786

8887
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
8988
return nil, err

source/contour_httpproxy.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3030
"k8s.io/apimachinery/pkg/labels"
31-
"k8s.io/apimachinery/pkg/util/wait"
3231
"k8s.io/client-go/dynamic"
3332
"k8s.io/client-go/dynamic/dynamicinformer"
3433
"k8s.io/client-go/informers"
@@ -53,6 +52,7 @@ type httpProxySource struct {
5352

5453
// NewContourHTTPProxySource creates a new contourHTTPProxySource with the given config.
5554
func NewContourHTTPProxySource(
55+
ctx context.Context,
5656
dynamicKubeClient dynamic.Interface,
5757
namespace string,
5858
annotationFilter string,
@@ -78,8 +78,7 @@ func NewContourHTTPProxySource(
7878
},
7979
)
8080

81-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
82-
informerFactory.Start(wait.NeverStop)
81+
informerFactory.Start(ctx.Done())
8382

8483
// wait for the local cache to be populated.
8584
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {

source/contour_httpproxy_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (suite *HTTPProxySuite) SetupTest() {
8888
var err error
8989

9090
suite.source, err = NewContourHTTPProxySource(
91+
context.TODO(),
9192
fakeDynamicClient,
9293
"default",
9394
"",
@@ -184,6 +185,7 @@ func TestNewContourHTTPProxySource(t *testing.T) {
184185
fakeDynamicClient, _ := newDynamicKubernetesClient()
185186

186187
_, err := NewContourHTTPProxySource(
188+
context.TODO(),
187189
fakeDynamicClient,
188190
"",
189191
ti.annotationFilter,
@@ -1033,6 +1035,7 @@ func testHTTPProxyEndpoints(t *testing.T) {
10331035
}
10341036

10351037
httpProxySource, err := NewContourHTTPProxySource(
1038+
context.TODO(),
10361039
fakeDynamicClient,
10371040
ti.targetNamespace,
10381041
ti.annotationFilter,
@@ -1059,6 +1062,7 @@ func newTestHTTPProxySource() (*httpProxySource, error) {
10591062
fakeDynamicClient, _ := newDynamicKubernetesClient()
10601063

10611064
src, err := NewContourHTTPProxySource(
1065+
context.TODO(),
10621066
fakeDynamicClient,
10631067
"default",
10641068
"",

source/ingress.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
log "github.com/sirupsen/logrus"
2727
networkv1 "k8s.io/api/networking/v1"
2828
"k8s.io/apimachinery/pkg/labels"
29-
"k8s.io/apimachinery/pkg/util/wait"
3029
kubeinformers "k8s.io/client-go/informers"
3130
netinformers "k8s.io/client-go/informers/networking/v1"
3231
"k8s.io/client-go/kubernetes"
@@ -64,7 +63,7 @@ type ingressSource struct {
6463
}
6564

6665
// NewIngressSource creates a new ingressSource with the given config.
67-
func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) {
66+
func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) {
6867
tmpl, err := parseTemplate(fqdnTemplate)
6968
if err != nil {
7069
return nil, err
@@ -83,8 +82,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
8382
},
8483
)
8584

86-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
87-
informerFactory.Start(wait.NeverStop)
85+
informerFactory.Start(ctx.Done())
8886

8987
// wait for the local cache to be populated.
9088
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

source/ingress_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func (suite *IngressSuite) SetupTest() {
5656
suite.NoError(err, "should succeed")
5757

5858
suite.sc, err = NewIngressSource(
59+
context.TODO(),
5960
fakeClient,
6061
"",
6162
"",
@@ -138,6 +139,7 @@ func TestNewIngressSource(t *testing.T) {
138139
t.Parallel()
139140

140141
_, err := NewIngressSource(
142+
context.TODO(),
141143
fake.NewSimpleClientset(),
142144
"",
143145
ti.annotationFilter,
@@ -1225,6 +1227,7 @@ func testIngressEndpoints(t *testing.T) {
12251227
}
12261228

12271229
source, _ := NewIngressSource(
1230+
context.TODO(),
12281231
fakeClient,
12291232
ti.targetNamespace,
12301233
ti.annotationFilter,

source/istio_gateway.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/labels"
33-
"k8s.io/apimachinery/pkg/util/wait"
3433
kubeinformers "k8s.io/client-go/informers"
3534
coreinformers "k8s.io/client-go/informers/core/v1"
3635
"k8s.io/client-go/kubernetes"
@@ -56,6 +55,7 @@ type gatewaySource struct {
5655

5756
// NewIstioGatewaySource creates a new gatewaySource with the given config.
5857
func NewIstioGatewaySource(
58+
ctx context.Context,
5959
kubeClient kubernetes.Interface,
6060
istioClient istioclient.Interface,
6161
namespace string,
@@ -93,9 +93,8 @@ func NewIstioGatewaySource(
9393
},
9494
)
9595

96-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
97-
informerFactory.Start(wait.NeverStop)
98-
istioInformerFactory.Start(wait.NeverStop)
96+
informerFactory.Start(ctx.Done())
97+
istioInformerFactory.Start(ctx.Done())
9998

10099
// wait for the local cache to be populated.
101100
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

source/istio_gateway_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func (suite *GatewaySuite) SetupTest() {
6969
}
7070

7171
suite.source, err = NewIstioGatewaySource(
72+
context.TODO(),
7273
fakeKubernetesClient,
7374
fakeIstioClient,
7475
"",
@@ -142,6 +143,7 @@ func TestNewIstioGatewaySource(t *testing.T) {
142143
t.Parallel()
143144

144145
_, err := NewIstioGatewaySource(
146+
context.TODO(),
145147
fake.NewSimpleClientset(),
146148
istiofake.NewSimpleClientset(),
147149
"",
@@ -1165,6 +1167,7 @@ func testGatewayEndpoints(t *testing.T) {
11651167
}
11661168

11671169
gatewaySource, err := NewIstioGatewaySource(
1170+
context.TODO(),
11681171
fakeKubernetesClient,
11691172
fakeIstioClient,
11701173
ti.targetNamespace,
@@ -1201,6 +1204,7 @@ func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService) (*gatewa
12011204
}
12021205

12031206
src, err := NewIstioGatewaySource(
1207+
context.TODO(),
12041208
fakeKubernetesClient,
12051209
fakeIstioClient,
12061210
"",

source/istio_virtualservice.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/labels"
34-
"k8s.io/apimachinery/pkg/util/wait"
3534
kubeinformers "k8s.io/client-go/informers"
3635
coreinformers "k8s.io/client-go/informers/core/v1"
3736
"k8s.io/client-go/kubernetes"
@@ -60,6 +59,7 @@ type virtualServiceSource struct {
6059

6160
// NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config.
6261
func NewIstioVirtualServiceSource(
62+
ctx context.Context,
6363
kubeClient kubernetes.Interface,
6464
istioClient istioclient.Interface,
6565
namespace string,
@@ -97,9 +97,8 @@ func NewIstioVirtualServiceSource(
9797
},
9898
)
9999

100-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
101-
informerFactory.Start(wait.NeverStop)
102-
istioInformerFactory.Start(wait.NeverStop)
100+
informerFactory.Start(ctx.Done())
101+
istioInformerFactory.Start(ctx.Done())
103102

104103
// wait for the local cache to be populated.
105104
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

source/istio_virtualservice_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func (suite *VirtualServiceSuite) SetupTest() {
8989
suite.NoError(err, "should succeed")
9090

9191
suite.source, err = NewIstioVirtualServiceSource(
92+
context.TODO(),
9293
fakeKubernetesClient,
9394
fakeIstioClient,
9495
"",
@@ -165,6 +166,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) {
165166
t.Parallel()
166167

167168
_, err := NewIstioVirtualServiceSource(
169+
context.TODO(),
168170
fake.NewSimpleClientset(),
169171
istiofake.NewSimpleClientset(),
170172
"",
@@ -1482,6 +1484,7 @@ func testVirtualServiceEndpoints(t *testing.T) {
14821484
}
14831485

14841486
virtualServiceSource, err := NewIstioVirtualServiceSource(
1487+
context.TODO(),
14851488
fakeKubernetesClient,
14861489
fakeIstioClient,
14871490
ti.targetNamespace,
@@ -1557,6 +1560,7 @@ func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, g
15571560
}
15581561

15591562
src, err := NewIstioVirtualServiceSource(
1563+
context.TODO(),
15601564
fakeKubernetesClient,
15611565
fakeIstioClient,
15621566
"",

source/kong_tcpingress.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"k8s.io/apimachinery/pkg/labels"
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"
32-
"k8s.io/apimachinery/pkg/util/wait"
3332
"k8s.io/client-go/dynamic"
3433
"k8s.io/client-go/dynamic/dynamicinformer"
3534
"k8s.io/client-go/informers"
@@ -57,7 +56,7 @@ type kongTCPIngressSource struct {
5756
}
5857

5958
// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
60-
func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
59+
func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
6160
var err error
6261

6362
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
@@ -73,8 +72,7 @@ func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kub
7372
},
7473
)
7574

76-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
77-
informerFactory.Start(wait.NeverStop)
75+
informerFactory.Start(ctx.Done())
7876

7977
// wait for the local cache to be populated.
8078
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {

source/kong_tcpingress_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func TestKongTCPIngressEndpoints(t *testing.T) {
241241
_, err = fakeDynamicClient.Resource(kongGroupdVersionResource).Namespace(defaultKongNamespace).Create(context.Background(), &tcpi, metav1.CreateOptions{})
242242
assert.NoError(t, err)
243243

244-
source, err := NewKongTCPIngressSource(fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong")
244+
source, err := NewKongTCPIngressSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong")
245245
assert.NoError(t, err)
246246
assert.NotNil(t, source)
247247

source/node.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
v1 "k8s.io/api/core/v1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
"k8s.io/apimachinery/pkg/labels"
28-
"k8s.io/apimachinery/pkg/util/wait"
2928
kubeinformers "k8s.io/client-go/informers"
3029
coreinformers "k8s.io/client-go/informers/core/v1"
3130
"k8s.io/client-go/kubernetes"
@@ -42,7 +41,7 @@ type nodeSource struct {
4241
}
4342

4443
// NewNodeSource creates a new nodeSource with the given config.
45-
func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) {
44+
func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) {
4645
tmpl, err := parseTemplate(fqdnTemplate)
4746
if err != nil {
4847
return nil, err
@@ -62,8 +61,7 @@ func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTempla
6261
},
6362
)
6463

65-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
66-
informerFactory.Start(wait.NeverStop)
64+
informerFactory.Start(ctx.Done())
6765

6866
// wait for the local cache to be populated.
6967
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

source/node_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func testNodeSourceNewNodeSource(t *testing.T) {
7171
t.Parallel()
7272

7373
_, err := NewNodeSource(
74+
context.TODO(),
7475
fake.NewSimpleClientset(),
7576
ti.annotationFilter,
7677
ti.fqdnTemplate,
@@ -353,6 +354,7 @@ func testNodeSourceEndpoints(t *testing.T) {
353354

354355
// Create our object under test and get the endpoints.
355356
client, err := NewNodeSource(
357+
context.TODO(),
356358
kubernetes,
357359
tc.annotationFilter,
358360
tc.fqdnTemplate,

source/openshift_route.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
log "github.com/sirupsen/logrus"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/labels"
32-
"k8s.io/apimachinery/pkg/util/wait"
3332
"k8s.io/client-go/tools/cache"
3433

3534
"sigs.k8s.io/external-dns/endpoint"
@@ -54,6 +53,7 @@ type ocpRouteSource struct {
5453

5554
// NewOcpRouteSource creates a new ocpRouteSource with the given config.
5655
func NewOcpRouteSource(
56+
ctx context.Context,
5757
ocpClient versioned.Interface,
5858
namespace string,
5959
annotationFilter string,
@@ -81,8 +81,7 @@ func NewOcpRouteSource(
8181
},
8282
)
8383

84-
// TODO informer is not explicitly stopped since controller is not passing in its channel.
85-
informerFactory.Start(wait.NeverStop)
84+
informerFactory.Start(ctx.Done())
8685

8786
// wait for the local cache to be populated.
8887
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

source/openshift_route_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (suite *OCPRouteSuite) SetupTest() {
4343
var err error
4444

4545
suite.sc, err = NewOcpRouteSource(
46+
context.TODO(),
4647
fakeClient,
4748
"",
4849
"",
@@ -141,6 +142,7 @@ func testOcpRouteSourceNewOcpRouteSource(t *testing.T) {
141142
t.Parallel()
142143

143144
_, err := NewOcpRouteSource(
145+
context.TODO(),
144146
fake.NewSimpleClientset(),
145147
"",
146148
ti.annotationFilter,
@@ -439,6 +441,7 @@ func testOcpRouteSourceEndpoints(t *testing.T) {
439441
require.NoError(t, err)
440442

441443
source, err := NewOcpRouteSource(
444+
context.TODO(),
442445
fakeClient,
443446
"",
444447
"",

0 commit comments

Comments
 (0)