consul-02.consul服务注册实现(java)

一,续

在上一篇文章中简单介绍了下consul和集群环境搭建,点我查看上一篇文章。
本篇中将介绍怎么将服务注册至consul集群中,并形成一个公共jar包,在springmvc或者springboot应用启动时将服务注册至consul集群中,应用退出时将服务从consul集群中移除。

二,实现思路

1.consul集群如何利用

这里将利用consul的健康检查的特性来保证我们服务消费方发现的服务是健康可用的。
服务提供方一般是我们的应用程序,比如会员服务,订单服务之类的,服务消费方一般是我们的网关。
比如会员服务有三台机器,将三台机器的ip和port分别注册至consul集群中,当会员服务的某台机器挂掉之后,consul集群中的健康检查就会将该服务标记为严重,网关向consul集群中获取健康的服务列表时,该机器提供的服务将不能被发现,
网关转发请求时自然就不会把请求转发到挂掉的那台机器了。
另外利用服务注册时提供的tag,我们可以利用tag做版本管理,发现服务时可以根据tag来查询。
还有一点就是,我们可以利用该思路来做灰度发布,如果网关做的比较完善的话,我只要保证每个服务有一台机器正常提供服务,整个系统就能一直正常运行,能做到随时增加和减少某个服务的机器。

坑:当你向集群中的某个节点注册服务时,该节点不会向其他节点扩散该服务信息,该节点挂掉后,该服务将不能被发现。我的解决办法比较暴力,将该服务信息向集群中的所有节点注册,服务发现时再将重复的服务信息,即ip+端口+版本一致的话,即认为是重复的服务,只保留一条该服务信息。

2.技术方面

技术方面主要是分别实现spring的InitializingBean,DisposableBean两个接口来达到应用程序启动时执行注册,应用程序退出时销毁服务。

三,实现spring应用程序注册至consul集群公共jar包

1.代码包结构,有以下几个类

  • com.sdk.consul.consts.ConsulConsts.java
    因为我们的配置文件properties每个环境只有一份,而且同一服务会在多台服务器上部署,每台服务器的ip和端口不一样,所以为了发布的时候不修改properties文件,故一般会在jvm启动参数中配置当前服务的ip和端口,这个文件定义的就是这两个启动参数的名字。
  • com.sdk.consul.exception.ConsulServiceException.java
    异常类,不多说了。
  • com.sdk.consul.utils.Md5Utils.java
    对应用服务ip+端口加密生成serviceid,保证服务器ip和端口不变,生成的servcieid也不变
  • com.sdk.consul.utils.ServerUtils.java
    通过java代码获取当前服务器ip和web服务器(如tomcat)的运行端口,多网卡下会有问题,因此这是获取应用服务ip和端口的最后一个选项。
  • com.sdk.consul.utils.StringUtils.java
    处理字符串的类,不多帅。
  • com.sdk.consul.ConsulConfig.java
    服务注册入口,spring mvc将此类在xml中定义成bean即可,类似下面这样:
    <bean class="com.sdk.consul.ConsulConfig"></bean>
    springboot在启动入口类中@Import(ConsulConfig.class)
  • com.sdk.consul.ConsulProperty.java
    properties配置文件配置信息类
  • com.sdk.consul.RegisterService.java
    服务操作类,提供注册和销毁服务等方法。

2.jar包依赖

  • pom依赖以下jar包
<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>com.ecwid.consul</groupId>
        <artifactId>consul-api</artifactId>
        <version>1.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>3.2.9.RELEASE</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

3.类具体代码

  • com.sdk.consul.consts.ConsulConsts.java
package com.sdk.consul.consts;

public class ConsulConsts {

    /** JVM启动参数名称-服务暴露地址 */
    public static final String CONSUL_SERVICE_URL = "CONSUL_SERVICE_URL";

    /**VM启动参数名称-服务暴露端口*/
    public static final String CONSUL_SERVICE_PORT = "CONSUL_SERVICE_PORT";
}
  • com.sdk.consul.exception.ConsulServiceException.java
package com.sdk.consul.exception;

/**
 * Consul服务注册异常类
 */
public class ConsulServiceException extends RuntimeException{

    public ConsulServiceException() {
        super();
    }

    public ConsulServiceException(final String message) {
        super(message);
    }

    public ConsulServiceException(final String message, final Throwable cause) {
        super(message, cause);
    }

}
  • com.sdk.consul.utils.Md5Utils.java
