Skip to content

Commit

Permalink
feat(kubectl): Implement PodExec tunnel emulation
Browse files Browse the repository at this point in the history
  • Loading branch information
danopia committed Aug 13, 2023
1 parent c704b2a commit 9fd40f6
Showing 1 changed file with 82 additions and 8 deletions.
90 changes: 82 additions & 8 deletions transports/via-kubectl-raw.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { readableStreamFromReader, TextLineStream } from '../deps.ts';
import { RestClient, RequestOptions, JSONValue } from '../lib/contract.ts';
import { RestClient, RequestOptions, JSONValue, KubernetesTunnel } from '../lib/contract.ts';
import { JsonParsingTransformer } from '../lib/stream-transformers.ts';

const isVerbose = Deno.args.includes('--verbose');
Expand Down Expand Up @@ -32,6 +32,7 @@ export class KubectlRawRestClient implements RestClient {
bodyRaw?: Uint8Array;
bodyJson?: JSONValue;
bodyStream?: ReadableStream<Uint8Array>;
bodyPassthru?: boolean;
}) {

const hasReqBody = opts.bodyJson !== undefined || !!opts.bodyRaw || !!opts.bodyStream;
Expand All @@ -41,14 +42,13 @@ export class KubectlRawRestClient implements RestClient {
'--context', this.contextName,
] : [];

const kubectl = new Deno.Command('kubectl', {
const p = new Deno.Command('kubectl', {
args: [...ctxArgs, ...args],
stdin: hasReqBody ? 'piped' : 'null',
stdin: (hasReqBody || opts.bodyPassthru) ? 'piped' : 'null',
stdout: 'piped',
stderr: 'inherit',
signal: opts.abortSignal,
});
const p = kubectl.spawn();
}).spawn();

if (hasReqBody) {
if (opts.bodyStream) {
Expand Down Expand Up @@ -82,6 +82,21 @@ export class KubectlRawRestClient implements RestClient {

if (opts.abortSignal?.aborted) throw new Error(`Given AbortSignal is already aborted`);

if (opts.expectTunnel) {
if (opts.expectTunnel.includes('v4.channel.k8s.io')) {
// We can implement PodExec with `kubectl exec`, for this specific route:
const match = new URLPattern({
pathname: '/api/:version/namespaces/:namespace/pods/:podName/exec',
}).exec({ pathname: opts.path });
if (match) {
const { namespace, podName } = match.pathname.groups;
return await this.emulateExecTunnel(namespace!, podName!, opts.querystring ?? new URLSearchParams(), opts.abortSignal);
}
}
if (opts.expectTunnel) throw new Error(
`That socket-based API (via ${opts.expectTunnel[0]}) is not implemented by this client.`);
}

let path = opts.path || '/';
const query = opts.querystring?.toString() ?? '';
if (query) {
Expand All @@ -91,9 +106,6 @@ export class KubectlRawRestClient implements RestClient {
const hasReqBody = opts.bodyJson !== undefined || !!opts.bodyRaw || !!opts.bodyStream;
isVerbose && console.error(opts.method, path, hasReqBody ? '(w/ body)' : '');

if (opts.expectTunnel) throw new Error(
`Channel-based APIs are not currently implemented by this client.`);

let rawArgs = [command, ...(hasReqBody ? ['-f', '-'] : []), "--raw", path];

if (command === 'patch') {
Expand Down Expand Up @@ -139,6 +151,68 @@ export class KubectlRawRestClient implements RestClient {
}
}

private async emulateExecTunnel(namespace: string, podName: string, querystring: URLSearchParams, signal?: AbortSignal): Promise<KubernetesTunnel> {
const wantsStdin = querystring.get('stdin') == '1';
const wantsTty = querystring.get('tty') == '1';
const wantsContainer = querystring.get('container');

// upstream feature request: https://github.com/denoland/deno/issues/3994
if (wantsTty) throw new Error(
`This Kubernetes client (${this.constructor.name} does not support opening TTYs. Try a Kubeconfig-based client if you need TTY.`);

const [p, status] = await this.runKubectl([
'exec',
...(querystring.get('stdin') == '1' ? ['--stdin'] : []),
...(querystring.get('tty') == '1' ? ['--tty'] : []),
...(namespace ? ['-n', namespace] : []),
`--quiet`,
podName,
...(wantsContainer ? ['-c', wantsContainer] : []),
`--`, // disable non-positional arguments after here, for safety
...querystring.getAll('command'),
], {
abortSignal: signal,
bodyPassthru: true, // lets us use the raw streams
});

return {
transportProtocol: 'Opaque',
subProtocol: 'v4.channel.k8s.io',
ready: () => Promise.resolve(), // we don't actually know!
stop: () => Promise.resolve(p.kill()),
getChannel: (opts) => {
if (opts.streamIndex == 0 && wantsStdin) {
return Promise.resolve({ writable: p.stdin } as any);
}
if (opts.streamIndex == 1) {
return Promise.resolve({ readable: p.stdout } as any);
}
if (opts.streamIndex == 2) {
// We don't pipe stderr, but we don't block it either
// Just provide a dummy stream for compatibility
const readable = new ReadableStream({
start(ctlr) { ctlr.close() },
});
return Promise.resolve({ readable } as any);
}
if (opts.streamIndex == 3) {
// Invent a JSON stream and give a limited ExecStatus
const readable = new ReadableStream({
async start(ctlr) {
const stat = await status;
ctlr.enqueue(new TextEncoder().encode(JSON.stringify({
status: stat.success ? 'Success' : 'Failure',
message: `kubectl exited with ${stat.code}`,
})));
ctlr.close();
},
});
return Promise.resolve({ readable } as any);
}
throw new Error(`BUG: Unmocked stream ${opts.streamIndex} in kubectl client!`);
},
};
}
}

// `kubectl patch` doesn't have --raw so we convert the HTTP request into a non-raw `kubectl patch` command
Expand Down

0 comments on commit 9fd40f6

Please sign in to comment.