Kubernetes Informers
有时候去使用 client-go 的时候会去频率高的查 pod 的状态,这会给 api-server 造成压力,有什么办法可以本地缓存呢一份数据呢,k8s.io/client-go/tools/cache 这就是缓存库
先介绍一个和缓存库相关连的 Informer k8s.io/client-go/informers
它可以创建一个为 k8s 所有资源创建一个工厂,来镜像 kubernetes.Interface type
代码示例如下
package main
import (
"log"
"os"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
kubeconfig := os.Getenv("KUBECONFIG")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
// interface that allows us to get metadata easily
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
})
informer.Run(stopper)
}
Dynamic Informer
package main
import (
"os"
"os/signal"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
cfg, err := restConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get config")
}
// Grab a dynamic interface that we can create informers from
dc, err := dynamic.NewForConfig(cfg)
if err != nil {
logrus.WithError(err).Fatal("could not generate dynamic client for config")
}
// Create a factory object that we can say "hey, I need to watch this resource"
// and it will give us back an informer for it
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, v1.NamespaceAll, nil)
// Retrieve a "GroupVersionResource" type that we need when generating our informer from our dynamic factory
gvr, _ := schema.ParseResourceArg("deployments.v1.apps")
// Finally, create our informer for deployments!
i := f.ForResource(*gvr)
stopCh := make(chan struct{})
go startWatching(stopCh, i.Informer())
sigCh := make(chan os.Signal, 0)
signal.Notify(sigCh, os.Kill, os.Interrupt)
<-sigCh
close(stopCh)
}
func restConfig() (*rest.Config, error) {
kubeCfg, err := rest.InClusterConfig()
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
}
if err != nil {
return nil, err
}
return kubeCfg, nil
}
func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
u := obj.(*unstructured.Unstructured)
logrus.WithFields(logrus.Fields{
"name": u.GetName(),
"namespace": u.GetNamespace(),
"labels": u.GetLabels(),
}).Info("received add event!")
},
UpdateFunc: func(oldObj, obj interface{}) {
logrus.Info("received update event!")
},
DeleteFunc: func(obj interface{}) {
logrus.Info("received update event!")
},
}
s.AddEventHandler(handlers)
s.Run(stopCh)
}
Last updated
Was this helpful?