例如,对于 Ceph 对象存储来说,每个 LIST bucket 请求都需要去多个磁盘中捞出这个 bucket 的全部数据;不仅自身很慢,还影响了同一时间段内的其他普通读写请求,因为 IO 是共享的,导致响应延迟上升乃至超时。如果 bucket 内的对象非常多(例如用作 harbor/docker-registry 的存储后端),LIST 操作甚至都无法在常规时间内完成( 因而依赖 LIST bucket 操作的 registry GC 也就跑不起来)。
又如 KV 存储 etcd。相比于 Ceph,一个实际 etcd 集群存储的数据量可能很小(几个 ~ 几十个 GB),甚至足够缓存到内存中。但与 Ceph 不同的是,它的并发请求数量可能会高 几个量级,比如它是一个 ~4000 nodes 的 k8s 集群的 etcd。单个 LIST 请求可能只需要 返回几十 MB 到上 GB 的流量,但并发请求一多,etcd 显然也扛不住,所以最好在前面有 一层缓存,这就是 apiserver 的功能(之一)。K8s 的 LIST 请求大部分都应该被 apiserver 挡住,从它的本地缓存提供服务,但如果使用不当,就会跳过缓存直接到达 etcd,有很大的稳定性风险。
本文深入研究 k8s apiserver/etcd 的 LIST 操作处理逻辑和性能瓶颈,并提供一些基础服务的 LIST 压力测试、 部署和调优建议,提升大规模 K8s 集群的稳定性。
kube-apiserverLIST代码基于 v1.24.0,不过 1.19~1.24 的基本逻辑和代码路径是一样的,有需要可对照参考。
1.1 K8s 架构:环形层次视图
从架构层次和组件依赖角度,可以将一个 K8s 集群和一台 Linux 主机做如下类比:
对于 K8s 集群,从内到外的几个组件和功能:
ListWatchkubelet*-agent*-operatorList/ListWatchapiserver/etcd以上可以看到,系统路径中存在两级 List/ListWatch(但数据是同一份):
apiserver List/ListWatch etcd 基础服务 List/ListWatch apiserver
因此,从最简形式上来说,apiserver 就是挡在 etcd 前面的一个代理(proxy),
+--------+ +---------------+ +------------+
| Client | -----------> | Proxy (cache) | --------------> | Data store |
+--------+ +---------------+ +------------+
infra services apiserver etcd
绝大部分情况下,apiserver 直接从本地缓存提供服务(因为它缓存了集群全量数据);
某些特殊情况,例如,
客户端明确要求从 etcd 读数据(追求最高的数据准确性), apiserver 本地缓存还没建好
apiserver 就只能将请求转发给 etcd —— 这里就要特别注意了 —— 客户端 LIST 参数设置不当也可能会走到这个逻辑。
apiserver/etcd1.3.1 请求举例
考虑下面几个 LIST 操作:
LIST apis/cilium.io/v2/ciliumendpoints?limit=500&resourceVersion=0resourceVersion=0limit=500LIST api/v1/pods?filedSelector=spec.nodeName%3Dnode1node1%3D= podList, err := Client().CoreV1().Pods("").List(ctx(), ListOptions{FieldSelector: "spec.nodeName=node1"})
store.List
|-store.ListPredicate
|-if opt == nil
| opt = ListOptions{ResourceVersion: ""}
|-Init SelectionPredicate.Limit/Continue fileld
|-list := e.NewListFunc() // objects will be stored in this list
|-storageOpts := storage.ListOptions{opt.ResourceVersion, opt.ResourceVersionMatch, Predicate: p}
|
|-if MatchesSingle ok // 1. when "metadata.name" is specified, get single obj
| // Get single obj from cache or etcd
|
|-return e.Storage.List(KeyRootFunc(ctx), storageOpts) // 2. get all objs and perform filtering
|-cacher.List()
| // case 1: list all from etcd and filter in apiserver
|-if shouldDelegateList(opts) // true if resourceVersion == ""
| return c.storage.List // list from etcd
| |- fromRV *int64 = nil
| |- if len(storageOpts.ResourceVersion) > 0
| | rv = ParseResourceVersion
| | fromRV = &rv
| |
| |- for hasMore {
| | objs := etcdclient.KV.Get()
| | filter(objs) // filter by labels or filelds
| | }
|
| // case 2: list & filter from apiserver local cache (memory)
|-if cache.notready()
| return c.storage.List // get from etcd
|
| // case 3: list & filter from apiserver local cache (memory)
|-obj := watchCache.WaitUntilFreshAndGet
|-for elem in obj.(*storeElement)
| listVal.Set() // append results to listOjb
|-return // results stored in listObj
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L361
// 根据 PredicateFunc 中指定的 LabelSelector 和 FieldSelector 过滤,返回一个对象列表
func (e *Store) List(ctx, options *metainternalversion.ListOptions) (runtime.Object, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil
label = options.LabelSelector // Label 过滤器,例如 app=nginx
field := fields.Everything()
if options != nil && options.FieldSelector != nil
field = options.FieldSelector // 字段过滤器,例如 spec.nodeName=node1
out := e.ListPredicate(ctx, e.PredicateFunc(label, field), options) // 拉取(List)数据并过滤(Predicate)
if e.Decorator != nil
e.Decorator(out)
return out, nil
}
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go#L411
func (e *Store) ListPredicate(ctx , p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
// Step 1: 初始化
if options == nil
options = &metainternalversion.ListOptions{ResourceVersion: ""}
p.Limit = options.Limit
p.Continue = options.Continue
list := e.NewListFunc() // 返回结果将存储在这里面
storageOpts := storage.ListOptions{ // 将 API 侧的 ListOption 转成底层存储侧的 ListOption,字段区别见下文
ResourceVersion: options.ResourceVersion,
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: p,
Recursive: true,
}
// Step 2:如果请求指定了 metadata.name,则应获取单个 object,无需对全量数据做过滤
if name, ok := p.MatchesSingle(); ok { // 检查是否设置了 metadata.name 字段
if key := e.KeyFunc(ctx, name); err == nil { // 获取这个 object 在 etcd 中的 key(唯一或不存在)
storageOpts.Recursive = false
e.Storage.GetList(ctx, key, storageOpts, list)
return list
}
// else 逻辑:如果执行到这里,说明没有从 context 中拿到过滤用的 key,则 fallback 到下面拿全量数据再过滤
}
// Step 3: 对全量数据做过滤
e.Storage.GetList(ctx, e.KeyRootFunc(), storageOpts, list) // KeyRootFunc() 用来获取这种资源在 etcd 里面的 root key(即 prefix,不带最后的 /)
return list
}
// staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go
// ListOptions is the query options to a standard REST list call.
type ListOptions struct {
metav1.TypeMeta
LabelSelector labels.Selector // 标签过滤器,例如 app=nginx
FieldSelector fields.Selector // 字段过滤器,例如 spec.nodeName=node1
Watch bool
AllowWatchBookmarks bool
ResourceVersion string
ResourceVersionMatch metav1.ResourceVersionMatch
TimeoutSeconds *int64 // Timeout for the list/watch call.
Limit int64
Continue string // a token returned by the server. return a 410 error if the token has expired.
}
// staging/src/k8s.io/apiserver/pkg/storage/interfaces.go
// ListOptions provides the options that may be provided for storage list operations.
type ListOptions struct {
ResourceVersion string
ResourceVersionMatch metav1.ResourceVersionMatch
Predicate SelectionPredicate // Predicate provides the selection rules for the list operation.
Recursive bool // true: 根据 key 获取单个对象;false:根据 key prefix 获取全量数据
ProgressNotify bool // storage-originated bookmark, ignored for non-watch requests.
}
// case 1:根据 metadata.name 获取单个 object,无需对全量数据做过滤
if name, ok := p.MatchesSingle(); ok { // 检查是否设置了 metadata.name 字段
if key := e.KeyFunc(ctx, name); err == nil {
e.Storage.GetList(ctx, key, storageOpts, list)
return list
}
// else 逻辑:如果执行到这里,说明没有从 context 中拿到过滤用的 key,则 fallback 到下面拿全量数据再过滤
}
// staging/src/k8s.io/apiserver/pkg/storage/interfaces.go
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
Create(ctx , key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx , key string, out runtime.Object, preconditions *Preconditions,...)
Watch(ctx , key string, opts ListOptions) (watch.Interface, error)
Get(ctx , key string, opts GetOptions, objPtr runtime.Object) error
// unmarshall objects found at key into a *List api object (an object that satisfies runtime.IsList definition).
// If 'opts.Recursive' is false, 'key' is used as an exact match; if is true, 'key' is used as a prefix.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
GetList(ctx , key string, opts ListOptions, listObj runtime.Object) error
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go#L622
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx , key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
// 情况一:ListOption 要求必须从 etcd 读
if shouldDelegateList(opts)
return c.storage.GetList(ctx, key, opts, listObj) // c.storage 指向 etcd
// If resourceVersion is specified, serve it from cache.
listRV := c.versioner.ParseResourceVersion(resourceVersion)
// 情况二:apiserver 缓存未建好,只能从 etcd 读
if listRV == 0 && !c.ready.check()
return c.storage.GetList(ctx, key, opts, listObj)
// 情况三:apiserver 缓存正常,从缓存读:保证返回的 objects 版本不低于 `listRV`
listPtr := meta.GetItemsPtr(listObj)
listVal := conversion.EnforcePtr(listPtr)
filter := filterWithAttrsFunction(key, pred) // 最终的过滤器
objs, readResourceVersion, indexUsed := c.listItems(listRV, key, pred, ...) // 根据 index 预筛,性能优化
for _, obj := range objs {
elem := obj.(*storeElement)
if filter(elem.Key, elem.Labels, elem.Fields) // 真正的过滤
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem))
}
// 更新最后一次读到的 ResourceVersion
if c.versioner != nil
c.versioner.UpdateList(listObj, readResourceVersion, "", nil)
return nil
}
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go#L591
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := DefaultFeatureGate.Enabled(features.APIListChunking) // 默认是启用的
hasContinuation := pagingEnabled && len(pred.Continue) > 0 // Continue 是个 token
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" // 只有在 resourceVersion != "0" 的情况下,hasLimit 才有可能为 true
// 1. 如果未指定 resourceVersion,从底层存储(etcd)拉去数据;
// 2. 如果有 continuation,也从底层存储拉数据;
// 3. 只有 resourceVersion != "0" 时,才会将 limit 传给底层存储(etcd),因为 watch cache 不支持 continuation
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
}
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L563
// GetList implements storage.Interface.
func (s *store) GetList(ctx , key string, opts storage.ListOptions, listObj runtime.Object) error {
listPtr := meta.GetItemsPtr(listObj)
v := conversion.EnforcePtr(listPtr)
key = path.Join(s.pathPrefix, key)
keyPrefix := key // append '/' if needed
newItemFunc := getNewItemFunc(listObj, v)
var fromRV *uint64
if len(resourceVersion) > 0 { // 如果 RV 非空(客户端不传时,默认是空字符串)
parsedRV := s.versioner.ParseResourceVersion(resourceVersion)
fromRV = &parsedRV
}
// ResourceVersion, ResourceVersionMatch 等处理逻辑
switch {
case recursive && s.pagingEnabled && len(pred.Continue) > 0: ...
case recursive && s.pagingEnabled && pred.Limit > 0 : ...
default : ...
}
// loop until we have filled the requested limit from etcd or there are no more results
for {
getResp = s.client.KV.Get(ctx, key, options...) // 从 etcd 拉数据
numFetched += len(getResp.Kvs)
hasMore = getResp.More
for i, kv := range getResp.Kvs {
if limitOption != nil && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key
data := s.transformer.TransformFromStorage(ctx, kv.Value, kv.Key)
appendListItem(v, data, kv.ModRevision, pred, s.codec, s.versioner, newItemFunc) // 这里面会做过滤
numEvald++
}
key = string(lastKey) + "\x00"
}
// instruct the client to begin querying from immediately after the last key we returned
if hasMore {
// we want to start immediately after the last key
next := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
}
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go#L622
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx , key string, opts storage.ListOptions, listObj runtime.Object) error {
// 情况一:ListOption 要求必须从 etcd 读
...
// 情况二:apiserver 缓存未建好,只能从 etcd 读
...
// 情况三:apiserver 缓存正常,从缓存读:保证返回的 objects 版本不低于 `listRV`
listPtr := meta.GetItemsPtr(listObj) // List elements with at least 'listRV' from cache.
listVal := conversion.EnforcePtr(listPtr)
filter := filterWithAttrsFunction(key, pred) // 最终的过滤器
objs, readResourceVersion, indexUsed := c.listItems(listRV, key, pred, ...) // 根据 index 预筛,性能优化
for _, obj := range objs {
elem := obj.(*storeElement)
if filter(elem.Key, elem.Labels, elem.Fields) // 真正的过滤
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem))
}
if c.versioner != nil
c.versioner.UpdateList(listObj, readResourceVersion, "", nil)
return nil
}
$ cat curl-k8s-apiserver.sh
curl -s --cert /etc/kubernetes/pki/admin.crt --key /etc/kubernetes/pki/admin.key --cacert /etc/kubernetes/pki/ca.crt $@
$ ./curl-k8s-apiserver.sh "https://localhost:6443/api/v1/pods?limit=2"
{
"kind": "PodList",
"metadata": {
"resourceVersion": "2127852936",
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJ...",
},
"items": [ {pod1 data }, {pod2 data}]
}
$ ./curl-k8s-apiserver.sh "https://localhost:6443/api/v1/pods?limit=2"
{
"kind": "PodList",
"metadata": {
"resourceVersion": "2127852936",
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJ...",
},
"items": [ {pod1 data }, {pod2 data}]
}
$ kubectl get pods --all-namespaces --v=10
# 以下都是 log 输出,做了适当调整
# curl -k -v -XGET -H "User-Agent: kubectl/v1.xx" -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json;as=Table;v=v1beta1;g=meta.k8s.io,application/json"
# 'http://localhost:8080/api/v1/pods?limit=500'
# GET http://localhost:8080/api/v1/pods?limit=500 200 OK in 202 milliseconds
# Response Body: {"kind":"Table","metadata":{"continue":"eyJ2Ijoib...","remainingItemCount":54},"columnDefinitions":[...],"rows":[...]}
#
# curl -k -v -XGET -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json;as=Table;v=v1beta1;g=meta.k8s.io,application/json" -H "User-Agent: kubectl/v1.xx"
# 'http://localhost:8080/api/v1/pods?continue=eyJ2Ijoib&limit=500'
# GET http://localhost:8080/api/v1/pods?continue=eyJ2Ijoib&limit=500 200 OK in 44 milliseconds
# Response Body: {"kind":"Table","metadata":{"resourceVersion":"2122644698"},"columnDefinitions":[],"rows":[...]}
$ ./curl-k8s-apiserver.sh "https://localhost:6443/api/v1/pods?limit=2&resourceVersion=0"
{
"kind": "PodList",
"metadata": {
"resourceVersion": "2127852936",
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJ...",
},
"items": [ {pod1 data }, {pod2 data}, ...]
}
$ ./curl-k8s-apiserver.sh "https://localhost:6443/api/v1/namespaces/default/pods?fieldSelector=spec.nodeName%3Dnode1" | jq '.items[].spec.nodeName'
"node1"
"node1"
"node1"
...
$ ./curl-k8s-apiserver.sh "https://localhost:6443/api/v1/namespaces/default/pods?fieldSelector=spec.nodeName%3Dnode1&resourceVersion=0" | jq '.items[].spec.nodeName'
"node1"
"node1"
"node1"
...
$ time ./curl-k8s-apiserver.sh <url> > result
1. api/v1/namespaces?resourceVersion=0
2. api/v1/pods?filedSelector=spec.nodeName%3Dnode1&resourceVersion=0
3. api/v1/nodes?fieldSelector=metadata.name%3Dnode1&resourceVersion=0
4. api/v1/services?labelSelector=%21service.kubernetes.io%2Fheadless%2C%21service.kubernetes.io%2Fservice-proxy-name
5. apis/discovery.k8s.io/v1beta1/endpointslices?resourceVersion=0
6. apis/networking.k8s.io/networkpolicies?resourceVersion=0
7. apis/cilium.io/v2/ciliumnodes?resourceVersion=0
8. apis/cilium.io/v2/ciliumnetworkpolicies?resourceVersion=0
9. apis/cilium.io/v2/ciliumclusterwidenetworkpolicies?resourceVersion=0
$ cat benchmark-list-overheads.sh
apiserver_url="https://localhost:6443"
# List k8s core resources (e.g. pods, services)
# API: GET/LIST /api/v1/<resources>?<fileld/label selector>&resourceVersion=0
function benchmark_list_core_resource() {
resource=$1
selectors=$2
echo "----------------------------------------------------"
echo "Benchmarking list $2"
listed_file="listed-$resource"
url="$apiserver_url/api/v1/$resource?resourceVersion=0"
# first perform a request without selectors, this is the size apiserver really handles
echo "curl $url"
time ./curl-k8s-apiserver.sh "$url" > $listed_file
# perform another request if selectors are provided, this is the size client receives
listed_file2="$listed_file-filtered"
if [ ! -z "$selectors" ]; then
url="$url&$selectors"
echo "curl $url"
time ./curl-k8s-apiserver.sh "$url" > $listed_file2
fi
ls -ahl $listed_file $listed_file2 2>/dev/null
echo "----------------------------------------------------"
echo ""
}
# List k8s apiextension resources (e.g. pods, services)
# API: GET/LIST /apis/<api group>/<resources>?<fileld/label selector>&resourceVersion=0
function benchmark_list_apiexternsion_resource() {
api_group=$1
resource=$2
selectors=$3
echo "----------------------------------------------------"
echo "Benchmarking list $api_group/$resource"
api_group_flatten_name=$(echo $api_group | sed 's/\//-/g')
listed_file="listed-$api_group_flatten_name-$resource"
url="$apiserver_url/apis/$api_group/$resource?resourceVersion=0"
if [ ! -z "$selectors" ]; then
url="$url&$selectors"
fi
echo "curl $url"
time ./curl-k8s-apiserver.sh "$url" > $listed_file
ls -ahl $listed_file
echo "----------------------------------------------------"
echo ""
}
benchmark_list_core_resource "namespaces" ""
benchmark_list_core_resource "pods" "filedSelector=spec.nodeName%3Dnode1"
benchmark_list_core_resource "nodes" "fieldSelector=metadata.name%3Dnode1"
benchmark_list_core_resource "services" "labelSelector=%21service.kubernetes.io%2Fheadless%2C%21service.kubernetes.io%2Fservice-proxy-name"
benchmark_list_apiexternsion_resource "discovery.k8s.io/v1beta1" "endpointslices" ""
benchmark_list_apiexternsion_resource "apiextensions.k8s.io/v1" "customresourcedefinitions" ""
benchmark_list_apiexternsion_resource "networking.k8s.io" "networkpolicies" ""
benchmark_list_apiexternsion_resource "cilium.io/v2" "ciliumnodes" ""
benchmark_list_apiexternsion_resource "cilium.io/v2" "ciliumendpoints" ""
benchmark_list_apiexternsion_resource "cilium.io/v2" "ciliumnetworkpolicies" ""
benchmark_list_apiexternsion_resource "cilium.io/v2" "ciliumclusterwidenetworkpolicies" ""
$ benchmark-list-overheads.sh
----------------------------------------------------
Benchmarking list
curl https://localhost:6443/api/v1/namespaces?resourceVersion=0
real 0m0.090s
user 0m0.038s
sys 0m0.044s
-rw-r--r-- 1 root root 69K listed-namespaces
----------------------------------------------------
Benchmarking list fieldSelector=spec.nodeName%3Dnode1
curl https://localhost:6443/api/v1/pods?resourceVersion=0
real 0m18.332s
user 0m1.355s
sys 0m1.822s
curl https://localhost:6443/api/v1/pods?resourceVersion=0&fieldSelector=spec.nodeName%3Dnode1
real 0m0.242s
user 0m0.044s
sys 0m0.188s
-rw-r--r-- 1 root root 2.0G listed-pods
-rw-r--r-- 1 root root 526K listed-pods-filtered
----------------------------------------------------
...
$ ls -ahl listed-*
-rw-r--r-- 1 root root 222 listed-apiextensions.k8s.io-v1-customeresourcedefinitions
-rw-r--r-- 1 root root 5.8M listed-apiextensions.k8s.io-v1-customresourcedefinitions
-rw-r--r-- 1 root root 2.0M listed-cilium.io-v2-ciliumclusterwidenetworkpolicies
-rw-r--r-- 1 root root 193M listed-cilium.io-v2-ciliumendpoints
-rw-r--r-- 1 root root 185 listed-cilium.io-v2-ciliumnetworkpolicies
-rw-r--r-- 1 root root 6.6M listed-cilium.io-v2-ciliumnodes
-rw-r--r-- 1 root root 42M listed-discovery.k8s.io-v1beta1-endpointslices
-rw-r--r-- 1 root root 69K listed-namespaces
-rw-r--r-- 1 root root 222 listed-networking.k8s.io-networkpolicies
-rw-r--r-- 1 root root 70M listed-nodes # 仅用于评估 apiserver 需要处理的数据量
-rw-r--r-- 1 root root 25K listed-nodes-filtered
-rw-r--r-- 1 root root 2.0G listed-pods # 仅用于评估 apiserver 需要处理的数据量
-rw-r--r-- 1 root root 526K listed-pods-filtered
-rw-r--r-- 1 root root 23M listed-services # 仅用于评估 apiserver 需要处理的数据量
-rw-r--r-- 1 root root 23M listed-services-filtered
{
"level":"warn",
"msg":"apply request took too long",
"took":"5357.87304ms",
"expected-duration":"100ms",
"prefix":"read-only range ",
"request":"key:\"/registry/pods/\" range_end:\"/registry/pods0\" ",
"response":"range_response_count:60077 size:602251227"
}