带截止日期的gRPC UDP

如何解决带截止日期的gRPC UDP

我已经在gRPC存储库中基于one of the tests创建了一个客户端-服务器程序。

gRPC中的UDP代码不是建立在其RPC层之上的,因此没有存根等概念。

我的代码有效,尽管我注意到在轻度的压力下,很大一部分消息都被丢弃了,而且我不确定这是否完全是由于UDP的有损特性还是与我的代码有关。

我有两个问题:

  1. 主要问题:是否存在用于设置UDP消息期限的gRPC方法?我熟悉ClientContext及其截止日期功能,但是我不知道如何在非TCP-less-less代码中使用它。如果没有,实现此目标的最佳方法是什么?

  2. 对于UDP本地主机通信,下降率是否为%50?

我的代码(它很长,因此请附加它以供参考。我的主要问题不需要阅读代码):

#include <netdb.h>
#include <string>
#include <thread>
#include <vector>

// grpc headers
#include <grpcpp/grpcpp.h>
#include "src/core/lib/iomgr/udp_server.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"

using namespace std;


int client_port = 6666;
int server_port = 5555;
int num_of_msgs = 1000;


int listening_port;
int remote_port;

int fd;

int received_msgs_cnt = 0;
vector<bool> is_received(num_of_msgs,false);


enum Role {
    CLIENT,SERVER
};

struct Request {
    int id;
};

struct Response {
    int id;
};

Role role;

bool udpServerFinished = false;


void sendUdp(const char *hostname,int port,const char* payload,size_t size) {
    auto transferred = write(fd,(void*)payload,size);
    assert(size == transferred);
}


/***************************************
 * UDP Handler class
 * (will be generated by factory class)
 * upon receiving a new message,the Read()
 * function is invoked
 ***************************************/
class UdpHandler : public GrpcUdpHandler {

public:
    UdpHandler(grpc_fd *emfd,void *user_data):
        GrpcUdpHandler(emfd,user_data),emfd_(emfd) {
    }
    virtual ~UdpHandler() {}
    static void startLoop(volatile bool &udpServerFinished) {
        grpc_core::ExecCtx exec_ctx;
        grpc_millis deadline;

        gpr_mu_lock(g_mu);

        while (!udpServerFinished) {
            deadline = grpc_timespec_to_millis_round_up(gpr_time_add(
                    gpr_now(GPR_CLOCK_MONOTONIC),gpr_time_from_millis(10000,GPR_TIMESPAN)));

            grpc_pollset_worker *worker = nullptr;
            GPR_ASSERT(GRPC_LOG_IF_ERROR(
                    "pollset_work",grpc_pollset_work(UdpHandler::g_pollset,&worker,deadline)));

            gpr_mu_unlock(UdpHandler::g_mu);
            grpc_core::ExecCtx::Get()->Flush();
            gpr_mu_lock(UdpHandler::g_mu);

        }
        gpr_mu_unlock(g_mu);
    }

    static grpc_pollset *g_pollset;
    static gpr_mu *g_mu;

public:
    static int g_num_listeners;

protected:
    bool Read() override {
        char read_buffer[512];
        ssize_t byte_count;

        gpr_mu_lock(UdpHandler::g_mu);
        byte_count = recv(grpc_fd_wrapped_fd(emfd()),read_buffer,sizeof(read_buffer),0);

        processIncomingMsg((void*)read_buffer,byte_count);

        GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",grpc_pollset_kick(UdpHandler::g_pollset,nullptr)));
        gpr_mu_unlock(UdpHandler::g_mu);
        return false;
    }

    void processIncomingMsg(void* msg,ssize_t size) {
        received_msgs_cnt++;
        (void)size;
        int id;
        if (role == Role::CLIENT) {
            Response res;
            assert(size == sizeof(Response));
            memcpy((void*)&res,(void*)msg,size);
            id = res.id;
            cout << "Msg: response for request " << res.id  << endl;
        }
        else {
            Request req;
            assert(size == sizeof(Request));
            memcpy((void*)&req,size);
            id = req.id;
            cout << "Msg: request " << req.id << endl;

            // send response
            Response res;
            res.id = req.id;
            sendUdp("127.0.0.1",remote_port,(const char*)&res,sizeof(Response));
        }

        // check for termination condition (both for client and server)
        if (received_msgs_cnt == num_of_msgs) {
            cout << "This is the last msg" << endl;
            udpServerFinished = true;
        }

        // mark the id of the current message
        is_received[id] = true;

        // if this was the last message,print the missing msg ids
        if (id == num_of_msgs - 1) {
            cout << "missing ids: ";
            for (int i = 0; i < num_of_msgs; i++) {
                if (is_received[i] == false)
                    cout << i << ",";
            }
            cout << endl;
            cout << "% of missing messages: "
                    << 1.0 - ((double)received_msgs_cnt / num_of_msgs) << endl;
        }
    }
    void OnCanWrite(void* /*user_data*/,grpc_closure* /*notify_on_write_closure*/) override {
        gpr_mu_lock(g_mu);

        GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",nullptr)));
        gpr_mu_unlock(g_mu);
    }
    void OnFdAboutToOrphan(grpc_closure *orphan_fd_closure,void* /*user_data*/) override {
        grpc_core::ExecCtx::Run(DEBUG_LOCATION,orphan_fd_closure,GRPC_ERROR_NONE);
    }

