Skip to content

Commit

Permalink
Merge pull request #207 from getAlby/fix/high-cpu
Browse files Browse the repository at this point in the history
fix: remove infinite loop in subscription causing high cpu
  • Loading branch information
rolznz authored Jan 9, 2024
2 parents 1b76186 + dd59887 commit 3289026
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 72 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ DATABASE_URI=file:nwc.db
NOSTR_PRIVKEY=
COOKIE_SECRET=secretsecret
RELAY=wss://relay.getalby.com/v1
#RELAY=ws://localhost:7447/v1
PUBLIC_RELAY=
PORT=8080

Expand Down
149 changes: 77 additions & 72 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,86 +54,91 @@ func (svc *Service) GetUser(c echo.Context) (user *User, err error) {
}

func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscription) error {
for {
if sub.Relay.ConnectionError != nil {
return sub.Relay.ConnectionError
}
select {
case <-ctx.Done():
svc.Logger.Info("Exiting subscription.")
return nil
case <-sub.EndOfStoredEvents:
if !svc.ReceivedEOS {
svc.Logger.Info("Received EOS")
go func() {
<-sub.EndOfStoredEvents
svc.ReceivedEOS = true
svc.Logger.Info("Received EOS")
}()

go func() {
for event := range sub.Events {
resp, err := svc.HandleEvent(ctx, event)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Errorf("Failed to process event: %v", err)
}
svc.ReceivedEOS = true
case event := <-sub.Events:
go func() {
resp, err := svc.HandleEvent(ctx, event)
if resp != nil {
status, err := sub.Relay.Publish(ctx, *resp)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Errorf("Failed to process event: %v", err)
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Errorf("Failed to publish reply: %v", err)
return
}
if resp != nil {
status, err := sub.Relay.Publish(ctx, *resp)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Errorf("Failed to publish reply: %v", err)
return
}

nostrEvent := NostrEvent{}
result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent)
if result.Error != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Error(result.Error)
return
}
nostrEvent.ReplyId = resp.ID
nostrEvent := NostrEvent{}
result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent)
if result.Error != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Error(result.Error)
return
}
nostrEvent.ReplyId = resp.ID

if status == nostr.PublishStatusSucceeded {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED
nostrEvent.RepliedAt = time.Now()
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Published reply")
} else if status == nostr.PublishStatusFailed {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Failed to publish reply")
} else {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Reply sent but no response from relay (timeout)")
}
if status == nostr.PublishStatusSucceeded {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED
nostrEvent.RepliedAt = time.Now()
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Published reply")
} else if status == nostr.PublishStatusFailed {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Failed to publish reply")
} else {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Reply sent but no response from relay (timeout)")
}
}()
}
}
}()

select {
case <-sub.Relay.Context().Done():
svc.Logger.Errorf("Relay error %v", sub.Relay.ConnectionError)
return sub.Relay.ConnectionError
case <-ctx.Done():
if ctx.Err() != context.Canceled {
svc.Logger.Errorf("Subscription error %v", ctx.Err())
return ctx.Err()
}
svc.Logger.Info("Exiting subscription.")
return nil
}
}

Expand Down

0 comments on commit 3289026

Please sign in to comment.