Skip to content

Commit

Permalink
[esutil.Client] Allow for passing routing values to Get, Delete, and …
Browse files Browse the repository at this point in the history
…MultiGet (#85)
  • Loading branch information
alexashley committed May 28, 2021
1 parent 52d9c8e commit 0bb1825
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
33 changes: 26 additions & 7 deletions go/v1beta1/storage/esutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"net/http"
"net/url"
)

//go:generate counterfeiter -generate
Expand Down Expand Up @@ -67,11 +68,13 @@ type MultiSearchRequest struct {
type GetRequest struct {
Index string
DocumentId string
Routing string
}

type MultiGetRequest struct {
Index string
DocumentIds []string
Items []*EsMultiGetItem
}

type SearchRequest struct {
Expand Down Expand Up @@ -102,6 +105,7 @@ type DeleteRequest struct {
Index string
Search *EsSearch
Refresh string // TODO: use RefreshOption type
Routing string
}

const defaultPitKeepAlive = "5m"
Expand Down Expand Up @@ -414,11 +418,18 @@ func (c *client) MultiSearch(ctx context.Context, request *MultiSearchRequest) (

func (c *client) Get(ctx context.Context, request *GetRequest) (*EsGetResponse, error) {
log := c.logger.Named("Get").With(zap.String("index", request.Index), zap.String("documentId", request.DocumentId))
getOpts := []func(*esapi.GetRequest){
c.esClient.Get.WithContext(ctx),
}

if request.Routing != "" {
getOpts = append(getOpts, c.esClient.Get.WithRouting(url.QueryEscape(request.Routing)))
}

res, err := c.esClient.Get(
request.Index,
url.QueryEscape(request.DocumentId),
c.esClient.Get.WithContext(ctx),
getOpts...,
)
if err != nil {
return nil, err
Expand All @@ -439,9 +450,9 @@ func (c *client) Get(ctx context.Context, request *GetRequest) (*EsGetResponse,

func (c *client) MultiGet(ctx context.Context, request *MultiGetRequest) (*EsMultiGetResponse, error) {
log := c.logger.Named("MultiGet")

encodedBody, requestJson := EncodeRequest(&EsMultiGetRequest{
IDs: request.DocumentIds,
IDs: request.DocumentIds,
Docs: request.Items,
})
log = log.With(zap.String("request", requestJson))

Expand Down Expand Up @@ -511,11 +522,19 @@ func (c *client) Delete(ctx context.Context, request *DeleteRequest) error {
request.Refresh = "true"
}

deleteOpts := []func(queryRequest *esapi.DeleteByQueryRequest){
c.esClient.DeleteByQuery.WithContext(ctx),
c.esClient.DeleteByQuery.WithRefresh(withRefreshBool(request.Refresh)),
}

if request.Routing != "" {
deleteOpts = append(deleteOpts, c.esClient.DeleteByQuery.WithRouting(request.Routing))
}

res, err := c.esClient.DeleteByQuery(
[]string{request.Index},
encodedBody,
c.esClient.DeleteByQuery.WithContext(ctx),
c.esClient.DeleteByQuery.WithRefresh(withRefreshBool(request.Refresh)),
deleteOpts...,
)
if err != nil {
return err
Expand Down
56 changes: 54 additions & 2 deletions go/v1beta1/storage/esutil/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
protov1 "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/grafeas/grafeas/proto/v1beta1/common_go_proto"
pb "github.com/grafeas/grafeas/proto/v1beta1/grafeas_go_proto"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/filtering"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

var _ = Describe("elasticsearch client", func() {
Expand Down Expand Up @@ -794,6 +794,7 @@ var _ = Describe("elasticsearch client", func() {
It("should send the get request to ES", func() {
Expect(transport.ReceivedHttpRequests[0].Method).To(Equal(http.MethodGet))
Expect(transport.ReceivedHttpRequests[0].URL.Path).To(Equal(fmt.Sprintf("/%s/_doc/%s", expectedIndex, expectedDocumentId)))
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("routing")).To(BeEmpty())
})

It("should return the response and no error", func() {
Expand Down Expand Up @@ -850,6 +851,19 @@ var _ = Describe("elasticsearch client", func() {
Expect(transport.ReceivedHttpRequests[0].URL.RawPath).To(ContainSubstring(url.QueryEscape(expectedDocumentId)))
})
})

When("the document routing is specified", func() {
var expectedRouting string

BeforeEach(func() {
expectedRouting = fake.UUID()
expectedGetRequest.Routing = expectedRouting
})

It("should include the routing value", func() {
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("routing")).To(Equal(expectedRouting))
})
})
})

Context("MultiGet", func() {
Expand Down Expand Up @@ -908,6 +922,30 @@ var _ = Describe("elasticsearch client", func() {
Expect(actualMultiGetResponse).To(BeEquivalentTo(expectedMultiGetResponse))
})

When("multiget items are specified", func() {
var expectedItems []*EsMultiGetItem

BeforeEach(func() {
expectedItems = []*EsMultiGetItem{}
for i := 0; i < len(expectedDocumentIds); i++ {
expectedItems = append(expectedItems, &EsMultiGetItem{
Id: expectedDocumentIds[i],
Routing: fake.LetterN(10),
})
}
expectedMultiGetRequest.DocumentIds = nil
expectedMultiGetRequest.Items = expectedItems
})

It("should send the items instead of document ids", func() {
requestBody := &EsMultiGetRequest{}
ReadRequestBody(transport.ReceivedHttpRequests[0], &requestBody)

Expect(requestBody.IDs).To(BeNil())
Expect(requestBody.Docs).To(ConsistOf(expectedItems))
})
})

When("the multiget operation fails", func() {
BeforeEach(func() {
transport.PreparedHttpResponses = []*http.Response{
Expand Down Expand Up @@ -1048,6 +1086,7 @@ var _ = Describe("elasticsearch client", func() {

It("should delete the document in ES", func() {
Expect(transport.ReceivedHttpRequests[0].URL.Path).To(Equal(fmt.Sprintf("/%s/_delete_by_query", expectedDeleteRequest.Index)))
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("routing")).To(BeEmpty())

searchRequest := &EsSearch{}
ReadRequestBody(transport.ReceivedHttpRequests[0], &searchRequest)
Expand Down Expand Up @@ -1096,6 +1135,19 @@ var _ = Describe("elasticsearch client", func() {
Expect(actualErr).To(HaveOccurred())
})
})

When("the document routing is specified", func() {
var expectedRouting string

BeforeEach(func() {
expectedRouting = fake.UUID()
expectedDeleteRequest.Routing = expectedRouting
})

It("should include the routing value", func() {
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("routing")).To(Equal(expectedRouting))
})
})
})
})

Expand All @@ -1109,7 +1161,7 @@ func createRandomOccurrence() *pb.Occurrence {
Kind: common_go_proto.NoteKind_NOTE_KIND_UNSPECIFIED,
Remediation: fake.LetterN(10),
Details: nil,
CreateTime: ptypes.TimestampNow(),
CreateTime: timestamppb.Now(),
}
}

Expand Down
9 changes: 8 additions & 1 deletion go/v1beta1/storage/esutil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package esutil

import (
"encoding/json"

jsonpatch "github.com/evanphx/json-patch"
"github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/filtering"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -158,8 +159,14 @@ type ESPitResponse struct {
Id string `json:"id"`
}

type EsMultiGetItem struct {
Id string `json:"_id"`
Routing string `json:"routing,omitempty"`
}

type EsMultiGetRequest struct {
IDs []string `json:"ids"`
IDs []string `json:"ids,omitempty"`
Docs []*EsMultiGetItem `json:"docs,omitempty"`
}

type EsGetResponse struct {
Expand Down

0 comments on commit 0bb1825

Please sign in to comment.