1. 概述
-
目前apiserver默认的限流方式太过简单
-
目前k8s缺少客户端业务请求隔离,一个错误的客户端发送大量请求可能造成其他客户端请求异常,也不支持突发流量。
2. 开启APF
APF测试 开启APF,需要在apiserver配置
--feature-gates=APIPriorityAndFairness=true --runtime-config=flowcontrol.apiserver.k8s.io/v1alpha1=true
开启后,获取默认的FlowSchemas
API 优先级和公平性(APF)是MaxInFlightLimit限流的一种替代方案,设计文档见提案。
API 优先级和公平性(1.15以上,alpha版本), 以更细粒度(byUser,byNamespace)对请求进行分类和隔离。 支持突发流量,通过使用公平排队技术从队列中分发请求从而避免饥饿。
APF限流通过两种资源,PriorityLevelConfigurations
定义隔离类型和可处理的并发预算量,还可以调整排队行为。 FlowSchemas
用于对每个入站请求进行分类,并与一个 PriorityLevelConfigurations
相匹配。
可对用户或用户组或全局进行某些资源某些请求的限制,如限制default namespace写services put/patch请求。
优点
- 考虑情况较全面,支持优先级,白名单等
- 可支持server/namespace/user/resource等细粒度级别的限流
缺点
- 配置复杂,不直观,需要对APF原理深入了解
- 功能较新,缺少生产环境验证
整体设计:
- APF 的实现依赖两个非常重要的资源 FlowSchema, PriorityLevelConfiguration
- APF 对请求进行更细粒度的分类,每一个请求分类对应一个 FlowSchema (FS)
- FS 内的请求又会根据 distinguisher 进一步划分为不同的 Flow
- FS 会设置一个优先级 (Priority Level, PL),不同优先级的并发资源是隔离的。所以不同优先级的资源不会相互排挤。特定优先级的请求可以被高优处理。
- 一个 PL 可以对应多个 FS,PL 中维护了一个 QueueSet,用于缓存不能及时处理的请求,请求不会因为超出 PL 的并发限制而被丢弃。
- FS 中的每个 Flow 通过 shuffle sharding 算法从 QueueSet 选取特定的 queues 缓存请求。
- 每次从 QueueSet 中取请求执行时,会先应用 fair queuing 算法从 QueueSet 中选中一个 queue,然后从这个 queue 中取出 oldest 请求执行。所以即使是同一个 PL 内的请求,也不会出现一个 Flow 内的请求一直占用资源的不公平现象。
注意: 属于 “长时间运行” 类型的某些请求(例如远程命令执行或日志拖尾)不受 API 优先级和公平性过滤器的约束。 如果未启用 APF 特性,即便设置
--max-requests-inflight
标志,该类请求也不受约束。 APF 适用于 watch 请求。当 APF 被禁用时,watch 请求不受--max-requests-inflight
限制。
3. FlowSchema
3.1 对请求进行分类
- 用户可以通过创建 FlowSchema 资源对象自定义分类方式。
FS 代表一个请求分类,包含多条匹配规则,如果某个请求能匹配其中任意一条规则就认为这个请求属于这个 FS (只匹配第一个匹配的 FS)。
// FlowSchemaSpec describes how the FlowSchema's specification looks like.
type FlowSchemaSpec struct {...Rules []PolicyRulesWithSubjects `json:"rules,omitempty" protobuf:"bytes,4,rep,name=rules"`
}
请求与 FS 规则匹配:同时满足以下条件,就认为请求与该 FS 规则匹配
- 匹配请求主体 subject
- 对资源的请求,匹配 ResourceRules 中任意一条规则
- 对非资源的请求, 匹配 NonResourceRules 中任意一条规则
type PolicyRulesWithSubjects struct {Subjects []SubjectResourceRules []ResourcePolicyRuleNonResourceRules []NonResourcePolicyRule
}type Subject struct {Kind SubjectKind `json:"kind" protobuf:"bytes,1,opt,name=kind"`User *UserSubject `json:"user,omitempty" protobuf:"bytes,2,opt,name=user"`Group *GroupSubject `json:"group,omitempty" protobuf:"bytes,3,opt,name=group"`ServiceAccount *ServiceAccountSubject `json:"serviceAccount,omitempty" protobuf:"bytes,4,opt,name=serviceAccount"`
}type ResourcePolicyRule struct {Verbs []string `json:"verbs" protobuf:"bytes,1,rep,name=verbs"`APIGroups []string `json:"apiGroups" protobuf:"bytes,2,rep,name=apiGroups"`Resources []string `json:"resources" protobuf:"bytes,3,rep,name=resources"`ClusterScope bool `json:"clusterScope,omitempty" protobuf:"varint,4,opt,name=clusterScope"`Namespaces []string `json:"namespaces" protobuf:"bytes,5,rep,name=namespaces"`
}type NonResourcePolicyRule struct {Verbs []string `json:"verbs" protobuf:"bytes,1,rep,name=verbs"`NonResourceURLs []string `json:"nonResourceURLs" protobuf:"bytes,6,rep,name=nonResourceURLs"`
}
通过 FS,可以根据请求的主体 (User, Group, ServiceAccout)、动作 (Get, List, Create, Delete …)、资源类型 (pod, deployment …)、namespace、url 对请求进行分类。
-
FS 内的请求进一步划分 Flow
有两种方式对请求进行 Flow 划分:- distinguisher = ByUser, 根据请求的 User 划分不同 Flow;可以让来自不同用户的请求平等使用 PL 内的资源。
- distinguisher = ByNamespace, 根据请求的 namespace 划分不同的 Flow;可以让来自不同 namespace 的请求平等使用 PL 内的资源。
- distinguisher = nil,表示不划分
type FlowSchemaSpec struct {...DistinguisherMethod *FlowDistinguisherMethod `json:"distinguisherMethod,omitempty" protobuf:"bytes,3,opt,name=distinguisherMethod"`... }type FlowDistinguisherMethod struct {Type FlowDistinguisherMethodType `json:"type" protobuf:"bytes,1,opt,name=type"` }type FlowDistinguisherMethodType string const (FlowDistinguisherMethodByUserType FlowDistinguisherMethodType = "ByUser"FlowDistinguisherMethodByNamespaceType FlowDistinguisherMethodType = "ByNamespace" )
-
如何给请求分配优先级
FS 通过 FlowSchema.Spec.PriorityLevelConfiguration.Name 指定 PL,从属于这个 FS 的所有请求都划分到这个优先级中。
3.2 Priority Level
如果 api-sever 启动了 APF,它的总并发数为 --max-requests-inflight
和 --max-mutating-requests-inflight
两个配置值之和。这些并发数被分配给各个 PL,分配方式是根据 PriorityLevelConfiguration.Spec.Limited.AssuredConcurrencyShares 的数值按比例分配。 PL 的 AssuredConcurrencyShare 越大,分配到的并发份额越大 。
每个 PL 都对应维护了一个 QueueSet,其中包含多个 queue ,当 PL 达到并发限制时,收到的请求会被缓存在 QueueSet 中,不会丢弃,除非 queue 也达到了容量限制。
当入站请求的数量大于分配的 PriorityLevelConfiguration 中允许的并发级别时, type
字段将确定对额外请求的处理方式。 Reject
类型,表示多余的流量将立即被 HTTP 429(请求过多)错误所拒绝。 Queue
类型,表示对超过阈值的请求进行排队,将使用阈值分片和公平排队技术来平衡请求流之间的进度。
QueueSet 中 queue 数量由PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.Queues 指定;每个 queue 的长度由 PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.QueueLengthLimit 指定。
例如,默认配置包括针对领导者选举请求、内置控制器请求和 Pod 请求都单独设置优先级。 这表示即使异常的 Pod 向 API 服务器发送大量请求,也无法阻止领导者选举或内置控制器的操作执行成功。
优先级的并发限制会被定期调整,允许利用率较低的优先级将并发度临时借给利用率很高的优先级。 这些限制基于一个优先级可以借出多少个并发度以及可以借用多少个并发度的额定限制和界限, 所有这些均源自下述配置对象。
请求占用的席位
- 6个推荐的 PL
PL | request |
---|---|
global-default | 所有其他的请求 |
leader-election | 优先级用于内置控制器的领导选举的请求 (特别是来自 kube-system 名字空间中 system:kube-controller-manager 和 system:kube-scheduler 用户和服务账号,针对 endpoints 、configmaps 或 leases 的请求) |
node-high | 优先级用于来自节点的健康状态更新。 |
system | 优先级用于 system:nodes 组(即 kubelet)的与健康状态更新相关的请求; kubelets 必须能连上 API 服务器,以便工作负载能够调度到其上 |
workload-high | 优先级用于内置控制器的其他请求 |
workload-low | 优先级用于来自所有其他服务帐户的请求,通常包括来自 Pod 中运行的控制器的所有请求。 |
$ kubectl get flowschemas.flowcontrol.apiserver.k8s.io
NAME PRIORITYLEVEL MATCHINGPRECEDENCE DISTINGUISHERMETHOD AGE MISSINGPL
system-leader-election leader-election 100 ByUser 152m False
workload-leader-election leader-election 200 ByUser 152m False
system-nodes system 500 ByUser 152m False
kube-controller-manager workload-high 800 ByNamespace 152m False
kube-scheduler workload-high 800 ByNamespace 152m False
kube-system-service-accounts workload-high 900 ByNamespace 152m False
health-for-strangers exempt 1000 <none> 151m False
service-accounts workload-low 9000 ByUser 152m False
global-default global-default 9900 ByUser 152m False
catch-all catch-all 10000 ByUser
- 两对内建的 PL 和 FS(强制配置)
Exempt PL (PriorityLevelConfigurations.Spec.Type = exempt):这个 PL 内的请求完全不受限制,且被立即执行。
Exempt FS:将来自 system:master group 的所有请求划分到 Exempt PL。用户可以自定义 FS 将一些特殊的请求划分到 Exempt PL 中。
catch-all PL:只有一个并发配额,没有 queue。一般会返回 HTTP 429 错误。
catch-all FS:说有未能匹配其他 FS 的请求,最终会被这个 FS 匹配上。保证所有的请求都有一个分类。
由于 request 只匹配第一个符合条件的 FS,所以 APF 会对 FS 进行排序,将 Exempt FS 排第一位,catch-all FS 排最后一位。
// sort into the order to be used for matching
sort.Sort(fsSeq)// Supply missing mandatory FlowSchemas, in correct position
if !haveExemptFS {// 放在第一位fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...)
}
if !haveCatchAllFS {// 放在最后一位fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
}
自定义PL和FS
- FlowShema配置
apiVersion: flowcontrol.apiserver.k8s.io/v1alpha1
kind: FlowSchema
metadata:name: health-for-strangers
spec:matchingPrecedence: 1000 #匹配优先级,1~1000,越小优先级越高priorityLevelConfiguration: #关联的PriorityLevelConfigurationsname: exempt #排除rules,即不限制当前flowshema的rulesrules: #请求规则- nonResourceRules: #非资源- nonResourceURLs:- "/healthz"- "/livez"- "/readyz"verbs:- "*"subjects: #对应的用户或用户组- kind: Groupgroup:name: system:unauthenticated
- PriorityLevelConfiguration配置
apiVersion: flowcontrol.apiserver.k8s.io/v1alpha1
kind: PriorityLevelConfiguration
metadata:name: leader-election
spec:limited: #限制策略assuredConcurrencyShares: 10 limitResponse: #如何处理被限制的请求queuing: #类型为Queue时,列队的设置handSize: 4 #队列queueLengthLimit: 50 #队列长度queues: 16 #队列数type: Queue #Queue或者Reject,Reject直接返回429,Queue将请求加入队列type: Limited #类型,Limited或Exempt, Exempt即不限制
4. 问题诊断
启用了 APF 的 API 服务器,它每个 HTTP 响应都有两个额外的 HTTP 头: X-Kubernetes-PF-FlowSchema-UID
和 X-Kubernetes-PF-PriorityLevel-UID
, 注意与请求匹配的 FlowSchema 和已分配的优先级。 如果请求用户没有查看这些对象的权限,则这些 HTTP 头中将不包含 API 对象的名称, 因此在调试时,你可以使用类似如下的命令:
kubectl get flowschemas -o custom-columns="uid:{metadata.uid},name:{metadata.name}"
kubectl get prioritylevelconfigurations -o custom-columns="uid:{metadata.uid},name:{metadata.name}"
5. 源码分析
5.1 请求流程
API Server 接收到请求后,先按照前面提到的方式,找到与之匹配的 FS,实现分类,并根据 FS 确定请求的所属的Flow 和 PL。
APF 利用 FS 的 name 和请求的 userName 或 namespace 计算一个 hashFlowID 标识 Flow。
var hashValue uint64
if numQueues > 1 {// 1. DistinguisherMethod = ByUser, flowDistinguisher = rd.User.Name// 2. DistinguisherMethod = ByNamespace, flowDistinguisher = rd.RequestInfo.Namespace// 3. DistinguisherMethod = nil, flowDistinguisher = ""flowDistinguisher := computeFlowDistinguisher(rd, fs.Spec.DistinguisherMethod)hashValue = hashFlowID(fs.Name, flowDistinguisher)
}
然后利用这个 hashFlowID 通过 Shuttle Sharding 算法,从请求所属的 PL 的 QueueSet 中选取指定数目的 queues (PriorityLevelConfiguration.Spec.Limited.LimitResponse.Queuing.HandSize),然后从这些候选的 queues 中,选择其中 length 最小 queue. 并移出 queue 中超时的请求。
func (d *Dealer) Deal(hashValue uint64, pick func(int)) {// 15 is the largest possible value of handSizevar remainders [15]intfor i := 0; i < d.handSize; i++ {hashValueNext := hashValue / uint64(d.deckSize-i)remainders[i] = int(hashValue - uint64(d.deckSize-i)*hashValueNext)hashValue = hashValueNext}// 防止重复:正反馈机制,大者更大for i := 0; i < d.handSize; i++ {card := remainders[i]for j := i; j > 0; j-- {if card >= remainders[j-1] {// 不会出现 card > deckSize// 因为 hashValue % uint64(d.deckSize-i) <= d.deckSize-i-1,而第 i 个 card 最多自增 i 次card++}}pick(card)}
}
判断是否入队这个请求:如果队列已满且 PL 中正在执行的请求数达到 PL 的并发限制,就会拒绝这个请求,否则入队这个请求。
此处保证了不同 Flow 的请求不会挤掉其他 Flow 的请求。Flow 是按照用户或 namespace 划分的,它的实际意义就是来自不同用户或 namespace 的请求不会挤掉同优先级的其他用户或 namespace 的请求。
5.2 分发请求
为了保证同一个 PL 中缓存的不同 Flow 的请求被处理机会平等,每次分发请求时,都会先应用 fair queuing 算法从 PL 的 QueueSet 中选中一个 queue:
// selectQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) selectQueueLocked() *queue {minVirtualFinish := math.Inf(1)var minQueue *queuevar minIndex intnq := len(qs.queues)for range qs.queues {qs.robinIndex = (qs.robinIndex + 1) % nqqueue := qs.queues[qs.robinIndex]if len(queue.requests) != 0 {currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)if currentVirtualFinish < minVirtualFinish {minVirtualFinish = currentVirtualFinishminQueue = queueminIndex = qs.robinIndex}}}// we set the round robin indexing to start at the chose queue// for the next round. This way the non-selected queues// win in the case that the virtual finish times are the sameqs.robinIndex = minIndexreturn minQueue
}
fiar queuing 选 queue 的基本思路是:
- 每一个 queue 都维护了一个 virtualStart: oldest 请求的虚拟开始执行时间
type queue struct {requests []*request// 如果队列中没有 request 且没有 request 在执行 (requestsExecuting = 0), virtualStart = queueSet.virtualTime// 每分发一个 request, virtualStart = virtualStart + queueSet.estimatedServiceTime// 每执行完一个 request, virtualStart = virtualStart - queueSet.estimatedServiceTime + actualServiceTime,用真实的执行时间,校准 virtualStart// 计算第 J 个 request 的 virtualFinishTime = virtualStart + (J+1) * serviceTimevirtualStart float64requestsExecuting intindex int
}
virtualStart 初始化是直接设置为 QueueSet 中维护的 virtualTime。而 QueueSet.virtualTime 是在这个 PL 初始化的时候赋值为 0。此后,如果 QueueSet 中的 queue 如有任何状态变化,都要执行更新,根据自身两次变更历经的 realTime 按比例增加:
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked()
}其中,这个比例计算方式为:min(QueueSet 中正字执行的请求数, PL 的并发配额) / QueueSet 中活跃的 queue 数目。
virtualTime 实际对应于 bit-by-bit round-robin 算法中的 R(t),当前时间 round-robin 轮数。具体可以参考文后第4个链接。
- 选 queue 时,会估计每个 queue 中 oldest 请求的虚拟执行完毕时间,选择这个虚拟执行完毕时间最小的 queue
选中 queue 之后,从 queue 中取出 oldest 请求,设置执行标记。重复执行以上选 queue 给 oldest 请求设置执行标志,直到 PL 所有的 Queue 中都没有缓存的请求或达到 PL 的并发限制。
注:此处是尽可能多的分发 PL 中缓存的请求,有可能当前新加入的请求不会被分发。
5.3 请求阻塞监听执行
完成以上操作之后,该请求会进入阻塞监听状态,直到被分发。
func (req *request) wait() (bool, bool) {qs := req.qsqs.lock.Lock()defer qs.lock.Unlock()...// 里面包含一个条件锁,阻塞,等待唤醒decisionAny := req.decision.GetLocked()...decision, isDecision := decisionAny.(requestDecision)if !isDecision {panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2))}switch decision {case decisionReject:klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out")return false, qs.isIdleLocked()case decisionCancel:// TODO(aaron-prindle) add metrics for this caseklog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)return false, qs.isIdleLocked()case decisionExecute:klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)return true, falsedefault:// This can not happen, all possible values are handled abovepanic(decision)}
}
5.4 请求执行
如果这个请求被唤醒,并收到了 decisionExecute 标记,便会开始执行。
func (req *request) Finish(execFn func()) bool {exec, idle := req.wait()if !exec {return idle}// 请求执行execFn()// 分发请求return req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
}
执行完毕后,便会释放一个并发资源,于是会触发新一轮的请求分发。