Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refs #81 Updated the following behaviors: #82

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/hnz.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,17 @@ class HNZ {
*/
void m_sendAllTIQualityReadings(const ReadingParameters& paramsTemplate, const vector<unsigned int>& rejectFilter = {});

/**
* At the end of a CG request returns:
* - The list of TS addresses that are missing
* - The list of TS adresses that were received but not expected
*/
struct AddressesDiff {
std::vector<unsigned int> missingAddresses;
std::vector<unsigned int> extraAddresses;
};
AddressesDiff m_getMismatchingTSCGAddresses() const;

};

#endif
56 changes: 49 additions & 7 deletions src/hnz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ void HNZ::receive(std::shared_ptr<HNZPath> hnz_path_in_use) {
}
}

/* Helper function used to get a printable version of an address list */
std::string formatAddresses(const std::vector<unsigned int>& addresses) {
std::string out = "[";
for(auto address: addresses) {
if(out.size() > 1) {
out += ", ";
}
out += std::to_string(address);
}
out += "]";
return out;
}

void HNZ::m_handle_message(const vector<unsigned char>& data) {
std::lock_guard<std::recursive_mutex> guard(m_configMutex);
std::string beforeLog = HnzUtility::NamePlugin + " - HNZ::m_handle_message -";
Expand Down Expand Up @@ -235,13 +248,18 @@ void HNZ::m_handle_message(const vector<unsigned char>& data) {
}
// Check if GI is complete (make sure we only update the end of GI status after creating readings for all TS received)
if ((t == TSCG_CODE) && !m_gi_addresses_received.empty()) {
// All expected TS received: GI succeeded
if (m_gi_addresses_received.size() == m_hnz_conf->getNumberCG()) {
// Last expected TS received: GI succeeded
if (m_gi_addresses_received.back() == m_hnz_conf->getLastTSAddress()) {
auto nbTSCG = m_hnz_conf->getNumberCG();
// Mismatch in the number of TS received: Log CG as incomplete
if (m_gi_addresses_received.size() != nbTSCG) {
AddressesDiff TSAddressesDiff = m_getMismatchingTSCGAddresses();
HnzUtility::log_warn("%s Received last TSCG but %lu TS received when %lu were expected: Missing %s, Extra %s",
beforeLog.c_str(), m_gi_addresses_received.size(), nbTSCG,
formatAddresses(TSAddressesDiff.missingAddresses).c_str(),
formatAddresses(TSAddressesDiff.extraAddresses).c_str());
}
m_hnz_connection->checkGICompleted(true);
// Last expected TS received but some other TS were missing: GI failed
}
else if (m_gi_addresses_received.back() == m_hnz_conf->getLastTSAddress()) {
m_hnz_connection->checkGICompleted(false);
}
}
}
Expand Down Expand Up @@ -554,7 +572,7 @@ static bool endsWith(const std::string& str, const std::string& suffix)

bool HNZ::operation(const std::string& operation, int count, PLUGIN_PARAMETER** params) {
std::string beforeLog = HnzUtility::NamePlugin + " - HNZ::operation -";
HnzUtility::log_info("%s Operation %s", beforeLog.c_str(), operation.c_str());
HnzUtility::log_info("%s Operation %s: %s", beforeLog.c_str(), operation.c_str(), paramsToStr(params, count).c_str());

// Workaround until the following ticket is fixed: https://github.com/fledge-iot/fledge/issues/1239
// if (operation == "HNZCommand") {
Expand Down Expand Up @@ -805,6 +823,7 @@ void HNZ::GICompleted(bool success) {
m_hnz_connection->onGICompleted();
if (success) {
HnzUtility::log_info("%s General Interrogation completed.", beforeLog.c_str());
m_sendAllTSQualityReadings(true, false, m_gi_addresses_received);
updateGiStatus(GiStatus::FINISHED);
}
else {
Expand Down Expand Up @@ -861,4 +880,27 @@ void HNZ::m_sendAllTIQualityReadings(const ReadingParameters& paramsTemplate, co
if (!readings.empty()) {
m_sendToFledge(readings);
}
}

HNZ::AddressesDiff HNZ::m_getMismatchingTSCGAddresses() const {
std::set<unsigned int> missingAddresses;
std::set<unsigned int> extraAddresses;
// Fill missingAddresses with all known addresses
const auto& allMessages = m_hnz_conf->get_all_messages();
const auto& allTSs = allMessages.at("TS").at(m_remote_address);
for (auto const& kvp : allTSs) {
unsigned int msg_address = kvp.first;
missingAddresses.insert(msg_address);
}
// Remove addresses received in missingAddresses / store unknown addresses in extraAddresses
for(auto address: m_gi_addresses_received) {
if(missingAddresses.count(address) == 0) {
extraAddresses.insert(address);
} else {
missingAddresses.erase(address);
}
}
return AddressesDiff{
std::vector<unsigned int>(missingAddresses.begin(), missingAddresses.end()),
std::vector<unsigned int>(extraAddresses.begin(), extraAddresses.end())};
}
2 changes: 2 additions & 0 deletions src/hnzconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void HNZConnection::checkGICompleted(bool success) {
// GI not completed in time or last TS received with other missing TS
if (m_active_path->gi_repeat > gi_repeat_count_max) {
// GI failed
HnzUtility::log_warn("%s Maximum GI repeat reached (%d)", beforeLog.c_str(), gi_repeat_count_max);
m_hnz_fledge->GICompleted(false);
} else {
HnzUtility::log_warn("%s General Interrogation Timeout, repeat GI", beforeLog.c_str());
Expand Down Expand Up @@ -182,6 +183,7 @@ void HNZConnection::m_check_GI() {
// Check the status of an ongoing GI
if (m_active_path->gi_repeat != 0) {
if (m_active_path->gi_start_time + gi_time_max < m_current) {
HnzUtility::log_warn("%s GI timeout (%d ms)", beforeLog.c_str(), gi_time_max);
checkGICompleted(false);
}
}
Expand Down
32 changes: 30 additions & 2 deletions src/hnzpath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,31 @@ vector<unsigned char> convertPayloadToVector(unsigned char* data, int size) {
/**
* Helper method to convert payload into something readable for logs.
*/
string convert_data_to_str(unsigned char* data, int len) {
std::string convert_data_to_str(unsigned char* data, int len) {
if (data == nullptr) {
return "";
}
std::stringstream stream;
for (int i = 0; i < len; i++) {
if (i > 0) {
stream << " ";
}
stream << std::setfill ('0') << std::setw(2) << std::hex << static_cast<unsigned int>(data[i]);
if (i < len - 1) stream << " ";
}
return stream.str();
}

/**
* Helper method to convert message into something readable for logs.
*/
std::string convert_message_to_str(const Message& message) {
std::stringstream stream;
auto len = message.payload.size();
for (int i = 0; i < len; i++) {
if (i > 0) {
stream << " ";
}
stream << std::setfill ('0') << std::setw(2) << std::hex << static_cast<unsigned int>(message.payload[i]);
}
return stream.str();
}
Expand Down Expand Up @@ -524,12 +541,23 @@ void HNZPath::m_sendRR(bool repetition, int ns, int nr) {
}

void HNZPath::m_sendInfo(unsigned char* msg, unsigned long size) {
std::string beforeLog = HnzUtility::NamePlugin + " - HNZPath::m_sendInfo - " + m_name_log;
Message message;
message.payload = vector<unsigned char>(msg, msg + size);

if (msg_sent.size() < m_anticipation_ratio) {
m_sendInfoImmediately(message);
} else {
std::string waitingMsgStr;
for(const Message& waitingMsg: msg_sent) {
if (waitingMsgStr.size() > 0){
waitingMsgStr += ", ";
}
waitingMsgStr += "[" + convert_message_to_str(waitingMsg) + "]";
}
HnzUtility::log_debug(beforeLog + " Anticipation ratio reached (" + std::to_string(m_anticipation_ratio) + "), message ["
+ convert_data_to_str(msg, static_cast<int>(size)) + "] will be delayed. Messages waiting: "
+ waitingMsgStr);
msg_waiting.push_back(message);
}
}
Expand Down
119 changes: 75 additions & 44 deletions tests/test_hnz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ string protocol_stack_generator(int port, int port2) {
to_string(port2) + "}"
: "") +
" ] } , \"application_layer\" : { \"repeat_timeout\" : 3000, \"repeat_path_A\" : 3,"
"\"remote_station_addr\" : 1, \"max_sarm\" : 5, \"gi_time\" : 1, \"gi_repeat_count\" : 2 },"
"\"south_monitoring\" : { \"asset\" : \"TEST_STATUS\" } } }";
"\"remote_station_addr\" : 1, \"max_sarm\" : 5, \"gi_time\" : 1, \"gi_repeat_count\" : 2,"
"\"anticipation_ratio\" : 5 }, \"south_monitoring\" : { \"asset\" : \"TEST_STATUS\" } } }";
}

class HNZTestComp : public HNZ {
Expand Down Expand Up @@ -889,7 +889,7 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
if(HasFatalFailure()) return;

///////////////////////////////////////
// Send TS1 + TS2 only, then TS1 + TS2 + TS3 as CG answer
// Send TS1 + TS2 only, then TS3 only as CG answer
///////////////////////////////////////
hnz->sendCG();
debug_print("[HNZ south plugin] CG request 2 sent");
Expand Down Expand Up @@ -924,17 +924,18 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
}

// Extra CG messages should have been sent automatically because some TS are missing and gi_time was reached
frames = server->popLastFramesReceived();
CGframe = findFrameWithId(frames, 0x13);
ASSERT_NE(CGframe.get(), nullptr) << "Cound not find CG in frames received: " << BasicHNZServer::framesToStr(frames);
validateFrame(server->popLastFramesReceived(), {0x13, 0x01});
if(HasFatalFailure()) return;

// Send only second of the two expected TS (new CG was sent so the TS received earlier are ignored)
server->sendFrame({0x16, 0x39, 0x00, 0x01, 0x00, 0x00}, false);
debug_print("[HNZ Server] TSCG 2 sent");
this_thread::sleep_for(chrono::milliseconds(500)); // must be < gi_time
waitUntil(dataObjectsReceived, 3, 1000);

// Only second of the 2 TS CG messages were sent, it contains data for TS3
ASSERT_EQ(dataObjectsReceived, 1);
// CG is incomplete, but as last TS was received, it is still considered a finished CG
// Then quality update were sent for all missing TS (TS1 + TS2)
ASSERT_EQ(dataObjectsReceived, 3);
resetCounters();
currentReading = popFrontReadingsUntil("TS3");
validateReading(currentReading, "TS3", {
Expand All @@ -948,10 +949,26 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
});
if(HasFatalFailure()) return;

// Extra CG messages should have been sent automatically because some TS are missing and last expected TS was received
// Validate quality update for TS messages that were not sent
validateMissingTSCGQualityUpdate({"TS1", "TS2"}, false);
if(HasFatalFailure()) return;

// As CG is finished, no more CG should be sent automatically any more
this_thread::sleep_for(chrono::milliseconds(1200)); // gi_time + 200ms
frames = server->popLastFramesReceived();
CGframe = findFrameWithId(frames, 0x13);
ASSERT_NE(CGframe.get(), nullptr) << "Cound not find CG in frames received: " << BasicHNZServer::framesToStr(frames);
ASSERT_EQ(CGframe.get(), nullptr) << "No CG frame should be sent after last TS was received, but found: " << BasicHNZServer::frameToStr(CGframe);

///////////////////////////////////////
// Send TS1 + TS2 + TS3 as CG answer
///////////////////////////////////////
hnz->sendCG();
debug_print("[HNZ south plugin] CG request 3 sent");
this_thread::sleep_for(chrono::milliseconds(500)); // must not be too close to a multiple of gi_time

// Find the CG frame in the list of frames received by server and validate it
validateFrame(server->popLastFramesReceived(), {0x13, 0x01});
if(HasFatalFailure()) return;

// Send both TS this time (new CG was sent so the TS received earlier are ignored)
server->sendFrame({0x16, 0x33, 0x10, 0x00, 0x04, 0x00}, false);
Expand Down Expand Up @@ -986,7 +1003,7 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
// Send TS1 + TS2 + TS3 as CG answer with invalid flag for TS3
///////////////////////////////////////
hnz->sendCG();
debug_print("[HNZ south plugin] CG request 3 sent");
debug_print("[HNZ south plugin] CG request 4 sent");
this_thread::sleep_for(chrono::milliseconds(500)); // must not be too close to a multiple of gi_time

// Find the CG frame in the list of frames received by server and validate it
Expand Down Expand Up @@ -1018,31 +1035,61 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
}

///////////////////////////////////////
// Send TS3 only as CG answer
// Validate missing TS sent as invalid in case all CG attempts failed
///////////////////////////////////////
hnz->sendCG();
debug_print("[HNZ south plugin] CG request 4 sent");
debug_print("[HNZ south plugin] CG request 6 sent");
this_thread::sleep_for(chrono::milliseconds(500)); // must not be too close to a multiple of gi_time

// Find the CG frame in the list of frames received by server and validate it
validateFrame(server->popLastFramesReceived(), {0x13, 0x01});
if(HasFatalFailure()) return;

// Abort the first two of the 3 CG requests by sending the last TS only
// Abort the first two of the 3 CG requests by sending the first TS only
for(int i=0 ; i<2 ; i++) {
// Send only second of the two expected TS
server->sendFrame({0x16, 0x39, 0x00, 0x01, 0x00, 0x00}, false);
// Send only first of the two expected TS
server->sendFrame({0x16, 0x33, 0x10, 0x00, 0x04, 0x00}, false);
debug_print("[HNZ Server] TSCG %d sent", (5+i));
waitUntil(dataObjectsReceived, 1, 1000);
this_thread::sleep_for(chrono::milliseconds(1200)); // gi_time + 200ms

// Check that ingestCallback had been called
ASSERT_EQ(dataObjectsReceived, 1);
// Check that ingestCallback had been called for TS1 and TS2 only
ASSERT_EQ(dataObjectsReceived, 2);
resetCounters();
currentReading = popFrontReadingsUntil("TS3");
validateReading(currentReading, "TS3", {
for (int j = 0; j < 2; j++) {
std::string label("TS" + to_string(j + 1));
currentReading = popFrontReadingsUntil(label);
validateReading(currentReading, label, {
{"do_type", {"string", "TS"}},
{"do_station", {"int64_t", "1"}},
{"do_addr", {"int64_t", addrByTS[label]}},
{"do_value", {"int64_t", "1"}},
{"do_valid", {"int64_t", "0"}},
{"do_cg", {"int64_t", "1"}},
{"do_outdated", {"int64_t", "0"}},
});
if(HasFatalFailure()) return;
}

// Extra CG messages should have been sent automatically because some TS are missing and gi_time was reached
validateFrame(server->popLastFramesReceived(), {0x13, 0x01});
if(HasFatalFailure()) return;
}

// Send only first of the two expected TS on the final CG attempt
server->sendFrame({0x16, 0x33, 0x10, 0x00, 0x04, 0x00}, false);
debug_print("[HNZ Server] TSCG 7 sent");
waitUntil(dataObjectsReceived, 3, 1200); // gi_time + 200ms

// Check that ingestCallback had been called for TS1 and TS2 only
ASSERT_EQ(dataObjectsReceived, 3);
resetCounters();
for (int j = 0; j < 2; j++) {
std::string label("TS" + to_string(j + 1));
currentReading = popFrontReadingsUntil(label);
validateReading(currentReading, label, {
{"do_type", {"string", "TS"}},
{"do_station", {"int64_t", "1"}},
{"do_addr", {"int64_t", "577"}},
{"do_addr", {"int64_t", addrByTS[label]}},
{"do_value", {"int64_t", "1"}},
{"do_valid", {"int64_t", "0"}},
{"do_cg", {"int64_t", "1"}},
Expand All @@ -1051,30 +1098,14 @@ TEST_F(HNZTest, ReceivingTSCGMessages) {
if(HasFatalFailure()) return;
}

// Send only second of the two expected TS on the final CG attempt
server->sendFrame({0x16, 0x39, 0x00, 0x01, 0x00, 0x00}, false);
debug_print("[HNZ Server] TSCG 7 sent");
waitUntil(dataObjectsReceived, 3, 1000);

// Check that ingestCallback had been called
ASSERT_EQ(dataObjectsReceived, 3);
resetCounters();
// Only TS3 was received, the other two were sent as invalid quality update
currentReading = popFrontReadingsUntil("TS3");
validateReading(currentReading, "TS3", {
{"do_type", {"string", "TS"}},
{"do_station", {"int64_t", "1"}},
{"do_addr", {"int64_t", "577"}},
{"do_value", {"int64_t", "1"}},
{"do_valid", {"int64_t", "0"}},
{"do_cg", {"int64_t", "1"}},
{"do_outdated", {"int64_t", "0"}},
});
if(HasFatalFailure()) return;

// Validate quality update for TS messages that were not sent
validateMissingTSCGQualityUpdate({"TS1", "TS2"}, false);
validateMissingTSCGQualityUpdate({"TS3"}, false);
if(HasFatalFailure()) return;

// Send a few extra CG requests to trigger the anticipation ratio message
hnz->sendCG();
hnz->sendCG();
hnz->sendCG();
}

TEST_F(HNZTest, ReceivingTMAMessages) {
Expand Down
Loading