使用 BlobClient 类将文件上传到 A
911 2023-04-03 04:07:32
自2021年6月21号由玩云原生的运维转玩云原生的开发至今,已有5月有余,除去中间的一些其他工作任务,实际参与(实际是一个人负责开发)多厂商容器平台开发应有3月有余。个人开发并规划的多厂商容器平台是根据此张由我个人设计的规划图进行的(ps:部门没有架构师级别的,能提供可行的架构图或者一点指导,所以只能根据4年的kubernetes使用经验和逛各大网站和大厂的相关文档)。
也算是我心目中的多厂商容器平台的开发思路吧,下面将重点围绕上图展开。
注:此篇为我心目中的"多厂商容器管理平台"。
主要
采取模块分层开发:以适配层为分界线
1、上层为适配层,根据不同的厂商进行API封装,统一返回字段,需要根据厂商的字段进行不同的方法开发。
2、下层为核心层,直接调kubernetes接口,统一返回字段。
各容器厂商会根据实际需求,对容器平台进行适当的整改,如:
集群的创建方案[1、在创建集群的同时按照参数创建云厂商节点资源 2、导入已有的节点资源创建集群 3、等等等]
节点特性[1、根据地域就近调度 2、控制节点是否可见 3、等等等]
kubernetes附件组建[1、网络插件的按需选择或二开 2、节点的pod最大数 3、等等等]
导入第三方kubernetes集群的方案[1、导入config 2、导入secret 3、导入agent 4、等等等]
......
根据以上的考虑:
1、集群和节点的增删改采取适配器的方案开发
2、集群和节点的查询走kubernetes,其一查询是个频繁的操作而厂家平台是有APi调用次数限制,其二集群和节点的状态应以kubernetes原生为依据而不应该是以厂家的容器平台为依据,其三减少代码的无用重复便于维护
3、每个厂家的仓库(helm仓库和image仓库)都有各自的特性且不属于kubernetes核心资源且无法像kubernetes那样获取到最底层的API文档,所以增删改查都走厂家似乎没有不妥之处。
4、数据库需要维护的有:集群信息、节点信息、仓库的有关权控信息。尽可能的减少此服务的维护复杂度,能交给厂家和kubernetes的etcd维护的最好。
4.1 如没有"集群没创建成功即可查看集群的个别信息和集群没创建成功既可查看节点的有关信息"的需求,那其实集群和节点也没有数据库维护的必要性,我认为。5、kubernetes的核心资源,即可以被
kubectl api-resources
查到的资源,通过kubernetes的api即可。6、因为服务在可预见的时期是部署与虚机上,所以暂没有准备进行kubernetes 聚合api开发,但api最好为声明式的,留下可行性。
7、requests和response采取常规的json格式,没有选择采用yaml的json格式,主要是为了减轻前端的不便,且当下大厂也是各有格式,最主要的是目前还没有进行api聚合到kubernetes。
8、认为开发的重点应该在中期规划部分,这才所有创新,有所不同于众多厂家。
目录结构:
.├── Makefile├── README.md├── cmd│ └── container│ └── container.go├── config│ └── container.yaml├── deploy│ ├── docker│ │ └── Dockerfile│ └── vmware├── docs├── go.mod├── go.sum├── internal│ └── container│ ├── bootstrap│ ├── controller│ ├── dao│ ├── dto│ ├── ecode│ ├── initialize│ ├── job│ ├── middleware│ ├── models│ ├── pkg│ │ ├── kubernetes│ │ │ ├── dto│ │ │ └── service│ │ ├── registry│ │ │ ├── registry.go│ │ │ └── tke│ │ │ └── acr│ │ │ └── harbor│ │ └── rancher│ │ ├── dto│ │ └── service│ │ └── ack│ │ ├── dto│ │ └── service│ │ └── tke│ │ ├── dto│ │ └── service│ ├── routers│ └── service├── pkg├── scripts│ ├── db.sh│ └── deploy.sh└── test
封装httpClient
type HTTPClient interface {Do(req *http.Request) (*http.Response, error)}var (Client HTTPClient)func init() {Client = &http.Client{Timeout: 10 * time.Second,Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},DisableKeepAlives: true,Proxy: http.ProxyFromEnvironment,DialContext: (&net.Dialer{Timeout: 30 * time.Second, // tcp连接超时时间KeepAlive: 60 * time.Second, // 保持长连接的时间DualStack: true,}).DialContext, // 设置连接的参数MaxIdleConns: 50, // 最大空闲连接MaxConnsPerHost: 100,MaxIdleConnsPerHost: 100, // 每个host保持的空闲连接数ExpectContinueTimeout: 30 * time.Second, // 等待服务第一响应的超时时间IdleConnTimeout: 60 * time.Second, // 空闲连接的超时时间},}}// CheckRespStatus 状态检查func CheckRespStatus(resp *http.Response) ([]byte, error) {bodyBytes, _ := ioutil.ReadAll(resp.Body)if resp.StatusCode >= 200 && resp.StatusCode < 400 {return bodyBytes, nil}return nil, errors.New(string(bodyBytes))}// Request 建立http请求func Request(url, token, body string, headerSet map[string]string, method string) (respStatusCode int, respBytes []byte, err error) {request, err := http.NewRequest(method, url, strings.NewReader(body))if err != nil {return 401, nil, err}//添加tokenif token != "" {request.Header.Set("Authorization", "Bearer "+token)}// header 添加字段if headerSet != nil {for k, v := range headerSet {request.Header.Set(k, v)}}resp, err := Client.Do(request)if err != nil {return 401, nil, err}defer resp.Body.Close()// 返回的状态码respBytes, err = CheckRespStatus(resp)respStatusCode = resp.StatusCodereturn}
封装clusterManager
type ClusterManager struct {ClientSet *kubernetes.ClientsetMetrics *metrics.ClientsetDynamicClient dynamic.Interface}const (//DefaultQPS High enough QPS to fit all expected use cases.DefaultQPS = 1e6//DefaultBurst High enough Burst to fit all expected use cases.DefaultBurst = 1e6)func buildConfig(clusterName string) (*rest.Config, error) {var clientConfig *rest.Configvar configV1 *clientcmdapiv1.Configvar dbCluster models.Clustervar err errorvar host stringrows := bootstrap.DB.Where("cluster_name = ?", clusterName).Find(&dbCluster).RowsAffectedif rows == 0 {return nil, errors.New("the database does not have this information")}if dbCluster.KubeConfigSecret != "" {kubeConfigBytes, err := base64.StdEncoding.DecodeString(dbCluster.KubeConfigSecret)kubeConfigJson, err := yaml.YAMLToJSON(kubeConfigBytes)err = json.Unmarshal(kubeConfigJson, &configV1)if err != nil {logrus.Error(err.Error())}// 切换匹配的版本configObject, err := clientcmdlatest.Scheme.ConvertToVersion(configV1, clientcmdapi.SchemeGroupVersion)configInternal := configObject.(*clientcmdapi.Config)// 实例化配置信息clientConfig, err = clientcmd.NewDefaultClientConfig(*configInternal, &clientcmd.ConfigOverrides{}).ClientConfig()clientConfig.QPS = DefaultQPSclientConfig.Burst = DefaultBurst} else if dbCluster.Token != "" {var addresses []dto.Addresseserr := json.Unmarshal([]byte(dbCluster.APIServer), &addresses)for _, address := range addresses {if address.Type == "Real" {host = fmt.Sprintf("https://") + address.Host + fmt.Sprintf(":") + strconv.Itoa(address.Port)break}}if err != nil {return nil, errors.New("request connection cluster failed")}clientConfig = &rest.Config{Host: host,APIPath: "",ContentConfig: rest.ContentConfig{},Username: "",Password: "",BearerToken: dbCluster.Token,BearerTokenFile: "",Impersonate: rest.ImpersonationConfig{},AuthProvider: nil,AuthConfigPersister: nil,ExecProvider: nil,TLSClientConfig: rest.TLSClientConfig{Insecure: true,},UserAgent: "",DisableCompression: false,Transport: nil,WrapTransport: nil,QPS: DefaultQPS,Burst: DefaultBurst,RateLimiter: nil,WarningHandler: nil,Timeout: 0,Dial: nil,Proxy: nil,}} else {return nil, errors.New("build client config error")}return clientConfig, nil}func BuildApiServerClient(clusterName string) (*ClusterManager, error) {clientConfig, err := buildConfig(clusterName)if err != nil {return nil, err}if clientConfig == nil {return nil, errors.New("err: error BuildApiServerClient")}clientSet, err := kubernetes.NewForConfig(clientConfig)if err != nil {return nil, err}// 这里一定要调用Discovery().ServerVersion(),探测Kube apiServer是否可用,因为kubernetes.NewForConfig(restConfig)不会去检查服务是否可用,当服务不可用时,该方法不会返回错误的_, err = clientSet.Discovery().ServerVersion()if err != nil {return nil, err}m, err := metrics.NewForConfig(clientConfig)if err != nil {return nil, err}d, err := dynamic.NewForConfig(clientConfig)if err != nil {return nil, err}clusterManager := &ClusterManager{clientSet,m,d,}return clusterManager, nil}
封装reponseApi
type ApiResponse struct {Code int `json:"code"`Msg string `json:"message"`Data interface{} `json:"data"`}// PaginateResponse 显然这个结构体可以复用 ApiResponse, 但是 swagger 不认识!type PaginateResponse struct {Code int `json:"code"`Msg string `json:"message"`Data Paginate `json:"data"`}type Paginate struct {CurPage int `json:"cur_page"` // 当前页CurPageSize int `json:"cur_page_size"` // 每页展示数据量Total int `json:"total"` // 总共数据量TotalPage int `json:"total_page"` // 总共页数Data interface{} `json:"data"` // 数据}// SuccessResponse API成功返回func SuccessResponse(c *gin.Context, data interface{}) {response(c, ecode.Success, data)}// SuccessPaginateResponse 分页返回func SuccessPaginateResponse(c *gin.Context, data interface{}, total int, curPage int, curPageSize int) {c.JSON(http.StatusOK, PaginateResponse{Code: int(ecode.Success),Msg: ecode.ErrMsg[ecode.Success],Data: Paginate{CurPage: curPage, CurPageSize: curPageSize, Total: total, TotalPage: int(math.Ceil(float64(total) / float64(curPageSize))), Data: data},})c.Abort()}// ErrorResponse API失败返回func ErrorResponse(c *gin.Context, code ecode.ErrCode, data interface{}) {response(c, code, data)}func NotFoundResponse(c *gin.Context) {c.JSON(http.StatusNotFound, gin.H{"code": 404,"message": "页面未找到","data": "",})}func response(c *gin.Context, code ecode.ErrCode, data interface{}) {c.JSON(http.StatusOK, ApiResponse{Code: int(code),Msg: ecode.ErrMsg[code],Data: data,})c.Abort()}
几个调kubernetes的例子【认为比较有趣的例子】
# 倒序输出事件events, err := w.workloadKubernetes.FindEvents(clientSet, namespace, name)if err != nil {return nil, err}var eventsWorkloadReps []*dto.EventsWorkloadRept := time.Time{}for i := len(events) - 1; i >= 0; i-- {if events[i].FirstTimestamp.Time == t {events[i].FirstTimestamp.Time = events[i].EventTime.Time}if events[i].LastTimestamp.Time == t {events[i].LastTimestamp.Time = events[i].EventTime.Time}eventsWorkloadReps = append(eventsWorkloadReps, &dto.EventsWorkloadRep{WorkloadUUID: id,FirstTimestamp: events[i].FirstTimestamp.Time,LastTimestamp: events[i].LastTimestamp.Time,Type: events[i].Type,Kind: events[i].InvolvedObject.Kind,Name: events[i].Name,Reason: events[i].Reason,Message: events[i].Message,Count: events[i].Count,})}if len(eventsWorkloadReps) == 0 {return make([]*dto.EventsWorkloadRep, 0), nil}return eventsWorkloadReps, nil# pod log limit, _ := strconv.ParseInt(tailLines, 10, 64) req := clientSet.CoreV1().Pods(namespace).GetLogs(name, &coreV1.PodLogOptions{Container: container, Timestamps: true, TailLines: &limit}) podLogs, err := req.Stream(context.TODO()) if err != nil { return "error in opening stream" } defer podLogs.Close() buf := new(bytes.Buffer) _, err = io.Copy(buf, podLogs) if err != nil { return "error in copy information from podLogs to buf" } str := buf.String() return str# 根据kubeconfig 获取ApiServer\CertFile\Token decoded, err := base64.StdEncoding.DecodeString(kubeConfig) decodestr := string(decoded) // 认证方式为kubeConfig // 通过kubeConfig获取 api / token / certFile kubeConfigJson, err := syaml.YAMLToJSON([]byte(decodestr)) var configV1 *clientcmdapiv1.Config err = json.Unmarshal(kubeConfigJson, &configV1) if err != nil { return nil, nil, err } c, err := clientcmd.RESTConfigFromKubeConfig(decoded) if err != nil { return nil, nil, err } clientSet, err := kubernetes.NewForConfig(c) if err != nil { return nil, nil, err } sa, err := clientSet.CoreV1().ServiceAccounts("kube-system").Get(context.TODO(), "admin-user", metaV1.GetOptions{}) secrets, err := clientSet.CoreV1().Secrets("kube-system").Get(context.TODO(), sa.Secrets[0].Name, metaV1.GetOptions{}) if err != nil {return nil, err } importClusterReq.ApiServer = configV1.Clusters[0].Cluster.Server encoded := base64.StdEncoding.EncodeToString(configV1.Clusters[0].Cluster.CertificateAuthorityData) importClusterReq.CertFile = encoded importClusterReq.Token = string(secrets.Data["token"])# 实现apply yaml 【......写成了x了】func (y *Yaml) ApplyYaml(dynamicClient dynamic.Interface, clientSet *kubernetes.Clientset, yamlBody []byte) (interface{}, error) { data, err := yamlutil.ToJSON(yamlBody) var applyYaml dto.ApplyYaml if err = json.Unmarshal(data, &applyYaml); err != nil { return nil, err } var applyYamlRep string decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewReader(yamlBody), len(yamlBody)) var rawObj runtime.RawExtension if err := decoder.Decode(&rawObj); err != nil { return nil, err } obj, gvk, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, nil) unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, err }unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}// 获取支持的资源类型列表gr, err := restmapper.GetAPIGroupResources(clientSet.Discovery())if err != nil {return nil, err}// 创建 'Discovery REST Mapper',获取查询的资源的类型mapper := restmapper.NewDiscoveryRESTMapper(gr)// 查找 Group/Version/Kind 的 REST 映射mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)if err != nil {return nil, err}var dri dynamic.ResourceInterface// 需要为 namespace 范围内的资源提供不同的接口if mapping.Scope.Name() == meta.RESTScopeNameNamespace {if unstructuredObj.GetNamespace() == "" {unstructuredObj.SetNamespace("default")}dri = dynamicClient.Resource(mapping.Resource).Namespace(unstructuredObj.GetNamespace())} else {dri = dynamicClient.Resource(mapping.Resource)}if applyYaml.Metadata.Namespace == "" {applyYaml.Metadata.Namespace = "default"}// 查询k8s是否有该资源类型switch applyYaml.Kind {case "Deployment":_, err = clientSet.AppsV1().Deployments(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "StatefulSet":_, err = clientSet.AppsV1().StatefulSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "DaemonSet":_, err = clientSet.AppsV1().DaemonSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "ReplicaSet":_, err = clientSet.AppsV1().ReplicaSets(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "CronJob":_, err = clientSet.BatchV1().CronJobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "Job":_, err = clientSet.BatchV1().Jobs(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "Service":_, err = clientSet.CoreV1().Services(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "ConfigMap":_, err = clientSet.CoreV1().ConfigMaps(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "Ingress":_, err = clientSet.ExtensionsV1beta1().Ingresses(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "ServiceAccount":_, err = clientSet.CoreV1().ServiceAccounts(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "ClusterRole":_, err = clientSet.RbacV1().ClusterRoles().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "RoleBinding":_, err = clientSet.RbacV1().RoleBindings(applyYaml.Metadata.Namespace).Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "ClusterRoleBinding":_, err = clientSet.RbacV1().ClusterRoleBindings().Get(context.TODO(), applyYaml.Metadata.Name, metaV1.GetOptions{})case "APIService":_, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})if err != nil {return nil, err}}if err != nil {if !errors.IsNotFound(err) {return nil, err}// 不存在则创建obj2, err := dri.Create(context.Background(), unstructuredObj, metaV1.CreateOptions{})fmt.Println("obj2", obj2)if err != nil {return nil, err}applyYamlRep = fmt.Sprintf("%s/%s/%s created", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())} else { // 已存在则更新obj2, err := dri.Update(context.Background(), unstructuredObj, metaV1.UpdateOptions{})if err != nil {return nil, err}applyYamlRep = fmt.Sprintf("%s/%s/%s update", obj2.GetNamespace(), obj2.GetKind(), obj2.GetName())}return applyYamlRep, nil}
以registry的命名空间新建为例 来个适配器的demo 【伪代码】
# controller层func CreateContainerRegistryNamespace(c *gin.Context) { var param dto.CreateContainerRegistryNamespaceReq if err := c.ShouldBindJSON(¶m); err != nil { controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error()) return } cred, err := service.NewCredentialService().GetPlainTextCredential(param.CredentialID) if err != nil { ErrorResponse(c, ecode.ParameterErr, err.Error()) return } param.Token = cred.Token if data, err := service.NewRegistryService(param.Source,param.Token).CreateContainerRegistryNamespace(param); err != nil { controller.ErrorResponse(c, ecode.PARAMETER_ERR, err.Error()) } else { controller.SuccessResponse(c, data) }}# service层type registryService struct { cli registry.Registry registryDao dao.RegistryDao}func NewRegistryService(source,token string) *registryService { return ®istryService{ cli: registry.NewRegistryCli(source,token), }}func (r *registryService) CreateContainerRegistryNamespace(param dto.CreateContainerRegistryNamespaceReq) (interface{}, error) { rep, err := r.cli.CreateContainerRegistryNamespace(param.DisplayName, param.Describe, param.Visibility) return rep, err}# interface层type Registry interface { CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error)}func NewRegistryCli(source,token string) Registry { var cli Registry switch source { case "tke": cli = tke.NewTkeClient(token) } return cli}# 方法层type tkeClient struct { token string }func NewTkeClient(token) *tkeClient { return &tkeClient{ token: token }}func (t *tkeClient) CreateContainerRegistryNamespace(name, describe, visibility, uuid string) (*dto.ListContainerRegistryNamespacesRep, error) { url := viper.GetString("url") + "/apis/registry.tkestack.io/v1/namespaces/" createJson := tDto.CreateContainerRegistryReq{ APIVersion: "registry.tkestack.io/v1", Kind: "Namespace", Spec: tDto.CreateContainerRegistryReqSpec{ Name: name, DisplayName: describe, Visibility: visibility, }, } jsonData, errs := json.Marshal(createJson) if errs != nil { return nil, errs } _, rep, err := pkg.Request(url, token, string(jsonData), nil, http.MethodPost) if err != nil { errRep := gojsonq.New().FromString(err.Error()).Find("message") return nil, errors.New(errRep.(string)) } var listRep *tDto.ListRepItems if err = json.NewDecoder(strings.NewReader(string(rep))).Decode(&listRep); err != nil { errRep := gojsonq.New().FromString(err.Error()).Find("message") return nil, errors.New(errRep.(string)) } var createRep dto.ListContainerRegistryNamespacesRep createRep.DisplayName = listRep.Spec.Name createRep.Describe = listRep.Spec.DisplayName createRep.Visibility = listRep.Spec.Visibility createRep.Name = listRep.Metadata.Name createRep.RepoCount = listRep.Status.RepoCount return &createRep, nil}
注:由上所述 似乎并没有牵扯到多么高深的操作,甚至是单纯的api调用、封装,也未涉及到中间件类的应用,随着开发的不断深入,此服务应逐渐复杂化
kubernetes 源码分析:https://jeffdingzone.com/category/k8s/
kubernetes api文档:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/
图解kubernetes中API聚合机制的实现: https://juejin.cn/post/6844904081438277640
单体仓库与多仓库都有哪些优势劣势,如何确定微服务落地的最佳实践?:https://ssoor.github.io/2020/03/24/mono-repo-vs-multi-repo/
kubernetes Events介绍:https://www.kubernetes.org.cn/1031.html