如何做企业网站及费用问题网站推广软件免费版下载
1. 背景
client-go工具下的tools/cache.indexer为informer提供缓存与索引的能力。可以实现快速通过索引找到对应的对象(pod, deployment,secret,configmap等)。
indexer再informer机制中的使用图示:

indexer包括2部分: 一部分是store用于实际数据的存储,也就是图中thread safe store部分。另一部分是index,用于为数据制作索引。 建立索引后,就可以实现从store中快速检索出向要的数据。否则
从store中检索数据,需要依次对store中的数据进行全遍历查询。 indexer简单图示就是大概这样:

这个机制可以类比mysql数据库的索引机制。mysql的利用B+树建立索引,之后利用索引就能快速检索数据,而不需要全表遍历查询。
当然indexer的索引机制不是使用B+数,那具体是如何实现呢?接下来我们分析下indexer的索引实现方法。
2. indexer的源码分析
找到indexer源码,位于k8s.io/client-go/tools/cach/index.go
type Indexer interface {Store// Index returns the stored objects whose set of indexed values// intersects the set of indexed values of the given object, for// the named indexIndex(indexName string, obj interface{}) ([]interface{}, error)// IndexKeys returns the storage keys of the stored objects whose// set of indexed values for the named index includes the given// indexed valueIndexKeys(indexName, indexedValue string) ([]string, error)// ListIndexFuncValues returns all the indexed values of the given indexListIndexFuncValues(indexName string) []string// ByIndex returns the stored objects whose set of indexed values// for the named index includes the given indexed valueByIndex(indexName, indexedValue string) ([]interface{}, error)// GetIndexer return the indexersGetIndexers() Indexers// AddIndexers adds more indexers to this store. If you call this after you already have data// in the store, the results are undefined.AddIndexers(newIndexers Indexers) error
}
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index
indexer是一个接口,包括二部分,一部分是存储数据的store,后面可以进一步看到store也是一个接口。另一部分是实现索引的几个方Index(),IndexKeys(),ListIndexFuncValues,ByIndex(),GetIndexers(),AddIndexers(),以及三个map结构类型Index,Indexers,Indices。
可以看到涉及的名称和概念很多,需要一步一步的拆解分析,才能搞清楚各个概念的含义与作用,这里先简单介绍下。
(1) IndexFunc, 是一个函数,输入obj对象,输出对象在这个索引函数匹配后字段。
例如,一个pod对象,给pod 打个label, "city"="shenzhen"。可以定义一个IndexFunc,输入一个pod对象,输出这个pod已经定义的label: "city"的值:"shenzhen"
func cityIndexFunc(obj interface{}) ([]string, error) {pod := obj.(*corev1.Pod)psaId := pod.Labels["city"]return []string{psaId}, nil
}
(2) Index, map结构,这里我们姑且称为"索引表",索引机制用到的索引就存储在index类型里面。key是索引值,value是索引出的对象名(默认是<ns>/<meta.name>格式的对象)
(3) Indexers, map结构,可能IndexFunc索引函数有很多,那么可以给每个indexFunc起一个名字indexName,再把indexName: indexFunc的映射关系用一个map结构保存 。
(4) Indices, map结构,indexFunc有很多,每一个indexFunc都对应一个index索引表,所以indices就是indexName与index的映射关系表
indexer的定义简单图示下,便于理解

