nginx+rsyslog+kafka+clickhouse+grafana 实现nginx 网关监控

需求

  • 我想做一个类似腾讯云网关日志最终以仪表方式呈现,比如说qps、p99、p95的请求响应时间等等

流程图

数据流转就像标题
nginx ----> rsyslog ----> kafka —> clickhouse —> grafana

部署

kafka

kafka 相关部署这里不做赘述,只要创建一个topic 就可以
这里kafka地址是 192.168.1.180,topic是``

rsyslog 设置

rsyslog 具体是啥东西这个我这里也不做介绍,本人也是一个后端开发不是做运维的,只知道这个东西性能不错,算是logstash 平替把

# 安装rsyslog-kafka 插件
yum install -y rsyslog-kafka
# 创建一个配置

vim /etc/rsyslog.d/rsyslog_nginx_kafka_cluster.conf

conf 内容

module(load="imudp")
input(type="imudp" port="514")

# nginx access log ==> rsyslog server(local) ==> kafka
module(load="omkafka")

template(name="nginxLog" type="string" string="%msg%")

if $inputname == "imudp" then {
    # 这里的名字和下面nginx 配置tag 相同
    if ($programname == "nginx_access_log") then
        action(type="omkafka"
            template="nginxLog"
            # kafka 地址
            broker=["192.168.1.180:9092"]
            # topic 名字
            topic="rsyslog_nginx"
            partitions.auto="on"
            confParam=[
                "socket.keepalive.enable=true"
            ]
        )
}

:rawmsg,contains,"nginx_access_log" ~

最后重启rsyslog 即可

nginx

http 节点中设置如下

log_format jsonlog '{'
        '"host": "$host",'
        '"server_addr": "$server_addr",'
        '"remote_addr":"$remote_addr",'
        '"time_format":"$time_ms",'
        '"time_sec":$times,'
        '"timestamp":$timestamp,'
        '"method":"$request_method",'
        '"request_url":"$request_uri",'
        '"status":$status,'
        '"upstream_name":"$upstream_name",'
        '"http_user_agent":"$http_user_agent",'
        '"upstream_addr":"$upstream_addr",'
        '"trace_id":"$http_traceid",'
        '"upstream_status":"$upstream_status",'
        '"upstream_response_time":"$upstream_response_time",'
        '"request_time":$request_time,'
        '"nginx_host":"$nginx_host"'
        '}';
	# 配置rsyslog,tag 和上文rsyslog 中相同!!!
    access_log syslog:server=192.168.1.179,facility=local7,tag=nginx_access_log,severity=info jsonlog;
    #access_log  /var/log/nginx/access.log  jsonlog;

配置实际转发upstream

upstream testcontrol{
	server 192.168.10.123:8081;
}

server {
    listen       18081;
    server_name  _;
    # 设置up name
    set $upstream_name 'testcontrol';
    include /etc/nginx/conf/time.conf;

    location / {
	proxy_set_header traceid $http_traceid;
        proxy_set_header n-t $msec;
        #proxy_set_header X-Real-IP $remote_addr;
        proxy_pass http://testcontrol;
    }

    location ~ /status {
        stub_status   on;
    }
}

/etc/nginx/conf/time.conf; 配置

set $nginx_host '192.168.8.64';
if ($time_iso8601 ~ "^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})") {
        set $year $1;
        set $month $2;
        set $day $3;
        set $hour $4;
        set $minutes $5;
        set $seconds $6;
        set $time_sec "$1-$2-$3 $4:$5:$6";
    }

    # 获取毫秒时间戳,并拼接到$time_zh之后
    if ($msec ~ "(\d+)\.(\d+)") {
       # 时间戳-ms
        set $timestamp $1$2;
        # 时间戳-s
	     set $times $1;
	     # 格式化之后的时间ms
        set $time_ms $time_sec.$2;
        # traceId,方便查日志
	    set $http_traceid $nginx_host#$timestamp#$request_id;
}

clickhouse

先创建一个 nginx_gw_log 表


create table app_logs.nginx_gw_log
(
    timestamp              DateTime64(3, 'Asia/Shanghai'),
    server_addr            String,
    remote_addr            String,
    time_sec               DateTime('Asia/Shanghai'),
    method                 String,
    request_url            String,
    status                 Int16,
    upstream_name          String,
    http_user_agent        String,
    upstream_addr          String,
    trace_id               String,
    upstream_status        String,
    upstream_response_time String,
    request_time           Int32,
    nginx_host              String,
    host                   String
) engine = MergeTree PARTITION BY toYYYYMMDD(timestamp)
      ORDER BY timestamp
      TTL toDateTime(timestamp) + toIntervalDay(1);

上面设置了过期1天,可以自由修改
添加kafka 引擎

