Skip to content

Commit

Permalink
[esutil.Client] Add routing paramter to Bulk and Update (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexashley committed Jun 10, 2021
1 parent 7aae484 commit b22f2e1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 3 deletions.
21 changes: 18 additions & 3 deletions go/v1beta1/storage/esutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type BulkRequestItem struct {
DocumentId string
Join *EsJoin
Operation EsBulkOperation
Routing string
}

type MultiSearchRequest struct {
Expand Down Expand Up @@ -99,6 +100,7 @@ type UpdateRequest struct {
DocumentId string
Refresh string // TODO: use RefreshOption type
Message proto.Message
Routing string
}

type DeleteRequest struct {
Expand Down Expand Up @@ -226,6 +228,10 @@ func (c *client) Bulk(ctx context.Context, request *BulkRequest) (*EsBulkRespons
err error
)
if item.Join != nil {
if item.Routing != "" {
return nil, errors.New("cannot specify a routing key when using a join")
}

// marshal the protobuf message with the custom join patch.
// see the godoc for EsDocWithJoin for more details
data, err = json.Marshal(&EsDocWithJoin{
Expand All @@ -238,6 +244,7 @@ func (c *client) Bulk(ctx context.Context, request *BulkRequest) (*EsBulkRespons

operationFragment.Routing = item.Join.Parent
} else {
operationFragment.Routing = item.Routing
data, err = protojson.MarshalOptions{EmitUnpopulated: true}.Marshal(item.Message)
if err != nil {
return nil, err
Expand Down Expand Up @@ -489,12 +496,20 @@ func (c *client) Update(ctx context.Context, request *UpdateRequest) (*EsIndexDo
request.Refresh = "true"
}

res, err := c.esClient.Index(
request.Index,
bytes.NewReader(str),
indexOpts := []func(*esapi.IndexRequest){
c.esClient.Index.WithDocumentID(request.DocumentId),
c.esClient.Index.WithContext(ctx),
c.esClient.Index.WithRefresh(request.Refresh),
}

if request.Routing != "" {
indexOpts = append(indexOpts, c.esClient.Index.WithRouting(request.Routing))
}

res, err := c.esClient.Index(
request.Index,
bytes.NewReader(str),
indexOpts...,
)
if err != nil {
return nil, err
Expand Down
55 changes: 55 additions & 0 deletions go/v1beta1/storage/esutil/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,48 @@ var _ = Describe("elasticsearch client", func() {
})
})

When("one of the bulk items specifies a routing", func() {
var (
expectedRouting string
randomItemIndex int
)

BeforeEach(func() {
expectedRouting = fake.UUID()
randomItemIndex = fake.Number(0, len(expectedBulkItems)-1)
expectedBulkItems[randomItemIndex].Routing = expectedRouting
})

It("should set the routing value for that item", func() {
var expectedPayloads []interface{}

for i := 0; i < len(expectedOccurrences); i++ {
expectedPayloads = append(expectedPayloads, &EsBulkQueryFragment{}, &pb.Occurrence{})
}

parseNDJSONRequestBodyWithProtobufs(transport.ReceivedHttpRequests[0].Body, expectedPayloads)

metadataIndex := randomItemIndex * 2
metadata := expectedPayloads[metadataIndex].(*EsBulkQueryFragment)

Expect(metadata.Index.Routing).To(Equal(expectedRouting))
})
})

When("one of the item specifies a join and a routing value", func() {
BeforeEach(func() {
randomItemIndex := fake.Number(0, len(expectedBulkItems)-1)
expectedBulkItems[randomItemIndex].Routing = fake.UUID()
expectedBulkItems[randomItemIndex].Join = &EsJoin{
Parent: fake.UUID(),
}
})

It("should return an error", func() {
Expect(actualErr).To(HaveOccurred())
})
})

When("the bulk operation fails", func() {
BeforeEach(func() {
transport.PreparedHttpResponses[0] = &http.Response{
Expand Down Expand Up @@ -1050,6 +1092,19 @@ var _ = Describe("elasticsearch client", func() {
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("refresh")).To(Equal("false"))
})
})

When("a routing key is specified", func() {
var expectedRouting string

BeforeEach(func() {
expectedRouting = fake.UUID()
expectedUpdateRequest.Routing = expectedRouting
})

It("should include the routing value", func() {
Expect(transport.ReceivedHttpRequests[0].URL.Query().Get("routing")).To(Equal(expectedRouting))
})
})
})

Context("Delete", func() {
Expand Down

0 comments on commit b22f2e1

Please sign in to comment.