Skip to content

Commit

Permalink
feat(zRPCClient): add option to initiate a server request upon timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
seif hadrich committed May 3, 2024
1 parent 7f082ee commit fdd5379
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ target_compile_options(${PROJECT_NAME} PUBLIC ${compile_options})
# Test applications
###############################################################################
if (ZRPC_BUILD_TESTS)
add_executable(clientDetached tests/clientDetached.cpp)
target_link_libraries(clientDetached zRPC)
target_compile_options(clientDetached PUBLIC ${compile_options})

add_executable(client tests/client.cpp)
target_link_libraries(client zRPC)
target_compile_options(client PUBLIC ${compile_options})
Expand Down
21 changes: 17 additions & 4 deletions include/zRPC.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

namespace zRPC
{

/**
* @class Server zRPC.hpp "zRPC.hpp"
*
Expand Down Expand Up @@ -233,20 +232,34 @@ class Client
* @param[in] identity Identity string to use for the client.
* @param[in] uri Zero-MQ address:port to bind listening socket to.
*/
explicit Client(const std::string &identity,
const std::string &uri);
explicit Client(const std::string &identity, const std::string &uri);

/**
* @brief Call the RPC with the given name and given arguments
*
* @tparam A Variadic argument list
* @param[in] name Name of the RPC to call on the remote serveer
* @param[in] name Name of the RPC to call on the remote server
* @param[in] args Variadic argument list to pass to the remote server
* @return msgpack::object_handle MessagePack'd object handle containing
* server response (if any)
*/
template <typename... A>
msgpack::object_handle call(const std::string &name, A... args);

/**
* @brief Call the RPC with the given name and given arguments
*
* @tparam A Variadic argument list
* @param[in] timeout Timeout in ms before dropping the request
* @param[in] name Name of the RPC to call on the remote server
* @param[in] args Variadic argument list to pass to the remote server
* @return msgpack::object_handle MessagePack'd object handle containing
* server response (if any)
*/
template <typename... A>
msgpack::object_handle call(const int timeout,
const std::string &name,
A... args);
};

/**
Expand Down
61 changes: 38 additions & 23 deletions include/zRPCClient.inl
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,61 @@ namespace zRPC
{
template <typename... A>
msgpack::object_handle Client::call(const std::string &name, A... args)
{
return call(-1, name, args...);
}

template <typename... A>
msgpack::object_handle Client::call(int timeout,
const std::string &name,
A... args)
{
try
{
// Ensure socket is connected to the server
zmq::socket_t l_sock(m_ctx, zmq::socket_type::dealer);
l_sock.set(zmq::sockopt::rcvtimeo, timeout);
// Clean out the memory after the socket is closed
l_sock.set(zmq::sockopt::linger, timeout);
l_sock.set(zmq::sockopt::routing_id, m_idBase + std::to_string(m_idx++));
l_sock.connect(m_uri);

if (l_sock)
{
// Create a tuple with the RPC name and arguments
auto args_tuple = std::make_tuple(args...);
auto call_tuple = std::make_tuple(name, args_tuple);
// Create a tuple with the RPC name and arguments
auto args_tuple = std::make_tuple(args...);
auto call_tuple = std::make_tuple(name, args_tuple);

// Pack the tuple into a stringstream and calculate CRC
std::stringstream cbuf;
msgpack::pack(cbuf, call_tuple);
std::uint32_t crc =
CRC::Calculate(cbuf.str().data(), cbuf.str().size(), m_crcTable);
auto crc_tuple = std::make_tuple(cbuf.str(), crc);
// Pack the tuple into a stringstream and calculate CRC
std::stringstream cbuf;
msgpack::pack(cbuf, call_tuple);
std::uint32_t crc =
CRC::Calculate(cbuf.str().data(), cbuf.str().size(), m_crcTable);
auto crc_tuple = std::make_tuple(cbuf.str(), crc);

// Pack the new tuple into an object and send to the server
auto sbuf = std::make_shared<msgpack::sbuffer>();
msgpack::pack(*sbuf, crc_tuple);
(void)l_sock.send(zmq::const_buffer(sbuf->data(), sbuf->size()));
// Pack the new tuple into an object and send to the server
auto sbuf = std::make_shared<msgpack::sbuffer>();
msgpack::pack(*sbuf, crc_tuple);
(void)l_sock.send(zmq::const_buffer(sbuf->data(), sbuf->size()));

// Wait for response
zmq::message_t msg;
auto rxres = l_sock.recv(msg);
if (rxres && (rxres.value() > 0))
{
auto obj = msgpack::unpack(static_cast<char *>(msg.data()), msg.size());
return obj;
}
// Wait for response or timeout event

zmq::message_t msg;
auto rxres = l_sock.recv(msg);
if (rxres && (rxres.value() > 0))
{
auto obj = msgpack::unpack(static_cast<char *>(msg.data()), msg.size());
return obj;
}
else
{
std::cout << " ! ZMQ Warning server is not responding, request <" << name
<< "> is dropped !" << std::endl;
}
}
catch (const zmq::error_t &e)
{
std::cerr << " !! ZMQ Error " << e.num() << ": " << e.what() << std::endl;
}

return msgpack::object_handle();
}
} // namespace zRPC
74 changes: 74 additions & 0 deletions tests/clientDetached.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* @file clientDetached.cpp
* @author Seif Hadrich
* @date 3-May-2021 10:30:00 am
*
* @brief demo application for running a client in detached non blocking thread
*
* @copyright Jonathan Haws -- 2021
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include "zRPC.hpp"

class Message
{
public:
int v{-1};

MSGPACK_DEFINE(v)
};

void f4(zRPC::Client &client, int timeout)
{
try
{
Message m;
m.v = 1;
auto res = client.call(timeout, "f4", m);
std::cout << "f4 result = " << res.get().as<Message>().v << std::endl;
}
catch (const std::exception &e)
{
std::cerr << e.what() << '\n';
}
}

int main(void)
{
zRPC::Client client("TEST-CLIENT-Detached", "tcp://localhost:12345");

int i = 0;
while (i < 100)
{
// Run f4 client with timeout enabled
int timeout = 100; // 100ms
std::cout << "Call detached f4 thread number " << i << std::endl;
std::thread(f4, std::ref(client), timeout).detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}

client.call("terminate"); // shutdown the server
sleep(1);
}

0 comments on commit fdd5379

Please sign in to comment.