diff --git a/.env.example b/.env.example index e0e7c094..5aa2ed4f 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,7 @@ DATABASE_URI=file:nwc.db NOSTR_PRIVKEY= COOKIE_SECRET=secretsecret RELAY=wss://relay.getalby.com/v1 +#RELAY=ws://localhost:7447/v1 PUBLIC_RELAY= PORT=8080 diff --git a/service.go b/service.go index f6def421..8707bdf4 100644 --- a/service.go +++ b/service.go @@ -54,86 +54,91 @@ func (svc *Service) GetUser(c echo.Context) (user *User, err error) { } func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscription) error { - for { - if sub.Relay.ConnectionError != nil { - return sub.Relay.ConnectionError - } - select { - case <-ctx.Done(): - svc.Logger.Info("Exiting subscription.") - return nil - case <-sub.EndOfStoredEvents: - if !svc.ReceivedEOS { - svc.Logger.Info("Received EOS") + go func() { + <-sub.EndOfStoredEvents + svc.ReceivedEOS = true + svc.Logger.Info("Received EOS") + }() + + go func() { + for event := range sub.Events { + resp, err := svc.HandleEvent(ctx, event) + if err != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "eventKind": event.Kind, + }).Errorf("Failed to process event: %v", err) } - svc.ReceivedEOS = true - case event := <-sub.Events: - go func() { - resp, err := svc.HandleEvent(ctx, event) + if resp != nil { + status, err := sub.Relay.Publish(ctx, *resp) if err != nil { svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "eventKind": event.Kind, - }).Errorf("Failed to process event: %v", err) + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Errorf("Failed to publish reply: %v", err) + return } - if resp != nil { - status, err := sub.Relay.Publish(ctx, *resp) - if err != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Errorf("Failed to publish reply: %v", err) - return - } - nostrEvent := NostrEvent{} - result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) - if result.Error != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Error(result.Error) - return - } - nostrEvent.ReplyId = resp.ID + nostrEvent := NostrEvent{} + result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) + if result.Error != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Error(result.Error) + return + } + nostrEvent.ReplyId = resp.ID - if status == nostr.PublishStatusSucceeded { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED - nostrEvent.RepliedAt = time.Now() - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Published reply") - } else if status == nostr.PublishStatusFailed { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Failed to publish reply") - } else { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Reply sent but no response from relay (timeout)") - } + if status == nostr.PublishStatusSucceeded { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED + nostrEvent.RepliedAt = time.Now() + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Published reply") + } else if status == nostr.PublishStatusFailed { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Failed to publish reply") + } else { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Reply sent but no response from relay (timeout)") } - }() + } + } + }() + + select { + case <-sub.Relay.Context().Done(): + svc.Logger.Errorf("Relay error %v", sub.Relay.ConnectionError) + return sub.Relay.ConnectionError + case <-ctx.Done(): + if ctx.Err() != context.Canceled { + svc.Logger.Errorf("Subscription error %v", ctx.Err()) + return ctx.Err() } + svc.Logger.Info("Exiting subscription.") + return nil } }