Skip to content

Commit

Permalink
fix(alpacabkfeeder): support basic auth (#577)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Mar 16, 2022
1 parent d382167 commit f594c2c
Show file tree
Hide file tree
Showing 19 changed files with 456 additions and 91 deletions.
9 changes: 4 additions & 5 deletions contrib/alpacabkfeeder/alpacav2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"fmt"
"time"

"github.com/alpacahq/alpaca-trade-api-go/alpaca"
"github.com/alpacahq/alpaca-trade-api-go/common"
"github.com/pkg/errors"

"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api"
"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/configs"
"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/feed"
"github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/symbols"
Expand All @@ -30,17 +29,17 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
log.Info("loaded Alpaca Broker Feeder config...")

// init Alpaca API client
cred := &common.APIKey{
cred := &api.APIKey{
ID: config.APIKeyID,
PolygonKeyID: config.APIKeyID,
Secret: config.APISecretKey,
// OAuth: os.Getenv(EnvApiOAuth),
}
if config.APIKeyID == "" || config.APISecretKey == "" {
// if empty, get from env vars
cred = common.Credentials()
cred = api.Credentials()
}
apiClient := alpaca.NewClient(cred)
apiClient := api.NewClient(cred)

// init Market Time Checker
var timeChecker feed.MarketTimeChecker
Expand Down
268 changes: 268 additions & 0 deletions contrib/alpacabkfeeder/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

v1 "github.com/alpacahq/marketstore/v4/contrib/alpacabkfeeder/api/v1"
)

const (
rateLimitRetryCount = 3
rateLimitRetryDelay = time.Second
)

var (
// DefaultClient is the default Alpaca client using the
// environment variable set credentials
DefaultClient = NewClient(Credentials())
base = "https://api.alpaca.markets"
dataURL = "https://data.alpaca.markets"
apiVersion = "v2"
clientTimeout = 10 * time.Second
do = defaultDo
)

func defaultDo(c *Client, req *http.Request) (*http.Response, error) {
if c.credentials.OAuth != "" {
req.Header.Set("Authorization", "Bearer "+c.credentials.OAuth)
} else {
req.Header.Set("APCA-API-KEY-ID", c.credentials.ID)
req.Header.Set("APCA-API-SECRET-KEY", c.credentials.Secret)
// Add Basic Auth
req.SetBasicAuth(c.credentials.ID, c.credentials.Secret)
}

client := &http.Client{
Timeout: clientTimeout,
}
var resp *http.Response
var err error
for i := 0; ; i++ {
resp, err = client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusTooManyRequests {
break
}
if i >= rateLimitRetryCount {
break
}
time.Sleep(rateLimitRetryDelay)
}

if err = verify(resp); err != nil {
return nil, err
}

return resp, nil
}

const (
// v2MaxLimit is the maximum allowed limit parameter for all v2 endpoints
v2MaxLimit = 10000
)

func init() {
if s := os.Getenv("APCA_API_BASE_URL"); s != "" {
base = s
} else if s := os.Getenv("ALPACA_BASE_URL"); s != "" {
// legacy compatibility...
base = s
}
if s := os.Getenv("APCA_DATA_URL"); s != "" {
dataURL = s
}
// also allow APCA_API_DATA_URL to be consistent with the python SDK
if s := os.Getenv("APCA_API_DATA_URL"); s != "" {
dataURL = s
}
if s := os.Getenv("APCA_API_VERSION"); s != "" {
apiVersion = s
}
if s := os.Getenv("APCA_API_CLIENT_TIMEOUT"); s != "" {
d, err := time.ParseDuration(s)
if err != nil {
log.Fatal("invalid APCA_API_CLIENT_TIMEOUT: " + err.Error())
}
clientTimeout = d
}
}

// APIError wraps the detailed code and message supplied
// by Alpaca's API for debugging purposes
type APIError struct {
Code int `json:"code"`
Message string `json:"message"`
}

func (e *APIError) Error() string {
return e.Message
}

// Client is an Alpaca REST API client
type Client struct {
credentials *APIKey
}

func SetBaseUrl(baseUrl string) {
base = baseUrl
}

// NewClient creates a new Alpaca client with specified
// credentials
func NewClient(credentials *APIKey) *Client {
return &Client{credentials: credentials}
}

// GetSnapshots returns the snapshots for multiple symbol
func (c *Client) GetSnapshots(symbols []string) (map[string]*Snapshot, error) {
u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/snapshots?symbols=%s",
dataURL, strings.Join(symbols, ",")))
if err != nil {
return nil, err
}

resp, err := c.get(u)
if err != nil {
return nil, err
}

var snapshots map[string]*Snapshot

if err = unmarshal(resp, &snapshots); err != nil {
return nil, err
}

return snapshots, nil
}

