Skip to content

Commit

Permalink
implement BatchCreateNotes (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrparkers committed Dec 22, 2020
1 parent a9910d2 commit 0681f8b
Show file tree
Hide file tree
Showing 6 changed files with 614 additions and 10 deletions.
142 changes: 140 additions & 2 deletions go/v1beta1/storage/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,146 @@ func (es *ElasticsearchStorage) CreateNote(ctx context.Context, projectId, noteI
}

// BatchCreateNotes batch creates the specified notes in memstore.
func (es *ElasticsearchStorage) BatchCreateNotes(ctx context.Context, pID, uID string, notes map[string]*pb.Note) ([]*pb.Note, []error) {
return nil, nil
func (es *ElasticsearchStorage) BatchCreateNotes(ctx context.Context, projectId, uID string, notesWithNoteIds map[string]*pb.Note) ([]*pb.Note, []error) {
log := es.logger.Named("BatchCreateNotes").With(zap.String("projectId", projectId))
log.Debug("creating notes")

searchMetadata, _ := json.Marshal(&esMultiSearchQueryFragment{
Index: notesIndex(projectId),
})
searchMetadata = append(searchMetadata, "\n"...)

var (
notes []*pb.Note
searchRequestBody bytes.Buffer
)
for noteId, note := range notesWithNoteIds {
note.Name = fmt.Sprintf("projects/%s/notes/%s", projectId, noteId)
notes = append(notes, note)

searchBody := &esSearch{
Query: &filtering.Query{
Term: &filtering.Term{
"name": note.Name,
},
},
}
data, _ := json.Marshal(searchBody)
dataBytes := append(data, "\n"...)

searchRequestBody.Grow(len(searchMetadata) + len(dataBytes))
searchRequestBody.Write(searchMetadata)
searchRequestBody.Write(dataBytes)
}

log.Debug("attempting ES multisearch", zap.String("payload", string(searchRequestBody.Bytes())))

res, err := es.client.Msearch(
bytes.NewReader(searchRequestBody.Bytes()),
es.client.Msearch.WithContext(ctx),
)

if err != nil {
return nil, []error{
createError(log, "failed while sending request to ES", err),
}
}
if res.IsError() {
return nil, []error{
createError(log, "unexpected response from ES", nil, zap.Any("response", res.String()), zap.Int("status", res.StatusCode)),
}
}

searchResponse := &esMultiSearchResponse{}
err = decodeResponse(res.Body, searchResponse)
if err != nil {
return nil, []error{
createError(log, "error decoding ES response", nil),
}
}

var (
notesToCreate []*pb.Note
errs []error
)
for i, res := range searchResponse.Responses {
if res.Hits.Total.Value != 0 {
errs = append(errs, status.Errorf(codes.AlreadyExists, "note with the name %s already exists", notes[i].Name))
} else {
notesToCreate = append(notesToCreate, notes[i])
}
}

if len(notesToCreate) == 0 {
log.Error("all notes already exist")
return nil, errs
}

indexMetadata, _ := json.Marshal(&esBulkQueryFragment{
Index: &esBulkQueryIndexFragment{
Index: notesIndex(projectId),
},
})
indexMetadata = append(indexMetadata, "\n"...)

// build the request body using newline delimited JSON (ndjson)
// each note is represented by two JSON structures:
// the first is the metadata that represents the ES operation, in this case "index"
// the second is the source payload to index
// in total, this body will consist of (len(notes) * 2) JSON structures, separated by newlines, with a trailing newline at the end
var indexBody bytes.Buffer
for _, note := range notesToCreate {
data, _ := protojson.Marshal(proto.MessageV2(note))

dataBytes := append(data, "\n"...)
indexBody.Grow(len(indexMetadata) + len(dataBytes))
indexBody.Write(indexMetadata)
indexBody.Write(dataBytes)
}

log.Debug("attempting ES bulk index", zap.String("payload", string(indexBody.Bytes())))

res, err = es.client.Bulk(
bytes.NewReader(indexBody.Bytes()),
es.client.Bulk.WithContext(ctx),
es.client.Bulk.WithRefresh(es.config.Refresh.String()),
)
if err != nil {
return nil, append(errs, createError(log, "failed while sending request to ES", err))
}
if res.IsError() {
return nil, append(errs, createError(log, "unexpected response from ES", nil, zap.Any("response", res.String()), zap.Int("status", res.StatusCode)))
}

bulkResponse := &esBulkResponse{}
err = decodeResponse(res.Body, bulkResponse)
if err != nil {
return nil, append(errs, createError(log, "error decoding ES response", nil))
}

// each indexing operation in this bulk request has its own status
// we need to iterate over each of the items in the response to know whether or not that particular note was created successfully
var createdNotes []*pb.Note
for i, note := range notesToCreate {
indexItem := bulkResponse.Items[i].Index
if indexDocError := indexItem.Error; indexDocError != nil {
errs = append(errs, createError(log, "error creating note in ES", fmt.Errorf("[%d] %s: %s", indexItem.Status, indexDocError.Type, indexDocError.Reason), zap.Any("note", note)))
continue
}

createdNotes = append(createdNotes, note)
log.Debug(fmt.Sprintf("note %s created", note.Name))
}

if len(errs) > 0 {
log.Info("errors while creating notes", zap.Any("errors", errs))

return createdNotes, errs
}

log.Debug("notes created successfully")

return createdNotes, nil
}

// UpdateNote updates the existing note with the given pID and nID
Expand Down
Loading

0 comments on commit 0681f8b

Please sign in to comment.