From ad8dd7787f7323daf82c6b1f1853a80ad15a9dd1 Mon Sep 17 00:00:00 2001 From: Christopher Ryan Date: Tue, 27 Nov 2018 12:20:40 -0800 Subject: [PATCH] start + end + limit bug --- cmd/connect/session/gap.go | 2 +- cmd/connect/session/show.go | 10 +- contrib/binancefeeder/binancefeeder.go | 3 +- contrib/bitmexfeeder/bitmexfeeder.go | 2 +- contrib/candler/candlecandler/all_test.go | 2 +- contrib/candler/tickcandler/all_test.go | 2 +- contrib/gdaxfeeder/gdaxfeeder.go | 2 +- contrib/ondiskagg/aggtrigger/aggtrigger.go | 2 +- .../ondiskagg/aggtrigger/aggtrigger_test.go | 4 +- contrib/polygon/polygon.go | 2 +- contrib/stream/streamtrigger/streamtrigger.go | 2 +- executor/all_test.go | 70 +++++++----- executor/scanner.go | 100 +++++------------- frontend/query.go | 20 ++-- frontend/query_test.go | 1 + planner/planner.go | 3 +- sqlparser/selectrelation.go | 2 +- utils/io/columnseries.go | 4 +- utils/io/rowseries.go | 16 ++- 19 files changed, 110 insertions(+), 139 deletions(-) diff --git a/cmd/connect/session/gap.go b/cmd/connect/session/gap.go index 4a6290adf..65d025baf 100644 --- a/cmd/connect/session/gap.go +++ b/cmd/connect/session/gap.go @@ -36,7 +36,7 @@ func (c *Client) findGaps(line string) { log.Error("Error return from query scanner: %v", err) return } - csm, _, err := scanner.Read() + csm, err := scanner.Read() if err != nil { log.Error("Error return from query scanner: %v", err) return diff --git a/cmd/connect/session/show.go b/cmd/connect/session/show.go index e8de8705d..c604f7fc7 100644 --- a/cmd/connect/session/show.go +++ b/cmd/connect/session/show.go @@ -28,8 +28,12 @@ func (c *Client) show(line string) { } timeStart := time.Now() - var csm io.ColumnSeriesMap - var err error + + var ( + csm io.ColumnSeriesMap + err error + ) + if c.mode == local { csm, err = processShowLocal(tbk, start, end) if err != nil { @@ -94,7 +98,7 @@ func processShowLocal(tbk *io.TimeBucketKey, start, end *time.Time) (csm io.Colu log.Error("Error return from query scanner: %v", err) return } - csm, _, err = scanner.Read() + csm, err = scanner.Read() if err != nil { log.Error("Error return from query scanner: %v", err) return diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index 8c972a177..62a19c89d 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -96,6 +96,7 @@ func recast(config map[string]interface{}) *FetcherConfig { data, _ := json.Marshal(config) ret := FetcherConfig{} json.Unmarshal(data, &ret) + return &ret } @@ -207,7 +208,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { return time.Time{} } reader, err := executor.NewReader(parsed) - csm, _, err := reader.Read() + csm, err := reader.Read() cs := csm[*tbk] if cs == nil || cs.Len() == 0 { return time.Time{} diff --git a/contrib/bitmexfeeder/bitmexfeeder.go b/contrib/bitmexfeeder/bitmexfeeder.go index 22b2acfea..d9b43bfe1 100644 --- a/contrib/bitmexfeeder/bitmexfeeder.go +++ b/contrib/bitmexfeeder/bitmexfeeder.go @@ -96,7 +96,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { return time.Time{} } reader, err := executor.NewReader(parsed) - csm, _, err := reader.Read() + csm, err := reader.Read() cs := csm[*tbk] if cs == nil || cs.Len() == 0 { return time.Time{} diff --git a/contrib/candler/candlecandler/all_test.go b/contrib/candler/candlecandler/all_test.go index 64bfa70ec..6fad23318 100644 --- a/contrib/candler/candlecandler/all_test.go +++ b/contrib/candler/candlecandler/all_test.go @@ -68,7 +68,7 @@ func (s *TestSuite) TestCandleCandler(c *C) { parsed, _ := q.Parse() scanner, err := executor.NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, _ := scanner.Read() + csm, _ := scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() c.Assert(time.Unix(epoch[0], 0).UTC(), Equals, startDate) diff --git a/contrib/candler/tickcandler/all_test.go b/contrib/candler/tickcandler/all_test.go index a9c18e313..1d35879b6 100644 --- a/contrib/candler/tickcandler/all_test.go +++ b/contrib/candler/tickcandler/all_test.go @@ -77,7 +77,7 @@ func (s *TestSuite) TestTickCandler(c *C) { parsed, _ := q.Parse() reader, err := executor.NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err := reader.Read() + csm, err := reader.Read() c.Assert(err == nil, Equals, true) c.Assert(len(csm), Equals, 1) for _, cs := range csm { diff --git a/contrib/gdaxfeeder/gdaxfeeder.go b/contrib/gdaxfeeder/gdaxfeeder.go index 2457a8749..af7dce9cb 100644 --- a/contrib/gdaxfeeder/gdaxfeeder.go +++ b/contrib/gdaxfeeder/gdaxfeeder.go @@ -100,7 +100,7 @@ func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { return time.Time{} } reader, err := executor.NewReader(parsed) - csm, _, err := reader.Read() + csm, err := reader.Read() cs := csm[*tbk] if cs == nil || cs.Len() == 0 { return time.Time{} diff --git a/contrib/ondiskagg/aggtrigger/aggtrigger.go b/contrib/ondiskagg/aggtrigger/aggtrigger.go index 79001c80c..6944fd7f6 100644 --- a/contrib/ondiskagg/aggtrigger/aggtrigger.go +++ b/contrib/ondiskagg/aggtrigger/aggtrigger.go @@ -344,7 +344,7 @@ func (s *OnDiskAggTrigger) query( return nil, err } - csm, _, err := scanner.Read() + csm, err := scanner.Read() if err != nil { return nil, err } diff --git a/contrib/ondiskagg/aggtrigger/aggtrigger_test.go b/contrib/ondiskagg/aggtrigger/aggtrigger_test.go index 1134362f3..eb48be3a5 100644 --- a/contrib/ondiskagg/aggtrigger/aggtrigger_test.go +++ b/contrib/ondiskagg/aggtrigger/aggtrigger_test.go @@ -185,7 +185,7 @@ func (t *TestSuite) TestFire(c *C) { c.Check(err, IsNil) scanner, err := executor.NewReader(parsed) c.Check(err, IsNil) - csm5, _, err := scanner.Read() + csm5, err := scanner.Read() c.Check(err, IsNil) cs5 := csm5[*tbk5] c.Check(cs5, NotNil) @@ -200,7 +200,7 @@ func (t *TestSuite) TestFire(c *C) { c.Check(err, IsNil) scanner, err = executor.NewReader(parsed) c.Check(err, IsNil) - csm1D, _, err := scanner.Read() + csm1D, err := scanner.Read() c.Check(err, IsNil) cs1D := csm1D[*tbk1D] c.Check(cs1D, NotNil) diff --git a/contrib/polygon/polygon.go b/contrib/polygon/polygon.go index ac6fd60b7..924ded7a0 100644 --- a/contrib/polygon/polygon.go +++ b/contrib/polygon/polygon.go @@ -174,7 +174,7 @@ func (pf *PolygonFetcher) backfill(symbol string, endEpoch int64) { return } - csm, _, err := scanner.Read() + csm, err := scanner.Read() if err != nil { fmt.Printf("scanner read error (%v)\n", err) return diff --git a/contrib/stream/streamtrigger/streamtrigger.go b/contrib/stream/streamtrigger/streamtrigger.go index 1e6327824..a109a66c0 100644 --- a/contrib/stream/streamtrigger/streamtrigger.go +++ b/contrib/stream/streamtrigger/streamtrigger.go @@ -102,7 +102,7 @@ func (s *StreamTrigger) Fire(keyPath string, records []trigger.Record) { return } - csm, _, err := scanner.Read() + csm, err := scanner.Read() if err != nil { log.Error("[streamtrigger] scanner read failure (%v)", err) return diff --git a/executor/all_test.go b/executor/all_test.go index 48236ca02..68a1281e1 100644 --- a/executor/all_test.go +++ b/executor/all_test.go @@ -24,10 +24,12 @@ import ( // Hook up gocheck into the "go test" runner. func Test(t *testing.T) { TestingT(t) } -var _ = Suite(&TestSuite{nil, "", nil, nil}) -var _ = Suite(&DestructiveWALTests{nil, "", nil, nil}) -var _ = Suite(&DestructiveWALTest2{nil, "", nil, nil}) -var _ = Suite(&CGOTests{}) +var ( + _ = Suite(&TestSuite{nil, "", nil, nil}) + _ = Suite(&DestructiveWALTests{nil, "", nil, nil}) + _ = Suite(&DestructiveWALTest2{nil, "", nil, nil}) + _ = Suite(&CGOTests{}) +) type TestSuite struct { DataDirectory *Directory @@ -130,7 +132,7 @@ func (s *TestSuite) TestQueryMulti(c *C) { q.SetRowLimit(LAST, 5) parsed, _ := q.Parse() reader, _ := NewReader(parsed) - csm, _, _ := reader.Read() + csm, _ := reader.Read() c.Assert(len(csm) >= 4, Equals, true) for _, cs := range csm { c.Assert(cs.Len() <= 5, Equals, true) @@ -183,7 +185,7 @@ func (s *TestSuite) TestWriteVariable(c *C) { */ reader, err := NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err := reader.Read() + csm, err := reader.Read() c.Assert(err == nil, Equals, true) c.Assert(len(csm), Equals, 1) for _, cs := range csm { @@ -215,7 +217,7 @@ func (s *TestSuite) TestWriteVariable(c *C) { s.WALFile.flushToWAL(tgc) s.WALFile.createCheckpoint() - csm, _, err = reader.Read() + csm, err = reader.Read() c.Assert(err == nil, Equals, true) c.Assert(len(csm), Equals, 1) for _, cs := range csm { @@ -266,7 +268,7 @@ func (s *TestSuite) TestFileRead(c *C) { } } c.Assert(minYear, Equals, int16(2001)) - csm, _, _ := scanner.Read() + csm, _ := scanner.Read() /* for _, cs := range csm { epoch := cs.GetEpoch() @@ -310,7 +312,7 @@ func (s *TestSuite) TestSortedFiles(c *C) { c.Assert(sortedFiles[0].File.Year, Equals, int16(2000)) c.Assert(sortedFiles[1].File.Year, Equals, int16(2001)) c.Assert(sortedFiles[2].File.Year, Equals, int16(2002)) - csm, _, err := scanner.Read() + csm, err := scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() c.Assert(len(epoch), Equals, nitems) @@ -330,7 +332,7 @@ func (s *TestSuite) TestSortedFiles(c *C) { } scanner, err = NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err = scanner.Read() + csm, err = scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() @@ -355,7 +357,7 @@ func (s *TestSuite) TestSortedFiles(c *C) { } scanner, err = NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err = scanner.Read() + csm, err = scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() c.Assert(len(epoch) == 200, Equals, true) @@ -373,7 +375,7 @@ func (s *TestSuite) TestSortedFiles(c *C) { parsed, err = q.Parse() scanner, err = NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err = scanner.Read() + csm, err = scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() //printoutCandles(cs, -1, -1) @@ -393,7 +395,7 @@ func (s *TestSuite) TestCrossYear(c *C) { parsed, _ := q.Parse() scanner, err := NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, _ := scanner.Read() + csm, _ := scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() //printoutCandles(cs, -1, 1) @@ -418,7 +420,7 @@ func (s *TestSuite) TestLastN(c *C) { parsed, _ := q.Parse() scanner, err := NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, _ := scanner.Read() + csm, _ := scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() // printoutCandles(OHLCSlice, 0, -1) @@ -458,10 +460,30 @@ func (s *TestSuite) TestLastN(c *C) { parsed, _ = q.Parse() scanner, err = NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, _ = scanner.Read() + csm, _ = scanner.Read() for _, cs := range csm { epoch := cs.GetEpoch() - //printoutCandles(cs, 0, -1) + c.Log(epoch) + c.Assert(len(epoch), Equals, 1) + } + + // Query data with an end date of 12/31 asking for last 10 rows + q = NewQuery(s.DataDirectory) + q.AddRestriction("Symbol", "NZDUSD") + q.AddRestriction("AttributeGroup", "OHLC") + q.AddRestriction("Timeframe", "1Min") + q.SetRange( + time.Date(1999, time.January, 1, 0, 0, 0, 0, time.UTC).Unix(), + time.Date(1999, time.December, 23, 59, 0, 0, 0, time.UTC).Unix(), + ) + q.SetRowLimit(LAST, 10) + parsed, _ = q.Parse() + scanner, err = NewReader(parsed) + c.Assert(err == nil, Equals, true) + csm, _ = scanner.Read() + for _, cs := range csm { + epoch := cs.GetEpoch() + c.Log(epoch) c.Assert(len(epoch), Equals, 0) } @@ -478,13 +500,14 @@ func (s *TestSuite) TestLastN(c *C) { parsed, _ = q.Parse() scanner, err = NewReader(parsed) c.Assert(err, IsNil) - csm, _, err = scanner.Read() + csm, err = scanner.Read() c.Assert(err, IsNil) c.Assert(csm.IsEmpty(), Equals, false) for _, cs := range csm { epoch := cs.GetEpoch() + c.Log(epoch) //printoutCandles(cs, 0, -1) - c.Assert(len(epoch), Equals, 1) + c.Assert(len(epoch), Equals, 2) } } @@ -522,10 +545,9 @@ func (s *TestSuite) TestAddSymbolThenWrite(c *C) { pr, _ = q.Parse() rd, err := NewReader(pr) c.Assert(err == nil, Equals, true) - columnSeries, tprevMap, err := rd.Read() + columnSeries, err := rd.Read() c.Assert(err == nil, Equals, true) c.Assert(len(columnSeries) != 0, Equals, true) - c.Assert(len(tprevMap) != 0, Equals, true) for _, cs := range columnSeries { open := cs.GetByName("Open").([]float32) high := cs.GetByName("High").([]float32) @@ -883,7 +905,7 @@ func forwardBackwardScan(numRecs int, d *Directory, c *C) { parsed, _ := q.Parse() scanner, err := NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err := scanner.Read() + csm, err := scanner.Read() for key, cs := range csm { c.Assert(err == nil, Equals, true) RefColumnSet[key] = cs @@ -905,7 +927,7 @@ func forwardBackwardScan(numRecs int, d *Directory, c *C) { parsed, _ = q.Parse() scanner, err = NewReader(parsed) c.Assert(err == nil, Equals, true) - csm, _, err = scanner.Read() + csm, err = scanner.Read() for key, cs := range csm { c.Assert(err == nil, Equals, true) epoch := cs.GetEpoch() @@ -962,9 +984,9 @@ func addTGData(root *Directory, tgc *TransactionPipe, number int, mixup bool) (q return nil, err } - csmSym, tPrev, err := scanner.Read() + csmSym, err := scanner.Read() if err != nil { - fmt.Printf("scanner.Read failed: tPrev: %v Err: %s", tPrev, err) + fmt.Printf("scanner.Read failed: Err: %s", err) return nil, err } diff --git a/executor/scanner.go b/executor/scanner.go index a3a25d796..6c990f870 100644 --- a/executor/scanner.go +++ b/executor/scanner.go @@ -40,7 +40,6 @@ func (iofp *ioFilePlan) GetFileYear() int16 { type ioplan struct { FilePlan []*ioFilePlan - PrevFilePlan []*ioFilePlan RecordLen int32 RecordType EnumRecordType VariableRecordLen int @@ -49,10 +48,10 @@ type ioplan struct { } func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error) { - iop = new(ioplan) - iop.FilePlan = make([]*ioFilePlan, 0) - iop.PrevFilePlan = make([]*ioFilePlan, 0) - iop.Limit = pr.Limit + iop = &ioplan{ + FilePlan: make([]*ioFilePlan, 0), + Limit: pr.Limit, + } /* At this point we have a date unconstrained group of sorted files We will do two things here: @@ -102,13 +101,17 @@ func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err err */ // Set the starting and ending indices based on the range if file.File.Year == pr.Range.StartYear { + // log.Info("range start: %v", pr.Range.Start) startOffset = EpochToOffset( pr.Range.Start, file.File.GetTimeframe(), file.File.GetRecordLength(), ) + // log.Info("start offset: %v", startOffset) } if file.File.Year == pr.Range.EndYear { + // log.Info("range end: %v", pr.Range.End) + endOffset = EpochToOffset( pr.Range.End, file.File.GetTimeframe(), @@ -155,11 +158,9 @@ func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err err } } } - // Reverse the prevPath filelist order - for i := len(prevPaths) - 1; i >= 0; i-- { - iop.PrevFilePlan = append(iop.PrevFilePlan, prevPaths[i]) - } + iop.TimeQuals = pr.TimeQuals + return iop, nil } @@ -204,9 +205,8 @@ func NewReader(pr *planner.ParseResult) (r *reader, err error) { return r, nil } -func (r *reader) Read() (csm ColumnSeriesMap, tPrevMap map[TimeBucketKey]int64, err error) { +func (r *reader) Read() (csm ColumnSeriesMap, err error) { csm = NewColumnSeriesMap() - tPrevMap = make(map[TimeBucketKey]int64) catMap := r.pr.GetCandleAttributes() rtMap := r.pr.GetRowType() dsMap := r.pr.GetDataShapes() @@ -215,16 +215,15 @@ func (r *reader) Read() (csm ColumnSeriesMap, tPrevMap map[TimeBucketKey]int64, cat := catMap[key] rt := rtMap[key] rlen := rlMap[key] - buffer, tPrev, err := r.read(iop) + buffer, err := r.read(iop) if err != nil { - return nil, nil, err + return nil, err } - tPrevMap[key] = tPrev - rs := NewRowSeries(key, tPrev, buffer, dsMap[key], rlen, cat, rt) + rs := NewRowSeries(key, buffer, dsMap[key], rlen, cat, rt) key, cs := rs.ToColumnSeries() csm[key] = cs } - return csm, tPrevMap, err + return csm, err } /* @@ -239,8 +238,7 @@ type bufferMeta struct { // Reads the data from files, removing holes. The resulting buffer will be packed // Uses the index that prepends each row to identify filled rows versus holes -func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) { - const GatherTprev = true +func (r *reader) read(iop *ioplan) (resultBuffer []byte, err error) { // Number of bytes to buffer, some multiple of record length // This should be at least bigger than 4096 and be better multiple of 4KB, // which is the common io size on most of the storage/filesystem. @@ -256,7 +254,7 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) } else { limitBytes = math.MaxInt32 if direction == LAST { - return nil, 0, fmt.Errorf("Reverse scan only supported with a limited result set") + return nil, fmt.Errorf("Reverse scan only supported with a limited result set") } } @@ -265,8 +263,6 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) /* if direction == FIRST Read Forward to fill final buffer - Read Backward to get previous record (for Tprev overlap) - Strip Tprev from previous record if direction == LAST Read Backward to fill final buffer Strip Tprev from first record @@ -307,41 +303,7 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) break } } - if GatherTprev { - // Set the default tPrev to the base time of the oldest file in the PrevPlan minus one minute - prevCount := len(iop.PrevFilePlan) - if prevCount > 0 { - tPrev = time.Unix(iop.PrevFilePlan[prevCount-1].BaseTime, 0).Add(-time.Duration(time.Minute)).UTC().Unix() - } - // Scan backward until we find the first previous time - // Scan the file at the beginning of the date range unless the range started at the file begin - finished = false - for _, fp := range iop.PrevFilePlan { - var tPrevBuff []byte - tPrevBuff, finished, bytesRead, err := ex.readBackward( - tPrevBuff, - fp, - iop.RecordLen, - iop.RecordLen, - readBuffer, - r.fileBuffer) - if finished { - if bytesRead != 0 { - // We found a record, let's grab the tPrev time from it - tPrev = int64(binary.LittleEndian.Uint64(tPrevBuff[0:])) - } - break - } else if err != nil { - // We did not finish the scan and have an error, return the error - return nil, 0, err - } - } - } } else if direction == LAST { - if GatherTprev { - // Add one more record to the results in order to obtain the previous time - limitBytes += iop.RecordLen - } // This is safe because we know limitBytes is a sane value for reverse scans bytesLeftToFill := limitBytes fp := iop.FilePlan @@ -376,7 +338,7 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) break } else if err != nil { // We did not finish the scan and have an error, return the error - return nil, 0, err + return nil, err } } @@ -394,22 +356,6 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) bufMeta[(lenOF-1)-i] = bufMeta[i] } } - - if GatherTprev { - if len(resultBuffer) > 0 { - tPrev = int64(binary.LittleEndian.Uint64(resultBuffer[0:])) - // Chop off the first record - resultBuffer = resultBuffer[iop.RecordLen:] - if iop.RecordType == VARIABLE { - /* - Chop the first record off of the buffer map as well - */ - bufMeta[0].Data = bufMeta[0].Data[iop.RecordLen:] - } - } else { - tPrev = 0 - } - } } /* @@ -418,11 +364,11 @@ func (r *reader) read(iop *ioplan) (resultBuffer []byte, tPrev int64, err error) if iop.RecordType == VARIABLE { resultBuffer, err = r.readSecondStage(bufMeta) if err != nil { - return nil, 0, err + return nil, err } } - return resultBuffer, tPrev, err + return resultBuffer, err } type ioExec struct { @@ -497,6 +443,7 @@ func (ex *ioExec) packingReader(packedBuffer *[]byte, f io.ReadSeeker, buffer [] func (ex *ioExec) readForward(finalBuffer []byte, fp *ioFilePlan, recordLen, bytesToRead int32, readBuffer []byte) ( resultBuffer []byte, finished bool, err error) { + // log.Info("reading forward [recordLen: %v bytesToRead: %v]", recordLen, bytesToRead) filePath := fp.FullPath if finalBuffer == nil { @@ -523,8 +470,7 @@ func (ex *ioExec) readForward(finalBuffer []byte, fp *ioFilePlan, recordLen, byt // fmt.Printf("Length of final buffer: %d\n",len(finalBuffer)) if int32(len(finalBuffer)) >= bytesToRead { // fmt.Printf("Clipping final buffer: %d\n",limitBytes) - finalBuffer = finalBuffer[:bytesToRead] - return finalBuffer, true, nil + return finalBuffer[:bytesToRead], true, nil } return finalBuffer, false, nil } @@ -533,6 +479,8 @@ func (ex *ioExec) readBackward(finalBuffer []byte, fp *ioFilePlan, recordLen, bytesToRead int32, readBuffer []byte, fileBuffer []byte) ( result []byte, finished bool, bytesRead int32, err error) { + // log.Info("reading backward [recordLen: %v bytesToRead: %v offset: %v]", recordLen, bytesToRead, fp.Offset) + filePath := fp.FullPath beginPos := fp.Offset diff --git a/frontend/query.go b/frontend/query.go index 07fc57e05..f16905b12 100644 --- a/frontend/query.go +++ b/frontend/query.go @@ -157,7 +157,7 @@ func (s *DataService) Query(r *http.Request, reqs *MultiQueryRequest, response * start := io.ToSystemTimezone(time.Unix(epochStart, 0)) stop := io.ToSystemTimezone(time.Unix(epochEnd, 0)) - csm, tpm, err := executeQuery( + csm, err := executeQuery( dest, start, stop, limitRecordCount, limitFromStart, @@ -201,10 +201,7 @@ func (s *DataService) Query(r *http.Request, reqs *MultiQueryRequest, response * /* Append the NumpyMultiDataset to the MultiResponse */ - tpmStr := make(map[string]int64) - for key, val := range tpm { - tpmStr[key.String()] = val - } + response.Responses = append(response.Responses, QueryResponse{ nmds, @@ -236,13 +233,14 @@ Utility functions */ func executeQuery(tbk *io.TimeBucketKey, start, end time.Time, LimitRecordCount int, - LimitFromStart bool) (io.ColumnSeriesMap, map[io.TimeBucketKey]int64, error) { + LimitFromStart bool) (io.ColumnSeriesMap, error) { query := planner.NewQuery(executor.ThisInstance.CatalogDir) /* Alter timeframe inside key to ensure it matches a queryable TF */ + tf := tbk.GetItemInCategory("Timeframe") cd := utils.CandleDurationFromString(tf) queryableTimeframe := cd.QueryableTimeframe() @@ -273,19 +271,19 @@ func executeQuery(tbk *io.TimeBucketKey, start, end time.Time, LimitRecordCount } else { log.Error("Parsing query: %s\n", err) } - return nil, nil, err + return nil, err } scanner, err := executor.NewReader(parseResult) if err != nil { log.Error("Unable to create scanner: %s\n", err) - return nil, nil, err + return nil, err } - csm, tPrevMap, err := scanner.Read() + csm, err := scanner.Read() if err != nil { log.Error("Error returned from query scanner: %s\n", err) - return nil, nil, err + return nil, err } - return csm, tPrevMap, err + return csm, err } func runAggFunctions(callChain []string, csInput *io.ColumnSeries) (cs *io.ColumnSeries, err error) { diff --git a/frontend/query_test.go b/frontend/query_test.go index 200ec61b4..f00e43c39 100644 --- a/frontend/query_test.go +++ b/frontend/query_test.go @@ -156,6 +156,7 @@ func (s *ServerTestSuite) TestQueryRange(c *C) { } cs, _ := response.Responses[0].Result.ToColumnSeries() index := cs.GetEpoch() + c.Logf("EPOCH: %v", index) c.Assert(time.Unix(index[0], 0), Equals, time.Unix(*args.Requests[0].EpochStart, 0)) } diff --git a/planner/planner.go b/planner/planner.go index 36fa30a80..2adb8ea1b 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -1,7 +1,6 @@ package planner import ( - "errors" "fmt" "math" "time" @@ -236,7 +235,7 @@ func (q *query) Parse() (pr *ParseResult, err error) { CatList := q.DataDir.GatherCategoriesFromCache() for key := range q.Restriction.GetRestrictionMap() { if _, ok := CatList[key]; !ok { - return nil, errors.New(fmt.Sprintf("Category: %s not in catalog\n", key)) + return nil, fmt.Errorf("category: %s not in catalog", key) } } diff --git a/sqlparser/selectrelation.go b/sqlparser/selectrelation.go index 5b64b4c07..2e703be70 100644 --- a/sqlparser/selectrelation.go +++ b/sqlparser/selectrelation.go @@ -172,7 +172,7 @@ func (sr *SelectRelation) Materialize() (outputColumnSeries *io.ColumnSeries, er if err != nil { return nil, err } - csm, _, err := scanner.Read() + csm, err := scanner.Read() if err != nil { return nil, err } diff --git a/utils/io/columnseries.go b/utils/io/columnseries.go index 1756b74e2..f45378bd3 100644 --- a/utils/io/columnseries.go +++ b/utils/io/columnseries.go @@ -230,7 +230,7 @@ func (cs *ColumnSeries) GetEpoch() []int64 { func (cs *ColumnSeries) ToRowSeries(itemKey TimeBucketKey) (rs *RowSeries) { dsv := cs.GetDataShapes() data, recordLen := SerializeColumnsToRows(cs, dsv, true) - rs = NewRowSeries(itemKey, 0, data, dsv, recordLen, cs.GetCandleAttributes(), NOTYPE) + rs = NewRowSeries(itemKey, data, dsv, recordLen, cs.GetCandleAttributes(), NOTYPE) return rs } @@ -419,7 +419,7 @@ func (csm ColumnSeriesMap) ToRowSeriesMap(dataShapesMap map[TimeBucketKey][]Data for key, columns := range csm { dataShapes := dataShapesMap[key] data, recordLen := SerializeColumnsToRows(columns, dataShapes, true) - rsMap[key] = NewRowSeries(key, 0, data, dataShapes, recordLen, columns.GetCandleAttributes(), NOTYPE) + rsMap[key] = NewRowSeries(key, data, dataShapes, recordLen, columns.GetCandleAttributes(), NOTYPE) } return rsMap } diff --git a/utils/io/rowseries.go b/utils/io/rowseries.go index 6c2994c00..ff02b9958 100644 --- a/utils/io/rowseries.go +++ b/utils/io/rowseries.go @@ -16,7 +16,6 @@ type RowsInterface interface { type RowSeriesInterface interface { GetMetadataKey() string // The filesystem metadata key for this data - GetTPrev() time.Time // The first timestamp of data just prior to the first row } type Rows struct { @@ -144,11 +143,15 @@ type RowSeries struct { ColumnInterface rows *Rows metadataKey TimeBucketKey - tPrev time.Time } -func NewRowSeries(key TimeBucketKey, tPrev int64, data []byte, dataShape []DataShape, rowLen int, cat *CandleAttributes, - rowType EnumRecordType) *RowSeries { +func NewRowSeries( + key TimeBucketKey, + data []byte, + dataShape []DataShape, + rowLen int, cat *CandleAttributes, + rowType EnumRecordType, +) *RowSeries { /* We have to add a column named _nanoseconds_ to the datashapes for a variable record type This is true because the read() function for variable types inserts a 32-bit nanoseconds column @@ -156,13 +159,11 @@ func NewRowSeries(key TimeBucketKey, tPrev int64, data []byte, dataShape []DataS if rowType == VARIABLE { dataShape = append(dataShape, DataShape{"Nanoseconds", INT32}) } - timePrev := time.Unix(tPrev, 0).UTC() rows := NewRows(dataShape, data) rows.SetCandleAttributes(cat) rows.SetRowLen(rowLen) return &RowSeries{ metadataKey: key, - tPrev: timePrev, rows: rows, } } @@ -170,9 +171,6 @@ func NewRowSeries(key TimeBucketKey, tPrev int64, data []byte, dataShape []DataS func (rs *RowSeries) GetMetadataKey() TimeBucketKey { return rs.metadataKey } -func (rs *RowSeries) GetTPrev() time.Time { - return rs.tPrev -} func (rs *RowSeries) SetCandleAttributes(ca *CandleAttributes) { rs.rows.SetCandleAttributes(ca)