Skip to content

Commit

Permalink
Adding k8s injector
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Dec 1, 2023
1 parent dcc017a commit 01ac6a9
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 12 deletions.
49 changes: 40 additions & 9 deletions duetector/injectors/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
from collections import namedtuple
from typing import Any

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache


import docker
from duetector.extension.injector import hookimpl
from duetector.injectors.base import ProcInjector
from duetector.injectors.inspector import Inspector, ProcInfo
from duetector.injectors.inspector import Inspector
from duetector.log import logger


class DockerInjector(ProcInjector, Inspector):
Expand Down Expand Up @@ -43,31 +50,55 @@ def _inspect(self, model: dict[str, Any]) -> dict[str, Any]:
if not cgroups:
return {}
maybe_container_id = None
for cg in cgroups:
maybe_container_id = cg.split(":")[-1].split("/")[-1].lstrip("docker-").split(".")[0]
break
try:
for cg in cgroups:
# FIXME: Need a more compatible way to get container_id
maybe_container_id = (
cg.split(":")[-1].split("/")[-1].lstrip("docker-").split(".")[0]
)
break

except IndexError:
logger.info("Cann't parse container id.")
logger.debug(f"{cgroups}")
return {}
if not maybe_container_id:
return {}

if not self.client:
return {"maybe_container_id": maybe_container_id}

container_info = {}
try:
container_info = self.client.inspect_container(maybe_container_id)
container_info = self._query_container_info(maybe_container_id)
except Exception as e:
if "docker" not in cgroups[0]:
return {"maybe_container_id": maybe_container_id}
else:
return {
"container_id": maybe_container_id,
}
return {"container_id": maybe_container_id}

# TODO: More info from container_info
return {
"container_id": maybe_container_id,
**container_info,
}

def _query_container_info(self, container_id: str) -> dict[str, Any]:
# TODO: More info from container_info
container_inspect = self.client.inspect_container(container_id)
return {}


@hookimpl
def init_injector(config=None):
return DockerInjector(config=config)


if __name__ == "__main__":
pid = 1
i = DockerInjector()
model = {
"pid": pid,
}
data_t = namedtuple("T", ("pid",))
print(i.get_patch_kwargs(data_t(**model)))
i.shutdown()
118 changes: 115 additions & 3 deletions duetector/injectors/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,131 @@
from collections import namedtuple
from typing import Any

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

from kubernetes import client
from kubernetes import config as k8s_config

from duetector.extension.injector import hookimpl
from duetector.injectors.base import ProcInjector
from duetector.injectors.inspector import Inspector
from duetector.log import logger


class K8SInjector(ProcInjector, Inspector):
name = "k8s"

