diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c5a76d..491add2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,6 +84,10 @@ target_compile_options(${PROJECT_NAME} PUBLIC ${compile_options}) # Test applications ############################################################################### if (ZRPC_BUILD_TESTS) + add_executable(clientDetached tests/clientDetached.cpp) + target_link_libraries(clientDetached zRPC) + target_compile_options(clientDetached PUBLIC ${compile_options}) + add_executable(client tests/client.cpp) target_link_libraries(client zRPC) target_compile_options(client PUBLIC ${compile_options}) diff --git a/include/zRPC.hpp b/include/zRPC.hpp index ae75130..aa0e23d 100644 --- a/include/zRPC.hpp +++ b/include/zRPC.hpp @@ -45,7 +45,6 @@ namespace zRPC { - /** * @class Server zRPC.hpp "zRPC.hpp" * @@ -233,20 +232,34 @@ class Client * @param[in] identity Identity string to use for the client. * @param[in] uri Zero-MQ address:port to bind listening socket to. */ - explicit Client(const std::string &identity, - const std::string &uri); + explicit Client(const std::string &identity, const std::string &uri); /** * @brief Call the RPC with the given name and given arguments * * @tparam A Variadic argument list - * @param[in] name Name of the RPC to call on the remote serveer + * @param[in] name Name of the RPC to call on the remote server * @param[in] args Variadic argument list to pass to the remote server * @return msgpack::object_handle MessagePack'd object handle containing * server response (if any) */ template msgpack::object_handle call(const std::string &name, A... args); + + /** + * @brief Call the RPC with the given name and given arguments + * + * @tparam A Variadic argument list + * @param[in] timeout Timeout in ms before dropping the request + * @param[in] name Name of the RPC to call on the remote server + * @param[in] args Variadic argument list to pass to the remote server + * @return msgpack::object_handle MessagePack'd object handle containing + * server response (if any) + */ + template + msgpack::object_handle call(const int timeout, + const std::string &name, + A... args); }; /** diff --git a/include/zRPCClient.inl b/include/zRPCClient.inl index 8b0346a..ffb23e6 100644 --- a/include/zRPCClient.inl +++ b/include/zRPCClient.inl @@ -31,46 +31,61 @@ namespace zRPC { template msgpack::object_handle Client::call(const std::string &name, A... args) +{ + return call(-1, name, args...); +} + +template +msgpack::object_handle Client::call(int timeout, + const std::string &name, + A... args) { try { // Ensure socket is connected to the server zmq::socket_t l_sock(m_ctx, zmq::socket_type::dealer); + l_sock.set(zmq::sockopt::rcvtimeo, timeout); + // Clean out the memory after the socket is closed + l_sock.set(zmq::sockopt::linger, timeout); l_sock.set(zmq::sockopt::routing_id, m_idBase + std::to_string(m_idx++)); l_sock.connect(m_uri); - if (l_sock) - { - // Create a tuple with the RPC name and arguments - auto args_tuple = std::make_tuple(args...); - auto call_tuple = std::make_tuple(name, args_tuple); + // Create a tuple with the RPC name and arguments + auto args_tuple = std::make_tuple(args...); + auto call_tuple = std::make_tuple(name, args_tuple); - // Pack the tuple into a stringstream and calculate CRC - std::stringstream cbuf; - msgpack::pack(cbuf, call_tuple); - std::uint32_t crc = - CRC::Calculate(cbuf.str().data(), cbuf.str().size(), m_crcTable); - auto crc_tuple = std::make_tuple(cbuf.str(), crc); + // Pack the tuple into a stringstream and calculate CRC + std::stringstream cbuf; + msgpack::pack(cbuf, call_tuple); + std::uint32_t crc = + CRC::Calculate(cbuf.str().data(), cbuf.str().size(), m_crcTable); + auto crc_tuple = std::make_tuple(cbuf.str(), crc); - // Pack the new tuple into an object and send to the server - auto sbuf = std::make_shared(); - msgpack::pack(*sbuf, crc_tuple); - (void)l_sock.send(zmq::const_buffer(sbuf->data(), sbuf->size())); + // Pack the new tuple into an object and send to the server + auto sbuf = std::make_shared(); + msgpack::pack(*sbuf, crc_tuple); + (void)l_sock.send(zmq::const_buffer(sbuf->data(), sbuf->size())); - // Wait for response - zmq::message_t msg; - auto rxres = l_sock.recv(msg); - if (rxres && (rxres.value() > 0)) - { - auto obj = msgpack::unpack(static_cast(msg.data()), msg.size()); - return obj; - } + // Wait for response or timeout event + + zmq::message_t msg; + auto rxres = l_sock.recv(msg); + if (rxres && (rxres.value() > 0)) + { + auto obj = msgpack::unpack(static_cast(msg.data()), msg.size()); + return obj; + } + else + { + std::cout << " ! ZMQ Warning server is not responding, request <" << name + << "> is dropped !" << std::endl; } } catch (const zmq::error_t &e) { std::cerr << " !! ZMQ Error " << e.num() << ": " << e.what() << std::endl; } + return msgpack::object_handle(); } } // namespace zRPC \ No newline at end of file diff --git a/tests/clientDetached.cpp b/tests/clientDetached.cpp new file mode 100644 index 0000000..aa81aa6 --- /dev/null +++ b/tests/clientDetached.cpp @@ -0,0 +1,74 @@ +/* + * @file clientDetached.cpp + * @author Seif Hadrich + * @date 3-May-2021 10:30:00 am + * + * @brief demo application for running a client in detached non blocking thread + * + * @copyright Jonathan Haws -- 2021 + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include "zRPC.hpp" + +class Message +{ +public: + int v{-1}; + + MSGPACK_DEFINE(v) +}; + +void f4(zRPC::Client &client, int timeout) +{ + try + { + Message m; + m.v = 1; + auto res = client.call(timeout, "f4", m); + std::cout << "f4 result = " << res.get().as().v << std::endl; + } + catch (const std::exception &e) + { + std::cerr << e.what() << '\n'; + } +} + +int main(void) +{ + zRPC::Client client("TEST-CLIENT-Detached", "tcp://localhost:12345"); + + int i = 0; + while (i < 100) + { + // Run f4 client with timeout enabled + int timeout = 100; // 100ms + std::cout << "Call detached f4 thread number " << i << std::endl; + std::thread(f4, std::ref(client), timeout).detach(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + i++; + } + + client.call("terminate"); // shutdown the server + sleep(1); +} \ No newline at end of file