这个实例通过svcInformer := cache.NewSharedIndexInformer创建informer, 不是直接使用系统也有coreinformers.ServiceInformer
package main import ( "flag" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "sync" rt "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/klog" "k8s.io/kubernetes/pkg/controller" "math/rand" "path/filepath" "time" ) // Interval of synchronizing service status from apiserver const ( LocalServiceSyncPeriod = 30 * time.Second ) func main() { var wg sync.WaitGroup wg.Add(1) rand.Seed(time.Now().UnixNano()) klog.InitFlags(nil) var kubeconfigTemp *string if home := homedir.HomeDir(); home != "" { kubeconfigPath := filepath.Join(home, ".kube", "config") kubeconfigTemp = flag.String("kubeconfig1", kubeconfigPath, "(optional) absolute path to the kubeconfig file") } else { kubeconfigTemp = flag.String("kubeconfig1", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigTemp) klog.Infof("test02") CheckErr(err) clientset, err := kubernetes.NewForConfig(config) CheckErr(err) //sharedInformers := informers.NewSharedInformerFactory(clientset, 0) stopCh := make(chan struct{}) defer close(stopCh) fieldSelectorNotSys1 := fields.OneTermEqualSelector("metadata.namespace", "default") //fieldSelectorNotSys2 := fields.OneTermNotEqualSelector("metadata.namespace", "polar") selectorStr := fields.AndSelectors(fieldSelectorNotSys1).String() klog.Infof("selectorStr:%s", selectorStr) svcInformer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (rt.Object, error) { //options.FieldSelector = selectorStr return clientset.CoreV1().Services("").List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { //options.FieldSelector = selectorStr return clientset.CoreV1().Services("").Watch(options) }, }, &v1.Service{}, LocalServiceSyncPeriod, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }, ) svcInformer.AddEventHandlerWithResyncPeriod( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { newSvc := obj.(*v1.Service) if newSvc.Namespace != "default" { klog.Infof("svc: filter svc, skip svc [%s/%s]\n", newSvc.Namespace, newSvc.Name) return false } else { klog.Infof("svc: filter svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name) return true } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { newSvc := obj.(*v1.Service) klog.Infof("controller: add svc, svc [%s/%s]\n", newSvc.Namespace, newSvc.Name) }, UpdateFunc: func(oldObj, newObj interface{}) { newSvc := newObj.(*v1.Service) klog.Infof("controller: Update svc, pod [%s/%s]\n", newSvc.Namespace, newSvc.Name) }, DeleteFunc: func(obj interface{}) { delSvc := obj.(*v1.Service) klog.Infof("controller: Delete svc, pod [%s/%s]\n", delSvc.Namespace, delSvc.Name) }, }, }, LocalServiceSyncPeriod, ) klog.Infof("svcInformer start of run") go svcInformer.Run(stopCh) if !controller.WaitForCacheSync("service", stopCh, svcInformer.HasSynced) { klog.Infof("svcInformer start of run") return } //time.Sleep(5 * time.Minute) wg.Wait() } func initInformer(clientset *kubernetes.Clientset) coreinformers.PodInformer { generate a shared informerFactory sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) // create pod informer and start it podInformer := sharedInformerFactory.Core().V1().Pods() return podInformer } func CheckErr(err error) { if err != nil { panic(err) } }