client-go gin的简单整合四-list-watch初探

背景:

完成了client-go gin的简单整合三(list列表相关再进阶关于Pods),恩如果有代理是可以看到每次的请求都要访问后端服务的,如何避免频繁调用后端apiserver呢?list-watch监听机制可以使用一下?

关于list-watch:

参照:https://blog.51cto.com/u_15127559/3377812(错别字好多?最后还是引用了沈老师的ppt上面的概念!),

  • list http短链接调用资源的api,获取列表。
  • 使用http长连接持续监听资源,有变化则返回一个WatchEventclient-go informerclient-go k8s.io/client-go/tools/cache包informer对象对list-watch机制进行了封装
  • 初始化调用List api获得全量list 缓存(本地缓存)
  • 调用watch api watch资源,当资源发生变更通过一定机制维护缓存,减少访问apiserver的压力

个人觉得不错的文章Client-go源码分析之Reflector,华为云不错的视频list-watch机制原理详解,

client-go(kubernetes)的ListerWatcher解析.

client-go gin的简单整合四-list-watch

以deployment简单例子的开始

文件名 /src/service/test.go,监控deployment的变化 开始其实是不是可以跟java是的弄一个单独的测试包?这里就简单操作了偷懒......

cache.NewInformer()开始:

cache.NewInformer()

image.png

image.png

goland查看源码功能:

image.png

image.png

image.png

实现Handler方法:

只实现了OnUpdate方法(仅仅打印deployment名字),OnAdd,OnDelete是空的:

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}

最终test.go如下:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
)

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.AppsV1().RESTClient(),
		"deployments", "default", fields.Everything()),
		&v1.Deployment{},
		0,
		&DepHandler{},
	)
	c.Run(wait.NeverStop)
	s.List()
}

关于s c 源码中Store, Controller

image.png

运行test.go

go run test.go

手动修改nginx deployment副本数量,查看 goland输出:

[zhangpeng@zhangpeng ~]$ kubectl get deployments
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl get deployment
NAME    READY   UP-TO-DATE   AVAILABLE   AGE
nginx   2/2     2            2           10d
[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled
[zhangpeng@zhangpeng ~]$ 

image.png

注:resource 是deployments,不能是deployment, *DepHandler 为什么加指针运算符?

实现一个pod的list-watch?

首先停止test.go,并注释掉test.go中代码。

image.png

创建了一个/src/service/test1.go,照着test.go来一遍:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}
func main() {
	s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.CoreV1().RESTClient(),
		"pods", "default", fields.Everything()),
		&corev1.Pod{},
		0,
		&PodHandler{},
	)
	c.Run(wait.NeverStop)
	s.List()
}

注意:pod 的api是 corev1。参照:https://github.com/kubernetes/client-go/blob/release-1.23/informers/core/v1/pod.go#L58

运行test1.go,修改default 下nginx deployment副本数:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3
deployment.apps/nginx scaled

image.png

SharedInformerFactory工厂模式

思考一下为什么要使用工厂模式呢?

关于SharedInformerFactory参考:https://stackoverflow.com/questions/40975307/how-to-watch-events-on-a-kubernetes-service-using-its-go-client,https://qiankunli.github.io/2020/07/20/client_go.html

先test1.go的pod开始:

image.png

image.png

src/service/test1.go

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

写的时候以为直接corev1......发现是core().v1(),为什么要用select{}呢?参照:https://blog.csdn.net/cbmljs/article/details/93497415。阻塞,防止程序退出!运行test1.go程序,修改nginx deployment副本:

[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=4
deployment.apps/nginx scaled

image.png

可不可以deployment pod list-watch搞在一起呢?现在写了一个test.go,test1.go?貌似也是可以的......先搞在一起,如下:

package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

运行test1.go ,更改nginx deployment副本数量,打印了deployment name 和pod name!

image.png

Handler OnAdd

补全一下OnAdd方法,打印一下pod deployment列表:

func (p *PodHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*corev1.Pod).Name)
}
func (d *DepHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*v1.Deployment).Name)
}
package main

import (
	"fmt"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
)

type PodHandler struct {
}

func (p *PodHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*corev1.Pod).Name)
}
func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) {
	if pods, ok := newObj.(*corev1.Pod); ok {
		fmt.Println(pods.Name)
	}
}
func (p *PodHandler) OnDelete(obj interface{}) {
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {
	fmt.Println(obj.(*v1.Deployment).Name)
}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	podinformer := factory.Core().V1().Pods()
	podinformer.Informer().AddEventHandler(&PodHandler{})
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	select {}
}

image.png

sync.Map

为什么引用sync.Map呢?Go语言sync.Map(在并发环境中使用的map),还是考虑并发原因!

拿deployment为例,打印一下develop namespace命名空间下的deployment列表:

test1.go

package main

import (
	"context"
	"fmt"
	"github.com/gin-gonic/gin"
	"k8s-demo1/src/lib"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"log"
	"sync"
	"time"
)

type DeploymentMap struct {
	data sync.Map
}

func (depmap *DeploymentMap) Add(dep *v1.Deployment) {
	if list, ok := depmap.data.Load(dep.Namespace); ok {
		list = append(list.([]*v1.Deployment), dep)
		depmap.data.Store(dep.Namespace, list)
	} else {
		depmap.data.Store(dep.Namespace, []*v1.Deployment{dep})
	}
}

type DepHandler struct {
}

func (d *DepHandler) OnAdd(obj interface{}) {
	//fmt.Println(obj.(*v1.Deployment).Name)
	DepMap.Add(obj.(*v1.Deployment))
}
func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) {
	if dep, ok := newObj.(*v1.Deployment); ok {
		fmt.Println(dep.Name)
	}
}
func (d *DepHandler) OnDelete(obj interface{}) {
}

var DepMap *DeploymentMap

func init() {
	DepMap = &DeploymentMap{}
}
func main() {
	factory := informers.NewSharedInformerFactory(lib.K8sClient, 0)
	depinformer := factory.Apps().V1().Deployments()
	depinformer.Informer().AddEventHandler(&DepHandler{})
	factory.Start(wait.NeverStop)
	c, _ := context.WithTimeout(context.Background(), time.Second*3)
	select {
	case <-c.Done():
		log.Fatal("time out")
	default:
		r := gin.New()
		r.GET("/", func(c *gin.Context) {
			var res []string
			DepMap.data.Range(func(key, value interface{}) bool {
				if key == "develop" {
					for _, item := range value.([]*v1.Deployment) {
						res = append(res, item.Name)
					}
				}
				return true

			})
			c.JSON(200, res)

		})
		r.Run(":8080")
	}
}

没有作具体的路由,直接测试一下访问:http://127.0.0.1:8080/

image.png

总结

  1. list-watch机制
  2. cache informer,informer工厂模式。
  3. handler实现
  4. sync.map
  5. 断言......

原文地址:https://cloud.tencent.com/developer/article/2006389

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340