// ListBars returns a list of bar lists corresponding to the provided
// symbol list, and filtered by the provided parameters.
func (c *Client) ListBars(symbols []string, opts v1.ListBarParams) (map[string][]v1.Bar, error) {
vals := url.Values{}
vals.Add("symbols", strings.Join(symbols, ","))

if opts.Timeframe == "" {
return nil, fmt.Errorf("timeframe is required for the bars endpoint")
}

if opts.StartDt != nil {
vals.Set("start", opts.StartDt.Format(time.RFC3339))
}

if opts.EndDt != nil {
vals.Set("end", opts.EndDt.Format(time.RFC3339))
}

if opts.Limit != nil {
vals.Set("limit", strconv.FormatInt(int64(*opts.Limit), 10))
}

u, err := url.Parse(fmt.Sprintf("%s/v1/bars/%s?%v", dataURL, opts.Timeframe, vals.Encode()))
if err != nil {
return nil, err
}

resp, err := c.get(u)
if err != nil {
return nil, err
}
var bars map[string][]v1.Bar

if err = unmarshal(resp, &bars); err != nil {
return nil, err
}

return bars, nil
}

// ListAssets returns the list of assets, filtered by
// the input parameters.
func (c *Client) ListAssets(status *string) ([]v1.Asset, error) {
// TODO: add tests
apiVer := apiVersion
if strings.Contains(base, "broker"){
apiVer = "v1"
}

// TODO: support different asset classes
u, err := url.Parse(fmt.Sprintf("%s/%s/assets", base, apiVer))
if err != nil {
return nil, err
}

q := u.Query()

if status != nil {
q.Set("status", *status)
}

u.RawQuery = q.Encode()

resp, err := c.get(u)
if err != nil {
return nil, err
}

assets := []v1.Asset{}

if err = unmarshal(resp, &assets); err != nil {
return nil, err
}

return assets, nil
}

func (c *Client) get(u *url.URL) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}

return do(c, req)
}

func verify(resp *http.Response) (err error) {
if resp.StatusCode >= http.StatusMultipleChoices {
var body []byte
defer resp.Body.Close()

body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

apiErr := APIError{}

err = json.Unmarshal(body, &apiErr)
if err != nil {
return fmt.Errorf("json unmarshal error: %s", err.Error())
}
if err == nil {
err = &apiErr
}
}

return
}

func unmarshal(resp *http.Response, data interface{}) error {
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

return json.Unmarshal(body, data)
}
42 changes: 42 additions & 0 deletions contrib/alpacabkfeeder/api/credentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package api

import (
"os"
"sync"
)

var (
once sync.Once
key *APIKey
)

const (
EnvApiKeyID = "APCA_API_KEY_ID"
EnvApiSecretKey = "APCA_API_SECRET_KEY"
EnvApiOAuth = "APCA_API_OAUTH"
EnvPolygonKeyID = "POLY_API_KEY_ID"
)

type APIKey struct {
ID string
Secret string
OAuth string
PolygonKeyID string
}

// Credentials returns the user's Alpaca API key ID
// and secret for use through the SDK.
func Credentials() *APIKey {
var polygonKeyID string
if s := os.Getenv(EnvPolygonKeyID); s != "" {
polygonKeyID = s
} else {
polygonKeyID = os.Getenv(EnvApiKeyID)
}
return &APIKey{
ID: os.Getenv(EnvApiKeyID),
PolygonKeyID: polygonKeyID,
Secret: os.Getenv(EnvApiSecretKey),
OAuth: os.Getenv(EnvApiOAuth),
}
}
7 changes: 7 additions & 0 deletions contrib/alpacabkfeeder/api/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package api

/**
This package is the partial copy of https://github.com/alpacahq/alpaca-trade-api-go/tree/v1.9.0/alpaca.
To support Alpaca Broker API with the same implementation, Basic Auth support is added.
https://alpaca.markets/docs/api-references/broker-api/#authentication-and-rate-limit
*/
46 changes: 46 additions & 0 deletions contrib/alpacabkfeeder/api/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package api

import "time"

// Trade is a stock trade that happened on the market
type Trade struct {
ID int64 `json:"i"`
Exchange string `json:"x"`
Price float64 `json:"p"`
Size uint32 `json:"s"`
Timestamp time.Time `json:"t"`
Conditions []string `json:"c"`
Tape string `json:"z"`
}

// Quote is a stock quote from the market
type Quote struct {
BidExchange string `json:"bx"`
BidPrice float64 `json:"bp"`
BidSize uint32 `json:"bs"`
AskExchange string `json:"ax"`
AskPrice float64 `json:"ap"`
AskSize uint32 `json:"as"`
Timestamp time.Time `json:"t"`
Conditions []string `json:"c"`
Tape string `json:"z"`
}

// Bar is an aggregate of trades
type Bar struct {
Open float64 `json:"o"`
High float64 `json:"h"`
Low float64 `json:"l"`
Close float64 `json:"c"`
Volume uint64 `json:"v"`
Timestamp time.Time `json:"t"`
}

// Snapshot is a snapshot of a symbol
type Snapshot struct {
LatestTrade *Trade `json:"latestTrade"`
LatestQuote *Quote `json:"latestQuote"`
MinuteBar *Bar `json:"minuteBar"`
DailyBar *Bar `json:"dailyBar"`
PrevDailyBar *Bar `json:"prevDailyBar"`
}
Loading

0 comments on commit f594c2c

Please sign in to comment.