list-watch,作为k8s系统中统一的异步消息传递方式,对系统的性能、数据一致性起到关键性的作用。今天我想从代码这边探究一下list-watch的实现方式。并看是否能在后面的工作中优化这个过程。
上图是一个典型的Pod创建过程,在这个过程中,每次当kubectl创建了ReplicaSet对象后,controller-manager都是通过list-watch这种方式得到了最新的ReplicaSet对象,并执行自己的逻辑来创建Pod对象。其他的几个组件,Scheduler/Kubelet也是一样,通过list-watch得知变化并进行处理。这是组件的处理端代码:
go c.NodeLister.Store, c.nodePopulator = framework.NewInformer( c.createNodeLW(), ...(1) &api.Node{}, ...(2) 0, ...(3) framework.ResourceEventHandlerFuncs{ ...(4) AddFunc: c.addNodeToCache, ...(5) UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, )
其中(1)是list-watch函数,(4)(5)则是相应事件触发操作的入口。
list-watch操作需要做这么几件事:
既然watch本身是一个apiserver提供的http restful的API,那么就按照API的方式去阅读它的代码,按照apiserver的基础功能实现一文所描述,我们来看它的代码,
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,... ... lister, isLister := storage.(rest.Lister) watcher, isWatcher := storage.(rest.Watcher) ...(1) ... case "LIST": // List all resources of a kind. ...(2) doc := "list objects of kind " + kind if hasSubresource { doc = "list " + subresource + " of objects of kind " + kind } handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)
func ListResource(r rest.Lister, rw rest.Watcher,... { ... if (opts.Watch || forceWatch) && rw != nil { watcher, err := rw.Watch(ctx, &opts) ...(1) .... serveWatch(watcher, scope, req, res, timeout) return } result, err := r.List(ctx, &opts) ...(2) write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
func serveWatch(watcher watch.Interface... { server.ServeHTTP(res.ResponseWriter, req.Request) } func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { for { select { case event, ok := <-s.watching.ResultChan(): obj := event.Object if err := s.embeddedEncoder.EncodeToStream(obj, buf); ... }
这段的操作基本毫无技术含量,就是从watcher的结果channel中读取一个event对象,然后持续不断的编码写入到http response的流当中。
所以,我们的问题就回到了
回到上面的代码追踪过程来看,watcher(watch.Interface)对象是被Rest.Storage对象创建出来的。Rest.Storage分两层,一层是每个对象自己的逻辑,另一层则是通过通用的操作来搞定,像watch这样的操作应该是通用的,所以我们看这个源代码
/pkg/registry/generic/registry/store.go
func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) { ... return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) } func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) ...(1) return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc) }
果然,我们在(1)这里找到了生成Watch的函数,但这个工作是由e.Storage来完成的,所以我们需要找一个具体的Storage的生成过程,以Pod为例子
/pkg/registry/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage { prefix := "/pods" storageInterface := opts.Decorator( opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc) ...(1) store := ®istry.Store{ ... Storage: storageInterface, ...(2) } return PodStorage{ Pod: &REST{ store, proxyTransport}, ...(3)
这(1)就是Storage的生成现场,传入的参数包括了一个缓存Pod的数量。(2)(3)是和上面代码的连接点。那么现在问题就转化为追寻Decorator这个东西具体是怎么生成的,需要重复刚才的过程,往上搜索opts是怎么搞进来的。
/pkg/master/master.go - GetRESTOptionsOrDie() /pkg/genericapiserver/genericapiserver.go - StorageDecorator() /pkg/registry/generic/registry/storage_factory.go - StorageWithCacher() /pkg/storage/cacher.go
OK,这样我们就来到正题,一个具体的watch缓存的实现了!
把上面这个过程用一幅图表示:
看代码,首要看的是数据结构,以及考虑这个数据结构和需要解决的问题之间的关系。
对于cacher这结构来说,我们从外看需求,可以知道这是一个Storage,用于提供某个类型的数据,例如Pod的增删改查请求,同时它又用于watch,用于在client端需要对某个key的变化感兴趣时,创建一个watcher来源源不断的提供新的数据给客户端。
那么cacher是怎么满足这些需求的呢?答案就在它的结构里面:
type Cacher struct { // Underlying storage.Interface. storage Interface // "sliding window" of recent changes of objects and the current state. watchCache *watchCache reflector *cache.Reflector // Registered watchers. watcherIdx int watchers map[int]*cacheWatcher }
略去里面的锁(在看代码的时候一开始要忽略锁的存在,锁是后期为了避免破坏数据再加上去的,不影响数据流),略去里面的一些非关键的成员,现在我们剩下这3段重要的成员,其中
当然,这3个成员的作用是我看了所有代码后,总结出来的,一开始读代码时不妨先在脑子里面有个定位,然后在看下面的方法时不断修正这个定位。那么,接下来就看看具体的方法是怎么让数据在这些结构里面流动的吧!
func NewCacherFromConfig(config CacherConfig) *Cacher { ... cacher.startCaching(stopCh) } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { ... if err := c.reflector.ListAndWatch(stopChannel); err != nil { glog.Errorf("unexpected ListAndWatch error: %v", err) } }
其他的部分都是陈词滥调,只有startCaching()这段有点意思,这里启动一个go协程,最后启动了c.reflector.ListAndWatch()这个方法,如果对k8s的基本有了解的话,这个其实就是一个把远端数据源源不断的同步到本地的方法,那么数据落在什么地方呢?往上看可以看到
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
也就是说从创建cacher的实例开始,就会从etcd中把所有Pod的数据同步到watchCache里面来。这也就印证了watchCache是数据从etcd过来的第一站。
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) } 大部分方法都很无聊,就是短路到底层的storage直接执行。
// Implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) c.watchers[c.watcherIdx] = watcher c.watcherIdx++ return watcher, nil }
这里的逻辑就比较清晰,首先从watchCache中拿到从某个resourceVersion以来的所有数据——initEvents,然后用这个数据创建了一个watcher返回出去为某个客户端提供服务。
// Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { filterFunc := filterFunction(key, c.keyFunc, filter) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV) if err != nil { return fmt.Errorf("failed to wait for fresh list: %v", err) } for _, obj := range objs { if filterFunc(object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) } } }
从这段代码中我们可以看出2件事,一是list的数据都是从watchCache中获取的,二是获取后通过filterFunc过滤了一遍然后返回出去。
这个结构应该是缓存的核心结构,从上一层的代码分析中我们已经知道了对这个结构的需求,包括存储所有这个类型的数据,包括当有新的数据过来时把数据扔到cacheWatcher里面去,总之,提供List和Watch两大输出。
type watchCache struct { // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined // by endIndex (if cache is full it will be startIndex + capacity). // Both startIndex and endIndex can be greater than buffer capacity - // you should always apply modulo capacity to get an index in cache array. cache []watchCacheElement startIndex int endIndex int // store will effectively support LIST operation from the "end of cache // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. store cache.Store }
这里的关键数据结构依然是2个
那么继续看看方法是怎么运转的吧~
func (w *watchCache) Update(obj interface{}) error { event := watch.Event{ Type: watch.Modified, Object: object} f := func(obj runtime.Object) error { return w.store.Update(obj) } return w.processEvent(event, resourceVersion, f) } func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { previous, exists, err := w.store.Get(event.Object) watchCacheEvent := watchCacheEvent{ event.Type, event.Object, prevObject, resourceVersion} w.onEvent(watchCacheEvent) w.updateCache(resourceVersion, watchCacheEvent) } // Assumes that lock is already held for write. func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) { w.cache[w.endIndex%w.capacity] = watchCacheElement{ resourceVersion, event} w.endIndex++ }
所有的增删改方法做的事情都差不多,就是在store里面存具体的数据,然后调用processEvent()去增加环形队列里面的数据,如果详细看一下onEvent的操作,就会发现这个操作的本质是落在cacher.go里面:
func (c *Cacher) processEvent(event watchCacheEvent) { for _, watcher := range c.watchers {
watcher.add(event)
}
}
往所有的watcher里面挨个添加数据。总体来说,我们可以从上面的代码中得出一个结论:cache里面存储的是Event,也就是有prevObject的,对于所有操作都会在cache里面保存,但对于store来说,只存储当下的数据,删了就删了,改了就改了。
这里本来应该讨论List()方法的,但在cacher里面的List()实际上使用的是这个,所以我们看这个方法。
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) { startTime := w.clock.Now() go func() { w.cond.Broadcast() }() for w.resourceVersion < resourceVersion { w.cond.Wait() } return w.store.List(), w.resourceVersion, nil }
这个方法比较绕,前面使用了一堆cond通知来和其他协程通信,最后还是调用了store.List()把数据返回出去。后面来具体分析这里的协调机制。
这个方法在cacher的创建cacheWatcher里面使用,把当前store里面的所有数据都搞出来,然后把store里面的数据都转换为AddEvent,配上cache里面的Event,全部返回出去。
这个结构是每个watch的client都会拥有一个的,从上面的分析中我们也能得出这个结构的需求,就是从watchCache里面搞一些数据,然后写到客户端那边。
// cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event filter FilterFunc stopped bool forget func(bool) }
这段代码比较简单,就不去分析方法了,简单说就是数据在增加的时候放到input这个channel里面去,通过filter然后输出到result这个channel里面去。
这里的代码分析比较冗长,但从中可以得出看代码的一般逻辑:
这里我看完代码后有这些问题:
一个程序员的情书(超好看)
ROP-Ret2Shellcode-64位实例/usr/include/x86_64-linux-gnu/asm/unisted_64.h编写64位shellcode,思路和32位是一样的(1)想办法调用execve("/bin/sh",null,null)(2)借助栈来传入字符串/bin/sh(3)系统调用execverax = 0x3b(64bit)rdi = bin_sh_addrrsi = 0rdx = 0实例代码如下:setvbuf函数的作用是优化io流,在服务器上时,或
端午节终于放假了,大家可算是看到工作的尽头了,盼到了欢天喜地的端午节假日!可以回家了高速路好堵端午节放假除了吃粽子还是粽子纪念屈原so,你的端午节是怎么安排的普通的程序员一觉睡到大中午,...
一、设备介绍:型号:LMS511-10100(DC 24v)品牌:SICK操作环境:Windows 10 64bit软件:SOPAS ET连接线:串口转网口线(1根/4针 子头),电源线(1根/5针 母头)软件/文档下载地址:https://www.sick.com/cn/zh/detection-and-ranging-solutions/2d-lidar-/lms5xx/lms5...
处理方法 包含结束时间00.00.00 Date createDate = boxCodeLog.getCreateDate(); if (createDate != null){ LocalDate localDate=createDate.toInstant().atZone(ZoneId.syste...
我正在尝试将压缩字节发送到另一台服务器,然后让该服务器接收它们并写出压缩文件。当我在同一台服务器上进行压缩和编写时,它的效果非常好。本地版本看起来像这样:ZipOutputStream zout = new ZipOutputStream(FileOutputStream);zout.write(byteBuffer, 0, len);zout.flush()FileOutputStream.fl...
前言一开始,我只是想把一个AWD下的批量写马工具升级改造一下,记录一下期间的心得体会,本以为现在mysql弱口令连接的漏洞很少。但当最后工具完成后,一测试扫描外国网段,半天时间竟然就成功连接了上千台数据库服务器。起因这个脚本最开始的构思是在AWD比赛的情景下,因为所有服务器的环境都相同,只要查看本地的MySql用户名密码就知道了所有服务器的MySql用户名密码。若服务器开放了3306端口,那么利用...
因为google为了扩大chrome浏览器的范围还有影响,所以在很早以前就开始充许客户订制主页和书签,这个其中主要使用了ContentProvider的特性进行数据的分享。在google提供的源码中有定制主页和书签的apk,但是在高通和mtk分发的时候这两个apk已经做了一定的处理,不在进行编译。其中两个项目都位置在:packages/providers下一,定制主页定制主页googl
DFP算法是本科数学系中最优化方法的知识,也是无约束最优化方法中非常重要的两个拟Newton算法之一,上一周写了一周的数学软件课程论文,姑且将DFP算法的实现细节贴出来分享给学弟学妹参考吧,由于博客不支持数学公式,所以就不累述算法原理及推导公式了。DFP算法流程图先给出DFP算法迭代流程图,总体上是拟Newton方法的通用迭代步骤,唯独在校正...
0.目录目录文件系统概念索引式文件系统1 superblockinodeblock2 读取文件Ext系列文件系统1 Ext文件系统结构2 查看文件系统命令3 Ext文件系统如何存储和读取文件31 存储文件32 读取文件过程4 创建文件过程与日志文件系统41 创建文件过程42 数据不一致状态43 日志文件系统5 链接文件51 硬链接52 软链接1.文件系统概念格式化
Problem Description在每年的校赛里,所有进入决赛的同学都会获得一件很漂亮的t-shirt。但是每当我们的工作人员把上百件的衣服从商店运回到赛场的时候,却是非常累的!所以现在他们想要寻找最短的从商店到赛场的路线,你可以帮助他们吗? Input输入包括多组数据。每组数据第一行是两个整数N、M(N输入保证至少存在1条商店到赛场的路线。
1.什么是不可变类型不可变对象是指一个对象的状态在对象被创建之后就不再变化。这里的不可变化是指不可以修改这个类的内容,这样的设计有很多的好处,不可变的对象可以复用,是共享的,同时还设计到了一个线程安全的问题,不可变类的不变性确保了多个线程在访问同一个对象的时候,是线程安全的。这里有兴趣的可以看一下《Effective Java》这本书,很经典的书,一般别人问我推荐什么java程序员必读的书