Skip to content

Commit

Permalink
Add pagination (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrparkers committed Apr 8, 2021
1 parent 8342ca9 commit 4a0e69e
Show file tree
Hide file tree
Showing 10 changed files with 715 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ currently implemented features, along with the features that have not been imple
- [x] `.startsWith` function (ex: `"resource.uri".startsWith("gcr.io")`)
- [x] `.contains` function (ex: `"resource.uri".contains("alpine")`)
- [ ] `.endsWith` function
- [ ] Pagination
- [x] Pagination
- [ ] Elasticsearch config
- [x] URL
- [x] Index refresh behavior
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
github.com/Jeffail/gabs/v2 v2.6.0
github.com/brianvoe/gofakeit/v6 v6.0.0
github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20201104130636-152864b47d96
github.com/elastic/go-elasticsearch/v7 v7.10.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2
github.com/google/cel-go v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dchest/safefile v0.0.0-20151022103144-855e8d98f185/go.mod h1:cFRxtTwTOJkz2x3rQUNCYKWC93yP1VKjR8NUhqFxZNU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20201104130636-152864b47d96 h1:lYCctiMAT/uTRLTwPJnGoBeGKdgbmbKIN++ekoUrjuE=
github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20201104130636-152864b47d96/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-elasticsearch/v7 v7.10.0 h1:vYRwqgFM46ZUHFMRdvKr+y1WA4ehJO6WqAGV9Btbl2o=
github.com/elastic/go-elasticsearch/v7 v7.10.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
100 changes: 84 additions & 16 deletions go/v1beta1/storage/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/migration"

"github.com/elastic/go-elasticsearch/v7"
Expand All @@ -43,6 +44,7 @@ import (

const grafeasMaxPageSize = 1000
const sortField = "createTime"
const pitKeepAlive = "1m"

type ElasticsearchStorage struct {
client *elasticsearch.Client
Expand Down Expand Up @@ -162,7 +164,7 @@ func (es *ElasticsearchStorage) ListProjects(ctx context.Context, filter string,
var projects []*prpb.Project
log := es.logger.Named("ListProjects")

res, err := es.genericList(ctx, log, es.indexManager.ProjectsAlias(), filter, false)
res, nextPageToken, err := es.genericList(ctx, log, es.indexManager.ProjectsAlias(), filter, false, pageToken, int32(pageSize))
if err != nil {
return nil, "", err
}
Expand All @@ -182,7 +184,7 @@ func (es *ElasticsearchStorage) ListProjects(ctx context.Context, filter string,
projects = append(projects, project)
}

return projects, "", nil
return projects, nextPageToken, nil
}

// DeleteProject deletes the project with the given projectId from Elasticsearch
Expand Down Expand Up @@ -251,7 +253,7 @@ func (es *ElasticsearchStorage) ListOccurrences(ctx context.Context, projectId,
projectName := fmt.Sprintf("projects/%s", projectId)
log := es.logger.Named("ListOccurrences").With(zap.String("project", projectName))

res, err := es.genericList(ctx, log, es.indexManager.OccurrencesAlias(projectId), filter, true)
res, nextPageToken, err := es.genericList(ctx, log, es.indexManager.OccurrencesAlias(projectId), filter, true, pageToken, pageSize)
if err != nil {
return nil, "", err
}
Expand All @@ -272,7 +274,7 @@ func (es *ElasticsearchStorage) ListOccurrences(ctx context.Context, projectId,
occurrences = append(occurrences, occurrence)
}

return occurrences, "", nil
return occurrences, nextPageToken, nil
}

// CreateOccurrence adds the specified occurrence to Elasticsearch
Expand Down Expand Up @@ -486,7 +488,7 @@ func (es *ElasticsearchStorage) ListNotes(ctx context.Context, projectId, filter
projectName := fmt.Sprintf("projects/%s", projectId)
log := es.logger.Named("ListNotes").With(zap.String("project", projectName))

res, err := es.genericList(ctx, log, es.indexManager.NotesAlias(projectId), filter, true)
res, nextPageToken, err := es.genericList(ctx, log, es.indexManager.NotesAlias(projectId), filter, true, pageToken, pageSize)
if err != nil {
return nil, "", err
}
Expand All @@ -507,7 +509,7 @@ func (es *ElasticsearchStorage) ListNotes(ctx context.Context, projectId, filter
notes = append(notes, note)
}

return notes, "", nil
return notes, nextPageToken, nil
}

// CreateNote adds the specified note
Expand Down Expand Up @@ -866,13 +868,13 @@ func (es *ElasticsearchStorage) genericDelete(ctx context.Context, log *zap.Logg
return nil
}

func (es *ElasticsearchStorage) genericList(ctx context.Context, log *zap.Logger, index, filter string, sort bool) (*esutil.EsSearchResponseHits, error) {
func (es *ElasticsearchStorage) genericList(ctx context.Context, log *zap.Logger, index, filter string, sort bool, pageToken string, pageSize int32) (*esutil.EsSearchResponseHits, string, error) {
body := &esutil.EsSearch{}
if filter != "" {
log = log.With(zap.String("filter", filter))
filterQuery, err := es.filterer.ParseExpression(filter)
if err != nil {
return nil, createError(log, "error while parsing filter expression", err)
return nil, "", createError(log, "error while parsing filter expression", err)
}

body.Query = filterQuery
Expand All @@ -884,29 +886,95 @@ func (es *ElasticsearchStorage) genericList(ctx context.Context, log *zap.Logger
}
}

searchOptions := []func(*esapi.SearchRequest){
es.client.Search.WithContext(ctx),
}

var nextPageToken string
if pageToken != "" || pageSize != 0 { // handle pagination
next, extraSearchOptions, err := es.handlePagination(ctx, log, body, index, pageToken, pageSize)
if err != nil {
return nil, "", createError(log, "error while handling pagination", err)
}

nextPageToken = next
searchOptions = append(searchOptions, extraSearchOptions...)
} else {
searchOptions = append(searchOptions,
es.client.Search.WithIndex(index),
es.client.Search.WithSize(grafeasMaxPageSize),
)
}

encodedBody, requestJson := esutil.EncodeRequest(body)
log = log.With(zap.String("request", requestJson))
log.Debug("performing search")

res, err := es.client.Search(
es.client.Search.WithContext(ctx),
es.client.Search.WithIndex(index),
es.client.Search.WithBody(encodedBody),
es.client.Search.WithSize(grafeasMaxPageSize),
append(searchOptions, es.client.Search.WithBody(encodedBody))...,
)
if err != nil {
return nil, createError(log, "error sending request to elasticsearch", err)
return nil, "", createError(log, "error sending request to elasticsearch", err)
}
if res.IsError() {
return nil, createError(log, "unexpected response from elasticsearch", nil, zap.String("response", res.String()), zap.Int("status", res.StatusCode))
return nil, "", createError(log, "unexpected response from elasticsearch", nil, zap.String("response", res.String()), zap.Int("status", res.StatusCode))
}

var searchResults esutil.EsSearchResponse
if err := esutil.DecodeResponse(res.Body, &searchResults); err != nil {
return nil, createError(log, "error decoding elasticsearch response", err)
return nil, "", createError(log, "error decoding elasticsearch response", err)
}

return searchResults.Hits, nextPageToken, nil
}

func (es *ElasticsearchStorage) handlePagination(ctx context.Context, log *zap.Logger, body *esutil.EsSearch, index, pageToken string, pageSize int32) (string, []func(*esapi.SearchRequest), error) {
log = log.With(zap.String("pageToken", pageToken), zap.Int32("pageSize", pageSize))

var (
pit string
from int
err error
)

// if no pageToken is specified, we need to create a new PIT
if pageToken == "" {
res, err := es.client.OpenPointInTime(
es.client.OpenPointInTime.WithContext(ctx),
es.client.OpenPointInTime.WithIndex(index),
es.client.OpenPointInTime.WithKeepAlive(pitKeepAlive),
)
if err != nil {
return "", nil, createError(log, "error sending request to elasticsearch", err)
}
if res.IsError() {
return "", nil, createError(log, "unexpected response from elasticsearch", nil, zap.String("response", res.String()), zap.Int("status", res.StatusCode))
}

var pitResponse esutil.ESPitResponse
if err = esutil.DecodeResponse(res.Body, &pitResponse); err != nil {
return "", nil, createError(log, "error decoding elasticsearch response", err)
}

pit = pitResponse.Id
from = 0
} else {
// get the PIT from the provided pageToken
pit, from, err = esutil.ParsePageToken(pageToken)
if err != nil {
return "", nil, createError(log, "error parsing page token", err)
}
}

body.Pit = &esutil.EsSearchPit{
Id: pit,
KeepAlive: pitKeepAlive,
}

return searchResults.Hits, nil
return esutil.CreatePageToken(pit, from+int(pageSize)), []func(*esapi.SearchRequest){
es.client.Search.WithSize(int(pageSize)),
es.client.Search.WithFrom(from),
}, err
}

// createError is a helper function that allows you to easily log an error and return a gRPC formatted error.
Expand Down
Loading

0 comments on commit 4a0e69e

Please sign in to comment.