package com.sdk.consul.util;

import com.sdk.consul.exception.ConsulServiceException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class Md5Utils {

    /**
     * MD5加密
     * @param source 加密前字符串
     * @return
     * @throws ConsulServiceException
     * @throws NoSuchAlgorithmException
     */
    public static String encrypt(String source) throws ConsulServiceException,NoSuchAlgorithmException {
        if(StringUtils.isEmpty(source)){
            throw new ConsulServiceException("Please input a not blank value to encrypt md5.");
        }
        MessageDigest md5 = MessageDigest.getInstance("MD5");
        byte[] md5Bytes = md5.digest(source.getBytes());
        StringBuffer hexValue = new StringBuffer();
        for (int i = 0; i < md5Bytes.length; i++) {
            int val = ((int) md5Bytes[i]) & 0xff;
            if (val < 16){
                hexValue.append("0");
            }
            hexValue.append(Integer.toHexString(val));
        }
        return hexValue.toString();
    }
}
  • com.sdk.consul.utils.ServerUtils.java- com.sdk.consul.utils
package com.sdk.consul.util;

import javax.management.*;
import java.lang.management.ManagementFactory;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.Set;

/**
 * 获取服务器ip和应用服务器端口信息
 */
public class ServerUtils {

    /** 协议 */
    private static final String PROTOCOL = "protocol";

    /** 协议 HTTP/1.1*/
    private static final String HTTP11 = "HTTP/1.1";

    /** 协议 Http11*/
    private static final String HTTP11_OTHER = "Http11";

    /** 应用服务容器*/
    private static final String CONNECTOR = "*:type=Connector,*";

    /** scheme*/
    private static final String SCHEME = "scheme";

    /** 端口*/
    private static final String PORT = "port";

    /**
     * 获取ipv4和应用程序端口信息
     * @return
     * @throws MalformedObjectNameException
     * @throws NullPointerException
     * @throws UnknownHostException
     * @throws AttributeNotFoundException
     * @throws InstanceNotFoundException
     * @throws MBeanException
     * @throws ReflectionException
     */
    public static String getIPV4EndPoints() throws MalformedObjectNameException,
            NullPointerException, UnknownHostException,
            AttributeNotFoundException, InstanceNotFoundException,
            MBeanException, ReflectionException {
            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
            QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
            QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
            Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);
            String hostname = Inet4Address.getLocalHost().getHostName();
            InetAddress[] addresses = Inet4Address.getAllByName(hostname);
            return getIPArray(connectors, addresses, mbs, 4);
    }

    /**
     * 获取ipv6和应用程序端口信息
     * @return
     * @throws MalformedObjectNameException
     * @throws NullPointerException
     * @throws UnknownHostException
     * @throws AttributeNotFoundException
     * @throws InstanceNotFoundException
     * @throws MBeanException
     * @throws ReflectionException
     */
    public static String getIPV6EndPoints() throws MalformedObjectNameException,
                NullPointerException, UnknownHostException,
                AttributeNotFoundException, InstanceNotFoundException,
                MBeanException, ReflectionException {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
        QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
        QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
        Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);
        String hostname = Inet6Address.getLocalHost().getHostName();
        InetAddress[] addresses = Inet6Address.getAllByName(hostname);
        return getIPArray(connectors, addresses, mbs, 6);
    }

    /**
     * 获取web container 运行端口
     * @return
     * @throws MalformedObjectNameException
     * @throws NullPointerException
     */
    public static String getPort() throws MalformedObjectNameException,
            NullPointerException{
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
        QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
        QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
        Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);

        for (Iterator<ObjectName> i = connectors.iterator(); i.hasNext();) {
            ObjectName obj = i.next();
            return obj.getKeyProperty(PORT);
        }
        return "";
    }

    /**
     * 组合当前所有ip和端口信息
     * @param connectors
     * @param addresses
     * @param mbs
     * @param ipType 4:ipv4,6:ipv6
     * @return
     * @throws NullPointerException
     * @throws AttributeNotFoundException
     * @throws InstanceNotFoundException
     * @throws MBeanException
     * @throws ReflectionException
     */
    private static String getIPArray(Set<ObjectName> connectors,
                                    InetAddress[] addresses,
                                    MBeanServer mbs,
                                    int ipType)
            throws NullPointerException, AttributeNotFoundException,
            InstanceNotFoundException, MBeanException, ReflectionException {
        for (Iterator<ObjectName> i = connectors.iterator(); i.hasNext();) {
            ObjectName obj = i.next();
            String scheme = mbs.getAttribute(obj, SCHEME).toString();
            for (InetAddress addr : addresses) {
                if (addr.isAnyLocalAddress() || addr.isLoopbackAddress() ||
                        addr.isMulticastAddress() ) {
                    continue;
                }
                if((ipType == 4 || ipType == 6) && addr.getAddress().length == ipType){
                    String host = addr.getHostAddress();
                    String ep = scheme + "://" + host;
                    return ep;
                }
            }
        }
        return "";
    }
}
  • com.sdk.consul.utils.StringUtils.java
