Skip to content

Commit

Permalink
Improve handling of disconnections
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero committed Jul 18, 2024
1 parent d3af65e commit a4cdd52
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public class AlpacaBrokerageAdditionalTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();

var algorithmMock = new Mock<IAlgorithm>();
_alpacaBrokerage = new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, algorithmMock.Object);
_alpacaBrokerage = new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, algorithmMock.Object, accessToken);
}

[Test]
Expand Down Expand Up @@ -72,13 +72,13 @@ public void GetLatestQuote(Symbol symbol)

internal class TestAlpacaBrokerage : AlpacaBrokerage
{
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IAlgorithm algorithm)
: base(apiKey, apiKeySecret, null, isPaperTrading, algorithm)
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IAlgorithm algorithm, string accessToken)
: base(apiKey, apiKeySecret, accessToken, isPaperTrading, algorithm)
{
}

public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IOrderProvider orderProvider, ISecurityProvider securityProvider)
: base(apiKey, apiKeySecret, null, isPaperTrading, orderProvider, securityProvider)
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IOrderProvider orderProvider, ISecurityProvider securityProvider, string accessToken)
: base(apiKey, apiKeySecret, accessToken, isPaperTrading, orderProvider, securityProvider)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class AlpacaBrokerageHistoryProviderTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
_alpacaBrokerage = new AlpacaBrokerage(apiKey, apiKeySecret, null, isPaperTrading, new Mock<IAlgorithm>().Object);
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();
_alpacaBrokerage = new AlpacaBrokerage(apiKey, apiKeySecret, accessToken, isPaperTrading, new Mock<IAlgorithm>().Object);
}

private static IEnumerable<TestCaseData> TestParameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System;
using Alpaca.Markets;
using NUnit.Framework;
using QuantConnect.Configuration;

namespace QuantConnect.Brokerages.Alpaca.Tests
{
Expand All @@ -29,7 +28,7 @@ public class AlpacaBrokerageSymbolMapperTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, _) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, _, _) = AlpacaBrokerageTestHelpers.GetConfigParameters();

var secretKey = new SecretKey(apiKey, apiKeySecret);
var alpacaTradingClient = Environments.Paper.GetAlpacaTradingClient(secretKey);
Expand Down
10 changes: 5 additions & 5 deletions QuantConnect.AlpacaBrokerage.Tests/AlpacaBrokerageTestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ public static class AlpacaBrokerageTestHelpers
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="isValidateOnEmpty"/> is <c>true</c> and any configuration parameter is null or empty.
/// </exception>
public static (string ApiKey, string ApiKeySecret, bool IsPapperTrading) GetConfigParameters(bool isValidateOnEmpty = true)
public static (string ApiKey, string ApiKeySecret, bool IsPapperTrading, string accessToken) GetConfigParameters(bool isValidateOnEmpty = true)
{
var (apiKey, apiKeySecret, isPaperTrading) = (Config.Get("alpaca-api-key"), Config.Get("alpaca-api-secret"), Config.GetBool("alpaca-paper-trading"));
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = (Config.Get("alpaca-api-key"), Config.Get("alpaca-api-secret"), Config.GetBool("alpaca-paper-trading"), Config.Get("alpaca-access-token"));

if (!isValidateOnEmpty)
{
return (apiKey, apiKeySecret, isPaperTrading);
return (apiKey, apiKeySecret, isPaperTrading, accessToken);
}

if (string.IsNullOrEmpty(apiKey) || string.IsNullOrEmpty(apiKeySecret))
if (string.IsNullOrEmpty(accessToken) && (string.IsNullOrEmpty(apiKey) || string.IsNullOrEmpty(apiKeySecret)))
{
throw new ArgumentNullException("'API Key' or 'Secret Key' or 'Data Feed Provider' cannot be null or empty. Please check your configuration.");
}

return (apiKey, apiKeySecret, isPaperTrading);
return (apiKey, apiKeySecret, isPaperTrading, accessToken);

}
}
4 changes: 2 additions & 2 deletions QuantConnect.AlpacaBrokerage.Tests/AlpacaBrokerageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public partial class AlpacaBrokerageTests : BrokerageTests

protected override IBrokerage CreateBrokerage(IOrderProvider orderProvider, ISecurityProvider securityProvider)
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();

return new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, orderProvider, securityProvider);
return new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, orderProvider, securityProvider, accessToken);
}
protected override bool IsAsync() => false;
protected override decimal GetAskPrice(Symbol symbol)
Expand Down
35 changes: 35 additions & 0 deletions QuantConnect.AlpacaBrokerage/AlpacaBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using QuantConnect.Configuration;
using QuantConnect.Brokerages.CrossZero;

Expand Down Expand Up @@ -203,6 +204,13 @@ private void StreamingClient_OnError(IStreamingClient client, Exception obj)
private void StreamingClient_SocketClosed(IStreamingClient client)
{
Log.Trace($"{nameof(StreamingClient_SocketClosed)}({client.GetType().Name}): SocketClosed");
if (_connected)
{
_connected = false;
// let consumers know, we will try to reconnect internally, if we can't lean will kill us
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Disconnect, "Disconnected", "Brokerage Disconnected"));
RunReconnectionLogic(5);
}
}

private void StreamingClient_SocketOpened(IStreamingClient client)
Expand Down Expand Up @@ -559,6 +567,33 @@ public override void Connect()
_connected = true;
}

private void RunReconnectionLogic(int secondsDelay)
{
Task.Delay(TimeSpan.FromSeconds(secondsDelay)).ContinueWith(_ =>
{
try
{
Connect();
// resubscribe required? should/can we unsubscribe? TODO sanity test
Subscribe(_subscriptionManager.GetSubscribedSymbols());
}
catch (Exception ex)
{
Log.Error(ex);
}
if (!IsConnected)
{
RunReconnectionLogic(60);
}
else
{
// let consumers know we are reconnected, avoid lean killing us
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Reconnect, "Reconnected", "Brokerage Reconnected"));
}
});
}

/// <summary>
/// Disconnects the client from the broker's remote servers
/// </summary>
Expand Down

0 comments on commit a4cdd52

Please sign in to comment.