diff --git a/notifications/log.go b/notifications/log.go new file mode 100644 index 000000000..5fb30f100 --- /dev/null +++ b/notifications/log.go @@ -0,0 +1,26 @@ +package notifications + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// Subsystem defines the sub system name of this package. +const Subsystem = "NTFNS" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/notifications/manager.go b/notifications/manager.go new file mode 100644 index 000000000..983968b4f --- /dev/null +++ b/notifications/manager.go @@ -0,0 +1,205 @@ +package notifications + +import ( + "context" + "time" + + "github.com/lightninglabs/loop/swapserverrpc" + "google.golang.org/grpc" +) + +type NotificationType int + +const ( + NotificationTypeUnknown NotificationType = iota + NotificationTypeReservation +) + +type NotificationsClient interface { + SubscribeNotifications(ctx context.Context, + in *swapserverrpc.SubscribeNotificationsRequest, + opts ...grpc.CallOption) ( + swapserverrpc.SwapServer_SubscribeNotificationsClient, error) +} + +// Config contains all the services that the notification manager needs to +// operate. +type Config struct { + // Client is the client used to communicate with the swap server. + Client NotificationsClient + + // FetchL402 is the function used to fetch the l402 token. + FetchL402 func(context.Context) error +} + +// Manager is a manager for notifications that the swap server sends to the +// client. +type Manager struct { + cfg *Config + + hasL402 bool + + subscribers map[NotificationType][]subscriber +} + +// NewManager creates a new notification manager. +func NewManager(cfg *Config) *Manager { + return &Manager{ + cfg: cfg, + subscribers: make(map[NotificationType][]subscriber), + } +} + +type subscriber struct { + subCtx context.Context + recvChan interface{} +} + +// SubscribeReservations subscribes to the reservation notifications. +func (m *Manager) SubscribeReservations(ctx context.Context, +) <-chan *swapserverrpc.ServerReservationNotification { + + // We'll create a channel that we'll use to send the notifications to the + // caller. + notifChan := make( + chan *swapserverrpc.ServerReservationNotification, //nolint:lll + ) + sub := subscriber{ + subCtx: ctx, + recvChan: notifChan, + } + + m.subscribers[NotificationTypeReservation] = append( + m.subscribers[NotificationTypeReservation], + sub, + ) + + return notifChan +} + +// Run starts the notification manager. +func (n *Manager) Run(ctx context.Context) error { + err := n.registerNotifications(ctx) + if err != nil { + return err + } + + return nil +} + +// registerNotifications registers a new server notification stream. +func (n *Manager) registerNotifications(mainCtx context.Context) error { + // In order to create a valid l402 we first are going to call + // the FetchL402 method. As a client might not have outbound capacity + // yet, we'll retry until we get a valid response. + if !n.hasL402 { + n.fetchL402(mainCtx) + } + + callCtx, cancel := context.WithCancel(mainCtx) + + // We'll now subscribe to the swap server notifications. + notifStream, err := n.cfg.Client.SubscribeNotifications( + callCtx, &swapserverrpc.SubscribeNotificationsRequest{}, + ) + if err != nil { + cancel() + return err + } + + log.Debugf("Successfully subscribed to server notifications") + + // We'll now start a goroutine that will handle all the incoming + // notifications. + go func() { + for { + notification, err := notifStream.Recv() + if err == nil && notification != nil { + log.Debugf("Received notification: %v", notification) + n.handleNotification(notification) + continue + } + log.Errorf("Error receiving "+ + "notification: %v", err) + + cancel() + + // If we encounter an error, we'll + // try to reconnect. + for { + select { + case <-mainCtx.Done(): + return + + case <-time.After(time.Second * 10): + log.Debugf("Reconnecting to server notifications") + err = n.registerNotifications(mainCtx) + if err != nil { + log.Errorf("Error reconnecting: %v", err) + continue + } + + // If we were able to reconnect, we'll + // return. + return + } + } + } + }() + + return nil +} + +// fetchL402 fetches the L402 from the server. This method will keep on +// retrying until it gets a valid response. +func (m *Manager) fetchL402(ctx context.Context) { + // Add a 0 timer so that we initially fetch the L402 immediately. + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + + case <-timer.C: + err := m.cfg.FetchL402(ctx) + if err != nil { + log.Warnf("Error fetching L402: %v", err) + timer.Reset(time.Second * 10) + continue + } + m.hasL402 = true + + return + } + } +} + +// handleNotification handles an incoming notification from the server, +// forwarding it to the appropriate subscribers. +func (n *Manager) handleNotification(notification *swapserverrpc. + SubscribeNotificationsResponse) { + + switch notification.Notification.(type) { + case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification: + // We'll forward the reservation notification to all subscribers. + // Cleaning up any subscribers that have been canceled. + newSubs := []subscriber{} + for _, sub := range n.subscribers[NotificationTypeReservation] { + recvChan := sub.recvChan.(chan *swapserverrpc. + ServerReservationNotification) + + select { + case <-sub.subCtx.Done(): + continue + case recvChan <- notification.GetReservationNotification(): + newSubs = append(newSubs, sub) + } + } + + n.subscribers[NotificationTypeReservation] = newSubs + + default: + log.Warnf("Received unknown notification type: %v", + notification) + } +} diff --git a/notifications/manager_test.go b/notifications/manager_test.go new file mode 100644 index 000000000..861e1f903 --- /dev/null +++ b/notifications/manager_test.go @@ -0,0 +1,157 @@ +package notifications + +import ( + "context" + "io" + "testing" + "time" + + "github.com/lightninglabs/loop/swapserverrpc" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var ( + testReservationId = []byte{0x01, 0x02} + testReservationId2 = []byte{0x01, 0x02} +) + +// mockNotificationsClient implements the NotificationsClient interface for testing. +type mockNotificationsClient struct { + mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient + subscribeErr error + timesCalled int +} + +func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context, + in *swapserverrpc.SubscribeNotificationsRequest, + opts ...grpc.CallOption) ( + swapserverrpc.SwapServer_SubscribeNotificationsClient, error) { + + m.timesCalled++ + if m.subscribeErr != nil { + return nil, m.subscribeErr + } + return m.mockStream, nil +} + +// mockSubscribeNotificationsClient simulates the server stream. +type mockSubscribeNotificationsClient struct { + grpc.ClientStream + recvChan chan *swapserverrpc.SubscribeNotificationsResponse + recvErrChan chan error +} + +func (m *mockSubscribeNotificationsClient) Recv() ( + *swapserverrpc.SubscribeNotificationsResponse, error) { + + select { + case err := <-m.recvErrChan: + return nil, err + case notif, ok := <-m.recvChan: + if !ok { + return nil, io.EOF + } + return notif, nil + } +} + +func (m *mockSubscribeNotificationsClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (m *mockSubscribeNotificationsClient) Trailer() metadata.MD { + return nil +} + +func (m *mockSubscribeNotificationsClient) CloseSend() error { + return nil +} + +func (m *mockSubscribeNotificationsClient) Context() context.Context { + return context.TODO() +} + +func (m *mockSubscribeNotificationsClient) SendMsg(interface{}) error { + return nil +} + +func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error { + return nil +} + +func TestManager_ReservationNotification(t *testing.T) { + // Create a mock notification client + recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1) + errChan := make(chan error, 1) + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: errChan, + } + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + } + + // Create a Manager with the mock client + mgr := NewManager(&Config{ + Client: mockClient, + FetchL402: func(ctx context.Context) error { + // Simulate successful fetching of L402 + return nil + }, + }) + + // Subscribe to reservation notifications. + subCtx, subCancel := context.WithCancel(context.Background()) + subChan := mgr.SubscribeReservations(subCtx) + + // Run the manager. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := mgr.Run(ctx) + require.NoError(t, err) + + // Wait a bit to ensure manager is running and has subscribed + time.Sleep(100 * time.Millisecond) + require.Equal(t, 1, mockClient.timesCalled) + + // Send a test notification + testNotif := getTestNotification(testReservationId) + + // Send the notification to the recvChan + recvChan <- testNotif + + // Collect the notification in the callback + receivedNotification := <-subChan + + // Now, check that the notification received in the callback matches the one sent + require.NotNil(t, receivedNotification) + require.Equal(t, testReservationId, receivedNotification.ReservationId) + + // Cancel the subscription + subCancel() + + // Send another test notification` + testNotif2 := getTestNotification(testReservationId2) + recvChan <- testNotif2 + + // Wait a bit to ensure the notification is not received + time.Sleep(100 * time.Millisecond) + + require.Len(t, mgr.subscribers[NotificationTypeReservation], 0) + + // Close the recvChan to stop the manager's receive loop + close(recvChan) +} + +func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResponse { + return &swapserverrpc.SubscribeNotificationsResponse{ + Notification: &swapserverrpc.SubscribeNotificationsResponse_ReservationNotification{ + ReservationNotification: &swapserverrpc.ServerReservationNotification{ + ReservationId: resId, + }, + }, + } +}