反复说,Indexer由两部分组成,"存储"+"索引"。我们先看看第一步分存储数据的store如何实现.
3. store.go 源码分析
找到store源码,位于k8s.io/client-go/tools/cach/store.go
store接口的定义
type Store interface {Add(obj interface{}) error // 往存储里面添加一个对象Update(obj interface{}) error // 更新存储里面的一个对象Delete(obj interface{}) error // 删除存储里面的一个对象List() []interface{} // 提取存储里面所有对象ListKeys() []string // 提取存储里面所有对象的keyGet(obj interface{}) (item interface{}, exists bool, err error) // 获取存储里面的一个对象GetByKey(key string) (item interface{}, exists bool, err error) // 通过key来获取存储里面的一个对象// Replace will delete the contents of the store, using instead the// given list. Store takes ownership of the list, you should not reference// it after calling this function.Replace([]interface{}, string) error // 替换存储里面的所有对象Resync() error
}
store的构造函数与indexer的构造函数:
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {return &cache{cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),keyFunc: keyFunc,}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {return &cache{cacheStorage: NewThreadSafeStore(indexers, Indices{}),keyFunc: keyFunc,}
}
NewStore()返回的是一个Store对象,只能存储数据,不具有索引能力。NewIndexer()返回的是一个Indexer,通过上分析知道Indexer是既有"存储"能力,也有"索引"能力的类型。
另外,从构造函数可以看到NewStore()与NewIndexer()都返回的是一个&cache{}对象,那还得继续看看cache类型的定义
cache类型的定义
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
type cache struct {// cacheStorage bears the burden of thread safety for the cachecacheStorage ThreadSafeStore // ThreadSafeStore 是存数据的地方// keyFunc is used to make the key for objects stored in and retrieved from items, and// should be deterministic.keyFunc KeyFunc // 作用把一个object计算出一个key出来
}
var _ Store = &cache{}
从cache类型的定义看,包括2部分,一个是ThreadSafeStore类型,一个keyFunc. 其中keyFunc,这个方法的作用把object计算出一个key,通常用的keyFunc是MetaNamespaceKeyFunc, 可以输入一个obj,返回结果是<ns>/<name>,如果没有namespace的对象,返回<name>.
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if err != nil {return "", fmt.Errorf("object has no meta: %v", err)}if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil // 如果有namespace的资源类型,返回ns+name,比如pod,configmap等}return meta.GetName(), nil // 如果没有namespace的资源类型,返回ns,比如node,pv等
}
而且cache类型实现了store接口定义的所有方法 Add(), Update(), Delete(), List(), ListKeys(), Get(), GetByKey(), Replace(), Resync() ,具体代码略。
store是抽象接口类,cache是store的具体实现类型。
接下来看看cache类型定义包括的另一部分,ThreadSafeStore的分析
4. ThreadSafeStore.go 源码分析
找到ThreadSafeStore源码,位于k8s.io/client-go/tools/cach/ThreadSafeStore.go
type ThreadSafeStore interface {Add(key string, obj interface{})Update(key string, obj interface{})Delete(key string)Get(key string) (item interface{}, exists bool)List() []interface{}ListKeys() []stringReplace(map[string]interface{}, string)Index(indexName string, obj interface{}) ([]interface{}, error)IndexKeys(indexName, indexKey string) ([]string, error)ListIndexFuncValues(name string) []stringByIndex(indexName, indexKey string) ([]interface{}, error)GetIndexers() Indexers// AddIndexers adds more indexers to this store. If you call this after you already have data// in the store, the results are undefined.AddIndexers(newIndexers Indexers) errorResync() error
}
定义了一个ThreadSafeStore的接口类型。可以看到ThreadSafeStore定义的方法,包括"存储"能力部分对应的函数,与"索引"能力部分对应的函数。
threadSafeMap类型的定义
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {lock sync.RWMutex // 保证对items map表操作的线程安全items map[string]interface{} // 真正存储数据的map表结构// indexers maps a name to an IndexFuncindexers Indexers // 保存IndexFunc索引函数的map结构// indices maps a name to an Indexindices Indices // 保存Index索引表的map结构
}
threadSafeMap类型实现了ThreadSafeStore接口类型。
threadSafeMap类型定义中的lock为了保证操作map时线程安全。items是一个map,用于真正存储数据的结构; indexers 是一个map,保存IndexFunc与这个函数的命名(即indexName); indices也是一个map,map表里面保存了很多的index索引表.
threadSafeMap类型定义的方法包括 Add(), Update(), Delete(), List(), ListKeys(), Get() , Replace(), Resync(), Index() , IndexKeys(), ListIndexFuncValues(), ByIndex(), GetIndexers(), AddIndexers().
大致分为实现存储数据的方法函数: Add(), Update(), Delete(), List(), ListKeys(), Get() , Replace(), Resync()
与实现索引的方法函数: Index() , IndexKeys(), ListIndexFuncValues(), ByIndex(), GetIndexers(), AddIndexers()

到此为止我们总结下:
经过层层剥解,我们知道indexer接口类型,调用了store接口类型,store接口类型调用了cache普通类型,cache类型的定义中包括threadSafeStore接口类型, 而threadSafeMap普通类型是threadSafeStore接口类型的实现。 所以indexer接口类型的的存储与索引能力,是利用底层的是threadSafeMap类型实现的.
threadSafeMap类型包括2部分,一部分是一个名叫到items的map,是存储数据的结构。另一部分是2个map,分别是indexers、indices,其中indexers是用于存放indexFunc即索引函数的集合,indices是存放index的集合,index保存的是索引后的值。

