如何解决带截止日期的gRPC UDP
我已经在gRPC存储库中基于one of the tests创建了一个客户端-服务器程序。
gRPC中的UDP代码不是建立在其RPC层之上的,因此没有存根等概念。
我的代码有效,尽管我注意到在轻度的压力下,很大一部分消息都被丢弃了,而且我不确定这是否完全是由于UDP的有损特性还是与我的代码有关。
我有两个问题:
-
主要问题:是否存在用于设置UDP消息期限的gRPC方法?我熟悉ClientContext及其截止日期功能,但是我不知道如何在非TCP-less-less代码中使用它。如果没有,实现此目标的最佳方法是什么?
-
对于UDP本地主机通信,下降率是否为%50?
我的代码(它很长,因此请附加它以供参考。我的主要问题不需要阅读代码):
#include <netdb.h>
#include <string>
#include <thread>
#include <vector>
// grpc headers
#include <grpcpp/grpcpp.h>
#include "src/core/lib/iomgr/udp_server.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
using namespace std;
int client_port = 6666;
int server_port = 5555;
int num_of_msgs = 1000;
int listening_port;
int remote_port;
int fd;
int received_msgs_cnt = 0;
vector<bool> is_received(num_of_msgs,false);
enum Role {
CLIENT,SERVER
};
struct Request {
int id;
};
struct Response {
int id;
};
Role role;
bool udpServerFinished = false;
void sendUdp(const char *hostname,int port,const char* payload,size_t size) {
auto transferred = write(fd,(void*)payload,size);
assert(size == transferred);
}
/***************************************
* UDP Handler class
* (will be generated by factory class)
* upon receiving a new message,the Read()
* function is invoked
***************************************/
class UdpHandler : public GrpcUdpHandler {
public:
UdpHandler(grpc_fd *emfd,void *user_data):
GrpcUdpHandler(emfd,user_data),emfd_(emfd) {
}
virtual ~UdpHandler() {}
static void startLoop(volatile bool &udpServerFinished) {
grpc_core::ExecCtx exec_ctx;
grpc_millis deadline;
gpr_mu_lock(g_mu);
while (!udpServerFinished) {
deadline = grpc_timespec_to_millis_round_up(gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),gpr_time_from_millis(10000,GPR_TIMESPAN)));
grpc_pollset_worker *worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",grpc_pollset_work(UdpHandler::g_pollset,&worker,deadline)));
gpr_mu_unlock(UdpHandler::g_mu);
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(UdpHandler::g_mu);
}
gpr_mu_unlock(g_mu);
}
static grpc_pollset *g_pollset;
static gpr_mu *g_mu;
public:
static int g_num_listeners;
protected:
bool Read() override {
char read_buffer[512];
ssize_t byte_count;
gpr_mu_lock(UdpHandler::g_mu);
byte_count = recv(grpc_fd_wrapped_fd(emfd()),read_buffer,sizeof(read_buffer),0);
processIncomingMsg((void*)read_buffer,byte_count);
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",grpc_pollset_kick(UdpHandler::g_pollset,nullptr)));
gpr_mu_unlock(UdpHandler::g_mu);
return false;
}
void processIncomingMsg(void* msg,ssize_t size) {
received_msgs_cnt++;
(void)size;
int id;
if (role == Role::CLIENT) {
Response res;
assert(size == sizeof(Response));
memcpy((void*)&res,(void*)msg,size);
id = res.id;
cout << "Msg: response for request " << res.id << endl;
}
else {
Request req;
assert(size == sizeof(Request));
memcpy((void*)&req,size);
id = req.id;
cout << "Msg: request " << req.id << endl;
// send response
Response res;
res.id = req.id;
sendUdp("127.0.0.1",remote_port,(const char*)&res,sizeof(Response));
}
// check for termination condition (both for client and server)
if (received_msgs_cnt == num_of_msgs) {
cout << "This is the last msg" << endl;
udpServerFinished = true;
}
// mark the id of the current message
is_received[id] = true;
// if this was the last message,print the missing msg ids
if (id == num_of_msgs - 1) {
cout << "missing ids: ";
for (int i = 0; i < num_of_msgs; i++) {
if (is_received[i] == false)
cout << i << ",";
}
cout << endl;
cout << "% of missing messages: "
<< 1.0 - ((double)received_msgs_cnt / num_of_msgs) << endl;
}
}
void OnCanWrite(void* /*user_data*/,grpc_closure* /*notify_on_write_closure*/) override {
gpr_mu_lock(g_mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",nullptr)));
gpr_mu_unlock(g_mu);
}
void OnFdAboutToOrphan(grpc_closure *orphan_fd_closure,void* /*user_data*/) override {
grpc_core::ExecCtx::Run(DEBUG_LOCATION,orphan_fd_closure,GRPC_ERROR_NONE);
}
grpc_fd *emfd() { return emfd_; }
private:
grpc_fd *emfd_;
};
int UdpHandler::g_num_listeners = 1;
grpc_pollset *UdpHandler::g_pollset;
gpr_mu *UdpHandler::g_mu;
/****************************************
* Factory class (generated UDP handler)
****************************************/
class UdpHandlerFactory : public GrpcUdpHandlerFactory {
public:
GrpcUdpHandler *CreateUdpHandler(grpc_fd *emfd,void *user_data) override {
UdpHandler *handler = new UdpHandler(emfd,user_data);
return handler;
}
void DestroyUdpHandler(GrpcUdpHandler *handler) override {
delete reinterpret_cast<UdpHandler *>(handler);
}
};
/****************************************
* Main function
****************************************/
int main(int argc,char *argv[]) {
if (argc != 2) {
cerr << "Usage: './run client' or './run server' " << endl;
return 1;
}
string r(argv[1]);
if (r == "client") {
cout << "Client is initializing to send requests!" << endl;
role = Role::CLIENT;
listening_port = client_port;
remote_port = server_port;
}
else if (r == "server") {
cout << "Server is initializing to accept requests!" << endl;
role = Role::SERVER;
listening_port = server_port;
remote_port = client_port;
}
else {
cerr << "Usage: './run client' or './run server' " << endl;
return 1;
}
/********************************************************
* Initialize UDP Listener
********************************************************/
/* Initialize the grpc library. After it's called,* a matching invocation to grpc_shutdown() is expected. */
grpc_init();
grpc_core::ExecCtx exec_ctx;
UdpHandler::g_pollset = static_cast<grpc_pollset *>(
gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(UdpHandler::g_pollset,&UdpHandler::g_mu);
grpc_resolved_address resolved_addr;
struct sockaddr_storage *addr =
reinterpret_cast<struct sockaddr_storage *>(resolved_addr.addr);
int svrfd;
grpc_udp_server *s = grpc_udp_server_create(nullptr);
grpc_pollset *pollsets[1];
memset(&resolved_addr,sizeof(resolved_addr));
resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
addr->ss_family = AF_INET;
grpc_sockaddr_set_port(&resolved_addr,listening_port);
/* setup UDP server */
UdpHandlerFactory handlerFactory;
int rcv_buf_size = 1024;
int snd_buf_size = 1024;
GPR_ASSERT(grpc_udp_server_add_port(s,&resolved_addr,rcv_buf_size,snd_buf_size,&handlerFactory,UdpHandler::g_num_listeners) > 0);
svrfd = grpc_udp_server_get_fd(s,0);
GPR_ASSERT(svrfd >= 0);
GPR_ASSERT(getsockname(svrfd,(struct sockaddr *) addr,(socklen_t *) &resolved_addr.len) == 0);
GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
pollsets[0] = UdpHandler::g_pollset;
grpc_udp_server_start(s,pollsets,1,nullptr);
string addr_str = grpc_sockaddr_to_string(&resolved_addr,1);
cout << "UDP Server listening on: " << addr_str << endl;
thread udpPollerThread(
UdpHandler::startLoop,ref(udpServerFinished));
/********************************************************
* Establish connection to the other side
********************************************************/
struct sockaddr_in serv_addr;
struct hostent *server = gethostbyname("127.0.0.1");
bzero((char *) &serv_addr,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char *) server->h_addr,(char *) &serv_addr.sin_addr.s_addr,server->h_length);
serv_addr.sin_port = htons(remote_port);
fd = socket(serv_addr.sin_family,SOCK_DGRAM,0);
GPR_ASSERT(fd >= 0);
GPR_ASSERT(connect(fd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) == 0);
/********************************************************
* Send requests
********************************************************/
if (role == Role::CLIENT) {
static int counter = 0;
for (int i = 0; i < num_of_msgs; i++) {
Request req;
req.id = counter++;
cout << "Sending request " << req.id << endl;
sendUdp("127.0.0.1",(char*)&req,sizeof(Request));
}
}
/********************************************************
* wait for client to finish
********************************************************/
udpPollerThread.join();
/********************************************************
* cleanup
********************************************************/
close(fd);
gpr_free(UdpHandler::g_pollset);
grpc_shutdown();
cout << "finished successfully!" << endl;
return 0;
}
编译于:-std=c++17 -I$(GRPC_DIR) -I$(GRPC_DIR)/third_party/abseil-cpp
。
链接:pkg-config --libs grpc++
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。