Elasticsearch实践:ELK+Kafka+Beats对日志收集平台的实现

可以在短时间内搜索和分析大量数据。

Elasticsearch 不仅仅是一个全文搜索引擎,它还提供了分布式的多用户能力,实时的分析,以及对复杂搜索语句的处理能力,使其在众多场景下,如企业搜索,日志和事件数据分析等,都有广泛的应用。

本文将介绍 ELK+Kafka+Beats 对日志收集平台的实现。



1、关于ELK与BKELK
1.1、ELK架构及其影响

当我们在开源日志分析系统的领域,谈及 ELK 架构可谓是家喻户晓。然而,这个生态系统并非 Elastic 有意为之,毕竟 Elasticsearch 的初衷是作为一个分布式搜索引擎。其广泛应用于日志系统,实则是一种意料之外,这是社区用户的推动所致。如今,众多云服务厂商在推广自己的日志服务时,往往以 ELK 作为参照标准,由此可见,ELK 的影响力之深远。

ELK 是 Elasticsearch、Logstash 和 Kibana 的首字母缩写,这三个产品都是 Elastic 公司的开源项目,通常一起使用以实现数据的搜索、分析和可视化。

  1. Elasticsearch:一个基于 Lucene 的搜索服务器。它提供了一个分布式、多租户的全文搜索引擎,具有 HTTP 网络接口和无模式 JSON 文档。

  2. Logstash:是一个服务器端数据处理管道,它可以同时从多个来源接收数据,转换数据,然后将数据发送到你选择的地方。

  3. Kibana:是一个用于 Elasticsearch 的开源数据可视化插件。它提供了查找、查看和交互存储在 Elasticsearch 索引中的数据的方式。你可以使用它进行高级数据分析和可视化你的数据等。

这三个工具通常一起使用,以便从各种来源收集、搜索、分析和可视化数据。

1.2、基于BKLEK架构的日志分析系统实现

实际上,在流行的架构中并非只有 ELKB。当我们利用 ELKB 构建一套日志系统时,除了 Elasticsearch、Logstash、Kibana、beats 之外,还有一个被广泛应用的工具 —— Kafka。在这个体系中,Kafka 的角色尤为重要。作为一个中间件和缓冲区,它能够提升吞吐量,隔离峰值影响,缓存日志数据,快速落盘。同时,通过 producer/consumer 模式,使得 Logstash 能够进行横向扩展,还能用于数据的多路分发。因此,大多数情况下,我们看到的实际架构,按照数据流转的顺序排列,应该是 BKLEK 架构。

image-20231021004441222

BKLEK 架构即 ELK+Kafka+Beats ,这是一种常见的大数据处理和分析架构。在这个架构中:

  1. Beats:是一种轻量级的数据采集器,用于从各种源(如系统日志、网络流量等)收集数据,并将数据发送到 Kafka 或 Logstash。

  2. Kafka:是一个分布式流处理平台,用于处理和存储实时数据。在这个架构中,Kafka 主要用于作为一个缓冲区,接收来自 Beats 的数据,并将数据传输到 Logstash。

  3. Logstash:是一个强大的日志管理工具,可以从 Kafka 中接收数据,对数据进行过滤和转换,然后将数据发送到 Elasticsearch。

  4. Elasticsearch:是一个分布式的搜索和分析引擎,用于存储、搜索和分析大量数据。

  5. Kibana:是一个数据可视化工具,用于在 Elasticsearch 中搜索和查看存储的数据。

这种架构的优点是:

  • 可以处理大量的实时数据。
  • Kafka 提供了一个强大的缓冲区,可以处理高速流入的数据,保证数据的完整性。
  • Logstash 提供了强大的数据处理能力,可以对数据进行各种复杂的过滤和转换。
  • Elasticsearch 提供了强大的数据搜索和分析能力。
  • Kibana 提供了直观的数据可视化界面。

这种架构通常用于日志分析、实时数据处理和分析、系统监控等场景。


2、利用ELK+Kafka+Beats来实现一个统一日志平台
2.1、应用场景

