Skip to content

Commit

Permalink
[sc-145480] Fix race condition on stop during reconnection. (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwhelanLD authored Mar 11, 2022
1 parent eb10a5b commit 0d914ee
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 150 deletions.
2 changes: 1 addition & 1 deletion LDSwiftEventSource.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "echo `pwd`\nif which mint >/dev/null; then\n /usr/bin/xcrun --sdk macosx mint run realm/SwiftLint\nelse\n echo \"warning: mint not installed, available from https://github.com/yonaskolb/Mint\"\nfi\n";
shellScript = "# Adds support for Apple Silicon brew directory\nexport PATH=\"$PATH:/opt/homebrew/bin\"\n\nif which mint >/dev/null; then\n /usr/bin/xcrun --sdk macosx mint run realm/SwiftLint\nelse\n echo \"warning: mint not installed, available from https://github.com/yonaskolb/Mint\"\nfi\n";
showEnvVarsInLog = 0;
};
/* End PBXShellScriptBuildPhase section */
Expand Down
27 changes: 15 additions & 12 deletions Source/EventParser.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import Foundation

typealias ConnectionHandler = (setReconnectionTime: (TimeInterval) -> Void, setLastEventId: (String) -> Void)

class EventParser {
private struct Constants {
static let dataLabel: Substring = "data"
Expand All @@ -11,15 +9,17 @@ class EventParser {
}

private let handler: EventHandler
private let connectionHandler: ConnectionHandler

private var data: String = ""
private var lastEventId: String?
private var eventType: String = ""
private var lastEventIdBuffer: String?
private var lastEventId: String?
private var currentRetry: TimeInterval

init(handler: EventHandler, connectionHandler: ConnectionHandler) {
init(handler: EventHandler, initialEventId: String?, initialRetry: TimeInterval) {
self.handler = handler
self.connectionHandler = connectionHandler
self.lastEventId = initialEventId
self.currentRetry = initialRetry
}

func parse(line: String) {
Expand All @@ -35,9 +35,13 @@ class EventParser {
}
}

func reset() {
func getLastEventId() -> String? { lastEventId }

func reset() -> TimeInterval {
data = ""
eventType = ""
lastEventIdBuffer = nil
return currentRetry
}

private func dropLeadingSpace(str: Substring) -> Substring {
Expand All @@ -56,23 +60,22 @@ class EventParser {
// See https://github.com/whatwg/html/issues/689 for reasoning on not setting lastEventId if the value
// contains a null code point.
if !value.contains("\u{0000}") {
lastEventId = String(value)
lastEventIdBuffer = String(value)
}
case Constants.eventLabel:
eventType = String(value)
case Constants.retryLabel:
if value.allSatisfy(("0"..."9").contains), let reconnectionTime = Int64(value) {
connectionHandler.setReconnectionTime(Double(reconnectionTime) * 0.001)
currentRetry = Double(reconnectionTime) * 0.001
}
default:
break
}
}

private func dispatchEvent() {
if let lastEventId = lastEventId {
connectionHandler.setLastEventId(lastEventId)
}
lastEventId = lastEventIdBuffer ?? lastEventId
lastEventIdBuffer = nil
guard !data.isEmpty
else {
eventType = ""
Expand Down
145 changes: 73 additions & 72 deletions Source/LDSwiftEventSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,30 @@ public class EventSource {
}
}

class ReconnectionTimer {
private let maxDelay: TimeInterval
private let resetInterval: TimeInterval

var backoffCount: Int = 0
var connectedTime: Date?

init(maxDelay: TimeInterval, resetInterval: TimeInterval) {
self.maxDelay = maxDelay
self.resetInterval = resetInterval
}

func reconnectDelay(baseDelay: TimeInterval) -> TimeInterval {
backoffCount += 1
if let connectedTime = connectedTime, Date().timeIntervalSince(connectedTime) >= resetInterval {
backoffCount = 0
}
self.connectedTime = nil
let maxSleep = min(maxDelay, baseDelay * pow(2.0, Double(backoffCount)))
return maxSleep / 2 + Double.random(in: 0...(maxSleep / 2))
}
}

// MARK: EventSourceDelegate
class EventSourceDelegate: NSObject, URLSessionDataDelegate {
private let delegateQueue: DispatchQueue = DispatchQueue(label: "ESDelegateQueue")
private let logger = Logs()
Expand All @@ -120,22 +144,19 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {
}
}

private var lastEventId: String?
private var reconnectTime: TimeInterval
private var connectedTime: Date?

private var reconnectionAttempts: Int = 0
private var errorHandlerAction: ConnectionErrorAction = .proceed
private let utf8LineParser: UTF8LineParser = UTF8LineParser()
// swiftlint:disable:next implicitly_unwrapped_optional
private var eventParser: EventParser!
private let eventParser: EventParser
private let reconnectionTimer: ReconnectionTimer
private var urlSession: URLSession?
private var sessionTask: URLSessionDataTask?

init(config: EventSource.Config) {
self.config = config
self.lastEventId = config.lastEventId
self.reconnectTime = config.reconnectTime
self.eventParser = EventParser(handler: config.handler,
initialEventId: config.lastEventId,
initialRetry: config.reconnectTime)
self.reconnectionTimer = ReconnectionTimer(maxDelay: config.maxReconnectTime,
resetInterval: config.backoffResetThreshold)
}

func start() {
Expand All @@ -153,19 +174,24 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {
}

func stop() {
let previousState = readyState
readyState = .shutdown
sessionTask?.cancel()
if previousState == .open {
config.handler.onClosed()
delegateQueue.async {
let previousState = self.readyState
self.readyState = .shutdown
self.sessionTask?.cancel()
if previousState == .open {
self.config.handler.onClosed()
}
self.urlSession?.invalidateAndCancel()
self.urlSession = nil
}
urlSession?.invalidateAndCancel()
}

func getLastEventId() -> String? { lastEventId }
func getLastEventId() -> String? { eventParser.getLastEventId() }

func createSession() -> URLSession {
URLSession(configuration: config.urlSessionConfiguration, delegate: self, delegateQueue: nil)
let opQueue = OperationQueue()
opQueue.underlyingQueue = self.delegateQueue
return URLSession(configuration: config.urlSessionConfiguration, delegate: self, delegateQueue: opQueue)
}

func createRequest() -> URLRequest {
Expand All @@ -174,7 +200,7 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {
timeoutInterval: self.config.idleTimeout)
urlRequest.httpMethod = self.config.method
urlRequest.httpBody = self.config.body
urlRequest.setValue(self.lastEventId, forHTTPHeaderField: "Last-Event-Id")
urlRequest.setValue(eventParser.getLastEventId(), forHTTPHeaderField: "Last-Event-Id")
urlRequest.allHTTPHeaderFields = self.config.headerTransform(
urlRequest.allHTTPHeaderFields?.merging(self.config.headers) { $1 } ?? self.config.headers
)
Expand All @@ -183,11 +209,6 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {

private func connect() {
logger.log(.info, "Starting EventSource client")
let connectionHandler: ConnectionHandler = (
setReconnectionTime: { [weak self] reconnectionTime in self?.reconnectTime = reconnectionTime },
setLastEventId: { [weak self] eventId in self?.lastEventId = eventId }
)
self.eventParser = EventParser(handler: self.config.handler, connectionHandler: connectionHandler)
let task = urlSession?.dataTask(with: createRequest())
task?.resume()
sessionTask = task
Expand All @@ -201,67 +222,44 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {
return action
}

private func afterComplete() {
guard readyState != .shutdown
else { return }

var nextState: ReadyState = .closed
let currentState: ReadyState = readyState
if errorHandlerAction == .shutdown {
logger.log(.info, "Connection has been explicitly shut down by error handler")
nextState = .shutdown
}
readyState = nextState

if currentState == .open {
config.handler.onClosed()
}

if nextState != .shutdown {
reconnect()
}
}

private func reconnect() {
reconnectionAttempts += 1

if let connectedTime = connectedTime, Date().timeIntervalSince(connectedTime) >= config.backoffResetThreshold {
reconnectionAttempts = 0
}
self.connectedTime = nil

let maxSleep = min(config.maxReconnectTime, reconnectTime * pow(2.0, Double(reconnectionAttempts)))
let sleep = maxSleep / 2 + Double.random(in: 0...(maxSleep / 2))

logger.log(.info, "Waiting %.3f seconds before reconnecting...", sleep)
delegateQueue.asyncAfter(deadline: .now() + sleep) { [weak self] in
self?.connect()
}
}

// MARK: URLSession Delegates

// Tells the delegate that the task finished transferring data.
public func urlSession(_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?) {
utf8LineParser.closeAndReset()
eventParser.reset()
let currentRetry = eventParser.reset()

guard readyState != .shutdown
else { return }

if let error = error {
// Ignore cancelled error
if (error as NSError).code == NSURLErrorCancelled {
} else if readyState != .shutdown && errorHandlerAction != .shutdown {
if (error as NSError).code != NSURLErrorCancelled {
logger.log(.info, "Connection error: %@", error.localizedDescription)
errorHandlerAction = dispatchError(error: error)
} else {
errorHandlerAction = .shutdown
if dispatchError(error: error) == .shutdown {
logger.log(.info, "Connection has been explicitly shut down by error handler")
if readyState == .open {
config.handler.onClosed()
}
readyState = .shutdown
return
}
}
} else {
logger.log(.info, "Connection unexpectedly closed.")
}

afterComplete()
if readyState == .open {
config.handler.onClosed()
}

readyState = .closed
let sleep = reconnectionTimer.reconnectDelay(baseDelay: currentRetry)
logger.log(.info, "Waiting %.3f seconds before reconnecting...", sleep)
delegateQueue.asyncAfter(deadline: .now() + sleep) { [weak self] in
self?.connect()
}
}

// Tells the delegate that the data task received the initial reply (headers) from the server.
Expand All @@ -280,13 +278,16 @@ class EventSourceDelegate: NSObject, URLSessionDataDelegate {
// swiftlint:disable:next force_cast
let httpResponse = response as! HTTPURLResponse
if (200..<300).contains(httpResponse.statusCode) {
connectedTime = Date()
reconnectionTimer.connectedTime = Date()
readyState = .open
config.handler.onOpened()
completionHandler(.allow)
} else {
logger.log(.info, "Unsuccessful response: %d", httpResponse.statusCode)
errorHandlerAction = dispatchError(error: UnsuccessfulResponseError(responseCode: httpResponse.statusCode))
if dispatchError(error: UnsuccessfulResponseError(responseCode: httpResponse.statusCode)) == .shutdown {
logger.log(.info, "Connection has been explicitly shut down by error handler")
readyState = .shutdown
}
completionHandler(.cancel)
}
}
Expand Down
Loading

0 comments on commit 0d914ee

Please sign in to comment.