Skip to content

Commit

Permalink
fix(alpacabkfeeder): data schema change & bug fixes (#581)
Browse files Browse the repository at this point in the history
* fix(alpacabkfeeder): data schema change & bug fixes

* fix(alpacabkfeeder): add an example config
  • Loading branch information
dakimura authored Apr 3, 2022
1 parent 19a890f commit e9b0670
Show file tree
Hide file tree
Showing 22 changed files with 355 additions and 162 deletions.
2 changes: 1 addition & 1 deletion catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (d *Directory) GetDataShapes(key *io.TimeBucketKey) (dsv []io.DataShape, er
return fi.GetDataShapes(), nil
}

func (d *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) { //d should be a subdirectory
func (d *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) { // d should be a subdirectory
// Must be thread-safe for WRITE access
/*
Adds a new primary storage file for the provided year to this directory
Expand Down
2 changes: 1 addition & 1 deletion contrib/alpaca/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func flatten(lists ...[]string) []string {
func prefixStrings(list []string, prefix string) []string {
res := make([]string, len(list))
for i, s := range list {
res[i] = string(prefix) + s
res[i] = prefix + s
}
return res
}
Expand Down
23 changes: 20 additions & 3 deletions contrib/alpacabkfeeder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ bgworkers:
- NASDAQ
# - NYSEARCA
# - OTC
# time when the list of target symbols (tradable stocks) are updated every day.
# (optional) time when the list of target symbols (tradable stocks) are updated every day.
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_SYMBOLS_UPDATE_TIME" environmental variable.
symbols_update_time: "13:00:00" # (UTC). = every day at 08:00:00 (EST)
#symbols_update_time: "13:00:00" # (UTC). = every day at 08:00:00 (EST)
# time when the historical data back-fill is run.
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_UPDATE_TIME" environmental variable.
update_time: "13:30:00" # (UTC). = every day at 08:30:00 (EST)
# Alpava Broker API Feeder writes data to "{symbol}/{timeframe}/TICK" TimeBucketKey
# (optional) When stocks_json_url is specified, Alpaca Broker feeder retrieves data for only the stocks
# written in the json file.
# The example json file structure:
# {
# "data": {
# "AAPL": {...},
# "ACN": {...},
# "ADBE": {...}
# }
# }
# In this case, Alpaca broker feeder gets "AAPL", "ACN", and "ADBE"'s data only.
# Any object under each symbol name is ignored.
stocks_json_url: "https://example.com/tradable_stocks.json"
# (optional) If the stocks_json file is authorized by basic auth,
# please specify "{user}:{pass}" (e.g. "john:mypassword").
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_STOCKS_JSON_BASIC_AUTH"
stocks_json_basic_auth: "user:pass"
# Alpaca Broker API Feeder writes data to "{symbol}/{timeframe}/TICK" TimeBucketKey
timeframe: "1Sec"
# API Key ID and Secret for Alpaca Broker API
# This config can be manually overridden by "ALPACA_BROKER_FEEDER_API_KEY_ID" and "ALPACA_BROKER_FEEDER_API_SECRET_KEY"
Expand Down
14 changes: 11 additions & 3 deletions contrib/alpacabkfeeder/alpacav2.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,18 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {

ctx := context.Background()
// init symbols Manager to update symbols in the target exchanges
sm := symbols.NewJSONFileManager(&http.Client{Timeout: getJSONFileTimeout},
config.StocksJSONURL, config.StocksJSONBasicAuth,
)
var sm symbols.Manager
sm = symbols.NewManager(apiCli, config.Exchanges)
if config.StocksJSONURL != "" {
// use a remote JSON file instead of the config.Exchanges to list up the symbols
sm = symbols.NewJSONFileManager(&http.Client{Timeout: getJSONFileTimeout},
config.StocksJSONURL, config.StocksJSONBasicAuth,
)
}
sm.UpdateSymbols()
if config.SymbolsUpdateTime.IsZero() {
config.SymbolsUpdateTime = config.UpdateTime
}
timer.RunEveryDayAt(ctx, config.SymbolsUpdateTime, sm.UpdateSymbols)
log.Info("updated symbols using a remote json file.")

Expand Down
137 changes: 109 additions & 28 deletions contrib/alpacabkfeeder/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -149,44 +149,125 @@ func (c *Client) GetSnapshots(symbols []string) (map[string]*Snapshot, error) {
return snapshots, nil
}

// ListBars returns a list of bar lists corresponding to the provided
// symbol list, and filtered by the provided parameters.
func (c *Client) ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) {
vals := url.Values{}
vals.Add("symbols", strings.Join(symbols, ","))

if opts.Timeframe == "" {
return nil, fmt.Errorf("timeframe is required for the bars endpoint")
// GetMultiBars returns bars for the given symbols.
func (c *Client) GetMultiBars(
symbols []string, params GetBarsParams,
) (map[string][]Bar, error) {
bars := make(map[string][]Bar, len(symbols))
for item := range c.GetMultiBarsAsync(symbols, params) {
if err := item.Error; err != nil {
return nil, err
}
bars[item.Symbol] = append(bars[item.Symbol], item.Bar)
}
return bars, nil
}

if opts.StartDt != nil {
vals.Set("start", opts.StartDt.Format(time.RFC3339))
}
// GetMultiBarsAsync returns a channel that will be populated with the bars for the requested symbols.
func (c *Client) GetMultiBarsAsync(symbols []string, params GetBarsParams) <-chan MultiBarItem {
ch := make(chan MultiBarItem)

if opts.EndDt != nil {
vals.Set("end", opts.EndDt.Format(time.RFC3339))
}
go func() {
defer close(ch)

if opts.Limit != nil {
vals.Set("limit", strconv.FormatInt(int64(*opts.Limit), 10))
}
u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/bars", dataURL))
if err != nil {
ch <- MultiBarItem{Error: err}
return
}

u, err := url.Parse(fmt.Sprintf("%s/v1/bars/%s?%v", dataURL, opts.Timeframe, vals.Encode()))
if err != nil {
return nil, err
q := u.Query()
q.Set("symbols", strings.Join(symbols, ","))
setQueryBarParams(q, params, "")

received := 0
for params.TotalLimit == 0 || received < params.TotalLimit {
setQueryLimit(q, params.TotalLimit, params.PageLimit, received)
u.RawQuery = q.Encode()

resp, err := c.get(u)
if err != nil {
ch <- MultiBarItem{Error: err}
return
}

var barResp multiBarResponse
if err = unmarshal(resp, &barResp); err != nil {
ch <- MultiBarItem{Error: err}
return
}

sortedSymbols := make([]string, 0, len(barResp.Bars))
for symbol := range barResp.Bars {
sortedSymbols = append(sortedSymbols, symbol)
}
sort.Strings(sortedSymbols)

for _, symbol := range sortedSymbols {
bars := barResp.Bars[symbol]
for _, bar := range bars {
ch <- MultiBarItem{Symbol: symbol, Bar: bar}
}
received += len(bars)
}
if barResp.NextPageToken == nil {
return
}
q.Set("page_token", *barResp.NextPageToken)
}
}()

return ch
}

func setQueryBarParams(q url.Values, params GetBarsParams, feed string) {
setBaseQuery(q, params.Start, params.End, params.Feed, feed)
adjustment := Raw
if params.Adjustment != "" {
adjustment = params.Adjustment
}
q.Set("adjustment", string(adjustment))
timeframe := OneDay
if params.TimeFrame.N != 0 {
timeframe = params.TimeFrame
}
q.Set("timeframe", timeframe.String())
}

resp, err := c.get(u)
if err != nil {
return nil, err
func setBaseQuery(q url.Values, start, end time.Time, feed, defaultFeed string) {
if !start.IsZero() {
q.Set("start", start.Format(time.RFC3339))
}
if !end.IsZero() {
q.Set("end", end.Format(time.RFC3339))
}
if feed != "" {
q.Set("feed", feed)
} else {
if defaultFeed != "" {
q.Set("feed", feed)
}
}
var bars map[string][]v1.Bar
}

if err = unmarshal(resp, &bars); err != nil {
return nil, err
func setQueryLimit(q url.Values, totalLimit int, pageLimit int, received int) {
limit := 0 // use server side default if unset
if pageLimit != 0 {
limit = pageLimit
}
if totalLimit != 0 {
remaining := totalLimit - received
if remaining <= 0 { // this should never happen
return
}
if (limit == 0 || limit > remaining) && remaining <= v2MaxLimit {
limit = remaining
}
}

return bars, nil
if limit != 0 {
q.Set("limit", fmt.Sprintf("%d", limit))
}
}

// ListAssets returns the list of assets, filtered by
Expand Down
97 changes: 89 additions & 8 deletions contrib/alpacabkfeeder/api/entities.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package api

import "time"
import (
"fmt"
"time"
)

// Trade is a stock trade that happened on the market.
type Trade struct {
Expand All @@ -26,14 +29,28 @@ type Quote struct {
Tape string `json:"z"`
}

// Bar is an aggregate of trades.
// Bar is an aggregate of trades
type Bar struct {
Open float64 `json:"o"`
High float64 `json:"h"`
Low float64 `json:"l"`
Close float64 `json:"c"`
Volume uint64 `json:"v"`
Timestamp time.Time `json:"t"`
Timestamp time.Time `json:"t"`
Open float64 `json:"o"`
High float64 `json:"h"`
Low float64 `json:"l"`
Close float64 `json:"c"`
Volume uint64 `json:"v"`
TradeCount uint64 `json:"n"`
VWAP float64 `json:"vw"`
}

// MultiBarItem contains a single bar for a symbol or an error
type MultiBarItem struct {
Symbol string
Bar Bar
Error error
}

type multiBarResponse struct {
NextPageToken *string `json:"next_page_token"`
Bars map[string][]Bar `json:"bars"`
}

// Snapshot is a snapshot of a symbol.
Expand All @@ -44,3 +61,67 @@ type Snapshot struct {
DailyBar *Bar `json:"dailyBar"`
PrevDailyBar *Bar `json:"prevDailyBar"`
}

// GetBarsParams contains optional parameters for getting bars
type GetBarsParams struct {
// TimeFrame is the aggregation size of the bars
TimeFrame TimeFrame
// Adjustment tells if the bars should be adjusted for corporate actions
Adjustment Adjustment
// Start is the inclusive beginning of the interval
Start time.Time
// End is the inclusive end of the interval
End time.Time
// TotalLimit is the limit of the total number of the returned bars.
// If missing, all bars between start end end will be returned.
TotalLimit int
// PageLimit is the pagination size. If empty, the default page size will be used.
PageLimit int
// Feed is the source of the data: sip or iex.
// If provided, it overrides the client's Feed option.
Feed string
}

// TimeFrameUnite is the base unit of the timeframe.
type TimeFrameUnit string

// List of timeframe units
const (
Min TimeFrameUnit = "Min"
Hour TimeFrameUnit = "Hour"
Day TimeFrameUnit = "Day"
)

// TimeFrame is the resolution of the bars
type TimeFrame struct {
N int
Unit TimeFrameUnit
}

func NewTimeFrame(n int, unit TimeFrameUnit) TimeFrame {
return TimeFrame{
N: n,
Unit: unit,
}
}

func (tf TimeFrame) String() string {
return fmt.Sprintf("%d%s", tf.N, tf.Unit)
}

var (
OneMin TimeFrame = NewTimeFrame(1, Min)
OneHour TimeFrame = NewTimeFrame(1, Hour)
OneDay TimeFrame = NewTimeFrame(1, Day)
)

// Adjustment specifies the corporate action adjustment(s) for the bars
type Adjustment string

// List of adjustments
const (
Raw Adjustment = "raw"
Split Adjustment = "split"
Dividend Adjustment = "dividend"
All Adjustment = "all"
)
11 changes: 1 addition & 10 deletions contrib/alpacabkfeeder/api/v1/entities.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package v1

import "time"

type Asset struct {
ID string `json:"id"`
Name string `json:"name"`
Expand All @@ -10,7 +8,7 @@ type Asset struct {
Symbol string `json:"symbol"`
Status string `json:"status"`
Tradable bool `json:"tradable"`
Marginal bool `json:"marginal"`
Marginal bool `json:"marginal"`
Shortable bool `json:"shortable"`
EasyToBorrow bool `json:"easy_to_borrow"`
}
Expand All @@ -23,10 +21,3 @@ type Bar struct {
Close float32 `json:"c"`
Volume int32 `json:"v"`
}

type ListBarParams struct {
Timeframe string `url:"timeframe,omitempty"`
StartDt *time.Time `url:"start_dt,omitempty"`
EndDt *time.Time `url:"end_dt,omitempty"`
Limit *int `url:"limit,omitempty"`
}
Loading

0 comments on commit e9b0670

Please sign in to comment.