天气API推荐:精准获取气象数据的首选
深入解析APIServer机制:开启API代理网关新篇章!
Aggregated APIServer
为了读者能够在了解后面几个开源项目时有足够的知识背景,本节对Aggregated APIServer 做一下简单的原理介绍和源码分析,在分析后面典型开源项目的时候,你会发现大家还是站在 Aggregated APIServer 这个巨人的肩膀上做二次开发。为了方便,后续会用 AA 来表示 Aggregated APIServer 技术。
熟悉K8S开发的同学都知道,K8S除了支持通过CRD的方式扩展 APIResource,还支持本节要介绍的AA的方式扩展,和CRD这种通过一个配置文件(YAML)来描述扩展API不同,AA对增强API的描述是在代码中实现,注册API的模式是通过创建一个APIService的对象来完成。
下面是 K8S 中经典 AA 组件 metrics-server 的 APIService 描述,在 APIService 中指定了对特定GVK(metrics.k8s.io/v1beta1)的请求会被路由到 kube-system 下的 metrics-serve r这个 service。
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
k8s-app: metrics-server
name: v1beta1.metrics.k8s.io
spec:
# k8s会转发所有发送到 /apis/metrics.k8s.io/v1beta1/ 的请求
group: metrics.k8s.io
version: v1beta1
# 发送到 /apis/metrics.k8s.io/v1beta1/ 的请求会被转发给这个 service
service:
name: metrics-server
namespace: kube-system
而GVK(metrics.k8s.io/v1beta1)下的资源定义是在service背后的服务中定义,代码地址:https://github.com/kubernetes/metrics/blob/master/pkg/apis/metrics/types.go#L31,你甚至可以暂时理解为和k8s的内建类型(Pod、Deployment)的定义模式一致,没有CRD实体。
参阅 https://kubernetes.io/docs/tasks/debug/debug-cluster/resource-metrics-pipeline/ 官方文档的例子,发送到 /apis/metrics.k8s.io/v1beta1 的请求被转发给以AA方式注册到K8S的MetricsServer服务
$ kubectl get --raw "/apis/metrics.k8s.io/v1beta1/nodes/minikube" | jq '.'
{
"kind": "NodeMetrics",
"apiVersion": "metrics.k8s.io/v1beta1",
"metadata": {
"name": "minikube",
"selfLink": "/apis/metrics.k8s.io/v1beta1/nodes/minikube",
"creationTimestamp": "2022-01-27T18:48:43Z"
},
"timestamp": "2022-01-27T18:48:33Z",
"window": "30s",
"usage": {
"cpu": "487558164n",
"memory": "732212Ki"
}
}
其中对 metrics.k8s.io/v1beta1 下的node资源的CURD请求将会交由对象 nodeMetrics 处理,核心代码在 https://github.com/kubernetes-sigs/metrics-server/blob/master/pkg/api/node.go ,从下面的代码可以看到,nodeMetrics 只要实现 k8s.io/apiserver/pkg/registry/rest 包下的几个关键接口,就可以为资源对象 node 赋予GET(实现rest.Getter接口)、LIST(实现rest.Lister接口),类似的,实现了 rest.Creater 接口就可以为资源对象实现CREAT功能,具体细节在下篇文章解释,大家有个大概的概念就行。
var _ rest.KindProvider = &nodeMetrics{}
var _ rest.Storage = &nodeMetrics{}
var _ rest.Getter = &nodeMetrics{}
var _ rest.Lister = &nodeMetrics{}
var _ rest.Scoper = &nodeMetrics{}
var _ rest.TableConvertor = &nodeMetrics{}
var _ rest.SingularNameProvider = &nodeMetrics{}
既然K8S把请求转发到 APIService 指定的 service 中,那么这个service背后的服务需要以K8S的规范来提供API服务,这时候我们的 k8s.io/apiserver登场了,你可以基于 k8s.io/apiserver 包启动一个基于K8S规范的APIServer服务,对外提供的API对象只要实现 k8s.io/apiserver 中提供的接口,就能创建一个符合K8S规范的资源对象,本文不会告诉你如何使用 k8s.io/apiserver开发一个 APIServer,我会在下一篇文章(文章已经写完了,排版好之后就会发布到公众号上)中从源码层面告诉你如何开发一个符合K8S规范的APIServer
下面我们对照着k8s源码来学习k8s是如何实现把请求 代理 到APIService后的服务这个动作,请认真阅读这段代码,在后面的例子中会频繁出现类似的处理逻辑
代码地址:https://github.com/kubernetes/kube-aggregator/blob/5544326a401d11ff3c58aac83b0c60c30e129ae9/pkg/apiserver/handler_proxy.go#L100-L171
整体的流程大概是:
- 根据APIService的配置解析出用户自定义服务的地址
- 构造新的请求,把发送给Kube-APIServer请求的目标地址修改为AA的SVC地址
- 基于APIService中的服务信息,构造代理Handler
⚠️重点,第三步中 proxy.NewUpgradeAwareHandler 创建一个协议升级感知的Handler,因为k8s中的请求中有很多协议升级的场景(例如 pod exec 用到的spdy),NewUpgradeAwareHandler 方法会帮你处理好协议升级的场景,后面会经常看到NewUpgradeAwareHandler这个方法的使用
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(proxyHandlingInfo)
...
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
// 根据APIService的配置解析出用户自定义服务的地址
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
if err != nil {
klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}
// 构造新的请求,把发送给Kube-APIServer请求的目标地址修改为AA的SVC地址
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
// 对原始请求深拷贝构建新的转发请求
newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req)
defer cancelFn()
if handlingInfo.proxyRoundTripper == nil {
proxyError(w, req, "", http.StatusNotFound)
return
}
proxyRoundTripper := handlingInfo.proxyRoundTripper
upgrade := httpstream.IsUpgradeRequest(req)
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
// If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
// NOT use the proxyRoundTripper. It's a direct dial that bypasses the proxyRoundTripper. This means that we have to
// attach the "correct" user headers to the request ahead of time.
if upgrade {
transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
}
// !!重点,创建一个协议升级感知的Handler,因为k8s中的请求中有很多协议升级的场景(例如 pod exec 用到的spdy),
// NewUpgradeAwareHandler 方法会帮你处理好协议升级的场景,后面会经常看到NewUpgradeAwareHandler这个方法
// 的使用
// 基于APIService中的服务信息,构造代理Handler
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
if r.rejectForwardingRedirects {
handler.RejectForwardingRedirects = true
}
utilflowcontrol.RequestDelegated(req.Context())
// 这里请求就转发到了用户启动的服务上,
handler.ServeHTTP(w, newReq)
}
关于AA的原理介绍就结束了,读者可以基于AA的原理继续看下面的开源项目是如何做定制化开发
代理网关
Cluster-Gateway
一切的源头都要从 https://github.com/oam-dev/cluster-gateway 这个项目开始,该项目作为 KubeVela 多集群功能的核心模块,管控组件对不同集群的请求交由 cluster-gateway 组件代理,我们先通过这个组件来了解市面上的一些实现K8S-API代理的开源组件的实现原理
下面展示了Cluster-Gateway的设计图,我们把项目中关于 Cluster-Gateway 的关键设计提炼为4点:
- 组件是基于 apiserver-aggregation[1] 的方式提供扩展API
- 基于AA模式抽象出 ClusterGateway 资源用来代表不同集群
- 受到 “service/proxy”, “pod/proxy” 这类子资源的设计影响,也为 ClusterGateway 设计了proxy子资源实现请求代理的核心逻辑
- hub集群存储能够访问被纳管集群的访问凭证,你可以理解为kubeconfig,cluster-gateway使用这些kubeconfig完成请求的代理转发
首先我们通过介绍 KubeVela 中的多集群应用管理功能的设计和需求背景来了解 Cluster-Gateway 在多集群管理中发挥的作用。
下图展示了KubeVela的一个多集群应用是如何完成资源多集群的分发,其中多集群应用的配置是存放在Hub集群中,并被被Hub集群的 KubeVela 控制器管理,根据应用的配置中的多集群调度策略,选择不同的集群下下发资源配置。
讲到这里,请大家思考一个这样的问题:KubeVela是如何在控制器内完成把应用配置下发到不同的集群内?
聪明的你可能会想到在控制器内管理多个不同的集群的 client,需要向特定集群下发配置的时候切换到指定的集群client,这种方法会带来很多问题:
- 1. 首先维护多个集群的client的成本就不必多说,
- 2. 当有新集群纳管到多集群的时候,又如何自动化的注册新的client,
- 3. 假设被纳管集群因为网络环境的特殊性不能直接通过管控集群直接访问,如何处理
KubeVela给出了一个相对优雅的解决方案,在管控集群中安装以AA模式部署的Cluster-Gateway,所有发往纳管集群的请求都会被转为请求管控集群 clustergateway/proxy 子资源的请求,所有的请求链路都在管控集群内完成,KubeVela控制器只要持有管控集群的KubeConfig和Client即可,具体如何实现请求的转换,涉及到client-go的一个黑魔法,一会会详细介绍。
请读者注意下图中标红部分的请求路径的转换,
- 首先KubeVela控制器请求cluster0集群创建deploy,该请求是直接发往管控集群的APIServer,经过一些客户端的黑魔法,该请求会被封装为对 clustergateway/proxy 子资源的请求,其中对deploy资源进行操作的API路径(/apis/apps/v1/deployments)会被附加在 clustergateway/proxy 后,所以最终请求被转为 clustergateway/proxy/apis/apps/v1/deployments
- 因为clustergateway资源对象是通过AA的模式注册到管控集群,该请求便会被转到Cluster-Gateway,Cluster-Gateway提取出放在proxy资源后的正常请求,把封装后的请求转为对cluster0集群的Kube-APIServer的正常请求
大家再回去看看上面贴出的 Cluster-Gateway 的原理图,是不是就清晰很多,下面统一回答遗留的几个问题:
1.kubevela是如何在只持有管控集群的client的情况下完成请求路径的转换?
完成请求路径转换的核心代码如下,代码地址:https://github.com/kubevela/kubevela/blob/fdcdf659d89eb81e9b381794b2c72c5e7a6d1e85/pkg/cmd/factory.go#L97
func NewDefaultFactory(cfg *rest.Config) Factory {
copiedCfg := *cfg
copiedCfg.RateLimiter = DefaultRateLimiter
copiedCfg.Wrap(multicluster.NewTransportWrapper())
return &defaultFactory{cfg: &copiedCfg}
}
关键点是调用了rest.Config的Wrap方法来构造 rest.Config 对象,这里你可以把 multicluster.NewTransportWrapper()
理解为一个 Middleware,所有client的请求都会经过这个中间层处理发往 APIServer
// RoundTrip is the main function for the re-write API path logic
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
cluster := t.getClusterFor(req)
If !IsLocal(cluster) {
req = req.Clone(req.Context())
req.URL.Path = formatProxyURL(cluster, req.URL.Path)
}
return t.delegate.RoundTrip(req)
}
经常使用client-go的同学知道,在对资源进行CURD的时候,调用的方法都会传递 context,kubevela在这个ctx里注入了想要请求的集群信息,在 Middleware 处理 http.Request 的时候,可以通过 http.Request 的 Context方法获取到client-go传递的 ctx,并提取集群信息,用来构建转换路径,感兴趣的同学可以看下 t.getClusterFor(req)
里的实现。
2.cluster-gateway是如何完成请求的代理?
cluster-gateway 抽象出了 clustergateways/proxy 子资源实现了 Connecter 接口(关于这个接口的细节会在下一篇文章讨论,如果不理解也没关系),用来代理发往被纳管集群的请求,以pod/proxy为例,这个子资源也是实现了 Connecter 接口用来代理发往Pod内进程的请求,这里思路做一些转换你可以把被纳管的集群当成Pod进程,这里也是简单做了一层代理,只是后面的服务变成了k8s。
// Connecter is a storage object that responds to a connection request.
type Connecter interface {
// Connect returns an http.Handler that will handle the request/response for a given API invocation.
// The provided responder may be used for common API responses. The responder will write both status
// code and body, so the ServeHTTP method should exit after invoking the responder. The Handler will
// be used for a single API request and then discarded. The Responder is guaranteed to write to the
// same http.ResponseWriter passed to ServeHTTP.
Connect(ctx context.Context, id string, options runtime.Object, r Responder) (http.Handler, error)
// NewConnectOptions returns an empty options object that will be used to pass
// options to the Connect method. If nil, then a nil options object is passed to
// Connect. It may return a bool and a string. If true, the value of the request
// path below the object will be included as the named string in the serialization
// of the runtime object.
NewConnectOptions() (runtime.Object, bool, string)
// ConnectMethods returns the list of HTTP methods handled by Connect
ConnectMethods() []string
}
我们来看下 clustergateways/proxy 在 Connect 方法中是如何完成的代理,代码地址:https://github.com/oam-dev/cluster-gateway/blob/48259e095bb66d5a98b4203def038b359e408391/pkg/apis/cluster/v1alpha1/clustergateway_proxy.go#L157
return &proxyHandler{
parentName: id,
path: proxyOpts.Path,
impersonate: proxyOpts.Impersonate,
clusterGateway: clusterGateway,
responder: r,
finishFunc: func(code int) {
metrics.RecordProxiedRequestsByResource(proxyReqInfo.Resource, proxyReqInfo.Verb, code)
metrics.RecordProxiedRequestsByCluster(id, code)
metrics.RecordProxiedRequestsDuration(proxyReqInfo.Resource, proxyReqInfo.Verb, id, code, time.Since(ts))
},
}, nil
Connect方法最终返回了一个实现了http.Handler
接口的 proxyHandler 对象,在这个对象的ServerHTTP方法内完成请求的代理 ,下面列出了ServerHTTP方法的部分实现,请读者回顾一开始我们分析 AA 模式是如何实现请求的代理,和这里的代理功能实现原理基本一致,正如我之前提到的,后面你还会见到类似的代码。唯一的不同点是AA是通过APIService对象里的信息构造转发请求,cluster-gateway则是通过存在secret里的纳管集群的kubeconfig构造转发请求。
下面是通过restconfig构造请求的核心代码,代码地址:https://github.com/oam-dev/cluster-gateway/blob/48259e095bb66d5a98b4203def038b359e408391/pkg/apis/cluster/v1alpha1/clustergateway_proxy.go#L222
....
// 该方法返回一个http.RoundTripper对象,该对象会根据提供的
// rest.Config定义的身份验证信息和k8s-apiserver交互
rt, err := restclient.TransportFor(cfg)
if err != nil {
responsewriters.InternalError(writer, request, errors.Wrapf(err, "failed creating cluster proxy client %s", cluster.Name))
return
}
// 和前面AA的构造代理Handler所调用的NewUpgradeAwareHandler是同一方法
proxy := apiproxy.NewUpgradeAwareHandler(
&url.URL{
Scheme: urlAddr.Scheme,
Path: newReq.URL.Path,
Host: urlAddr.Host,
RawQuery: request.URL.RawQuery,
},
rt,
false,
false,
nil)
...
proxy.UpgradeTransport = apiproxy.NewUpgradeRequestRoundTripper(
upgrading,
RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
newReq := utilnet.CloneRequest(req)
return upgrader.RoundTrip(newReq)
}))
proxy.Transport = rt
proxy.FlushInterval = defaultFlushInterval
proxy.Responder = ErrorResponderFunc(func(w http.ResponseWriter, req *http.Request, err error) {
p.responder.Error(err)
})
proxy.ServeHTTP(writer, newReq)
3.如何实现权限管理?
在cluster-gateway这种多集群管理模式下权限管理是必须要实现的能力,cluster-gateway在转发请求的时候使用的RestConfig在管控集群是具有admin权限的,因此在转发请求的时候不能无脑转发,需要在转发时做到权限收敛。cluster-gateway权限控制的功能是基于k8s的角色扮演机制 https://kubernetes.io/docs/reference/access-authn-authz/authentication/#user-impersonation 详细的原理请读者自己深入研究。
// rest.Config 的 Impersonate用于填充你想扮演的角色信息,比如你是admin的用户
// 但是你可以扮演 其他只有读权限的用户,这样cluster-gateway转发的请求也只有只读权限
// Impersonate is the configuration that RESTClient will use for impersonation.
Impersonate ImpersonationConfig
大概的实现方式是,在通过restconfig构造代理server的时候,配置好 Impersonate 信息,其实最终转发请求的时候,是把角色扮演信息注入在请求的Header中,完成角色扮演信息的传递
Impersonate-User: songyang
Impersonate-Group: developers
当管控集群接收到转发的请求的时候,会用header中指定的角色去校验授权
4.当管控面和被纳管集群之间的网络不通的场景下是如何完成代理的?
cluster-gateway使用了ANP的能力通过隧道打通Hub集群和被纳管集群之间的网络,这部分实践不多,便不再多说,感兴趣的同学可以翻阅 https://kubernetes.io/docs/tasks/extend-kubernetes/setup-konnectivity/文档 和开源项目 https://github.com/kubernetes-sigs/apiserver-network-proxy
关于cluster-gateway的介绍到此为止,本文不会太多聚焦到代码上,感兴趣的读者建议下载代码阅读,总结下来cluster-gateway实现了很优雅的集群请求转发,巧妙的通过AA的方式为Hub集群支持多集群管理能力,是一种在k8s整个框架下的api增强。希望大家了解到原理后,能够举一反三扩宽k8s下增强能力开发的思路。
市面上有很多在API测做增强的开源项目,https://github.com/oam-dev/cluster-gateway 似乎是我见到最早开源的(仅表个人看法),而且开源前已经在蚂蚁内部生产环境使用了很久。
KubeGateway
KubeGateway[2] 是字节跳动在2年前开源出来的项目,当时一同开源的还有 KubeZoo[3],这2个项目都是在 K8S-API 层面实现了功能增强,后面的文章会对 https://github.com/kubewharf/kubezoo 进行深入的分析。
KubeGateway 定位是为字节内部的 kube-apiserver 的 HTTP2 流量专门设计并定制的七层负载均衡代理,可以根据七层流量特性完成定制化的请求转发,比如对pod的读请求转发到某个apiserver,把写请求转发到另一个apiserver完成读写分离,或者限制pod的list的请求速率,限制某个用户的请求。
在介绍原理之前,读者可以根据上文已经了解到的知识背景自己思考一下如何设计出这样一个服务,我们一起来思考下:
这只是一个代理,但是需要承接所有用户的请求,用户需要对这层代理无感知,你会说我直接使用常见的7层代理,但是注意这个代理是面向k8s-apiserver的代理,需要适配k8s的所有请求特征,并不是一个简单的7层代理转发就能完成的,比如
- 1. 用户信息是从kubeconfig的用户凭证里获取到,需要解析下token或者用户证书
- 2. k8s api里很多verb语句,比如 watch,list动作需要从七层的请求信息中分析出
所以聪明的你,可能会想到我们可以用之前AA的思路,基于k8s.io/apiserver 这个构建一个7层代理,利用这个包天然能够兼容k8s的api,通过使用这个包下的增强能力,可以轻松的实现提取用户信息,解析k8s请求的功能,用户对某个集群的请求可以使用我们前面提到的proxy的能力转发到对应的k8s-apiserver上。
没错,KubeGateway的整体设计和我们分析的思路基本差不多。这个项目的搭建也是基于k8s.io/apiserver 这个库,不过字节应该是对部分功能做了增强,自己fork了一版,具体增加的代码细节就不再深究。
下面的官方的设计图,本文稍微做了一些注解,client端请求的域名中会带有请求的集群信息,以clusterA.k8s.kubegateway.io为例,表示这次请求的面向后面上游的Cluster A集群,KubeGateway会通过解析域名来判断该请求是转发给ClusterA的kube-apiserver,在转发请求的过程中会经过基本的鉴权和授权认证,其实就是把用户信息封装向被代理集群发起 Authentication Request (TokenReview) 和 Authorization Request (SubjectReview) 来验证用户身份和授权,完成鉴权和授权之后就到了我们老朋友出马的时候,把请求代理到目标集群。
关于如何基于k8s.io/apiserver这个库搭建一个符合k8s规范的apiserver我们会放在下一篇文章分析,这里大家只要知道基于这个库就能实现非常神奇的功能。
下面我们简单的讲解KubeGateway的核心代码,第一部分是KubeGateway是如何一步步完成请求的解析,鉴权授权和代理转发
代码地址:https://github.com/kubewharf/kubegateway/blob/43b7830e9973f45971655a92564c88e0c9b4caed/cmd/kube-gateway/app/proxy.go#L139
buildProxyHandlerChainFunc 会把每一步骤像洋葱一样一层一层封装起来,用户请求会从最下面的handler一层一层的被解析,有几个关键的handler:
- WithRequestInfo 会把用户请求解析为k8s标准的请求,并存储在ctx中,包括请求的资源(pods),请求动作(list、watch),所在命名空间(ns)等
- WithAuthentication和WithFailedAuthenticationAudit完成用户身份认证和授权
- 最后WithDispatcher完成请求的转发到不同的集群apiserver中
func buildProxyHandlerChainFunc(clusterManager clusters.Manager, enableAccessLog bool) func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
return func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
// new gateway handler chain
handler := gatewayfilters.WithDispatcher(apiHandler, proxydispatcher.NewDispatcher(clusterManager, enableAccessLog))
// without impersonation log
handler = gatewayfilters.WithNoLoggingImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
// new gateway handler chain, add impersonator userInfo
handler = gatewayfilters.WithImpersonator(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// disabel timeout, let upstream cluster handle it
// handler = gatewayfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
// new gateway handler chain
handler = gatewayfilters.WithPreProcessingMetrics(handler)
handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{})
handler = gatewayfilters.WithTerminationMetrics(handler)
handler = gatewayfilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithCacheControl(handler)
handler = gatewayfilters.WithNoLoggingPanicRecovery(handler)
return handler
}
}
第二部分我们看下我们的老朋友是如何完成请求的代理:
代码地址:https://github.com/kubewharf/kubegateway/blob/43b7830e9973f45971655a92564c88e0c9b4caed/pkg/gateway/proxy/dispatcher/dispatcher.go#L53
- 1. 从 endpointPicker 选择需要代理的kube-apiserver endpoint
- 2. 基于选出的endpoint构造新的代理请求
- 3. 基于被代理的集群的访问凭证,构造代理Handler,再细看下 NewUpgradeAwareHandler 函数的实现,果然见到了亲切的 proxy.NewUpgradeAwareHandler 函数
// 1. 从 endpointPicker 选择需要代理的kube-apiserver endpoint
endpoint, err := endpointPicker.Pop()
if err != nil {
d.responseError(errors.NewServiceUnavailable(err.Error()), w, req, statusReasonNoReadyEndpoints)
return
}
ep, err := url.Parse(endpoint.Endpoint)
if err != nil {
d.responseError(errors.NewInternalError(err), w, req, statusReasonInvalidEndpoint)
return
}
...
// 2. 基于选出的endpoint构造新的代理请求
location := &url.URL{}
location.Scheme = ep.Scheme
location.Host = ep.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
...
rw := responsewriter.WrapForHTTP1Or2(delegate)
// 3. 基于被代理的访问凭证构造 proxy handler
proxyHandler := NewUpgradeAwareHandler(location, endpoint.ProxyTransport, endpoint.PorxyUpgradeTransport, false, false, d, endpoint)
proxyHandler.ServeHTTP(rw, newReq)
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, upgradeTransport proxy.UpgradeRequestRoundTripper, wrapTransport, upgradeRequired bool, responder proxy.ErrorResponder, endpoint *clusters.EndpointInfo) *UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, responder)
handler.UpgradeTransport = upgradeTransport
return &UpgradeAwareHandler{
UpgradeAwareHandler: handler,
endpoint: endpoint,
}
}
这里简单的介绍了 KubeGateway 的实现原理,不过这个组件的能力不仅如此,他很多灵活的基于7层请求的转发限流等功能实现也很有意思,这些都依赖读者自己研究了。
Karmada-Aggregated-APIServer
这里简单介绍一下karmada多集群方案中的 karmada-aggregated-apiserver 组件,他是karmada push多集群管理模式下,管控面集群管理被纳管集群的代理服务,他也是抽象了cluster和cluster/proxy的概念,和ocm的cluster-gateway采用同样的思路来代理发往不同集群的请求,实现方式和我们上面讲解的思路也大同小异,不过他加入了一些增强功能,可以支持http和socks5协议的代理,感兴趣的同学也可以下载研究一下,代码入口在 https://github.com/karmada-io/karmada/blob/master/cmd/aggregated-apiserver/main.go。
总结
通过对比几个开源项目的实现,大家会发现这些项目的应用场景和多集群密切相关,cluster-gateway和karamda-aggregate-apiserver作为多集群代理,kubegateway则是支持为多个集群做负载均衡,等介绍完后面2类开源项目 自定义k8s-apiserver 和 多租k8s 后,大家会有更深刻的体会。系列文章的最后,会给大家介绍一种我司已经上线了的基于k8s.io/apiserver构建的服务,也非常有意思,另外本文关于一些实现细节没有深究,这些都会放在下一篇文章内讲解
参考文档
https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/apiserver-aggregation/
https://www.zeng.dev/post/2023-k8s-apiserver-aggregation-internals/
引用链接
[1]
apiserver-aggregation: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/apiserver-aggregation/[2]
KubeGateway: https://github.com/kubewharf/kubegateway/blob/main/README.zh_CN.md[3]
KubeZoo: https://github.com/kubewharf/kubezoo
文章转自微信公众号@云原生AI百宝箱