-
-
Notifications
You must be signed in to change notification settings - Fork 40
Open
Description
We need to investigate whether cleanSession and persistent sessions in MQTTConnection still makes sense as they are implemented right now.
Persistent sessions keep track of QoS 1 and 2 messages and subscriptions across different connections, but if an MQTTConnection disconnects, we lose all MQTTTasks related to QoS 1 and 2 PUBLISH messages and MQTTSubscriptions.
These are the old persistent session tests
mqtt-nio/Tests/MQTTNIOTests/MQTTNIOTests.swift
Lines 439 to 536 in 90f1f64
| func testSessionPresent() throws { | |
| let client = self.createClient(identifier: "testSessionPresent") | |
| defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) } | |
| _ = try client.connect(cleanSession: true).wait() | |
| var connack = try client.connect(cleanSession: false).wait() | |
| XCTAssertEqual(connack, false) | |
| try client.disconnect().wait() | |
| connack = try client.connect(cleanSession: false).wait() | |
| XCTAssertEqual(connack, true) | |
| } | |
| func testPersistentSession() throws { | |
| let expectation = XCTestExpectation(description: "testPersistentSession") | |
| expectation.expectedFulfillmentCount = 2 | |
| expectation.assertForOverFulfill = true | |
| let payloadString = | |
| #"{"from":1000000,"to":1234567,"type":1,"content":"I am a beginner in swift and I am studying hard!!测试\n\n test, message","timestamp":1607243024,"nonce":"pAx2EsUuXrVuiIU3GGOGHNbUjzRRdT5b","sign":"ff902e31a6a5f5343d70a3a93ac9f946adf1caccab539c6f3a6"}"# | |
| let payload = ByteBufferAllocator().buffer(string: payloadString) | |
| let client = self.createClient(identifier: "testPersistentSession_publisher") | |
| defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) } | |
| _ = try client.connect().wait() | |
| let client2 = self.createClient(identifier: "testPersistentSession_subscriber") | |
| defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) } | |
| client2.addPublishListener(named: "test") { result in | |
| switch result { | |
| case .success(let publish): | |
| var buffer = publish.payload | |
| let string = buffer.readString(length: buffer.readableBytes) | |
| XCTAssertEqual(string, payloadString) | |
| expectation.fulfill() | |
| case .failure(let error): | |
| XCTFail("\(error)") | |
| } | |
| } | |
| _ = try client2.connect(cleanSession: false).wait() | |
| _ = try client2.subscribe(to: [.init(topicFilter: "testPersistentAtLeastOnce", qos: .atLeastOnce)]).wait() | |
| try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() | |
| Thread.sleep(forTimeInterval: 1) | |
| try client2.disconnect().wait() | |
| _ = try client2.connect(cleanSession: false).wait() | |
| // client2 should receive this publish as we have reconnected with clean session set to false | |
| try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() | |
| Thread.sleep(forTimeInterval: 1) | |
| wait(for: [expectation], timeout: 5.0) | |
| try client.disconnect().wait() | |
| try client2.disconnect().wait() | |
| } | |
| func testNonPersistentSession() throws { | |
| let expectation = XCTestExpectation(description: "testPersistentSession") | |
| expectation.expectedFulfillmentCount = 1 | |
| expectation.assertForOverFulfill = true | |
| let payloadString = | |
| #"{"from":1000000,"to":1234567,"type":1,"content":"I am a beginner in swift and I am studying hard!!测试\n\n test, message","timestamp":1607243024,"nonce":"pAx2EsUuXrVuiIU3GGOGHNbUjzRRdT5b","sign":"ff902e31a6a5f5343d70a3a93ac9f946adf1caccab539c6f3a6"}"# | |
| let payload = ByteBufferAllocator().buffer(string: payloadString) | |
| let client = self.createClient(identifier: "testPersistentSession_publisher") | |
| defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) } | |
| _ = try client.connect().wait() | |
| let client2 = self.createClient(identifier: "testPersistentSession_subscriber") | |
| defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) } | |
| client2.addPublishListener(named: "test") { result in | |
| switch result { | |
| case .success(let publish): | |
| var buffer = publish.payload | |
| let string = buffer.readString(length: buffer.readableBytes) | |
| XCTAssertEqual(string, payloadString) | |
| expectation.fulfill() | |
| case .failure(let error): | |
| XCTFail("\(error)") | |
| } | |
| } | |
| _ = try client2.connect(cleanSession: false).wait() | |
| _ = try client2.subscribe(to: [.init(topicFilter: "testPersistentAtLeastOnce", qos: .atLeastOnce)]).wait() | |
| try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() | |
| Thread.sleep(forTimeInterval: 1) | |
| // disconnect and reconnect with clean session | |
| try client2.disconnect().wait() | |
| _ = try client2.connect(cleanSession: true).wait() | |
| // client2 should not receive this publish as we have reconnected with clean session set to true | |
| try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() | |
| Thread.sleep(forTimeInterval: 1) | |
| wait(for: [expectation], timeout: 5.0) | |
| try client.disconnect().wait() | |
| try client2.disconnect().wait() | |
| } |
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels