apiserver是k8s控制面的一个组件,在众多组件中唯一一个对接etcd,对外暴露http服务的形式为k8s中各种资源提供增删改查等服务。它是RESTful风格,每个资源的URI都会形如
/apis/{apiGroup}/{version}/namsspaces/{ns-name}/{resource-kind}/{resource-name}
或
/apis/{apiGroup}/{version}/{resource-kind}/{resource-name}
apiserver中包含3个server组件,apiserver依靠这3个组件来对不同类型的请求提供处理
本篇阅读源码版本1.19
apiserver同样使用了corbra命令行框架处理启动命令,它从命令行的RunE回调函数来到了Run函数,开始执行启动流程。Run函数做3件事
代码位于 /cmd/kube-apiserver/app/server.go
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error { //注册三个server的路由 server, err := CreateServerChain(completeOptions, stopCh) if err != nil { return err } //注册健康检查,就绪,存活探针的地址 prepared, err := server.PrepareRun() if err != nil { return err } //运行http server return prepared.Run(stopCh) }
CreateServerChain函数的调用如下
func CreateServerChain(...)(...){ kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } // If additional API servers are added, they should be gated. apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount, serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)) if err != nil { return nil, err } //创建APIExtensionsServer并注册路由 apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } //创建KubeAPIServer并注册路由 kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer) if err != nil { return nil, err } // aggregator comes last in the chain aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer) if err != nil { return nil, err } //创建aggregatorServer并注册路由 aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines return nil, err } }
创建每个server都要有对应它的config。apiExtensionServer和aggregatorServer的Config需要依赖kubeAPIServerConfig,而这几个ServerConfig都需要依赖GenericConfig,CreateKubeAPIServerConfig创建kubeAPIServerConfig,而CreateKubeAPIServerConfig调用buildGenericConfig创建GenericConfig。
func buildGenericConfig( s *options.ServerRunOptions, proxyTransport *http.Transport, )(...){ //创建一个genericConfig对象 genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) //设置genericConfig的字段,代码不展示 //创建认证实例 if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil { return } //创建鉴权实例 genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers) //准入控制器 err = s.Admission.ApplyTo( genericConfig, versionedInformers, kubeClientConfig, feature.DefaultFeatureGate, pluginInitializers...) }
APIExtensionServer的创建流程大致包含以下几个步骤
三种类型的Server底层都需要依赖GeneriAPIServer。第二步创建的CustomResourceDefinitions是本类型Server的对象,用于后续进行路由注册。APIGroupInfo是用于每个版本、每个资源类型对应的存储对象。最后调用InstallAPIGroup进行路由注册,把每一个资源的版本,类型映射到一个URI地址中。代码如下所示
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) { return apiextensionsConfig.Complete().New(delegateAPIServer) } //代码位于 /vendor/k8s.io/apiextensions-apiserver/apiserver.go func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) { //创建Generic genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget) //实例化 CustomResourceDefinitions s := &CustomResourceDefinitions{ GenericAPIServer: genericServer, } //实例化APIGroupInfo apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs) if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) { storage := map[string]rest.Storage{} // customresourcedefinitions customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter) if err != nil { return nil, err } storage["customresourcedefinitions"] = customResourceDefinitionStorage storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage) apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage } //另一个版本的类似,不作展示 //InstallAPIGroup注册 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } }
KubeAPIServer处理k8s内置资源请求,它的创建流程与APIExtensionServer类似,包含下面几个步骤
其中Instance是KubeAPIServer的Server对象。KubeAPIServer创建和Install的APIGroup需要调用两个方法,一个是installLegacyAPI,另一个是installAPI,原因在于k8s的apiGroup分了/api和/apis两种。初期的资源其实没有apiGroup这个概念,而后期引入了groupVersion为了兼容原有的才把旧的资源类型的URI地址都归属于/api这个路径下的,新的全部在/apis这个路径下,因此在创建注册APIGroup时都分了两类。代码如下
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } return kubeAPIServer, nil } //代码位于 /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) { //创建Generic s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) //1.14版本的是Master,当前版本是Instance m := &Instance{ GenericAPIServer: s, ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo, } //实例化核心API if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.ExtraConfig.StorageFactory, ProxyTransport: c.ExtraConfig.ProxyTransport, KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, EventTTL: c.ExtraConfig.EventTTL, ServiceIPRange: c.ExtraConfig.ServiceIPRange, SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange, ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, ExtendExpiration: c.ExtraConfig.ExtendExpiration, ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, APIAudiences: c.GenericConfig.Authentication.APIAudiences, } if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil { return nil, err } } restStorageProviders := []RESTStorageProvider{...} //InstallAPIs,内部包含InstallAPIGroup if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { return nil, err } }
进入m.InstallLegacyAPI,这个方法包含了实例化ApiGroupInfo和InstalAPIGroup两个操作,这部分资源是k8s的核心资源
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error { //实例化ApiGroupInfo legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { return fmt.Errorf("error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) //相当于调用InstallAPIGroup if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } return nil }
实例化APIGroupInfo的代码局部如下,代码篇幅较长,只摘取pod一部分的源码展示,代码位于/pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { apiGroupInfo := genericapiserver.APIGroupInfo{ PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""), VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, Scheme: legacyscheme.Scheme, ParameterCodec: legacyscheme.ParameterCodec, NegotiatedSerializer: legacyscheme.Codecs, } podStorage, err := podstore.NewStorage( restOptionsGetter, nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.LegacyBinding, ..... } }
m.GenericAPIServer.InstallLegacyAPIGroup的第一个参数是apiPrefix,值是/api;第二个参数是上面创建好的,包含资源存储方式的apiGroupInfo
与InstallAPIGroup类似地,InstallLegacyAPIGroup需要经过两层调用才会到达InstallREST,调用链如下
m.GenericAPIServer.InstallLegacyAPIGroup |--s.installAPIResources |--apiGroupVersion.InstallREST
InstallREST的入参是restful.Container,他是golang http框架go-restful里面的一个重要对象,在InstallREST里面构造出installer,installer包含资源的存储方法和资源对应api的前缀,利用installer.Install()来创建出go-restful的webservice,webservice加入到传入得container,即完成api的注册。
代码位于/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) error { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, prefix: prefix, minRequestTimeout: g.MinRequestTimeout, } apiResources, ws, registrationErrors := installer.Install() versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) return utilerrors.NewAggregate(registrationErrors) }
installer.Install()方法的边幅很长,它既然创建了webservice,api中各个URL的路由注册,handler的绑定也会在里面实现。由于这部分代码也涉及到apiserver如何响应处理一个http请求,本篇先不探讨
不过上面提及到go-restful框架的几个概念,在这里进行一个简单的科普
他们的层次结构如下
container |--webservice |--Route
用于处理聚合进来的api请求,实际是做七层转发,它的创建流程与APIExtensionServer的最为相似
实际创建AggregratorServer的代码位于/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) { //创建GeneriAPIServer genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget) //实例化Aggregrator s := &APIAggregator{...} //实例化APIGroupInfo apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter) //InstallAPIGroup if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } }
api的路由绑定完毕,最后就是要把http server跑起来,prepared.Run调用的是由preparedGenericAPIServer实现的Run方法,经过多层调用最终把server跑起来,调用链如下
prepared.Run /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go |--s.runnable.Run(stopCh) |==preparedGenericAPIServer.Run /vendor/k8s.io/apiserver/pkg/server/genericapiserver.go |--s.NonBlockingRun |--s.SecureServingInfo.Serve /vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go |--&http.Server{} |--RunServer
http server的对象是在SecureServingInfo.Serve创建的,即调用链的s.SecureServingInfo.Serve,最终让server开始监听是在RunServer处。它接收了http.Server作为参数。至此apiserver运行起来,接收来自各个组件或客户端的请求。
本篇讲述了k8s-apiserver的启动流程,介绍了apiserver包含了3个server组件,apiserver的服务实际上由这三个组件提供,讲述了他们创建流程,实例化底层的GenericServer,实例化各自的Server类,实例化ApiGroupInfo来建立资源与存储操作间的映射关系,最后InstallAPI。还专门挑了k8s核心资源类型的ApiGroup注册过程介绍。整个启动过程的调用链如下
Run /cmd/kube-apiserver/app/server.go |--CreateServerChain | |--CreateKubeAPIServerConfig | | |--buildGenericConfig | | |--genericapiserver.NewConfig | | |--s.Authentication.ApplyTo | | |--BuildAuthorizer | | |--s.Admission.ApplyTo | |--createAPIExtensionsConfig | |--createAPIExtensionsServer | | |--apiextensionsConfig.Complete().New /vendor/k8s.io/apiextensions-apiserver/apiserver.go | | |--c.GenericConfig.New | | |--&CustomResourceDefinitions{} | | |--genericapiserver.NewDefaultAPIGroupInfo | | |--s.GenericAPIServer.InstallAPIGroup | |--CreateKubeAPIServer | | |--kubeAPIServerConfig.Complete().New /pkg/controlplane/instance.go | | | |--c.GenericConfig.New | | | |--&Instance{} | | | |--m.InstallLegacyAPI /pkg/controlplane/instance.go | | | | |--legacyRESTStorageProvider.NewLegacyRESTStorage /pkg/registry/core/rest/storage_core.go | | | | |--m.GenericAPIServer.InstallLegacyAPIGroup ##相当于新版本的InstallAPIGroup | | | | | |--s.installAPIResources | | | | | | |--apiGroupVersion.InstallREST | | | |--m.InstallAPIs | |--createAggregatorConfig | |--createAggregatorServer | |--aggregatorConfig.Complete().NewWithDelegate /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go | |--c.GenericConfig.New | |--&APIAggregator{} | |--apiservicerest.NewRESTStorage | |--s.GenericAPIServer.InstallAPIGroup |--server.PrepareRun /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go | |--s.GenericAPIServer.PrepareRun /vendor/k8s.io/kube-aggregrator/pkg/server/genericapiserver.go | |--s.installHealthz() | |--s.installLivez() | |--s.installReadyz() |--prepared.Run /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go |--s.runnable.Run(stopCh) |==preparedGenericAPIServer.Run /vendor/k8s.io/apiserver/pkg/server/genericapiserver.go |--s.NonBlockingRun |--s.SecureServingInfo.Serve /vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go |--&http.Server{} |--RunServer
如有兴趣可阅读鄙人“k8s源码之旅”系列的其他文章
kubelet源码分析——kubelet简介与启动
kubelet源码分析——启动Pod
kubelet源码分析——关闭Pod
scheduler源码分析——调度流程
apiserver源码分析——启动流程