diff --git a/pkg/kanx/client.go b/pkg/kanx/client.go index 2de1d851947..80f996aaf89 100644 --- a/pkg/kanx/client.go +++ b/pkg/kanx/client.go @@ -4,6 +4,7 @@ import ( "context" "io" "net" + "net/url" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -16,6 +17,11 @@ func newGRPCConnection(addr string) (*grpc.ClientConn, error) { var opts []grpc.DialOption opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithContextDialer(unixDialer)) + // Add passthrough scheme if there is no scheme defined in the address + u, err := url.Parse(addr) + if err == nil && u.Scheme == "" { + addr = "passthrough:///" + addr + } return grpc.NewClient(addr, opts...) } diff --git a/pkg/kanx/kanx_test.go b/pkg/kanx/kanx_test.go index 97b7289ece4..b4b9b654785 100644 --- a/pkg/kanx/kanx_test.go +++ b/pkg/kanx/kanx_test.go @@ -40,16 +40,18 @@ func newTestServer(dir string) *Server { } } -func serverReady(ctx context.Context, addr string) { +func serverReady(ctx context.Context, addr string, c *C) { ctx, can := context.WithTimeout(ctx, 10*time.Second) defer can() for { select { case <-ctx.Done(): + c.Error("Timeout waiting for server to be ready") + c.Fail() return default: } - _, err := ListProcesses(ctx, addr) + l, err := ListProcesses(ctx, addr) if err == nil { return } @@ -61,7 +63,7 @@ func (s *KanXSuite) TestServerCancellation(c *C) { addr := path.Join(d, "kanx.sock") ctx, can := context.WithCancel(context.Background()) go func() { - serverReady(ctx, addr) + serverReady(ctx, addr, c) can() }() err := newTestServer(d).Serve(ctx, addr) @@ -77,7 +79,7 @@ func (s *KanXSuite) TestShortProcess(c *C) { err := newTestServer(d).Serve(ctx, addr) c.Assert(err, IsNil) }() - serverReady(ctx, addr) + serverReady(ctx, addr, c) p, err := CreateProcess(ctx, addr, "echo", []string{"hello"}) c.Assert(err, IsNil) @@ -115,7 +117,7 @@ func (s *KanXSuite) TestLongProcess(c *C) { err := server.Serve(ctx, addr) c.Assert(err, IsNil) }() - serverReady(ctx, addr) + serverReady(ctx, addr, c) p, err := CreateProcess(ctx, addr, "tail", []string{"-f", "/dev/null"}) c.Assert(err, IsNil) @@ -168,7 +170,7 @@ func (s *KanXSuite) TestError(c *C) { err := server.Serve(ctx, addr) c.Assert(err, IsNil) }() - serverReady(ctx, addr) + serverReady(ctx, addr, c) p, err := CreateProcess(ctx, addr, "tail", []string{"-f", "/dev/null"}) c.Assert(err, IsNil) @@ -225,7 +227,7 @@ func (s *KanXSuite) TestParallelStdout(c *C) { err := server.Serve(ctx, addr) c.Assert(err, IsNil) }() - serverReady(ctx, addr) + serverReady(ctx, addr, c) p, err := CreateProcess(ctx, addr, "yes", nil) c.Assert(err, IsNil)