使用UDP协议实现简单的分布式日志服务, java和python

使用UDP协议实现简单的分布式日志服务, java和python

这几天系统出现问题, 需要查原因. 日志分散在各个服务器上, 查起来很要命.
网上百度了好久, 最后发现, 各种日志的处理方式都略显纷繁复杂了.

  1. 有的是syslog
  2. 有的是用 influxdb 实现日志存储和查询.
  3. 收费的则还有阿里云的SLS https://www.aliyun.com/product/sls/
  4. 有的是用工具链. ELK .其中 Elasticsearch 又是相当巨大的性能开销. 和各种安装配置. 为了一个简单的日志 , 实在是有点浪费资源.

在我看来,日志最主要的作用还是在出错的时候查找错误信息, 跟踪和分析系统性能. 对于分布式系统来讲又增加了一个 “集中” 日志管理的功能需求.方便查日志. 系统7-8个, 查日志一个文件一个文件的翻很慢的. 对于日志信息, 实际上并无太高要求.当查错误的时候, 能够方便的查询到信息即可. 对于日志的丢失和实时,以及查询性能都无太高要求.

如果要想基于日志系统进行系统业务逻辑层面得扩展, 那么我认为这种想法是不科学的. 毕竟日志是不严谨的. 而且日志的正则分析和匹配都是相当大的性能开销.

所以日志就让它回归到日志本身.

我的设计是, 各个分布式服务, 通过UDP协议(很简单易编程)发送到日志服务. . 日志服务器则直接保存到本地文件中.这个日志服务器只起到了收集日志的作用, 如果要想查询和检索, 则将日志文件通过ssh 复制到本地.然后用vscode进行搜索和查询也是非常方便的(现在谁的电脑上还没有个vscode?), 如果不用则留在服务器上不用管. 只是存在硬盘上. 也不会占用太多资源,

日志服务端代码

日志服务本身很简单. 几句话就可以实现.代码如下.
用的是python语言. java语言的还没实现, 有精力的小伙伴可以实现以下. 实现后联系我.分享一下.

import logging
from logging import handlers 
import asyncio
import time

# from socket import *

import socket 


class LogServer_UDP(object):
      def __init__(self, host, port):
            # PORT = 9000
            ADDR = (host, port) #地址与端口
            self.BUFSIZ = 4096 #接收数据缓冲大小
            self.UDPServeSock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建udp服务器套接字
            self.UDPServeSock.bind(ADDR) #套接字与地址绑定-服务端特有
  
      def run(self):
            while True:
                try: 
                    while True: 
                        msg, addr = self.UDPServeSock.recvfrom(self.BUFSIZ) #接收客户端发来的字节数组,data.decode()='char',data.upper()='bytes'
                        logging.info(addr[0] + ":"+str(addr[1])+"=>" + msg.decode("utf8"))
                        pass

                except Exception as e:
                    time.sleep(1)
                    print(e)

            self.UDPServeSock.close() #关闭服务端socket

def namer(filename): 
      return filename

def main_logger():
    # 日志集中处理区,在主程序中调用一次
    
    # handlers配置区,filter可选
    # formatter = logging.Formatter("%(name)s - %(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s")
    formatter = logging.Formatter("%(message)s")
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)

      # info日志处理器
    # filename:日志文件名
    # when:日志文件按什么维度切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
    #       这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',而是从你
    #       项目启动开始,过了24小时,才会从新创建一个新的日志文件,
    #       如果项目重启,这个时间就会重置。所以这里选择'MIDNIGHT'-是指过了午夜
    #       12点,就会创建新的日志。
    # interval:是指等待多少个单位 when 的时间后,Logger会自动重建文件。
    # backupCount:是保留日志个数。默认的0是不会自动删除掉日志。
    logfile = handlers.TimedRotatingFileHandler(
        './log/udp.log',
        when='D',
        backupCount=10,
        encoding='utf-8')
    logfile.suffix =  "%Y_%m_%d.log"
    logfile.namer = namer
    logfile.setLevel(logging.INFO)
    logfile.setFormatter(formatter)  # add formatter to ch
    

    rootlog = logging.getLogger()
    rootlog.setLevel(level=logging.INFO)
    # rootlog.addHandler(log_file_handler)
    # rootlog.addHandler(errorlog_file_handler)
    rootlog.addHandler(console)
    rootlog.addHandler(logfile)

    # 设置监听的端口,并传递handlers
    # loggerListener = ZMQListener("tcp://127.0.0.1:6666",*(ch,console))
    
    # loggerListener.start()   # 开启一个子线程处理记录器监听
    # zq_listener = LogServer_UDP("127.0.0.1", 6666)
    udp_listener = LogServer_UDP("0.0.0.0", 11385)
    print("日志服务运行中,监听端口 11385")
    udp_listener.run()
    
    
# 主进程调用一次,非阻塞
main_logger()

日志输出端 java语言

用的是logback的扩展.UDPLogAppender
在UDPLogAppender 中实现了数据上传到日志服务端的功能.

package com.qcd.DDD;

import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Scanner;
 
 
@Slf4j
public class UDPLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
    
    
 
    // Scanner input = new Scanner(System.in);
    DatagramSocket socket;
    DatagramPacket sendPacked;

    // DatagramSocket datagramSocket = new DatagramSocket(); // 创建DatagramSocket
	// 	DatagramPacket datagramPacket = new DatagramPacket(str.getBytes(),
	// 			str.getBytes().length, InetAddress.getByName("127.0.0.1"), 1111); // 创建DatagramPacket(要发送的数据,数据的长度,Ip地址,端口)
	// 	datagramSocket.send(datagramPacket); // 发送
	// 	datagramSocket.close(); // 关闭