package com.sdk.consul.util;

/**
 * 字符串帮助类
 */
public class StringUtils {

    /**
     * 检查目标字符串是否为空白(null或者空字符串)
     * @param target
     * @return
     */
    public static boolean isEmpty(String target){
        return target == null || target.trim().length() == 0;
    }

}
  • com.sdk.consul.ConsulConfig.java
package com.sdk.consul;

import com.sdk.consul.exception.ConsulServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import javax.management.*;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;

/**
 * 应用程序注册到consul集群入口
 */
public class ConsulConfig implements InitializingBean,DisposableBean {

    private static final Logger LOGGER= LoggerFactory.getLogger(ConsulConfig.class);

    /** 应用服务名称 */
    @Value("${consul.service.name}")
    private String serviceName;

    /** 健康检查心跳时间 */
    @Value("${consul.service.interval}")
    private Integer interval;

    /** 应用服务地址 默认空白*/
    @Value("${consul.service.url:}")
    private String serviceUrl;

    /** 应用服务端口 默认空白*/
    @Value("${consul.service.port:}")
    private String servicePort;

    /** 应用服务版本 */
    @Value("${consul.service.version}")
    private String version;

    /** etcd集群url */
    @Value("${consul.nodes.url}")
    private String nodesUrl;

    /** consul配置信息 */
    private ConsulProperty consulProperty;

    /** 服务注册和销毁 */
    private RegisterService registerService;

    /**
     * 初始化consul配置信息
     */
    private void initConsulProperty(){
        consulProperty = new ConsulProperty();
        consulProperty.setServiceName(this.serviceName);
        consulProperty.setServiceUrl(this.serviceUrl);
        consulProperty.setServicePort(this.servicePort);
        consulProperty.setInterval(this.interval);
        consulProperty.setVersion(this.version);
        consulProperty.setNodesUrl(this.nodesUrl);
    }

    /**
     * 实现DisposableBean接口中的方法
     * 以便工程在容器中启动的时候将服务注册至consul集群中
     * @throws Exception
     */
    public void afterPropertiesSet() throws ConsulServiceException,
            MalformedObjectNameException,NoSuchAlgorithmException,
            NullPointerException, AttributeNotFoundException,
            InstanceNotFoundException, MBeanException, ReflectionException,
            IOException {
        //初始化配置信息
        initConsulProperty();
        //开始注册服务
        registerService = new RegisterService(consulProperty);
        LOGGER.info(">>>> Starting register service.");
        registerService.registerServiceToConsul();
        LOGGER.info("<<<< Register service completed.");
    }

    /**
     * 实现InitializingBean接口中的方法
     * 应用退出时将服务从集群中删除
     * @throws Exception
     */
    public void destroy(){
        registerService.destroy();
    }

}
  • com.sdk.consul.ConsulProperty.java
package com.sdk.consul;

import org.springframework.beans.factory.annotation.Value;
import java.io.Serializable;

/**
 * consul 配置类
 */
public class ConsulProperty implements Serializable {

    /** 应用服务名称 */
    private String serviceName;

    /** 应用服务id */
    private String serviceId;

    /** 健康检查心跳时间 */
    private Integer interval;

    /** 应用服务地址 默认空白*/
    private String serviceUrl;

    /** 应用服务端口 默认空白*/
    private String servicePort;

    /** 应用服务版本 */
    private String version;

    /** etcd集群url */
    private String nodesUrl;

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getServiceId() {
        return serviceId;
    }

    public void setServiceId(String serviceId) {
        this.serviceId = serviceId;
    }

    public Integer getInterval() {
        return interval;
    }

    public void setInterval(Integer interval) {
        this.interval = interval;
    }

    public String getServiceUrl() {
        return serviceUrl;
    }

