diff --git a/pkg/galaxy/galaxy.go b/pkg/galaxy/galaxy.go index 19d3a3b6..91b458b1 100644 --- a/pkg/galaxy/galaxy.go +++ b/pkg/galaxy/galaxy.go @@ -22,11 +22,16 @@ import ( "io/ioutil" "time" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + corev1Lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/clientcmd" glog "k8s.io/klog" "tkestack.io/galaxy/pkg/api/docker" + "tkestack.io/galaxy/pkg/api/k8s" "tkestack.io/galaxy/pkg/galaxy/options" "tkestack.io/galaxy/pkg/gc" "tkestack.io/galaxy/pkg/network/kernel" @@ -44,6 +49,8 @@ type Galaxy struct { pmhandler *portmapping.PortMappingHandler client kubernetes.Interface pm *policy.PolicyManager + factory informers.SharedInformerFactory + podLister corev1Lister.PodLister } type JsonConf struct { @@ -163,6 +170,14 @@ func (g *Galaxy) initk8sClient() { glog.Fatalf("Can not generate client from config: error(%v)", err) } glog.Infof("apiserver address %s", clientConfig.Host) + g.factory = informers.NewSharedInformerFactoryWithOptions(g.client, time.Minute, + informers.WithTweakListOptions( + func(listOptions *v1.ListOptions) { + listOptions.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", k8s.GetHostname()).String() + })) + g.podLister = g.factory.Core().V1().Pods().Lister() + g.factory.Start(g.quitChan) + g.factory.WaitForCacheSync(g.quitChan) } func (g *Galaxy) SetClient(cli kubernetes.Interface) { diff --git a/pkg/galaxy/server.go b/pkg/galaxy/server.go index e4b12201..47fa0642 100644 --- a/pkg/galaxy/server.go +++ b/pkg/galaxy/server.go @@ -31,9 +31,8 @@ import ( t020 "github.com/containernetworking/cni/pkg/types/020" "github.com/emicklei/go-restful" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" glog "k8s.io/klog" "tkestack.io/galaxy/pkg/api/cniutil" @@ -285,14 +284,12 @@ func parseExtendedCNIArgs(pod *corev1.Pod) (map[string]json.RawMessage, error) { func (g *Galaxy) setupIPtables() error { // filter all running pods on node - pods, err := g.client.CoreV1().Pods(v1.NamespaceAll).List(v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", k8s.GetHostname()).String()}) + pods, err := g.podLister.List(labels.Everything()) if err != nil { return fmt.Errorf("failed to get pods on node: %v", err) } var allPorts []k8s.Port - for i := range pods.Items { - pod := &pods.Items[i] + for _, pod := range pods { if len(pod.Status.PodIP) == 0 || pod.Spec.HostNetwork { continue } @@ -409,25 +406,7 @@ func (g *Galaxy) cleanIPtables(containerID string) error { } func (g *Galaxy) getPod(name, namespace string) (*corev1.Pod, error) { - var pod *corev1.Pod - printOnce := false - if err := wait.PollImmediate(time.Millisecond*500, 5*time.Second, func() (done bool, err error) { - pod, err = g.client.CoreV1().Pods(namespace).Get(name, v1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - if printOnce == false { - printOnce = true - glog.Warningf("can't find pod %s_%s, retring", name, namespace) - } - return false, nil - } - return false, err - } - return true, nil - }); err != nil { - return nil, fmt.Errorf("failed to get pod %s_%s: %v", name, namespace, err) - } - return pod, nil + return g.podLister.Pods(namespace).Get(name) } func convertResult(result types.Result) (*t020.Result, error) {