利用 ELK+Kafka+Beats 来实现一个统一日志平台,这是一个专门针对大规模分布式系统日志进行统一采集、存储和分析的 APM 工具。在分布式系统中,众多服务部署在不同的服务器上,一个客户端的请求可能会触发后端多个服务的调用,这些服务可能会互相调用或者一个服务会调用其他服务,最终将请求结果返回并在前端页面上展示。如果在这个过程中的任何环节出现异常,开发和运维人员可能会很难准确地确定问题是由哪个服务调用引起的。统一日志平台的作用就在于追踪每个请求的完整调用链路,收集链路上每个服务的性能和日志数据,从而使开发和运维人员能够快速发现并定位问题。

统一日志平台通过采集模块、传输模块、存储模块、分析模块实现日志数据的统一采集、存储和分析,结构图如下:

img

为了实现海量日志数据的收集和分析,首先需要解决的是如何处理大量的数据信息。在这个案例中,我们使用 Kafka、Beats 和 Logstash 构建了一个分布式消息队列平台。具体来说,我们使用 Beats 采集日志数据,这相当于在 Kafka 消息队列中扮演生产者的角色,生成消息并发送到 Kafka。然后,这些日志数据被发送到 Logstash 进行分析和过滤,Logstash 在这里扮演消费者的角色。处理后的数据被存储在 Elasticsearch 中,最后我们使用 Kibana 对日志数据进行可视化展示。

2.2、环境准备

本地

  • Kafka
  • ES
  • Kibana
  • filebeat
  • Java Demo 项目

我们使用 Docker 创建以一个 名为 es-net 的网络

在 Docker 中,网络是连接和隔离 Docker 容器的方式。当你创建一个网络,我们定义一个可以相互通信的容器的网络环境。

docker network create es-net

docker network create 是 Docker 命令行界面的一个命令,用于创建一个新的网络。在这个命令后面,你需要指定你想要创建的网络的名称,在这个例子中,网络的名称是 “es-net”。

所以,docker network create es-net 这句命令的意思就是创建一个名为 “es-net” 的 Docker 网络。

2.3、基于Docker的ES部署

加载镜像:

docker pull elasticsearch:7.12.1

运行容器:

docker run -d \
	--name es \
    -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
    -e "discovery.type=single-node" \
    --privileged \
    --network es-net \
    -p 9200:9200 \
    -p 9300:9300 \
    
    
    
elasticsearch:7.12.1
    -v es-data:/Users/lizhengi/elasticsearch/data \
    -v es-plugins:/Users/lizhengi/elasticsearch/plugins \

这个命令是使用 Docker 运行一个名为 “es” 的 Elasticsearch 容器。具体参数的含义如下:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。

  • --name es:设置容器的名称为 “es”。

  • -e "ES_JAVA_OPTS=-Xms512m -Xmx512m":设置环境变量 ES_JAVA_OPTS,这是 JVM 的参数,用于控制 Elasticsearch 使用的最小和最大内存。这里设置的是最小和最大内存都为 512MB。

  • -e "discovery.type=single-node":设置环境变量 discovery.type,这是 Elasticsearch 的参数,用于设置集群发现类型。这里设置的是单节点模式。

  • -v es-data:/Users/lizhengi/elasticsearch/data-v es-plugins:/Users/lizhengi/elasticsearch/plugins:挂载卷(volume)。这两个参数将主机上的 es-dataes-plugins 目录挂载到容器的 /Users/lizhengi/elasticsearch/data/Users/lizhengi/elasticsearch/plugins 目录。

  • --privileged:以特权模式运行容器。这将允许容器访问宿主机的所有设备,并且容器中的进程可以获取任何 AppArmor 或 SELinux 的权限。

  • --network es-net:将容器连接到 es-net 网络。

  • -p 9200:9200-p 9300:9300:端口映射。这两个参数将容器的 9200 和 9300 端口映射到主机的 9200 和 9300 端口。

  • elasticsearch:7.12.1:要运行的 Docker 镜像的名称和标签。这里使用的是版本为 7.12.1 的 Elasticsearch 镜像。