    public void setServiceUrl(String serviceUrl) {
        this.serviceUrl = serviceUrl;
    }

    public String getServicePort() {
        return servicePort;
    }

    public void setServicePort(String servicePort) {
        this.servicePort = servicePort;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public String getNodesUrl() {
        return nodesUrl;
    }

    public void setNodesUrl(String nodesUrl) {
        this.nodesUrl = nodesUrl;
    }

    public String toString(){
        StringBuilder sb = new StringBuilder();
        sb.append("serviceName:").append(this.serviceName).append(",");
        sb.append("serviceId:" ).append(this.serviceId).append(",");
        sb.append("interval:").append(this.interval).append(",");
        sb.append("serviceUrl:").append(this.serviceUrl).append(",");
        sb.append("servicePort:").append(this.servicePort).append(",");
        sb.append("version:").append(this.version).append(",");
        sb.append("nodesUrl:").append(this.nodesUrl);
        return sb.toString();
    }
}
  • com.sdk.consul.RegisterService.java - com.sdk.consul
package com.sdk.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.agent.model.NewService;
import com.sdk.consul.consts.ConsulConsts;
import com.sdk.consul.exception.ConsulServiceException;
import com.sdk.consul.util.Md5Utils;
import com.sdk.consul.util.ServerUtils;
import com.sdk.consul.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 服务注册类
 */
public class RegisterService{

    private static final Logger LOGGER= LoggerFactory.getLogger(RegisterService.class);

    /** consul配置信息 */
    private ConsulProperty consulProperty;

    /**
     * 获取初始化好的consul信息
     * @param consulProperty
     */
    RegisterService(ConsulProperty consulProperty){
        this.consulProperty = consulProperty;
    }

    /**
     * 检查consul config配置是否正确
     */
    private void checkConsulConfig()
            throws ConsulServiceException,MalformedObjectNameException,NoSuchAlgorithmException,
                NullPointerException, UnknownHostException, AttributeNotFoundException,
                InstanceNotFoundException, MBeanException, ReflectionException {
        if(consulProperty == null){
            throw new ConsulServiceException("Get consul config error, please check the consul config was set at properties file.");
        }
        if(StringUtils.isEmpty(consulProperty.getServiceName())){
            throw new ConsulServiceException("Please set a not blank value to ${consul.service.name} at properties file.");
        }
        if(consulProperty.getInterval() == null){
            throw new ConsulServiceException("Please set a number value to ${consul.service.interval} at properties file.");
        }
        //处理serviceUrl
        String serviceUrlTmp = consulProperty.getServiceUrl();
        LOGGER.info("Get service url from JVM startup arguments.");
        String serviceUrl = System.getProperty(ConsulConsts.CONSUL_SERVICE_URL);
        if(StringUtils.isEmpty(serviceUrl)){
            LOGGER.info("Can't find CONSUL_SERVICE_URL argument from JVM startup arguments,please check service url was set in JVM startup arguments,now get service url from properties file.");
            serviceUrl = serviceUrlTmp;
        }
        if(StringUtils.isEmpty(serviceUrl)){
            LOGGER.info("Can't get service url from properties file,please check ${consul.service.url} was set in properties file, now get service url from running web server.");
            serviceUrl = ServerUtils.getIPV4EndPoints();
        }
        if(StringUtils.isEmpty(serviceUrl)){
            throw new ConsulServiceException("Can't get service url from JVM startup arguments or properties file or running web container, please set CONSUL_SERVICE_URL argument to JVM startup arguments.");
        }
        consulProperty.setServiceUrl(serviceUrl);
        //处理service port
        String servicePortTmp = consulProperty.getServicePort();
        LOGGER.info("Get service port from JVM startup arguments.");
        String servicePort = System.getProperty(ConsulConsts.CONSUL_SERVICE_PORT);
        if(StringUtils.isEmpty(servicePort)){
            LOGGER.info("Can't find CONSUL_SERVICE_PORT argument from JVM startup arguments,please check service url was set in JVM startup arguments,now get service port from properties file.");
            servicePort = servicePortTmp;
        }
        if(StringUtils.isEmpty(servicePort)){
            LOGGER.info("Can't get service port from properties file,please check ${consul.service.port} was set in properties file, now get service port from running web server.");
            servicePort =ServerUtils.getPort();
        }
        if(StringUtils.isEmpty(servicePort)){
            throw new ConsulServiceException("Can't get service port,please set CONSUL_SERVICE_PORT argument to JVM startup arguments.");
        }
        consulProperty.setServicePort(servicePort);
        consulProperty.setServiceId(Md5Utils.encrypt(serviceUrl +":"+ servicePort));
        if(StringUtils.isEmpty(consulProperty.getVersion())){
            throw new ConsulServiceException("Please set a not blank value to ${consul.service.version} at properties file.");
        }
        if(StringUtils.isEmpty(consulProperty.getNodesUrl())){
            throw new ConsulServiceException("Please set a not blank value to ${consul.nodes.url} at properties file.");
        }
    }