    grpc_fd *emfd() { return emfd_; }

private:
    grpc_fd *emfd_;
};


int UdpHandler::g_num_listeners = 1;

grpc_pollset *UdpHandler::g_pollset;
gpr_mu *UdpHandler::g_mu;



/****************************************
 * Factory class (generated UDP handler)
 ****************************************/
class UdpHandlerFactory : public GrpcUdpHandlerFactory {
public:
    GrpcUdpHandler *CreateUdpHandler(grpc_fd *emfd,void *user_data) override {
        UdpHandler *handler = new UdpHandler(emfd,user_data);
        return handler;
    }

    void DestroyUdpHandler(GrpcUdpHandler *handler) override {
        delete reinterpret_cast<UdpHandler *>(handler);
    }
};




/****************************************
 * Main function
 ****************************************/
int main(int argc,char *argv[]) {
    if (argc != 2) {
        cerr << "Usage: './run client'  or  './run server' " << endl;
        return 1;
    }

    string r(argv[1]);
    if (r == "client") {
        cout << "Client is initializing to send requests!" << endl;
        role = Role::CLIENT;
        listening_port = client_port;
        remote_port = server_port;
    }
    else if (r == "server") {
        cout << "Server is initializing to accept requests!" << endl;
        role = Role::SERVER;
        listening_port = server_port;
        remote_port = client_port;
    }
    else {
        cerr << "Usage: './run client'  or  './run server' " << endl;
        return 1;
    }

    /********************************************************
     * Initialize UDP Listener
     ********************************************************/
    /* Initialize the grpc library. After it's called,* a matching invocation to grpc_shutdown() is expected. */
    grpc_init();

    grpc_core::ExecCtx exec_ctx;
    UdpHandler::g_pollset = static_cast<grpc_pollset *>(
            gpr_zalloc(grpc_pollset_size()));
    grpc_pollset_init(UdpHandler::g_pollset,&UdpHandler::g_mu);

    grpc_resolved_address resolved_addr;
    struct sockaddr_storage *addr =
            reinterpret_cast<struct sockaddr_storage *>(resolved_addr.addr);
    int svrfd;

    grpc_udp_server *s = grpc_udp_server_create(nullptr);
    grpc_pollset *pollsets[1];

    memset(&resolved_addr,sizeof(resolved_addr));
    resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
    addr->ss_family = AF_INET;

    grpc_sockaddr_set_port(&resolved_addr,listening_port);

    /* setup UDP server */
    UdpHandlerFactory handlerFactory;

    int rcv_buf_size = 1024;
    int snd_buf_size = 1024;

    GPR_ASSERT(grpc_udp_server_add_port(s,&resolved_addr,rcv_buf_size,snd_buf_size,&handlerFactory,UdpHandler::g_num_listeners) > 0);

    svrfd = grpc_udp_server_get_fd(s,0);
    GPR_ASSERT(svrfd >= 0);
    GPR_ASSERT(getsockname(svrfd,(struct sockaddr *) addr,(socklen_t *) &resolved_addr.len) == 0);
    GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));

    pollsets[0] = UdpHandler::g_pollset;
    grpc_udp_server_start(s,pollsets,1,nullptr);

    string addr_str = grpc_sockaddr_to_string(&resolved_addr,1);
    cout << "UDP Server listening on: " << addr_str << endl;
    thread udpPollerThread(
            UdpHandler::startLoop,ref(udpServerFinished));



    /********************************************************
     * Establish connection to the other side
     ********************************************************/
    struct sockaddr_in serv_addr;
    struct hostent *server = gethostbyname("127.0.0.1");

    bzero((char *) &serv_addr,sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    bcopy((char *) server->h_addr,(char *) &serv_addr.sin_addr.s_addr,server->h_length);
    serv_addr.sin_port = htons(remote_port);

    fd = socket(serv_addr.sin_family,SOCK_DGRAM,0);
    GPR_ASSERT(fd >= 0);
    GPR_ASSERT(connect(fd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) == 0);


    /********************************************************
     * Send requests
     ********************************************************/
    if (role == Role::CLIENT) {
        static int counter = 0;

        for (int i = 0; i < num_of_msgs; i++) {
            Request req;
            req.id = counter++;
            cout << "Sending request " << req.id << endl;
            sendUdp("127.0.0.1",(char*)&req,sizeof(Request));
        }
    }

    /********************************************************
     * wait for client to finish
     ********************************************************/
    udpPollerThread.join();

    /********************************************************
     * cleanup
     ********************************************************/
    close(fd);
    gpr_free(UdpHandler::g_pollset);
    grpc_shutdown();

    cout << "finished successfully!" << endl;

    return 0;
}

编译于:
-std=c++17 -I$(GRPC_DIR) -I$(GRPC_DIR)/third_party/abseil-cpp
链接:
pkg-config --libs grpc++

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-