def __init__(self, config: dict[str, Any] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
try:
try:
# TODO: Config kube config path
k8s_config.load_kube_config()
except k8s_config.config_exception.ConfigException:
k8s_config.load_incluster_config()
except Exception:
self.client = None
else:
self.client = client

class K8SInjector(ProcInjector):
def get_patch_kwargs(
self, data: namedtuple, extra: dict[str, Any] | None = None
) -> dict[str, Any]:
extra.update(super().get_patch_kwargs(data, extra))
if not extra:
extra = {}

super_patch_kwargs = super().get_patch_kwargs(data, extra)

extra.update(super_patch_kwargs)
param = self.as_dict(data, extra)
return {}
return {
**super_patch_kwargs,
**self.inspect(param),
}

def _inspect(self, model: dict[str, Any]) -> dict[str, Any]:
cgroups: list[str] | None = self.cgroup_inspector.get(model, "cgroups")

if not cgroups:
return {}
maybe_pod_id = None
maybe_container_id = None
try:
for cg in cgroups:
maybe_pod_id_str_list = cg.split(":")[-1].split("/")[-2].split(".")[0].split("-")
for i, s in enumerate(maybe_pod_id_str_list):
if s.startswith("pod"):
maybe_pod_id_str_list[i] = maybe_pod_id_str_list[i].strip("pod")
maybe_pod_id = "-".join(maybe_pod_id_str_list[i:]).replace("_", "-")
break
maybe_container_id = (
cg.split(":")[-1].split("/")[-1].lstrip("docker-").split(".")[0]
)
break
except IndexError:
logger.info("Cann't parse container id and pod id.")
logger.debug(f"{cgroups}")
return {}
if not (maybe_pod_id and maybe_container_id):
return {}

container_info = self._query_container_info(maybe_pod_id, maybe_container_id)

return {
"pod_id": maybe_pod_id,
"container_id": maybe_container_id,
**container_info,
}

@cache
def _query_container_info(self, pod_id: str, maybe_container_id: str):
if not self.client:
return {}
pod = None
container_status = None
container_info = {}
try:
for p in self.client.CoreV1Api().list_pod_for_all_namespaces().items:
if p.metadata.uid == pod_id:
pod = p
container_info["pod_name"] = p.metadata.name
container_info["namespace"] = p.metadata.namespace
break
if not pod:
logger.info(f"Pod not found: {pod_id}")
return container_info
for cs in pod.status.container_statuses:
if maybe_container_id in cs.container_id:
container_status = cs
container_info["container_name"] = cs.name
container_info["container_runtime"] = cs.container_id.split(":")[0]
break
if not container_status:
logger.info(f"Container not found: {maybe_container_id}")
return container_info

except Exception as e:
logger.error("Exception when query container info from k8s api.")
logger.exception(e)
return container_info

return container_info


@hookimpl
def init_injector(config=None):
return K8SInjector(config=config)


if __name__ == "__main__":
pid = 105846
i = K8SInjector()
model = {
"pid": pid,
}
data_t = namedtuple("T", ("pid",))
print(i.get_patch_kwargs(data_t(**model)))
i.shutdown()
111 changes: 111 additions & 0 deletions tests/base/injector/test_k8s.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,115 @@
import glob
import time
from collections import namedtuple
from pathlib import Path

import pytest
import yaml
from kubernetes import client
from kubernetes import config as k8s_config

from duetector.injectors.docker import DockerInjector
from duetector.injectors.k8s import K8SInjector


@pytest.fixture(scope="session")
def k8s_client():
try:
try:
k8s_config.load_kube_config()
except k8s_config.config_exception.ConfigException:
k8s_config.load_incluster_config()
except Exception:
pytest.skip("K8S Config not avaliable.")

try:
client.CoreV1Api().list_pod_for_all_namespaces()
except Exception:
pytest.skip("K8S API not avaliable.")

return client


@pytest.fixture(scope="session")
def k8s_pod(k8s_client: client):
pod_name = "testpod"
container_name = "testcontainer"

y = f"""
apiVersion: v1
kind: Pod
metadata:
name: {pod_name}
spec:
containers:
- name: {container_name}
image: ubuntu
command:
- sleep
- "181"
"""
try:
k8s_client.CoreV1Api().create_namespaced_pod(
namespace="default",
body=yaml.safe_load(y),
)
except Exception as e:
pytest.skip("Unable to create k8s pod")
# Wait for pod to be running
try:
while True:
pod_status = k8s_client.CoreV1Api().read_namespaced_pod_status(
name=pod_name, namespace="default"
)
if pod_status.status.phase == "Running":
break
time.sleep(1)

for p in glob.glob("/proc/[0-9]*"):
p = Path(p)
if (p / "cmdline").read_text().replace("\x00", " ").strip() == "sleep 181".strip():
pid = p.name
try:
(p / "cgroup").read_text().strip().split("\n")
except PermissionError as e:
pytest.skip(
"Low privileges for the current user to inspect docker container's process"
)

assert pid

yield pod_name, container_name, pid

finally:
k8s_client.CoreV1Api().delete_namespaced_pod(
name=pod_name,
namespace="default",
body=client.V1DeleteOptions(),
)


@pytest.fixture
def k8s_injector():
i = K8SInjector()
try:
yield i
finally:
i.shutdown()


def test_k8s_injector(k8s_pod, k8s_injector):
pod_name, container_name, pid = k8s_pod
model = {
"pid": pid,
}
data_t = namedtuple("T", ("pid",))
patch_args = k8s_injector.get_patch_kwargs(data_t(**model))
assert k8s_injector.is_inspected(patch_args)
assert k8s_injector.get(patch_args, "pod_name") == pod_name
assert k8s_injector.get(patch_args, "namespace") == "default"
assert k8s_injector.get(patch_args, "container_name") == container_name
assert k8s_injector.get(patch_args, "container_runtime")


if __name__ == "__main__":
pytest.main(["-vv", "-s", __file__])

0 comments on commit 01ac6a9

Please sign in to comment.