ELk+Kafka实现分布式日志文件收集
为什么要搭建分布式日志收集系统在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常底下。通常,日志被分散在储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的sy...
·
- 为什么要搭建分布式日志收集系统
在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常底下。通常,日志被分散在储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的syslog,将所有服务器上的日志收集汇总。
集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。
- ELK实现
单纯使用ElK实现分布式日志收集缺点 :
1.每台应用得 服务器都需要安装logstash,太多了,后期扩展不好。
2.读取IO文件,可能会产生日志丢失
3.不能实时读取日志文件 - ELK+Kafka实现
使用SpringAop进行日志收集,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等。
logstash做日志对接,接受应用系统的log,将其写入到elasticsearch中。
elasticsearch存储日志数据,方便的扩展特效,可以存储足够多的日志数据。
kibana则是对存放在elasticsearch中的log数据进行:数据展现、报表展现,并且是实时的。
logstash支持N种log渠道: kafka渠道、log目录、reids中的log数据进行监控读取,等等。
- Kafka环境搭建
需提前搭建好elk
ELK+Kafka环境搭建
ELK elasticsearch (存放日志)logstash (采集) kinbana(展示数据)
Kafka、Zookeeper(集群)服务器IP: 192.168.2.115
- 关闭防火墙
systemctl stop firewalld.service
- 下载kafka和Zookeeper镜像文件
docker pull wurstmeister/kafka docker pull wurstmeister/zookeeper
- 运行Zookeeper环境
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
- 运行Kafka环境
docker run --name kafka01 \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.2.115:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.115:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ -d wurstmeister/kafka
- 进入Kafka容器
docker exec -it kafka01 /bin/bash
- 创建主题my_log
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.115:2181 --replication-factor 1 --partitions 1 --topic my_log
- 查询创建的主题
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.115:2181
- 項目集成
配置文件、 kafka发送类
//测试error日志public class KafkaSender<T> { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private final static String MY_TOPTIC_LOG = "goods_mylog"; /** * kafka 发送消息 * * @param obj * 消息对象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); log.info("------------ message = {}", jsonObj); // 发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(MY_TOPTIC_LOG, jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { // TODO 业务处理 log.info("Produce: The message was sent successfully:"); log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); } }); } }
import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import com.alibaba.fastjson.JSONObject; import com.pitch.base.BaseApiService; import com.pitch.base.BaseResponse; import com.pitch.goods.ProductSearchService; import com.pitch.member.output.dto.ProductDto; import com.pitch.product.es.entity.ProductEntity; import com.pitch.product.es.reposiory.ProductReposiory; import com.pitch.product.kafka.KafkaSender; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.querydsl.QPageRequest; import org.springframework.web.bind.annotation.RestController; import ma.glasnost.orika.MapperFactory; import ma.glasnost.orika.impl.DefaultMapperFactory; @RestController public class ProductSearchServiceImpl extends BaseApiService<List<ProductDto>> implements ProductSearchService { @Autowired private ProductReposiory productReposiory; @Autowired private KafkaSender<JSONObject> kafkaSender; private int i = 0; @Override public BaseResponse<List<ProductDto>> search(String name) { // 1.拼接查询条件 BoolQueryBuilder builder = QueryBuilders.boolQuery(); // 2.模糊查询name\subtitle\detail字段 builder.must(QueryBuilders.multiMatchQuery(name, "name", "subtitle", "detail")); Pageable pageable = new QPageRequest(0, 5); // 3.调用ES接口查询 Page<ProductEntity> page = productReposiory.search(builder, pageable); // 4.获取集合数据 List<ProductEntity> content = page.getContent(); // 5.将entity转换dto MapperFactory mapperFactory = new DefaultMapperFactory.Builder().build(); List<ProductDto> mapAsList = mapperFactory.getMapperFacade().mapAsList(content, ProductDto.class); i++; if (i%3==0){ try { System.out.println(i/0); } catch (Exception e) { JSONObject errorJson = new JSONObject(); JSONObject logJson = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 logJson.put("request_time", df.format(new Date())); logJson.put("error_info", e); errorJson.put("request_error", logJson); kafkaSender.send(errorJson); } } return setResultSuccess(mapAsList); } }
- 准備配置文件goods_mylog.conf
192.168.2.115:9092:kafka地址input { kafka { bootstrap_servers => "192.168.2.115:9092" topics => ["goods_mylog"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.2.115:9200","192.168.2.117:9200"] index => "goods_mylog" } }
topics:主題 和我们前面创建的不一样,没关系,kafka会帮你重新创建。这里只是测试,每个项目topics都会不一样
192.168.2.115:9200,192.168.2.117:9200: elasticsearch集群地址
index: 文檔索引
// 上傳文件到logstash目錄下 - 启动
// 启动elasticsearch cd /usr/local/elasticsearch6.4/bin ./elasticsearch -d // 启动kibana su root cd /usr/local/kibana6.4/bin nohup ./kibana & // 启动logstash 等待几分钟 cd /usr/local/logstash-6.4.3 ./bin/logstash -f goods_mylog.conf
- 启动eureka、启动你的项目
访问: http://127.0.0.1:8500/search?name=888
项目报错,查看 kibana - 如何使用使用kibana的窗口查看日志
设置后查看 - 使用命令搜索
GET goods_mylog/doc/_search { "query":{ "match":{ "message": "request_error" } } }
- demo完成
更多推荐
已为社区贡献1条内容
所有评论(0)