Skip to content

Commit

Permalink
Merge pull request #152 from alpacahq/fix/start-end-limit-bug
Browse files Browse the repository at this point in the history
start + end + limit bug
  • Loading branch information
rocketbitz authored Nov 27, 2018
2 parents 66b6b77 + ad8dd77 commit ba46794
Show file tree
Hide file tree
Showing 19 changed files with 110 additions and 139 deletions.
2 changes: 1 addition & 1 deletion cmd/connect/session/gap.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Client) findGaps(line string) {
log.Error("Error return from query scanner: %v", err)
return
}
csm, _, err := scanner.Read()
csm, err := scanner.Read()
if err != nil {
log.Error("Error return from query scanner: %v", err)
return
Expand Down
10 changes: 7 additions & 3 deletions cmd/connect/session/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ func (c *Client) show(line string) {
}

timeStart := time.Now()
var csm io.ColumnSeriesMap
var err error

var (
csm io.ColumnSeriesMap
err error
)

if c.mode == local {
csm, err = processShowLocal(tbk, start, end)
if err != nil {
Expand Down Expand Up @@ -94,7 +98,7 @@ func processShowLocal(tbk *io.TimeBucketKey, start, end *time.Time) (csm io.Colu
log.Error("Error return from query scanner: %v", err)
return
}
csm, _, err = scanner.Read()
csm, err = scanner.Read()
if err != nil {
log.Error("Error return from query scanner: %v", err)
return
Expand Down
3 changes: 2 additions & 1 deletion contrib/binancefeeder/binancefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func recast(config map[string]interface{}) *FetcherConfig {
data, _ := json.Marshal(config)
ret := FetcherConfig{}
json.Unmarshal(data, &ret)

return &ret
}

Expand Down Expand Up @@ -207,7 +208,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time {
return time.Time{}
}
reader, err := executor.NewReader(parsed)
csm, _, err := reader.Read()
csm, err := reader.Read()
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
return time.Time{}
Expand Down
2 changes: 1 addition & 1 deletion contrib/bitmexfeeder/bitmexfeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time {
return time.Time{}
}
reader, err := executor.NewReader(parsed)
csm, _, err := reader.Read()
csm, err := reader.Read()
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
return time.Time{}
Expand Down
2 changes: 1 addition & 1 deletion contrib/candler/candlecandler/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *TestSuite) TestCandleCandler(c *C) {
parsed, _ := q.Parse()
scanner, err := executor.NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, _ := scanner.Read()
csm, _ := scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
c.Assert(time.Unix(epoch[0], 0).UTC(), Equals, startDate)
Expand Down
2 changes: 1 addition & 1 deletion contrib/candler/tickcandler/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *TestSuite) TestTickCandler(c *C) {
parsed, _ := q.Parse()
reader, err := executor.NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err := reader.Read()
csm, err := reader.Read()
c.Assert(err == nil, Equals, true)
c.Assert(len(csm), Equals, 1)
for _, cs := range csm {
Expand Down
2 changes: 1 addition & 1 deletion contrib/gdaxfeeder/gdaxfeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time {
return time.Time{}
}
reader, err := executor.NewReader(parsed)
csm, _, err := reader.Read()
csm, err := reader.Read()
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
return time.Time{}
Expand Down
2 changes: 1 addition & 1 deletion contrib/ondiskagg/aggtrigger/aggtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (s *OnDiskAggTrigger) query(
return nil, err
}

csm, _, err := scanner.Read()
csm, err := scanner.Read()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions contrib/ondiskagg/aggtrigger/aggtrigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (t *TestSuite) TestFire(c *C) {
c.Check(err, IsNil)
scanner, err := executor.NewReader(parsed)
c.Check(err, IsNil)
csm5, _, err := scanner.Read()
csm5, err := scanner.Read()
c.Check(err, IsNil)
cs5 := csm5[*tbk5]
c.Check(cs5, NotNil)
Expand All @@ -200,7 +200,7 @@ func (t *TestSuite) TestFire(c *C) {
c.Check(err, IsNil)
scanner, err = executor.NewReader(parsed)
c.Check(err, IsNil)
csm1D, _, err := scanner.Read()
csm1D, err := scanner.Read()
c.Check(err, IsNil)
cs1D := csm1D[*tbk1D]
c.Check(cs1D, NotNil)
Expand Down
2 changes: 1 addition & 1 deletion contrib/polygon/polygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (pf *PolygonFetcher) backfill(symbol string, endEpoch int64) {
return
}

csm, _, err := scanner.Read()
csm, err := scanner.Read()
if err != nil {
fmt.Printf("scanner read error (%v)\n", err)
return
Expand Down
2 changes: 1 addition & 1 deletion contrib/stream/streamtrigger/streamtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *StreamTrigger) Fire(keyPath string, records []trigger.Record) {
return
}

csm, _, err := scanner.Read()
csm, err := scanner.Read()
if err != nil {
log.Error("[streamtrigger] scanner read failure (%v)", err)
return
Expand Down
70 changes: 46 additions & 24 deletions executor/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

var _ = Suite(&TestSuite{nil, "", nil, nil})
var _ = Suite(&DestructiveWALTests{nil, "", nil, nil})
var _ = Suite(&DestructiveWALTest2{nil, "", nil, nil})
var _ = Suite(&CGOTests{})
var (
_ = Suite(&TestSuite{nil, "", nil, nil})
_ = Suite(&DestructiveWALTests{nil, "", nil, nil})
_ = Suite(&DestructiveWALTest2{nil, "", nil, nil})
_ = Suite(&CGOTests{})
)

type TestSuite struct {
DataDirectory *Directory
Expand Down Expand Up @@ -130,7 +132,7 @@ func (s *TestSuite) TestQueryMulti(c *C) {
q.SetRowLimit(LAST, 5)
parsed, _ := q.Parse()
reader, _ := NewReader(parsed)
csm, _, _ := reader.Read()
csm, _ := reader.Read()
c.Assert(len(csm) >= 4, Equals, true)
for _, cs := range csm {
c.Assert(cs.Len() <= 5, Equals, true)
Expand Down Expand Up @@ -183,7 +185,7 @@ func (s *TestSuite) TestWriteVariable(c *C) {
*/
reader, err := NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err := reader.Read()
csm, err := reader.Read()
c.Assert(err == nil, Equals, true)
c.Assert(len(csm), Equals, 1)
for _, cs := range csm {
Expand Down Expand Up @@ -215,7 +217,7 @@ func (s *TestSuite) TestWriteVariable(c *C) {
s.WALFile.flushToWAL(tgc)
s.WALFile.createCheckpoint()

csm, _, err = reader.Read()
csm, err = reader.Read()
c.Assert(err == nil, Equals, true)
c.Assert(len(csm), Equals, 1)
for _, cs := range csm {
Expand Down Expand Up @@ -266,7 +268,7 @@ func (s *TestSuite) TestFileRead(c *C) {
}
}
c.Assert(minYear, Equals, int16(2001))
csm, _, _ := scanner.Read()
csm, _ := scanner.Read()
/*
for _, cs := range csm {
epoch := cs.GetEpoch()
Expand Down Expand Up @@ -310,7 +312,7 @@ func (s *TestSuite) TestSortedFiles(c *C) {
c.Assert(sortedFiles[0].File.Year, Equals, int16(2000))
c.Assert(sortedFiles[1].File.Year, Equals, int16(2001))
c.Assert(sortedFiles[2].File.Year, Equals, int16(2002))
csm, _, err := scanner.Read()
csm, err := scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
c.Assert(len(epoch), Equals, nitems)
Expand All @@ -330,7 +332,7 @@ func (s *TestSuite) TestSortedFiles(c *C) {
}
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err = scanner.Read()
csm, err = scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()

Expand All @@ -355,7 +357,7 @@ func (s *TestSuite) TestSortedFiles(c *C) {
}
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err = scanner.Read()
csm, err = scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
c.Assert(len(epoch) == 200, Equals, true)
Expand All @@ -373,7 +375,7 @@ func (s *TestSuite) TestSortedFiles(c *C) {
parsed, err = q.Parse()
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err = scanner.Read()
csm, err = scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
//printoutCandles(cs, -1, -1)
Expand All @@ -393,7 +395,7 @@ func (s *TestSuite) TestCrossYear(c *C) {
parsed, _ := q.Parse()
scanner, err := NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, _ := scanner.Read()
csm, _ := scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
//printoutCandles(cs, -1, 1)
Expand All @@ -418,7 +420,7 @@ func (s *TestSuite) TestLastN(c *C) {
parsed, _ := q.Parse()
scanner, err := NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, _ := scanner.Read()
csm, _ := scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
// printoutCandles(OHLCSlice, 0, -1)
Expand Down Expand Up @@ -458,10 +460,30 @@ func (s *TestSuite) TestLastN(c *C) {
parsed, _ = q.Parse()
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, _ = scanner.Read()
csm, _ = scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
//printoutCandles(cs, 0, -1)
c.Log(epoch)
c.Assert(len(epoch), Equals, 1)
}

// Query data with an end date of 12/31 asking for last 10 rows
q = NewQuery(s.DataDirectory)
q.AddRestriction("Symbol", "NZDUSD")
q.AddRestriction("AttributeGroup", "OHLC")
q.AddRestriction("Timeframe", "1Min")
q.SetRange(
time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC).Unix(),
time.Date(1999, time.December, 23, 59, 0, 0, 0, time.UTC).Unix(),
)
q.SetRowLimit(LAST, 10)
parsed, _ = q.Parse()
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _ = scanner.Read()
for _, cs := range csm {
epoch := cs.GetEpoch()
c.Log(epoch)
c.Assert(len(epoch), Equals, 0)
}

Expand All @@ -478,13 +500,14 @@ func (s *TestSuite) TestLastN(c *C) {
parsed, _ = q.Parse()
scanner, err = NewReader(parsed)
c.Assert(err, IsNil)
csm, _, err = scanner.Read()
csm, err = scanner.Read()
c.Assert(err, IsNil)
c.Assert(csm.IsEmpty(), Equals, false)
for _, cs := range csm {
epoch := cs.GetEpoch()
c.Log(epoch)
//printoutCandles(cs, 0, -1)
c.Assert(len(epoch), Equals, 1)
c.Assert(len(epoch), Equals, 2)
}
}

Expand Down Expand Up @@ -522,10 +545,9 @@ func (s *TestSuite) TestAddSymbolThenWrite(c *C) {
pr, _ = q.Parse()
rd, err := NewReader(pr)
c.Assert(err == nil, Equals, true)
columnSeries, tprevMap, err := rd.Read()
columnSeries, err := rd.Read()
c.Assert(err == nil, Equals, true)
c.Assert(len(columnSeries) != 0, Equals, true)
c.Assert(len(tprevMap) != 0, Equals, true)
for _, cs := range columnSeries {
open := cs.GetByName("Open").([]float32)
high := cs.GetByName("High").([]float32)
Expand Down Expand Up @@ -883,7 +905,7 @@ func forwardBackwardScan(numRecs int, d *Directory, c *C) {
parsed, _ := q.Parse()
scanner, err := NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err := scanner.Read()
csm, err := scanner.Read()
for key, cs := range csm {
c.Assert(err == nil, Equals, true)
RefColumnSet[key] = cs
Expand All @@ -905,7 +927,7 @@ func forwardBackwardScan(numRecs int, d *Directory, c *C) {
parsed, _ = q.Parse()
scanner, err = NewReader(parsed)
c.Assert(err == nil, Equals, true)
csm, _, err = scanner.Read()
csm, err = scanner.Read()
for key, cs := range csm {
c.Assert(err == nil, Equals, true)
epoch := cs.GetEpoch()
Expand Down Expand Up @@ -962,9 +984,9 @@ func addTGData(root *Directory, tgc *TransactionPipe, number int, mixup bool) (q
return nil, err
}

csmSym, tPrev, err := scanner.Read()
csmSym, err := scanner.Read()
if err != nil {
fmt.Printf("scanner.Read failed: tPrev: %v Err: %s", tPrev, err)
fmt.Printf("scanner.Read failed: Err: %s", err)
return nil, err
}

Expand Down
Loading

0 comments on commit ba46794

Please sign in to comment.