diff --git a/transports/via-kubectl-raw.ts b/transports/via-kubectl-raw.ts index 2708f14..40876bd 100644 --- a/transports/via-kubectl-raw.ts +++ b/transports/via-kubectl-raw.ts @@ -1,5 +1,5 @@ import { readableStreamFromReader, TextLineStream } from '../deps.ts'; -import { RestClient, RequestOptions, JSONValue } from '../lib/contract.ts'; +import { RestClient, RequestOptions, JSONValue, KubernetesTunnel } from '../lib/contract.ts'; import { JsonParsingTransformer } from '../lib/stream-transformers.ts'; const isVerbose = Deno.args.includes('--verbose'); @@ -32,6 +32,7 @@ export class KubectlRawRestClient implements RestClient { bodyRaw?: Uint8Array; bodyJson?: JSONValue; bodyStream?: ReadableStream; + bodyPassthru?: boolean; }) { const hasReqBody = opts.bodyJson !== undefined || !!opts.bodyRaw || !!opts.bodyStream; @@ -41,14 +42,13 @@ export class KubectlRawRestClient implements RestClient { '--context', this.contextName, ] : []; - const kubectl = new Deno.Command('kubectl', { + const p = new Deno.Command('kubectl', { args: [...ctxArgs, ...args], - stdin: hasReqBody ? 'piped' : 'null', + stdin: (hasReqBody || opts.bodyPassthru) ? 'piped' : 'null', stdout: 'piped', stderr: 'inherit', signal: opts.abortSignal, - }); - const p = kubectl.spawn(); + }).spawn(); if (hasReqBody) { if (opts.bodyStream) { @@ -82,6 +82,21 @@ export class KubectlRawRestClient implements RestClient { if (opts.abortSignal?.aborted) throw new Error(`Given AbortSignal is already aborted`); + if (opts.expectTunnel) { + if (opts.expectTunnel.includes('v4.channel.k8s.io')) { + // We can implement PodExec with `kubectl exec`, for this specific route: + const match = new URLPattern({ + pathname: '/api/:version/namespaces/:namespace/pods/:podName/exec', + }).exec({ pathname: opts.path }); + if (match) { + const { namespace, podName } = match.pathname.groups; + return await this.emulateExecTunnel(namespace!, podName!, opts.querystring ?? new URLSearchParams(), opts.abortSignal); + } + } + if (opts.expectTunnel) throw new Error( + `That socket-based API (via ${opts.expectTunnel[0]}) is not implemented by this client.`); + } + let path = opts.path || '/'; const query = opts.querystring?.toString() ?? ''; if (query) { @@ -91,9 +106,6 @@ export class KubectlRawRestClient implements RestClient { const hasReqBody = opts.bodyJson !== undefined || !!opts.bodyRaw || !!opts.bodyStream; isVerbose && console.error(opts.method, path, hasReqBody ? '(w/ body)' : ''); - if (opts.expectTunnel) throw new Error( - `Channel-based APIs are not currently implemented by this client.`); - let rawArgs = [command, ...(hasReqBody ? ['-f', '-'] : []), "--raw", path]; if (command === 'patch') { @@ -139,6 +151,68 @@ export class KubectlRawRestClient implements RestClient { } } + private async emulateExecTunnel(namespace: string, podName: string, querystring: URLSearchParams, signal?: AbortSignal): Promise { + const wantsStdin = querystring.get('stdin') == '1'; + const wantsTty = querystring.get('tty') == '1'; + const wantsContainer = querystring.get('container'); + + // upstream feature request: https://github.com/denoland/deno/issues/3994 + if (wantsTty) throw new Error( + `This Kubernetes client (${this.constructor.name} does not support opening TTYs. Try a Kubeconfig-based client if you need TTY.`); + + const [p, status] = await this.runKubectl([ + 'exec', + ...(querystring.get('stdin') == '1' ? ['--stdin'] : []), + ...(querystring.get('tty') == '1' ? ['--tty'] : []), + ...(namespace ? ['-n', namespace] : []), + `--quiet`, + podName, + ...(wantsContainer ? ['-c', wantsContainer] : []), + `--`, // disable non-positional arguments after here, for safety + ...querystring.getAll('command'), + ], { + abortSignal: signal, + bodyPassthru: true, // lets us use the raw streams + }); + + return { + transportProtocol: 'Opaque', + subProtocol: 'v4.channel.k8s.io', + ready: () => Promise.resolve(), // we don't actually know! + stop: () => Promise.resolve(p.kill()), + getChannel: (opts) => { + if (opts.streamIndex == 0 && wantsStdin) { + return Promise.resolve({ writable: p.stdin } as any); + } + if (opts.streamIndex == 1) { + return Promise.resolve({ readable: p.stdout } as any); + } + if (opts.streamIndex == 2) { + // We don't pipe stderr, but we don't block it either + // Just provide a dummy stream for compatibility + const readable = new ReadableStream({ + start(ctlr) { ctlr.close() }, + }); + return Promise.resolve({ readable } as any); + } + if (opts.streamIndex == 3) { + // Invent a JSON stream and give a limited ExecStatus + const readable = new ReadableStream({ + async start(ctlr) { + const stat = await status; + ctlr.enqueue(new TextEncoder().encode(JSON.stringify({ + status: stat.success ? 'Success' : 'Failure', + message: `kubectl exited with ${stat.code}`, + }))); + ctlr.close(); + }, + }); + return Promise.resolve({ readable } as any); + } + throw new Error(`BUG: Unmocked stream ${opts.streamIndex} in kubectl client!`); + }, + }; + } } // `kubectl patch` doesn't have --raw so we convert the HTTP request into a non-raw `kubectl patch` command