5. 举例说明
我们参考应官方文档的测试用例k8s.io/client-go/tools/cache/index_test.go,写一个测试代码.
为了更好的理解源码与下面的测试用例,这里再重申下几个概念,源码中会多次涉及到。
obj: object表示k8s中的一个资源对象runtime.object,比如命名空间为"public"下,名为"one"的pod
key: 是由函数MetaNamespaceKeyFunc(obj)对某个obj资源对象处理后,返回的一个string,是有<ns>/<name>组成,比如上面那个pod对象,对应的Key是"public/one"
indexer: 可以理解为是一个能实现"存储"+“索引”能力的对象。
indexFunc: 称为"索引函数"
indexName: 索引函数也需要一个名字,就叫indexName
indexValue: 由索引函数处理Obj后,返回的值。比如下面例子中的“shenzhen”,"chengdu","beijing"
index: map结构,也叫索引表,真正存储索引的地方
indices: 每一个indexFunc都有一个对应的index,indices用于保存indexName与index的对应关系。可以通过indexName找到对应的Index
测试的完整代码
func cityIndexFunc(obj interface{}) ([]string, error) {pod := obj.(*corev1.Pod)psaId := pod.Labels["city"]return []string{psaId}, nil
}
func TestIndexer(t *testing.T) {// 用NewIndexer构造函数,创建一个indexer对象indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"cityIndex": cityIndexFunc,})// 造数据,添加pods到indexer中pod1 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Namespace: "public", Labels: map[string]string{"city": "shenzhen"}}}pod2 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Namespace: "public", Labels: map[string]string{"city": "chengdu"}}}pod3 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Namespace: "public", Labels: map[string]string{"city": "beijing"}}}pod4 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "for", Namespace: "public", Labels: map[string]string{"city": "shenzhen"}}}indexer.Add(pod1)indexer.Add(pod2)indexer.Add(pod3)indexer.Add(pod4)fmt.Println("显示索引表的所有数据: ")for k, v := range indexer.List() {fmt.Println(k, v.(*corev1.Pod).Name, v.(*corev1.Pod).Labels)}// 显示indexer中的所有索引值values := indexer.ListIndexFuncValues("cityIndex")fmt.Println("values: ", values) // values: [chengdu beijing shenzhen]// 查询索引值为shenzhen的pod// ByIndex 根据索引函数名与索引值,检索出匹配的obj对象foundPods2, err := indexer.ByIndex("cityIndex", "shenzhen")if err != nil {fmt.Printf("unexpected error: %v\n", err)}fmt.Println("pod have label shenzhen: ")for _, pod2 := range foundPods2 {fmt.Println(pod2.(*corev1.Pod).Namespace, pod2.(*corev1.Pod).Name) // 结果是 public for; public one}// IndexKeys 根据索引名与索引值,检索出匹配的obj的key(key是由ns/name组成)keys, err := indexer.IndexKeys("cityIndex", "shenzhen")if err != nil {t.Error(err)}for _, key := range keys {fmt.Println("key: ", key) // 结果是: public/one;public/for}// 查询所有obj中,用索引函数匹配的索引值ss := indexer.ListIndexFuncValues("cityIndex")fmt.Println("indexFuncValue: ", ss) // indexFuncValue: [chengdu beijing shenzhen]// 返回与输入obj有同样索引的objress, err := indexer.Index("cityIndex", pod1)if err != nil {return}fmt.Println(len(ress))for _, pod := range ress {fmt.Println(pod.(*corev1.Pod).Name, pod.(*corev1.Pod).Namespace) // one public,for public}
}
测试功能说明: 需要通过label快速检索出对应对象obj
创建一个索引函数: cityIndexFunc,函数输入是一个对象obj, 返回label是"city"的值。
加入一个pod,打上label: "city"="shenzhen". 将这个pod作为参数输入到这个函数,输出就是"city"对应的值"shenzhen"
func cityIndexFunc(obj interface{}) ([]string, error) {pod := obj.(*corev1.Pod)psaId := pod.Labels["city"]return []string{psaId}, nil
}
使用NewIndexer()构造函数, 创建一个Indexer对象出来。构造函数入参需要keyFunc函数 与 indexers类型。
keyFunc使用常用的MetaNamespaceKeyFunc函数,Indexers类型,需要一个indexFunc,这里就使用上面的cityIndexFunc函数。 索引函数名是"cityIndex",索引函数是"cityIndexFunc"
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"cityIndex": cityIndexFunc,})
进一步看下NewIndexer()函数,返回的cache对象中,用NewThreadSafeStore(indexers, Indices{},构造了一个NewThreadSafeStore对象
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {return &cache{cacheStorage: NewThreadSafeStore(indexers, Indices{}),keyFunc: keyFunc,}
}
再进一步看下NewThreadSafeStore()构造函数,返回了一个threadSafeMap对象,该对象中又创建了一个名"items"的map[string]interface{},它就是正是的索引表;indexers是索引函数的集合; indices是索引表index的集合。
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {return &threadSafeMap{items: map[string]interface{}{},indexers: indexers,indices: indices,}
}
通过构造4个pod对象,再用Add()方法,将pod对象添加到存储。Indexer的Add()方法。
// 造数据,添加pods到indexer中pod1 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Namespace: "public", Labels: map[string]string{"city": "shenzhen"}}}pod2 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Namespace: "public", Labels: map[string]string{"city": "chengdu"}}}pod3 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Namespace: "public", Labels: map[string]string{"city": "beijing"}}}pod4 := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "for", Namespace: "public", Labels: map[string]string{"city": "shenzhen"}}}indexer.Add(pod1)indexer.Add(pod2)indexer.Add(pod3)indexer.Add(pod4)
Indexer的Add()方法,是用store接口类型的Add()实现的。 方法定义如下:
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {key, err := c.keyFunc(obj) // 现有keyFunc也就是MetaNamespaceKeyFunc方法,计算出obj的key(由<ns>/<name>表示)if err != nil {return KeyError{obj, err}}c.cacheStorage.Add(key, obj) // 再调用ThreadSafeStore接口类型的Add()方法return nil
}
ThreadSafeStore接口类型的Add()方法,调用的是ThreadSafeMap的Add()方法
ThreadSafeMap的Add()方法的实现如下,包括2部分,一是存储数据,二是更新索引。
func (c *threadSafeMap) Add(key string, obj interface{}) {c.lock.Lock()defer c.lock.Unlock()oldObject := c.items[key] // 通过key获取存储内原来的obj对象即oldObjectc.items[key] = obj // 新的obj存到items表中c.updateIndices(oldObject, obj, key) // 使用updateIndices()更新索引
}
那么c.updateIndices(oldObject, obj, key)是如何更新索引的呢?
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {// if we got an old object, we need to remove it before we add it againif oldObj != nil {c.deleteFromIndices(oldObj, key) // 如存储里面,已经有obj的老数据,先把老数据的索引删除}for name, indexFunc := range c.indexers {indexValues, err := indexFunc(newObj) // 通过indexFunc获取到newObj的索引值indexValuesif err != nil {panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))}index := c.indices[name] // 通过indexName索引函数名,找到对应index索引表if index == nil { // 如果indexName索引函数名,还没有对应的索引表Index,就index{}新创建一个索引表index = Index{}c.indices[name] = index // 把新创建的索引表index,加到indices表中}for _, indexValue := range indexValues {set := index[indexValue] // 在index索引表中,用indexValue值找对应的值,值是一个set.string{}类型if set == nil { // 如果在index索引表,没有找到indexValue值时,就新建一个set.string{}类型set = sets.String{} index[indexValue] = set // indexValue与set对应的数据,存放到index索引表}set.Insert(key) // 如果index表中,已经有indexValue值的set.string{}数据,就将key加到这个set.string{}集合中去}}
}
简单的理解updateIndices ()函数的逻辑,如何oldObj已经在存储里面,就先删除oldObj对应的索引。 从indexers从遍历出,索引函数名name与索引函数indexFunc,用name从indices找到对应index索引表. 最后处理索引值indexValue,如果indexValue已经存储在与index索引表中,就将indexValue添加到index表中去。如果不存在,就新加一个键值对应健为index[indexValue],值为sets.string{}.
set.string{}是一个map结果类型,利用map建的唯一性,实现一个集合set类型, set的特点就是元素无重复。具体set.string{}的实现这里就不展开了,详情可以查看源码/k8s.io/apimachinery/pkg/util/sets
// sets.String is a set of strings, implemented via map[string]struct{} for minimal memory consumption.
type String map[string]Empty
type Empty struct{}
接下来我们看看indexer的几个主要函数
indexer.List()返回indexer中存储的所有obj对象
fmt.Println("显示索引表的所有数据: ")for k, v := range indexer.List() {fmt.Println(k, v.(*corev1.Pod).Name, v.(*corev1.Pod).Labels)}
indexer.ListIndexFuncValues("cityIndex") 用于返回匹配了indexFunc索引函数为"cityIndex"对应的索引值
// 显示indexer中的所有索引值
values := indexer.ListIndexFuncValues("cityIndex")
fmt.Println("values: ", values) // values: [chengdu beijing shenzhen]
indexer.ByIndex("cityIndex", "shenzhen"),根据索引函数名与索引值,检索出匹配的对象obj
// 查询索引值为shenzhen的pod
// ByIndex 根据索引函数名与索引值,检索出匹配的obj对象
foundPods2, err := indexer.ByIndex("cityIndex", "shenzhen")
if err != nil {fmt.Printf("unexpected error: %v\n", err)
}
fmt.Println("pod have label shenzhen: ")
for _, pod2 := range foundPods2 {fmt.Println(pod2.(*corev1.Pod).Namespace, pod2.(*corev1.Pod).Name) // 结果是 public for; public one
}
indexer.IndexKeys("cityIndex", "shenzhen"),根据索引函数名与索引值,检索出匹配的key(这里key默认是<ns>/<name>组合的字符串).
// IndexKeys 根据索引名与索引值,检索出匹配的obj的key(key是由ns/name组成)
keys, err := indexer.IndexKeys("cityIndex", "shenzhen")
if err != nil {t.Error(err)
}
for _, key := range keys {fmt.Println("key: ", key) // 结果是: public/one;public/for
}
再补充下,ByIndex(),IndexKeys()很类似,输入都是一样,但一个返回的是对象obj,一个是返回的对象obj的key.
indexer.Index("cityIndex", pod1), 先通过obj对象"pod1"找到indexkeys: "shenzhen", 再通过indexName: "cityIndex"与indexKey: "shenzhen", 接下来的逻辑同IndexKeys()函数
// 返回与输入obj有同样索引的obj
ress, err := indexer.Index("cityIndex", pod1)
if err != nil {return
}
fmt.Println(len(ress))
for _, pod := range ress {fmt.Println(pod.(*corev1.Pod).Name, pod.(*corev1.Pod).Namespace) // one public,for public
}
最后为了更深入理解index,indices,items三个map中具体存储的什么类型,默认可以在源码k8s.io/client-go/tools/cach/ThreadSafeStore.go中,找到ByIndex()函数下添加打印信息。

在运行测试用例"go test -run "^TestIndexer" -v ",输出如下:
items的内容: map[public/for:&Pod{ObjectMeta:{for public 此次输出省略},} public/one:&Pod{ObjectMeta:{one public 此次输出省略},} public/tre:&Pod{ObjectMeta:{tre public 此次输出省略},} public/two:&Pod{ObjectMeta:{two public 此次输出省略}]
index的内容: map[beijing:map[public/tre:{}] chengdu:map[public/two:{}] shenzhen:map[public/for:{} public/one:{}]]
indices的内容: map[cityIndex:map[beijing:map[public/tre:{}] chengdu:map[public/two:{}] shenzhen:map[public/for:{} public/one:{}]]]
为了便于理解再整理下格式:
items的内容:
map["public/for": &pod{},"public/one": &pod{},"public/tre": &pod{},"public/two": &pod{},
]
items的内容是key与obj的对应,也就是实际存储的数据. 重述下,key的格式是ns/name
index的内容:
map["shenzhen": map["public/for": {},"public/one": {}]"beijing": map["public/tre":{}]"chengdu": map["public/two":{}]
]
可以看出index,的键是"",value是一个map, 这个map里面,只有键,没有value,这里不用list,我猜想是为了能保证数据唯一
indices的内容
map["cityIndex": index]map["cityIndex": map["beijing": map["public/tre": {}]"chengdu": map["public/two": {}]"shenzhen": map["public/for":{} "public/one":{}]
]
indices的键是indexFunc的名称即: indexName,value为一个Index