Как автоматизировать операции Kubernetes посредством Python
В последние годы Kubernetes (К8s) прочно закрепился в повседневной деятельности многих разработчиков и DevOps-инженеров. Однако большинство задач, которые приходится выполнять, однообразны, монотонны и легко поддаются автоматизации.
Зачастую довольно просто набросать быстрый shell-скрипт с командами kubectl
. Но для более сложных задач автоматизации требуется что-то более мощное, чем bash, например возможности языка программирования Python.
В данной статье научимся работать с клиентской библиотекой Python для Kubernetes (kubernetes-client/python
) и автоматизировать любые скучные задачи K8s, стоящие перед нами!
Создание экспериментального кластера
Перед началом работы с kubernetes-client
создадим экспериментальный кластер для тестирования. Воспользуемся KinD (Kubernetes в Docker), который можно установить по ссылке.
Потребуется следующая конфигурация кластера:
# kind.yaml
# https://kind.sigs.k8s.io/docs/user/configuration/
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
name: api-playground
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
Для создания кластера с указанной конфигурацией выполняем:
kind create cluster --image kindest/node:v1.23.5 --config=kind.yamlkubectl cluster-info --context kind-api-playground
# Плоскость управления (Сontrol plane) Kubernetes работает по адресу https://127.0.0.1:36599
# CoreDNS работает по адресу https://127.0.0.1:36599/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
kubectl get nodes
# NAME STATUS ROLES AGE VERSION
# api-playground-control-plane Ready control-plane,master 58s v1.23.5
# api-playground-worker Ready <none> 27s v1.23.5
# api-playground-worker2 NotReady <none> 27s v1.23.5
# api-playground-worker3 NotReady <none> 27s v1.23.5
По факту готовности и запуска кластера устанавливаем клиентскую библиотеку (по желанию — внутри виртуальной среды):
python3 -m venv venv
source venv/bin/activate
pip install kubernetes
Аутентификация
Перед выполнением любых действий в кластере Kubernetes необходимо пройти процесс аутентификации.
Во избежание повторных процедур аутентификации воспользуемся долгосрочными токенами. Для их получения создаем ServiceAccount
:
kubectl create sa playground
kubectl describe sa playgroundName: playground
Namespace: default
Labels: <none>
Annotations: <none>
Image pull secrets: <none>
Mountable secrets: playground-token-v8bq7
Tokens: playground-token-v8bq7
Events: <none>
export KIND_TOKEN=$(kubectl get secret playground-token-v8bq7 -o json | jq -r .data.token | base64 --decode)
Еще одно преимущество ServiceAccount
состоит в том, что он не привязан к одному человеку. Это обстоятельство как нельзя лучше отвечает целям автоматизации.
Полученный токен можно задействовать в запросах:
curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis
Процедура аутентификации завершена. Теперь пройдем авторизацию, дающую право на выполнение действий. Для этого создаем роль Role
и связываем ее с ServiceAccount
. В результате у нас появляется возможность управлять ресурсами. Рассмотрим соответствующий код:
kubectl create clusterrole manage-pods \
--verb=get --verb=list --verb=watch --verb=create --verb=update --verb=patch --verb=delete \
--resource=pods kubectl -n default create rolebinding sa-manage-pods \
--clusterrole=manage-pods \
--serviceaccount=default:playground
Данный код разрешает ServiceAccount
выполнять действия с подами, ограниченными пространством имен default
.
Роли должны быть точными и конкретными. При работе в KinD целесообразно выбрать роль администратора всего кластера cluster-admin
, как показано ниже:
kubectl create clusterrolebinding sa-cluster-admin \
--clusterrole=cluster-admin \
--serviceaccount=default:playground
Необработанные запросы
Чтобы лучше понять внутренний механизм работы kubectl
и kubernetes-client
, начнем с необработанных HTTP-запросов и воспользуемся curl
.
Есть самый простой способ узнать, какие запросы выполняются “под капотом”. Для этого необходимо выполнить нужную команду kubectl
с -v 10
. На выводе мы получаем полную команду curl
, как показано ниже:
kubectl get pods -v 10
# <snip>
curl -k -v -XGET -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json..." \
'https://127.0.0.1:36599/api/v1/namespaces/default/pods?limit=500'
# <snip>
Вывод с loglevel 10
будет очень подробным, но в нем вы найдете вышеуказанную команду curl
.
Добавляем заголовок токена Bearer
в команду curl
с долгосрочным токеном. Вы сможете выполнять те же действия, что и kubectl
. Например:
curl -s -k -XGET -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" 'https://127.0.0.1:36599/api/v1/namespaces/default/pods/example' | jq .status.phase
# "Running"
В случае потребности в теле запроса посмотрите, какие поля нужно включить в запрос. Например, при создании пода можно использовать API, описанный по данной ссылке. Сделав это, получаем следующий запрос:
curl -k -XPOST -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" https://127.0.0.1:36599/api/v1/namespaces/default/pods -d@pod.json# Подтверждение
kubectl get pods
NAME READY STATUS RESTARTS AGE
example 0/1 Running 0 7s
За атрибутами объекта обратимся к справочной документации по API Kubernetes. Кроме того, можно посмотреть определение OpenAPI с помощью этой команды:
curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis
Прямое взаимодействие с Kubernetes посредством REST API может оказаться неудобным, но в ряде ситуаций такой шаг оправдан. Так, некоторые случаи включают взаимодействие с API, у которых нет эквивалентной команды kubectl
при работе с другим дистрибутивом K8s, например OpenShift. Этот дистрибутив предоставляет дополнительные API, не предусмотренные kubectl
или клиентским SDK.
Клиентская библиотека Python
Переходим к самой клиентской библиотеке Python. Выполняем те же шаги, что и в случае с kubectl
и curl
. Начнем с аутентификации:
from kubernetes import client
import osconfiguration = client.Configuration()
configuration.api_key_prefix["authorization"] = "Bearer"
configuration.host = "https://127.0.0.1:36599"
configuration.api_key["authorization"] = os.getenv("KIND_TOKEN", None)
configuration.verify_ssl = False # Только для тестирования с KinD!
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
ret = v1.list_namespaced_pod(namespace="default", watch=False)
for pod in ret.items:
print(f"Name: {pod.metadata.name}, Namespace: {pod.metadata.namespace} IP: {pod.status.pod_ip}")
# Name: example, Namespace: default IP: 10.244.2.2
Сначала определяем объект конфигурации, который сообщает клиенту о процедуре аутентификации с использованием токена Bearer
. С учетом того, что кластер KinD не применяет SSL, мы его отключаем в реальном кластере. Но вам так делать не надо.
Проверяем конфигурацию с помощью метода list_namespaced_pod
клиента API и тем самым получаем все поды в пространстве имен default
. Затем выводим их name
, namespace
и IP
.
Предлагаю поработать с более реалистичной задачей. Для этого создадим Deployment
:
deployment_name = "my-deploy"
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": deployment_name, "namespace": "default"},
"spec": {"replicas": 3,
"selector": {
"matchLabels": {
"app": "nginx"
}},
"template": {"metadata": {"labels": {"app": "nginx"}},
"spec": {"containers": [
{"name": "nginx", "image": "nginx:1.21.6", "ports": [{"containerPort": 80}]}]
}
},
}
}import time
from kubernetes.client.rest import ApiException
v1 = client.AppsV1Api(api_client)
response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
while True:
try:
response = v1.read_namespaced_deployment_status(name=deployment_name, namespace="default")
if response.status.available_replicas != 3:
print("Waiting for Deployment to become ready...")
time.sleep(5)
else:
break
except ApiException as e:
print(f"Exception when calling AppsV1Api -> read_namespaced_deployment_status: {e}\n")
Помимо создания Deployment
мы также ожидаем получения доступа ко всем его подам. С этой целью запрашиваем состояние Deployment
и проверяем количество доступных реплик.
Обратите внимание на структуру имен функций, например create_namespaced_deployment
. Поясним на дополнительных примерах:
replace_namespaced_cron_job
;patch_namespaced_stateful_set
;list_namespaced_horizontal_pod_autoscaler
;read_namespaced_daemon_set
;read_custom_resource_definition
.
Все они представлены в формате operation_namespaced_resource
или просто operation_resource
для глобальных ресурсов. В конце к ним можно добавить _status
или _scale
для методов, которые выполняют операции с состоянием (read_namespaced_deployment_status
) или масштабированием ресурсов (patch_namespaced_stateful_set_scale
).
Отметим еще один важный момент в вышеуказанном примере. Мы выполняли действия с помощью client.AppsV1Api
. Он позволяет работать со всеми ресурсами, принадлежащими apiVersion: apps/v1
. Потребуйся нам CronJob, мы бы выбрали BatchV1Api
(он же apiVersion: batch/v1
в формате YAML
), а для PVC предпочли бы CoreV1Api
из-за apiVersion: v1
. Суть понятна.
Как видно, предоставляется большой выбор функций. К счастью, все они перечислены в документации. Нажав на любую из них, вы получаете пример ее использования.
Помимо основных операций CRUD, можно постоянно следить за изменениями объектов, например за Events
:
from kubernetes import client, watchv1 = client.CoreV1Api(api_client)
count = 10
w = watch.Watch()
for event in w.stream(partial(v1.list_namespaced_event, namespace="default"), timeout_seconds=10):
print(f"Event - Message: {event['object']['message']} at {event['object']['metadata']['creationTimestamp']}")
count -= 1
if not count:
w.stop()
print("Finished namespace stream.")
# Event - Сообщение: Успешно назначен default/my-deploy-cb69f686c-2dspd для api-playground-worker2 2022-04-19T11:18:25Z
# Event - Сообщение: Образ контейнера "nginx:1.21.6" уже присутствует в компьютере 2022-04-19T11:18:26Z
# Event - Сообщение: Контейнер nginx создан 2022-04-19T11:18:26Z
# Event - Сообщение: Контейнер nginx запущен 2022-04-19T11:18:26Z
Здесь мы отслеживаем события в пространстве имен default
: берем 10 событий и затем закрываем поток. В случае необходимости постоянного мониторинга ресурсов просто убираем timeout_seconds
и вызов w.stop()
.
Как видно, в первом примере мы воспользовались простым dict
Python для определения объекта Deployment
, переданного клиенту. Как вариант, можно прибегнуть к более выраженному стилю ООП и применить модели API (классы), предоставляемые библиотекой:
v1 = client.AppsV1Api(api_client)deployment_manifest = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(name=deployment_name),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(match_labels={
"app": "nginx"
}),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
spec=client.V1PodSpec(
containers=[client.V1Container(name="nginx",
image="nginx:1.21.6",
ports=[client.V1ContainerPort(container_port=80)]
)]))
)
)
response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
Отбросьте попытки выяснить, какая модель применяется для каждого аргумента — это бесполезно. Генерируя ресурсы вышеуказанным способом, вы всегда должны работать с документацией по моделям и переходить по ссылкам при создании отдельных подобъектов. Так вы поймете, какие значения/типы ожидаются в каждом поле.
Полезные примеры
Теперь у вас есть базовое представление о работе клиентской библиотеки Python. Рассмотрим несколько примеров и фрагментов кода, позволяющих автоматизировать рутинные операции Kubernetes.
Довольно часто мы выполняем непрерывный перезапуск развертывания с помощью команды kubectl rollout restart
, но API для этого нет. kubectl
проводит данную операцию, обновляя аннотации развертывания. Если быть более точным, то kubectl.kubernetes.io/restartedAt
устанавливается на текущее время. Такой прием срабатывает, поскольку любое изменение в спецификации пода приводит к перезапуску.
Рассмотрим код для выполнения перезапуска с помощью клиентской библиотеки Python:
from kubernetes import dynamic
from kubernetes.client import api_client # Внимание: другой импорт, отличается от предыдущей клиентской библиотеки!
import datetimeclient = dynamic.DynamicClient(api_client.ApiClient(configuration=configuration))
api = client.resources.get(api_version="apps/v1", kind="Deployment")
# Несмотря на то, что манифест развертывания ранее был создан с помощью модели класса, он по-прежнему ведет себя как словарь:
deployment_manifest["spec"]["template"]["metadata"]["annotations"] = {
"kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}
deployment_patched = api.patch(body=deployment_manifest, name=deployment_name, namespace="default")
Еще одна распространенная операция — масштабирование развертывания. К счастью, на этот случай есть функция API. Рассмотрим ее применение:
from kubernetes import clientapi_client = client.ApiClient(configuration)
apps_v1 = client.AppsV1Api(api_client)
# Тело может включать разные типы patch - https://github.com/kubernetes-client/python/issues/1206#issuecomment-668118057
api_response = apps_v1.patch_namespaced_deployment_scale(deployment_name, "default", {"spec": {"replicas": 5}})
Для устранения неполадок целесообразно выполнить команду exec
внутри пода, изучить и, возможно, получить переменную среды, чтобы проверить правильность конфигурации. Пример соответствующего кода:
from kubernetes.stream import streamdef pod_exec(name, namespace, command, api_instance):
exec_command = ["/bin/sh", "-c", command]
resp = stream(api_instance.connect_get_namespaced_pod_exec,
name,
namespace,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False,
_preload_content=False)
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print(f"STDOUT: \n{resp.read_stdout()}")
if resp.peek_stderr():
print(f"STDERR: \n{resp.read_stderr()}")
resp.close()
if resp.returncode != 0:
raise Exception("Script failed")
pod = "example"
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
pod_exec(pod, "default", "env", v1)
# STDOUT:
# KUBERNETES_SERVICE_PORT=443
# KUBERNETES_PORT=tcp://10.96.0.1:443
# HOSTNAME=example
# HOME=/root
# ...
Данный фрагмент кода также позволяет при необходимости запускать полные shell-скрипты.
Допустим, нужно установить ограничение Taint
на проблемный узел. Можно сконцентрироваться на задачах, ориентированных на администрирование кластера. Поскольку прямого API для ограничений узлов (Node Taints) нет, найдем обходной способ. Вот код, который поможет:
from kubernetes import clientapi_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
# kubectl taint nodes api-playground-worker some-taint=1:NoSchedule
v1.patch_node("api-playground-worker", {"spec": {"taints": [{"effect": "NoSchedule", "key": "some-taint", "value": "1"}]}})
# kubectl get nodes -o custom-columns=NAME:.metadata.name,TAINTS:.spec.taints --no-headers
# api-playground-control-plane [map[effect:NoSchedule key:node-role.kubernetes.io/master]]
# api-playground-worker [map[effect:NoSchedule key:some-taint value:1]]
# api-playground-worker2 <none>
# api-playground-worker3 <none>
Для автоматизации масштабирования кластера проводится контроль за использованием его ресурсов. Как правило, команда kubectl top
позволяет получить информацию в интерактивном режиме. С помощью же клиентской библиотеки мы можем сделать так:
# https://github.com/kubernetes-sigs/kind/issues/398
# kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.5.0/components.yaml
# kubectl patch -n kube-system deployment metrics-server --type=json \
# -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]'from kubernetes import client
api_client = client.ApiClient(configuration)
custom_api = client.CustomObjectsApi(api_client)
response = custom_api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes") # also works with "pods" instead of "nodes"
for node in response["items"]:
print(f"{node['metadata']['name']: <30} CPU: {node['usage']['cpu']: <10} Memory: {node['usage']['memory']}")
# api-playground-control-plane CPU: 148318488n Memory: 2363504Ki
# api-playground-worker CPU: 91635913n Memory: 1858680Ki
# api-playground-worker2 CPU: 75473747n Memory: 1880860Ki
# api-playground-worker3 CPU: 105692650n Memory: 1881560Ki
Рассмотренный пример предусматривает предварительную установку metrics-server
в кластере. Проверить его наличие можно командой kubectl top
. Воспользуйтесь комментариями в коде для его установки, если вы работаете с KinD.
Изучим последний, но не менее востребованный случай. Допустим, у вас есть набор файлов YAML
и JSON
. Вы планируете задействовать их либо для развертывания или изменения объектов в кластере, либо для экспорта или резервных копий объектов, созданных с помощью клиентской библиотеки. Рассмотрим простой способ преобразования файлов YAML
/JSON
в объекты Kubernetes и обратно в файлы:
# pip install kopf # (Python 3.7+)
import kopfapi_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
pods = []
# https://stackoverflow.com/questions/59977058/clone-kubernetes-objects-programmatically-using-the-python-api/59977059#59977059
ret = v1.list_namespaced_pod(namespace="default")
for pod in ret.items:
# Простое преобразование в Dict/JSON
print(api_client.sanitize_for_serialization(pod))
# Преобразование с очисткой полей
pods.append(kopf.AnnotationsDiffBaseStorage()
.build(body=kopf.Body(api_client.sanitize_for_serialization(pod))))
# Преобразование из Dict обратно в объект Client
class FakeKubeResponse:
def __init__(self, obj):
import json
self.data = json.dumps(obj)
for pod in pods:
pod_manifest = api_client.deserialize(FakeKubeResponse(pod), "V1Pod")
...
Первый способ преобразования существующего объекта в словарь Python (JSON
) предусматривает применение sanitize_for_serialization
. Он выдает необработанный вывод со всеми сгенерированными/предустановленными полями. Второй и более предпочтительный способ обращается к вспомогательным методам библиотеки kopf
, которые удаляют все ненужные поля. Таким образом, упрощается процесс преобразования словаря в правильный файл YAML
или JSON
.
Если необходимо осуществить обратный процесс и перейти от словаря к клиентской объектной модели, можно воспользоваться методом deserialize
клиента API. Однако он требует наличия атрибута data
у своего аргумента. Поэтому мы передаем ему экземпляр класса контейнера с таким атрибутом.
Если у вас уже есть файлы YAML
для работы с клиентской библиотекой Python, воспользуйтесь вспомогательной функцией kubernetes.utils.create_from_yaml
.
Чтобы получить полное представление обо всех возможностях библиотеки, ознакомьтесь с каталогом примеров в репозитории.
Настоятельно рекомендую изучить ишью (англ. issue) в репозитории библиотеки. Здесь вы найдете множество отличных практических примеров, например параллельная обработка событий и отслеживание обновлений ConfigMaps.