diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..1dc4a2f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,29 @@ +name: CI + +on: + # Triggers the workflow on push or pull request events but only for the "main" branch + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + GO_VERSION: '1.18.5' + +jobs: + + golang-lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: true + + - name: Set up Golang + uses: actions/setup-go@v2 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Run unit test + run: go test -v ./... \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9eb16b2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ + + +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +bin +testbin/* + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Kubernetes Generated files - skip generated files, except for vendored files + +!vendor/**/zz_generated.* + +# editor and IDE paraphernalia +.idea +*.swp +*.swo +*~ + diff --git a/example/example.go b/example/example.go new file mode 100644 index 0000000..8a4f1a5 --- /dev/null +++ b/example/example.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "github.com/practice/shell_extender/pkg/command" +) + +func main() { + fmt.Println("==============ExecShellCommand=================") + out, i, err := command.ExecShellCommand("kubectl get node") + if err != nil { + fmt.Println(err) + return + } + + fmt.Printf("i: %v, out: %s", i, out) + fmt.Println("===============================") + + fmt.Println("==============ExecShellCommandWithResult=================") + stdout, stderr, code, err := command.ExecShellCommandWithResult("kubectl get node") + fmt.Printf("stdout: %v, stderr: %s, code: %v, err: %v\n", stdout, stderr, code, err) + fmt.Println("===============================") + + fmt.Println("==============ExecShellCommandWithTimeout=================") + stdout, stderr, code, err = command.ExecShellCommandWithTimeout("sleep 10; kubectl get node", 3) + fmt.Printf("stdout: %v, stderr: %s, code: %v, err: %v\n", stdout, stderr, code, err) + fmt.Println("===============================") + + fmt.Println("==============ExecShellCommandWithChan=================") + outputC := make(chan string, 10) + + go func() { + for i := range outputC { + fmt.Println("output line: ", i) + } + }() + + err = command.ExecShellCommandWithChan("kubectl get node", outputC) + if err != nil { + fmt.Println(err) + return + } + fmt.Println("===============================") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5c07265 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/practice/shell_extender + +go 1.18 diff --git a/pkg/command/command.go b/pkg/command/command.go new file mode 100644 index 0000000..ba1c840 --- /dev/null +++ b/pkg/command/command.go @@ -0,0 +1,116 @@ +package command + +import ( + "bufio" + "bytes" + "errors" + "io" + "os/exec" + "time" +) + +var ( + ErrTimeout = errors.New("exec command timeout") +) + +// ExecShellCommand 执行命令 +// 输出:1.命令行结果 2.进程输出code 3.错误 +func ExecShellCommand(cmd string) (string, int, error) { + executor := exec.Command("bash", "-c", cmd) + outByte, err := executor.CombinedOutput() + out := string(outByte) + return out, executor.ProcessState.ExitCode(), err +} + +// ExecShellCommandWithResult 执行命令并输出所有结果 +// 输出:1.命令行结果 2.命令行错误 3.进程输出code 4.错误 +func ExecShellCommandWithResult(cmd string) (string, string, int, error) { + + executor := exec.Command("bash", "-c", cmd) + var ( + stdout, stderr bytes.Buffer + err error + ) + executor.Stdout = &stdout + executor.Stderr = &stderr + err = executor.Start() + if err != nil { + return string(stdout.Bytes()), string(stderr.Bytes()), executor.ProcessState.ExitCode(), err + } + + err = executor.Wait() + return string(stdout.Bytes()), string(stderr.Bytes()), executor.ProcessState.ExitCode(), err +} + +// ExecShellCommandWithTimeout 执行命令并超时时间 +// 输出:1.命令行结果 2.命令行错误 3.进程输出code 4.错误 +func ExecShellCommandWithTimeout(cmd string, timeout int64) (string, string, int, error) { + executor := exec.Command("bash", "-c", cmd) + // executor.Run() 会阻塞,因此开一个goroutine异步执行, + // 当执行结束时,使用chan通知 + notifyC := make(chan struct{}) + var err error + execFunc := func() { + err = executor.Run() + close(notifyC) + } + go execFunc() + + var ( + stdout, stderr bytes.Buffer + ) + executor.Stdout = &stdout + executor.Stderr = &stderr + + if err != nil { + return string(stdout.Bytes()), string(stderr.Bytes()), executor.ProcessState.ExitCode(), err + } + + // 超时执行返回 + t := time.Duration(timeout) * time.Second + select { + case <-notifyC: + return string(stdout.Bytes()), string(stderr.Bytes()), executor.ProcessState.ExitCode(), err + case <-time.After(t): + return string(stdout.Bytes()), string(stderr.Bytes()), executor.ProcessState.ExitCode(), ErrTimeout + } +} + + +// ExecShellCommandWithChan 执行命令并使用管道输出 +// 输入:chan 输出:错误 +func ExecShellCommandWithChan(cmd string, queue chan string) error { + executor := exec.Command("bash", "-c", cmd) + stdout, err := executor.StdoutPipe() + if err != nil { + return err + } + + stderr, err := executor.StderrPipe() + if err != nil { + return err + } + + executor.Start() + + callbackFunc := func(in io.ReadCloser) { + reader := bufio.NewReader(in) + for { + line, _, err := reader.ReadLine() + if err != nil || io.EOF == err { + break + } + + select { + case queue <- string(line): + } + } + } + + go callbackFunc(stdout) + go callbackFunc(stderr) + + executor.Wait() + close(queue) + return nil +} diff --git a/pkg/command/command_test.go b/pkg/command/command_test.go new file mode 100644 index 0000000..331cddd --- /dev/null +++ b/pkg/command/command_test.go @@ -0,0 +1,43 @@ +package command + +import ( + "fmt" + "testing" +) + +func TestExecShellCommand(t *testing.T) { + out, i, err := ExecShellCommand("echo TestExecShellCommand") + if err != nil { + fmt.Println(err) + return + } + + fmt.Printf("i: %v, out: %s", i, out) + +} + +func TestExecShellCommandWithResult(t *testing.T) { + stdout, stderr, code, err := ExecShellCommandWithResult("echo TestExecShellCommandWithResult") + fmt.Printf("stdout: %v, stderr: %s, code: %v, err: %v\n", stdout, stderr, code, err) +} + +func TestExecShellCommandWithTimeout(t *testing.T) { + stdout, stderr, code, err := ExecShellCommandWithTimeout("sleep 15; echo TestExecShellCommandWithTimeout; kubectl get pods", 20) + fmt.Printf("stdout: %v, stderr: %s, code: %v, err: %v\n", stdout, stderr, code, err) +} + +func TestExecShellCommandWithChan(t *testing.T) { + outputC := make(chan string, 10) + + go func() { + for i := range outputC { + fmt.Println("output line: ", i) + } + }() + + err := ExecShellCommandWithChan("echo TestExecShellCommandWithChan ;sleep 1;kubectl get node", outputC) + if err != nil { + fmt.Println(err) + return + } +} \ No newline at end of file diff --git a/pkg/output/out_byte.go b/pkg/output/out_byte.go new file mode 100644 index 0000000..9726a35 --- /dev/null +++ b/pkg/output/out_byte.go @@ -0,0 +1,40 @@ +package output + +import ( + "bufio" + "bytes" + "sync" +) + +type OutputBuffer struct { + buf *bytes.Buffer + lines []string + *sync.Mutex +} + +func NewOutputBuffer() *OutputBuffer { + out := &OutputBuffer{ + buf: &bytes.Buffer{}, + lines: []string{}, + Mutex: &sync.Mutex{}, + } + return out +} + +func (rw *OutputBuffer) Write(p []byte) (n int, err error) { + rw.Lock() + n, err = rw.buf.Write(p) // and bytes.Buffer implements io.Writer + rw.Unlock() + return +} + +func (rw *OutputBuffer) Lines() []string { + rw.Lock() + s := bufio.NewScanner(rw.buf) + for s.Scan() { + rw.lines = append(rw.lines, s.Text()) + } + rw.Unlock() + return rw.lines +} + diff --git a/pkg/output/out_stream.go b/pkg/output/out_stream.go new file mode 100644 index 0000000..cce284c --- /dev/null +++ b/pkg/output/out_stream.go @@ -0,0 +1,98 @@ +package output + +import ( + "bytes" + "errors" +) + +var ( + ErrLineBufferOverflow = errors.New("line buffer overflow") + + ErrAlreadyFinished = errors.New("already finished") + ErrNotFoundCommand = errors.New("command not found") + ErrNotExecutePermission = errors.New("not execute permission") + ErrInvalidArgs = errors.New("Invalid argument to exit") + ErrProcessTimeout = errors.New("throw process timeout") + ErrProcessCancel = errors.New("active cancel process") + + DefaultExitCode = 2 +) + +type OutputStream struct { + streamChan chan string + bufSize int + buf []byte + lastChar int +} + +// NewOutputStream creates a new streaming output on the given channel. +func NewOutputStream(streamChan chan string) *OutputStream { + out := &OutputStream{ + streamChan: streamChan, + bufSize: 16384, + buf: make([]byte, 16384), + lastChar: 0, + } + return out +} + +// Write makes OutputStream implement the io.Writer interface. +func (rw *OutputStream) Write(p []byte) (n int, err error) { + n = len(p) // end of buffer + firstChar := 0 + + for { + newlineOffset := bytes.IndexByte(p[firstChar:], '\n') + if newlineOffset < 0 { + break // no newline in stream, next line incomplete + } + + // End of line offset is start (nextLine) + newline offset. Like bufio.Scanner, + // we allow \r\n but strip the \r too by decrementing the offset for that byte. + lastChar := firstChar + newlineOffset // "line\n" + if newlineOffset > 0 && p[newlineOffset-1] == '\r' { + lastChar -= 1 // "line\r\n" + } + + // Send the line, prepend line buffer if set + var line string + if rw.lastChar > 0 { + line = string(rw.buf[0:rw.lastChar]) + rw.lastChar = 0 // reset buffer + } + line += string(p[firstChar:lastChar]) + rw.streamChan <- line // blocks if chan full + + // Next line offset is the first byte (+1) after the newline (i) + firstChar += newlineOffset + 1 + } + + if firstChar < n { + remain := len(p[firstChar:]) + bufFree := len(rw.buf[rw.lastChar:]) + if remain > bufFree { + var line string + if rw.lastChar > 0 { + line = string(rw.buf[0:rw.lastChar]) + } + line += string(p[firstChar:]) + err = ErrLineBufferOverflow + n = firstChar + return // implicit + } + copy(rw.buf[rw.lastChar:], p[firstChar:]) + rw.lastChar += remain + } + + return // implicit +} + +func (rw *OutputStream) Lines() <-chan string { + return rw.streamChan +} + +func (rw *OutputStream) SetLineBufferSize(n int) { + rw.bufSize = n + rw.buf = make([]byte, rw.bufSize) +} + diff --git a/pkg/output/out_test.go b/pkg/output/out_test.go new file mode 100644 index 0000000..a307575 --- /dev/null +++ b/pkg/output/out_test.go @@ -0,0 +1,38 @@ +package output + +import ( + "fmt" + "os/exec" + "testing" +) + +func TestOut(t *testing.T) { + stdoutChan := make(chan string, 100) + incr := 0 + go func() { + for line := range stdoutChan { + incr++ + fmt.Println(incr, line) + } + }() + + cmd := exec.Command("bash", "-c", "echo 123;sleep 1;echo 456; echo 789") + stdout := NewOutputStream(stdoutChan) + cmd.Stdout = stdout + cmd.Run() + + select { + + } +} + +func TestCheckBuffer(t *testing.T) { + cmd := exec.Command("bash", "-c", "echo 123") + stdout := NewOutputBuffer() + cmd.Stdout = stdout + cmd.Run() + + fmt.Println(stdout.buf.String()) + + +}