运行结果验证:随后便可以去访问 IP:9200,结果如图:

image-20231021103821702

2.4、基于Docker的kibana部署

加载镜像:

docker pull kibana:7.12.1

运行容器:

docker run -d \
		--name kibana \
		-e ELASTICSEARCH_HOSTS=http://es:9200 \
		--network=es-net \
		-p 5601:5601  \
kibana:7.12.1

这是一个 Docker 命令,用于运行一个 Kibana 容器。下面是每个参数的解释:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。

  • --name kibana:设置容器的名称为 “kibana”。

  • -e ELASTICSEARCH_HOSTS=http://es:9200:设置环境变量 ELASTICSEARCH_HOSTS,这是 Kibana 的参数,用于指定 Elasticsearch 服务的地址。这里设置的是 http://es:9200,表示 Kibana 将连接到同一 Docker 网络中名为 “es” 的容器的 9200 端口。

  • --network=es-net:将容器连接到 es-net 网络。

  • -p 5601:5601:端口映射。这个参数将容器的 5601 端口映射到主机的 5601 端口。

  • kibana:7.12.1:要运行的 Docker 镜像的名称和标签。这里使用的是版本为 7.12.1 的 Kibana 镜像。

kibana启动一般比较慢,需要多等待一会,可以通过命令:

docker logs -f kibana

查看运行日志,当查看到下面的日志,说明成功:

image-20231021104654758

运行结果验证:随后便可以去访问 IP:9200,结果如图:

也可以浏览器访问:

image-20231021104755229

2.5、基于Docker的Zookeeper部署

加载镜像:

docker pull zookeeper:latest

运行容器:

以下是一个基本的 Docker 命令,用于运行一个 Zookeeper 容器:

docker run -d \
    --name zookeeper \
    --network=es-net \
    -p 2181:2181 \
zookeeper:latest

这个命令的参数解释如下:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。
  • --name zookeeper:设置容器的名称为 “zookeeper”。
  • --network=es-net:将容器连接到 es-net 网络。
  • -p 2181:2181:端口映射。这个参数将容器的 2181 端口映射到主机的 2181 端口。
  • zookeeper:latest:要运行的 Docker 镜像的名称和标签。这里使用的是最新版本的 Zookeeper 镜像。
2.6、基于Docker的Kafka部署

加载镜像:

docker pull confluentinc/cp-kafka:latest

运行容器:

以下是一个基本的 Docker 命令,用于运行一个 Kafka 容器:

docker run -d \
    --name kafka \
    --network=es-net \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
confluentinc/cp-kafka:latest

这个命令的参数解释如下:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。
  • --name kafka:设置容器的名称为 “kafka”。
  • --network=es-net:将容器连接到 es-net 网络。
  • -p 9092:9092:端口映射。这个参数将容器的 9092 端口映射到主机的 9092 端口。
  • -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181:设置环境变量 KAFKA_ZOOKEEPER_CONNECT,这是 Kafka 的参数,用于指定 Zookeeper 服务的地址。这里设置的是 zookeeper:2181,表示 Kafka 将连接到同一 Docker 网络中名为 “zookeeper” 的容器的 2181 端口。
  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092:设置环境变量 KAFKA_ADVERTISED_LISTENERS,这是 Kafka 的参数,用于指定 Kafka 服务对外公布的地址和端口。这里设置的是 PLAINTEXT://localhost:9092
  • confluentinc/cp-kafka:latest:要运行的 Docker 镜像的名称和标签。这里使用的是最新版本的 Confluent 平台的 Kafka 镜像。
2.7、基于Docker的Logstash部署

加载镜像:

docker pull docker.elastic.co/logstash/logstash:7.12.1

创建配置文件:

首先,你需要创建一个 Logstash 配置文件,例如 logstash.conf,内容如下:

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["logs_topic"]
  }
}