create table app_logs.nginx_gw_log_kafka
(
    timestamp              DateTime64(3,
    status                 Int32,
    request_time           Decimal32(3),
    host                   String
) engine = Kafka()
      SETTINGS kafka_broker_list = '192.168.1.180:9092',
          kafka_topic_list = 'rsyslog_nginx',
          kafka_group_name = 'nginx_ck',
          kafka_format = 'JSONEachRow',
          kafka_skip_broken_messages = 100000,
          kafka_num_consumers = 1;

最后设置mv

CREATE MATERIALIZED VIEW app_logs.nginx_gw_log_mv TO app_logs.nginx_gw_log AS
SELECT timestamp,
       server_addr,
       remote_addr,
       time_sec,
       method,
       request_url,
       status,
       upstream_name,
       http_user_agent,
       upstream_addr,
       trace_id,
       upstream_status,
       upstream_response_time,
       toInt32(multiply(request_time, 1000)) as request_time,
       host
FROM app_logs.nginx_gw_log_kafka;

这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做

上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了

grafana

{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "grafana",
          "uid": "-- Grafana --"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0,211,255,1)",
        "name": "Annotations & Alerts",
        "target": {
          "limit": 100,
          "matchAny": false,
          "tags": [],
          "type": "dashboard"
        },
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 19,
  "links": [],
  "liveNow": false,
  "panels": [
    {
      "datasource": {
        "type": "grafana-clickhouse-datasource",
        "uid": "${datasource}"
      },
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "palette-classic"
          },
          "custom": {
            "axisCenteredZero": false,
            "axisColorMode": "text",
            "axisLabel": "",
            "axisPlacement": "auto",
            "barAlignment": 0,
            "drawStyle": "line",
            "fillOpacity": 0,
            "gradientMode": "none",
            "hideFrom": {
              "legend": false,
              "tooltip": false,
              "viz": false
            },
            "lineInterpolation": "linear",
            "lineWidth": 1,
            "pointSize": 5,
            "scaleDistribution": {
              "type": "linear"
            },
            "showPoints": "auto",
            "spanNulls": false,
            "stacking": {
              "group": "A",
              "mode": "none"
            },
            "thresholdsStyle": {
              "mode": "off"
            }
          },
          "mappings": [],
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {
                "color": "green",
                "value": null
              },
              {
                "color": "red",
                "value": 80
              }
            ]
          }
        },
        "overrides": []
      },
      "gridPos": {
        "h": 9,
        "w": 12,
        "x": 0,
        "y": 0
      },
      "id": 2,
      "options": {
        "legend": {
          "calcs": [],
          "displayMode": "list",
          "placement": "bottom",
          "showLegend": true
        },
        "tooltip": {
          "mode": "single",
          "sort": "none"
        }
      },
      "pluginVersion": "9.1.1",
      "targets": [
        {
          "builderOptions": {
            "database": "app_logs",
            "fields": [],
            "filters": [
              {
                "condition": "AND",
                "filterType": "custom",
                "key": "timestamp",
                "operator": "WITH IN DASHBOARD TIME RANGE",
                "restrictToFields": [
                  {
                    "label": "timestamp",
                    "name": "timestamp",
                    "picklistValues": [],
                    "type": "DateTime64(3,'Asia/Shanghai')"
                  },
                  {
                    "label": "startTime",
                    "name": "startTime",'Asia/Shanghai')"
                  }
                ],
                "type": "datetime"
              }
            ],
            "groupBy": [],
            "limit": 100,
            "metrics": [
              {
                "aggregation": "count",
                "field": ""
              }
            ],
            "mode": "list",
            "orderBy": [],
            "table": "tk_wx_control_req_kafka",
            "timeField": "timestamp",
            "timeFieldType": "DateTime64(3,'Asia/Shanghai')"
          },
          "datasource": {
            "type": "grafana-clickhouse-datasource",
            "uid": "${datasource}"
          },
          "format": 1,
          "meta": {
            "builderOptions": {
              "database": "app_logs",
              "fields": [],
              "filters": [
                {
                  "condition": "AND",
                  "filterType": "custom",
                  "key": "timestamp",
                  "operator": "WITH IN DASHBOARD TIME RANGE",
                  "restrictToFields": [
                    {
                      "label": "timestamp",
                      "name": "timestamp",
                      "picklistValues": [],
                      "type": "DateTime64(3,'Asia/Shanghai')"
                    },
                    {
                      "label": "startTime",
                      "name": "startTime",'Asia/Shanghai')"
                    }
                  ],
                  "type": "datetime"
                }
              ],
              "groupBy": [],
              "limit": 100,
              "metrics": [
                {
                  "aggregation": "count",
                  "field": ""
                }
              ],
              "mode": "list",
              "orderBy": [],
              "table": "tk_wx_control_req_kafka",
              "timeField": "timestamp",
              "timeFieldType": "DateTime64(3,'Asia/Shanghai')"
            }
          },
          "queryType": "sql",
          "rawSql": "SELECT time_sec as a,count() as num FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime )\r\ngroup by a order by a desc ;\r\n",
          "refId": "A",
          "selectedFormat": 4
        }
      ],
      "title": "qps",
      "type": "timeseries"
    },
    {
      "datasource": {
        "type": "grafana-clickhouse-datasource",
        "x": 12,
      "id": 3,toInt32(quantile(0.95)(request_time))  as num FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime )\r\ngroup by a order by a desc ;",
          "refId": "p95",
          "selectedFormat": 4
        },
        {
          "builderOptions": {
            "database": "app_logs",
          "hide": false,toInt32(quantile(0.99)(request_time))  as num FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime )\r\ngroup by a order by a desc ;",
          "refId": "p99",
      "title": "响应时间统计",
      "fieldConfig": {
        "defaults": {
          "color": {
            "mode": "thresholds"
          },
          "custom": {
            "align": "auto",
            "displayMode": "auto",
            "inspect": false
          },
        "y": 9
      },
      "id": 4,
      "options": {
        "footer": {
          "fields": "",
          "reducer": [
            "sum"
          ],
          "show": false
        },
        "showHeader": true
      },
          "rawSql": "SELECT request_time as k,request_url,trace_id  FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime )\r\n order by k desc limit 10;\r\n",
      "title": "最耗时",
      "type": "table"
    },
      "id": 5,
          "rawSql": "SELECT request_url,count() as num FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime )\r\ngroup by request_url order by request_url desc ;",
      "title": "接口请求数量",
      "fieldConfig": {
        "defaults": {
          "custom": {
            "align": "auto",
        "y": 18
      },
      "id": 6,
          "rawSql": "SELECT DISTINCT host  FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime );\r\n",
      "title": "upstream",
      "id": 8,
          "rawSql": "SELECT  upstream_status,count() as num  FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime ) group by upstream_status;\r\n",
      "fieldConfig": {
        "defaults": {
          "mappings": [],
          "thresholds": {
            "mode": "percentage",
              {
                "color": "orange",
                "value": 70
              },
                "value": 85
              }
            ]
          }
        },
        "y": 27
      },
      "id": 7,
      "options": {
        "orientation": "auto",
        "reduceOptions": {
          "calcs": [
            "lastNotNull"
          ],
          "fields": "",
          "values": false
        },
        "showThresholdLabels": false,
        "showThresholdMarkers": true
      },
          "rawSql": "SELECT count()  FROM \"app_logs\".${table_name} where ( timestamp  >= $__fromTime AND timestamp <= $__toTime );\r\n",
      "title": "请求总数",
      "type": "gauge"
    }
  ],
  "schemaVersion": 37,
  "style": "dark",
  "tags": [],
  "templating": {
    "list": [
      {
        "current": {
          "selected": false,
          "text": "nginx_gw_log",
          "value": "nginx_gw_log"
        },
        "datasource": {
          "type": "grafana-clickhouse-datasource",
          "uid": "${datasource}"
        },
        "definition": "show  tables from app_logs where name = 'nginx_gw_log' ",
        "hide": 0,
        "includeAll": false,
        "label": "table_name",
        "multi": false,
        "name": "table_name",
        "options": [],
        "query": "show  tables from app_logs where name = 'nginx_gw_log' ",
        "refresh": 1,
        "regex": "",
        "skipUrlSync": false,
        "sort": 0,
        "type": "query"
      },
      {
        "current": {
          "selected": false,
          "text": "ClickHouse_1.178",
          "value": "ClickHouse_1.178"
        },
        "label": "datasource",
        "name": "datasource",
        "query": "grafana-clickhouse-datasource",
        "queryValue": "",
        "type": "datasource"
      },
      {
        "current": {
          "selected": true,
          "text": "atscontrol",
          "value": "atscontrol"
        },
        "definition": "select DISTINCT upstream_name from app_logs.nginx_gw_log",
        "label": "upstream_name",
        "name": "upstream_name",
        "query": "select DISTINCT upstream_name from app_logs.nginx_gw_log",
        "refresh": 2,
      {
        "current": {
          "isNone": true,
          "selected": false,
          "text": "None",
          "value": ""
        },
        "definition": "select DISTINCT nginx_host from app_logs.nginx_gw_log where upstream_name = \"$upstream_name\"",
        "label": "nginx_host",
        "name": "nginx_host",
        "query": "select DISTINCT nginx_host from app_logs.nginx_gw_log where upstream_name = \"$upstream_name\"",
        "type": "query"
      }
    ]
  },
  "time": {
    "from": "now-5m",
    "to": "now"
  },
  "timepicker": {},
  "timezone": "",
  "title": "nginx_gw",
  "uid": "lZrbSYOIkA",
  "version": 2,
  "weekStart": ""
}

最终呈现效果

在这里插入图片描述

隐患点

  • clickhouse 本身查询qps 就不高,如果数据量很大可以考虑集群或者其他的存储,doris、es等等
  • 不知道nginx+rsyslog 对性能有多少影响,目前测试单机nginx大几千的qps 都没啥问题

有其他更好方案的小伙伴留言哦

原文地址:https://blog.csdn.net/weixin_39660224/article/details/135295407

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署