    /**
     * 将服务注册至consul集群中
     */
    void registerServiceToConsul() throws ConsulServiceException,MalformedObjectNameException,
            NoSuchAlgorithmException, NullPointerException, AttributeNotFoundException,
            InstanceNotFoundException, MBeanException, ReflectionException,IOException{
        LOGGER.info(">>>> Starting check consul config.");
        checkConsulConfig();
        LOGGER.info("<<<< Check consul config success, geted cosul config is:{}",consulProperty.toString());
        String consulNodesUrl = consulProperty.getNodesUrl();
        List<String> tags = new ArrayList<String>();
        tags.add(consulProperty.getVersion());
        List<String> consulNodeUrls = new ArrayList<String>();
        if (consulNodesUrl.indexOf(",") > -1) {
            consulNodeUrls = Arrays.asList(consulNodesUrl.split(","));
        } else {
            consulNodeUrls.add(consulNodesUrl);
        }
        //组装服务信息
        NewService newService = new NewService();
        newService.setId(consulProperty.getServiceId());
        newService.setTags(tags);
        newService.setName(consulProperty.getServiceName());
        newService.setPort(Integer.parseInt(consulProperty.getServicePort()));
        newService.setAddress(consulProperty.getServiceUrl());
        //组装健康检查信息
        NewService.Check serviceCheck = new NewService.Check();
        serviceCheck.setHttp(consulProperty.getServiceUrl() + ":" + consulProperty.getServicePort());
        serviceCheck.setInterval(consulProperty.getInterval() + "s");
        newService.setCheck(serviceCheck);
        //往集群中注册服务
        for (String url : consulNodeUrls) {
            LOGGER.info(">>>> Starting register service to {}.", url);
            //组装服务信息
            ConsulClient client = new ConsulClient(url);
            client.agentServiceRegister(newService);
            LOGGER.info("<<<< Register service to {} success.", url);
        }
    }

    /**
     * 应用程序退出时删除服务信息
     */
    void destroy(){
        String consulNodesUrl = consulProperty.getNodesUrl();
        List<String> consulNodeUrls = new ArrayList<String>();
        if (consulNodesUrl.indexOf(",") > -1) {
            consulNodeUrls = Arrays.asList(consulNodesUrl.split(","));
        } else {
            consulNodeUrls.add(consulNodesUrl);
        }
        //注销集群中的服务
        for (String url : consulNodeUrls) {
            LOGGER.info(">>>> Starting deregister service to {}.", url);
            //组装服务信息
            ConsulClient client = new ConsulClient(url);
            client.agentServiceDeregister(consulProperty.getServiceId());
            LOGGER.info("<<<< Register service to {} success.", url);
        }
    }

}

4.配置文件说明

  • 应用服务需要加的配置说明例
    #当前应用服务名称
    consul.service.name=order-center
    #consul健康检查心跳时间,consul集群中的节点会定时请求${consul.service.url}+${consul.service.port}
    #地址,如果返回200则认为该服务是健康的,否则会将该服务标记为fail
    consul.service.interval=2
    #当前应用服务对外暴露的地址和端口。
    #首先从jvm启动参数中获取暴露的服务地址和端口;
    #jvm启动参数中没有配置的话,从当前properties文件获取;
    #当前properties文件中没有配置的话,将自动获取web server container的ip和port,
    #但请设置好服务器的hostname,linux下可执行命令hostname来设置hostname,
    #例:hostname consul-node
    consul.service.url=http://10.0.0.93
    consul.service.port=8080
    #当前应用服务版本
    consul.service.version=v1
    #consul集群地址,多个地址用半角逗号隔开
    consul.nodes.url=http://10.0.0.11:8500,http://10.0.0.12:8500,http://10.0.0.13:8500

猜你喜欢

转载自blog.csdn.net/Sukiyou_xixi/article/details/80378391