// ————————————————
// 版权声明:本文为CSDN博主「梅开二度%」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
// 原文链接:https://blog.csdn.net/s990420/article/details/119335819
  
    @Override
    public void start() {
    
    
        super.start();
        try {
    
    
            this.socket = new DatagramSocket();            
            String hostaddr  = InetAddress.getByName(this.host).getHostAddress(); 
            
            sendPacked = new DatagramPacket(
                new byte[2048], 2048,
                new InetSocketAddress(hostaddr, this.port));
        } 
        catch (UnknownHostException e){
    
    
            e.printStackTrace();
        }
        catch (SocketException e) {
    
    
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        // encoder.setContext(context);
        encoder.start();
        encoder.setCharset(Charset.forName("utf8"));
    }

    String host;

    public void setHost(String host) {
    
    
        this.host = host;
    }

    int port;

    public void setPort(int port) {
    
    
        this.port = port;
    }

      
    PatternLayoutEncoder  encoder;

    public  PatternLayoutEncoder  getEncoder() {
    
    
        return encoder;
    }
      
    public  void setEncoder(PatternLayoutEncoder encoder) {
    
    
        this.encoder = encoder;
    }

    @Override
    protected void append(ILoggingEvent eventObject)
    {
    
      
        byte[] bytemsg = this.encoder.encode(eventObject);

        //必须先转成字符串, 再转成utf8编码发出去,
        // 注释原因, 后来在上面加了个 encoder.setCharset(Charset.forName("utf8"));
        //String msg =  new String(bytemsg); 
        //String msg =  eventObject.getFormattedMessage();  
		try {
    
     
            // sendPacked.setData(msg.getBytes("utf8"));
            sendPacked.setData(bytemsg);
            socket.send(sendPacked); // 发送
            // socket.close(); // 关闭
		} catch (Exception e) {
    
    
			e.printStackTrace();
		} 
    }
 
} 

logback-spring.xml 的配置

<?xml version="1.0" encoding="UTF-8"?>  
<configuration>  
  
    <!-- 控制台 appender -->  
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">  
        <encoder>  
            <pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
            <!--charset>UTF-8</charset -->
        </encoder>
    </appender>  

    <appender name="udp" class="com.qcd.DDD.UDPLogAppender">
        <encoder>  
            <pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%n</pattern>
        </encoder>
        <host>wuxi.ai.px82.com</host>
        <port>11385</port>
    </appender>
      
    <!--控制台打印资源加载信息-->  
    <root level="info">   
        <appender-ref ref="udp"/>        
        <appender-ref ref="STDOUT"/>
         <!--  <appender-ref ref="ERROR"/>  
        <appender-ref ref="WARN"/>  
        <appender-ref ref="INFO" />   -->
    </root>  
</configuration>

日志输出端, python语言,

用的是logging库.
LogConfig.py 文件内容如下

import logging
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import RotatingFileHandler
import os 
import colorlog
import traceback
import socket
import Config 

log_colors_config = {
    
    
    # 终端输出日志颜色配置
    'DEBUG': 'white',
    'INFO': 'cyan',
    'WARNING': 'yellow',
    'ERROR': 'red',
    'CRITICAL': 'bold_red',
}
 

class UDPHandler(logging.Handler):
     def __init__(self, host, port):
        super().__init__() 
        self.host = host
        self.port = port 
        self.ADDR = (host, port)
        # print(ADDR)
        self.UDPCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建客户端套接字
  
     def emit(self, record):
         try:  
             msg = self.format(record) 
             #  self.zq.send_string(msg, flags=zmq.NOBLOCK)
             self.UDPCliSock.sendto(bytes(msg,'utf8'), self.ADDR) 
         except Exception:
            traceback.print_exc() 

 
# 滚动日志 
def backrollLog():
    def namer(filename): 
      return filename

    LOG_FORMAT = "%(asctime)s %(levelname)-8s[%(name)s %(filename)s:%(lineno)d] %(message)s"
    formatter = logging.Formatter(LOG_FORMAT)

    color_format ='%(log_color)s%(asctime)s-%(name)s-%(filename)s:%(lineno)d-%(levelname)s-[msg]: %(message)s',
 
    color_formater = colorlog.ColoredFormatter(
              "%(asctime)s %(log_color)s%(levelname)-8s%(reset)s [%(name)s %(filename)s :%(lineno)d] %(message_log_color)s%(message)s",
              secondary_log_colors={
    
    
                'message': {
    
    
                'ERROR': 'red',
                'CRITICAL': 'red'
                }
              })


 

    # 总开关
    rootlog = logging.getLogger()
    rootlog.setLevel(level=logging.INFO) 
  
    # pid = str(os.getpid())
    #控制台输出INFO级别的信息
    stream_handler = logging.StreamHandler()  # 日志控制台输出 
    stream_handler.setFormatter(formatter)
    stream_handler.setLevel(level=logging.INFO) 
    rootlog.addHandler(stream_handler)
 

    IP = socket.gethostbyname(Config.Seting.LogHost)
    udphandler = UDPHandler(IP, Config.Seting.LogPort)
    udphandler.setFormatter(color_formater)
    udphandler.setLevel(logging.INFO)
    rootlog.addHandler(udphandler)
 
    logging.error(" log error test ")
    logging.info(" log info test ")
    logging.warning(" log warning test ")
    logging.debug(" log debug test ")




backrollLog()

python 语言要求在主程序中import 一次 上面的代码. …
正常使用

logtest.py

import Config
import LogConfig
import logging
import time


# LogConfig.backrollLog() , 不需要单独执行, import LogConfig的时候已经执行过

while(True):
    logging.info("hahaha中文")
    time.sleep(1)

猜你喜欢

转载自blog.csdn.net/phker/article/details/127861969