output {
  elasticsearch {
    hosts => ["es:9200"]
    index => "logs_index"
  }
}

这个配置文件定义了 Logstash 的输入和输出。输入是 Kafka,连接到 kafka:9092,订阅的主题是 your_topic。输出是 Elasticsearch,地址是 es:9200,索引名是 logs_index

运行容器:

然后,我们使用以下命令运行 Logstash 容器:

docker run -d \
    --name logstash \
    --network=es-net \
    -v /Users/lizhengi/test/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
docker.elastic.co/logstash/logstash:7.12.1

这个命令的参数解释如下:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。
  • --name logstash:设置容器的名称为 “logstash”。
  • --network=es-net:将容器连接到 es-net 网络。
  • -v /path/to/your/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:挂载卷(volume)。这个参数将主机上的 logstash.conf 文件挂载到容器的 /usr/share/logstash/pipeline/logstash.conf
  • docker.elastic.co/logstash/logstash:latest:要运行的 Docker 镜像的名称和标签。这里使用的是最新版本的 Logstash 镜像。

请注意,你需要将 /path/to/your/logstash.conf 替换为你的 logstash.conf 文件所在的实际路径。

2.8、基于Docker的Filebeat部署

加载镜像:

docker pull docker.elastic.co/beats/filebeat:7.12.1

运行容器:

