直接上代码吧,首先是我们的class based view,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
class K8sApiView(APIView): authentication_classes = (BasicAuthentication, JWTAuthentication) permission_classes = [IsAuthenticated] # load k8s集群token信息 namespace = "test-env" config.load_kube_config('/data/kube-config.yaml') v1 = client.CoreV1Api() def request_and_limits_extraction(self, cpu_range: List, mem_range: List) -> Tuple: """ 这个函数是为了从没有顺序的pod内存和cpu的限制大小中生成一个从左到右从大到小的范围的 """ cpu_left = cpu_range[0] cpu_right = cpu_range[1] if 'm' in cpu_left: cpu_left_num = float(re.search('\d+', cpu_left).group(0)) / 1000 else: cpu_left_num = int(re.search('\d+', cpu_left).group(0)) if 'm' in cpu_right: cpu_right_num = float(re.search('\d+', cpu_right).group(0)) / 1000 else: cpu_right_num = int(re.search('\d+', cpu_right).group(0)) if cpu_left_num > cpu_right_num: cpu_limits = str(cpu_right_num) + '核' + '~' + str(cpu_left_num) + '核' else: cpu_limits = str(cpu_left_num) + '核' + '~' + str(cpu_right_num) + '核' mem_left = mem_range[0] mem_right = mem_range[1] if 'G' in mem_left: mem_left_num = int(re.search('\d+', mem_left).group(0)) mem_left_num = mem_left_num * 1024 elif 'M' in mem_left: mem_left_num = int(re.search('\d+', mem_left).group(0)) if 'G' in mem_right: mem_right_num = int(re.search('\d+', mem_right).group(0)) mem_right_num = mem_right_num * 1024 elif 'M' in mem_right: mem_right_num = int(re.search('\d+', mem_right).group(0)) if mem_left_num > mem_right_num: mem_limits = mem_right.strip() + '~' + mem_left.strip() else: mem_limits = mem_left.strip() + '~' + mem_right.strip() return cpu_limits, mem_limits def list_pod_by_labels(self,app_name) -> List[str]: """ 这个方法是通过label标签把pod的名字过滤出来成一个列表用的 """ # load k8s集群token信息 pod_list = [] ret = self.v1.list_namespaced_pod(namespace=self.namespace, label_selector="k8s-app={0}".format(app_name)) for item in ret.items: pod_list.append(item.to_dict()['metadata']['name']) return pod_list def describe_namespaced_pod(self, pod_name:str) -> dict: """ 这个方法是通过pod名字获取pod的详细内容的 """ # 获取pod详情 ret = self.v1.read_namespaced_pod(namespace=self.namespace, name=pod_name) data = ret.to_dict() return data def get_container_status(self, status_dict: dict) -> str: """ 这是一个简单的helper函数用来提取容器的运行状态的 """ for s in status_dict: if status_dict[s] is not None: return s return 'None' @swagger_auto_schema(operation_summary='查询tke pod信息') def post(self, request): app_name=request.data.get("app_name") pod_details_list = [] pod_details = {} pods_list = self.list_pod_by_labels(app_name) for pod in pods_list: container_info = {} pod_info=self.describe_namespaced_pod(pod) pod_details['instance_name'] = pod pod_details['pod_status'] = pod_info['status']['phase'] pod_details['host_ip'] = pod_info['status']['host_ip'] pod_details['instance_ip'] = pod_info['status']['pod_ip'] pod_resources = pod_info['spec']['containers'][0]['resources'] cpu_request_and_limits = [pod_resources['requests']['cpu'], pod_resources['limits']['cpu']] mem_request_and_limits = [pod_resources['requests']['memory'], pod_resources['limits']['memory']] request_and_limits_list = self.request_and_limits_extraction(cpu_request_and_limits,mem_request_and_limits) pod_details['cpu_and_mem_request_and_limits'] = "CPU: "+ request_and_limits_list[0] + ', ' + '内存: ' + request_and_limits_list[1] pod_details['create_date'] = pod_info['status']['start_time'].strftime("%Y-%m-%d %H:%M:%S") # container info container_info['container_name'] = pod_info['status']['container_statuses'][0]['name'] container_info['container_image_url'] = pod_info['status']['container_statuses'][0]['image'] container_info['container_cpu_and_mem_request_and_limits'] = "CPU: "+ request_and_limits_list[0] + ', ' + '内存: ' + request_and_limits_list[1] container_info['container_status'] = self.get_container_status(pod_info['status']['container_statuses'][0]['state']) pod_details['containers_info'] = container_info pod_details_list.append(pod_details) return Response(pod_details_list) |
解释一下上述代码:
上述代码中你只需要修改如下内容即可:
1 2 3 |
namespace = "test-env" config.load_kube_config('/data/kube-config.yaml') |
修改一下上述的命名空间和k8s的kube config的绝对路径为你们公司实际业务的真实路径即可,其它的看一下代码中的注释即可,这个代码很好理解,没啥难的。
最后去你的Django app中的urls.py中添加一个路由即可,比如下面的:
1 2 3 4 5 6 |
from django.urls import path from .views import K8sApiView urlpatterns = [ path('k8s/fetch/pod', K8sApiView.as_view()) ] |
接口的body示例如下:
1 2 |
{"app_name": "test-k8s-app"} |
接口返回的示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
{ "code": 200, "data": [ { "instance_name": "test-k8s-app-787sgg5-s7ujh4h", "pod_status": "Running", "host_ip": "192.168.0.1", "instance_ip": "192.168.0.2", "cpu_and_mem_request_and_limits": "CPU:0.25核~0.5核,内存:256Mi~2300Mi", "create_date": "2022-01-16 17:23:05", "containers_info": { "container_name": "test-k8s-app", "container_image_url": "docker-hub.com/test-env/test-k8s-app", "container_cpu_and_mem_request_and_limits": "CPU:0.25核~0.5核,内存:256Mi~2300Mi", "container status": "running" } } ], "msg": null } |
说明:以上代码只适用于业务的pod只有一个,pod里面只有一个容器的情况,要想支持多pod多容器的话也很简单,只需要对上面的demo代码稍加调整就行哈~