首先,你需要创建一个 Filebeat 配置文件,例如 filebeat.yml,内容如下:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /usr/share/filebeat/logs/*.log

output.kafka:
  enabled: true
  hosts: ["kafka:9092"]
  topic: "logs_topic"

这个配置文件定义了 Filebeat 的输入和输出。输入是文件 /usr/share/filebeat/Javalog.log,输出是 Kafka,连接到 kafka:9092,主题是 logs_topic

然后,你可以使用以下命令运行 Filebeat 容器:

docker run -d \
    --name filebeat \
    --network=es-net \
    -v /Users/lizhengi/test/logs:/usr/share/filebeat/logs \
    -v /Users/lizhengi/test/filebeat.yml:/usr/share/filebeat/filebeat.yml \
docker.elastic.co/beats/filebeat:7.12.1

这个命令的参数解释如下:

  • docker run -d:使用 Docker 运行一个新的容器,并且在后台模式(detached mode)下运行。

  • --name filebeat:设置容器的名称为 “filebeat”。

  • --network=es-net:将容器连接到 es-net 网络。

  • -v /Users/lizhengi/test/Javalog.log:/usr/share/filebeat/Javalog.log:挂载卷(volume)。这个参数将主机上的 /Users/lizhengi/test/Javalog.log 文件挂载到容器的 /usr/share/filebeat/Javalog.log

  • -v /path/to/your/filebeat.yml:/usr/share/filebeat/filebeat.yml:挂载卷(volume)。这个参数将主机上的 filebeat.yml 文件挂载到容器的 /usr/share/filebeat/filebeat.yml

  • docker.elastic.co/beats/filebeat:latest:要运行的 Docker 镜像的名称和标签。这里使用的是最新版本的 Filebeat 镜像。

请注意,你需要将 /path/to/your/filebeat.yml 替换为你的 filebeat.yml 文件所在的实际路径。

原文地址:https://blog.csdn.net/weixin_45187434/article/details/133960571

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读4.1k次。kafka认证_kafka认证
文章浏览阅读4.8k次,点赞4次,收藏11次。kafka常用参数_kafka配置
文章浏览阅读1.4k次,点赞25次,收藏10次。Kafka 生产者发送消息的流程涉及多个步骤,从消息的创建到成功存储在 Kafka 集群中。_kafka发送消息流程
文章浏览阅读854次,点赞22次,收藏24次。点对点模型:适用于一对一的消息传递,具有高可靠性。发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。
文章浏览阅读1.5k次,点赞2次,收藏3次。kafka 自动配置在KafkaAutoConfiguration
文章浏览阅读1.3w次,点赞6次,收藏33次。Offset Explorer(以前称为Kafka Tool)是一个用于管理和使Apache Kafka ®集群的GUI应用程序。它提供了一个直观的UI,允许人们快速查看Kafka集群中的对象以及存储在集群主题中的消息。它包含面向开发人员和管理员的功能。二、环境信息系统环境:windows 10版本:2.2Kafka版本:Kafka2.0.0三、安装和使用3.1 下载Offset Explorer 和安装下载到本地的 .exe文件Next安装路径 ,Next。_offset explorer
文章浏览阅读1.3k次,点赞12次,收藏19次。kafka broker 在启动的时候,会根据你配置的listeners 初始化它的网络组件,用来接收外界的请求,这个listeners你可能没配置过,它默认的配置是listeners=PLAINTEXT://:9092就是告诉kafka使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。
文章浏览阅读1.3k次,点赞2次,收藏2次。Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。_博客系统怎么使用kafka
文章浏览阅读3.5k次,点赞42次,收藏56次。对于Java开发者而言,关于 Spring ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。本期 Spring 源码解析系列文章,将带你领略 Spring 源码的奥秘。本期源码文章吸收了之前 Kafka 源码文章的错误,将不再一行一行的带大家分析源码,我们将一些不重要的分当做黑盒处理,以便我们更快、更有效的阅读源码。废话不多说,发车!
文章浏览阅读1.1k次,点赞14次,收藏16次。一、自动提交offset1、概念Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前主题-分区消费的偏移量2、自动提交offset和手动提交offset流程图3、在Java中实现配置4、自动提交offset问题自动提交会丢消息。因为如果消费者还没有消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已经提交的offset的下一个位置开始消费消息。_kafka中自动提交offsets
文章浏览阅读1.6k次。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。在默认情况下,生产者发送的消息是未经压缩的。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。_kafka producer 参数
文章浏览阅读2.9k次,点赞3次,收藏10次。kafka解决通信问题_kafka3.6
文章浏览阅读1.5k次,点赞9次,收藏11次。上面都配置完了之后可以先验证下,保证数据最终到ck,如果有问题,需要再每个节点调试,比如先调试nginx->rsyslog ,可以先不配置kafka 输出,配置为console或者文件输出都可以,具体这里就不写了。这里做了一个类型转换,因为nginx,request-time 单位是s,我想最终呈现在grafana 中是ms,所以这里做了转换,当然grafana中也可以做。kafka 相关部署这里不做赘述,只要创建一个topic 就可以。
文章浏览阅读1.4k次,点赞22次,收藏16次。Kafka中的enable-auto-commit和auto-commit-interval配置_auto-commit-interval
文章浏览阅读742次。thingsboard规则链调用外部 kafka_thingsboard kafka
文章浏览阅读1.3k次,点赞18次,收藏22次。Kafka_简介
文章浏览阅读1.1k次,点赞16次,收藏14次。在数据库系统中有个概念叫事务,事务的作用是为了保证数据的一致性,意思是要么数据成功,要么数据失败,不存在数据操作了一半的情况,这就是数据的一致性。在很多系统或者组件中,很多场景都需要保证数据的一致性,有的是高度的一致性。特别是在交易系统等这样场景。有些组件的数据不一定需要高度保证数据的一致性,比如日志系统。本节从从kafka如何保证数据一致性看通常数据一致性设计。
文章浏览阅读1.4k次。概述介绍架构发展架构原理类型系统介绍类型hive_table类型介绍DataSet类型定义Asset类型定义Referenceable类型定义Process类型定义Entities(实体)Attributes(属性)安装安装环境准备安装Solr-7.7.3安装Atlas2.1.0Atlas配置Atlas集成HbaseAtlas集成SolrAtlas集成KafkaAtlas Server配置Kerberos相关配置Atlas集成HiveAtlas启动Atlas使用Hive元数据初次导入Hive元数据增量同步。_atlas元数据管理
文章浏览阅读659次。Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再 ZooKeeper 上注册的服务几点状态给客户端。
文章浏览阅读1.4k次。Kafka-Kraft 模式架构部署_kafka kraft部署