一,续
在上一篇文章中简单介绍了下consul和集群环境搭建,点我查看上一篇文章。
本篇中将介绍怎么将服务注册至consul集群中,并形成一个公共jar包,在springmvc或者springboot应用启动时将服务注册至consul集群中,应用退出时将服务从consul集群中移除。
二,实现思路
1.consul集群如何利用
这里将利用consul的健康检查的特性来保证我们服务消费方发现的服务是健康可用的。
服务提供方一般是我们的应用程序,比如会员服务,订单服务之类的,服务消费方一般是我们的网关。
比如会员服务有三台机器,将三台机器的ip和port分别注册至consul集群中,当会员服务的某台机器挂掉之后,consul集群中的健康检查就会将该服务标记为严重,网关向consul集群中获取健康的服务列表时,该机器提供的服务将不能被发现,
网关转发请求时自然就不会把请求转发到挂掉的那台机器了。
另外利用服务注册时提供的tag,我们可以利用tag做版本管理,发现服务时可以根据tag来查询。
还有一点就是,我们可以利用该思路来做灰度发布,如果网关做的比较完善的话,我只要保证每个服务有一台机器正常提供服务,整个系统就能一直正常运行,能做到随时增加和减少某个服务的机器。
坑:当你向集群中的某个节点注册服务时,该节点不会向其他节点扩散该服务信息,该节点挂掉后,该服务将不能被发现。我的解决办法比较暴力,将该服务信息向集群中的所有节点注册,服务发现时再将重复的服务信息,即ip+端口+版本一致的话,即认为是重复的服务,只保留一条该服务信息。
2.技术方面
技术方面主要是分别实现spring的InitializingBean,DisposableBean两个接口来达到应用程序启动时执行注册,应用程序退出时销毁服务。
三,实现spring应用程序注册至consul集群公共jar包
1.代码包结构,有以下几个类
- com.sdk.consul.consts.ConsulConsts.java
因为我们的配置文件properties每个环境只有一份,而且同一服务会在多台服务器上部署,每台服务器的ip和端口不一样,所以为了发布的时候不修改properties文件,故一般会在jvm启动参数中配置当前服务的ip和端口,这个文件定义的就是这两个启动参数的名字。 - com.sdk.consul.exception.ConsulServiceException.java
异常类,不多说了。 - com.sdk.consul.utils.Md5Utils.java
对应用服务ip+端口加密生成serviceid,保证服务器ip和端口不变,生成的servcieid也不变 - com.sdk.consul.utils.ServerUtils.java
通过java代码获取当前服务器ip和web服务器(如tomcat)的运行端口,多网卡下会有问题,因此这是获取应用服务ip和端口的最后一个选项。 - com.sdk.consul.utils.StringUtils.java
处理字符串的类,不多帅。 - com.sdk.consul.ConsulConfig.java
服务注册入口,spring mvc将此类在xml中定义成bean即可,类似下面这样:
<bean class="com.sdk.consul.ConsulConfig"></bean>
springboot在启动入口类中@Import(ConsulConfig.class) - com.sdk.consul.ConsulProperty.java
properties配置文件配置信息类 - com.sdk.consul.RegisterService.java
服务操作类,提供注册和销毁服务等方法。
2.jar包依赖
- pom依赖以下jar包
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.9.RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
3.类具体代码
- com.sdk.consul.consts.ConsulConsts.java
package com.sdk.consul.consts;
public class ConsulConsts {
/** JVM启动参数名称-服务暴露地址 */
public static final String CONSUL_SERVICE_URL = "CONSUL_SERVICE_URL";
/**VM启动参数名称-服务暴露端口*/
public static final String CONSUL_SERVICE_PORT = "CONSUL_SERVICE_PORT";
}
- com.sdk.consul.exception.ConsulServiceException.java
package com.sdk.consul.exception;
/**
* Consul服务注册异常类
*/
public class ConsulServiceException extends RuntimeException{
public ConsulServiceException() {
super();
}
public ConsulServiceException(final String message) {
super(message);
}
public ConsulServiceException(final String message, final Throwable cause) {
super(message, cause);
}
}
- com.sdk.consul.utils.Md5Utils.java
package com.sdk.consul.util;
import com.sdk.consul.exception.ConsulServiceException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5Utils {
/**
* MD5加密
* @param source 加密前字符串
* @return
* @throws ConsulServiceException
* @throws NoSuchAlgorithmException
*/
public static String encrypt(String source) throws ConsulServiceException,NoSuchAlgorithmException {
if(StringUtils.isEmpty(source)){
throw new ConsulServiceException("Please input a not blank value to encrypt md5.");
}
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] md5Bytes = md5.digest(source.getBytes());
StringBuffer hexValue = new StringBuffer();
for (int i = 0; i < md5Bytes.length; i++) {
int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16){
hexValue.append("0");
}
hexValue.append(Integer.toHexString(val));
}
return hexValue.toString();
}
}
- com.sdk.consul.utils.ServerUtils.java- com.sdk.consul.utils
package com.sdk.consul.util;
import javax.management.*;
import java.lang.management.ManagementFactory;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.Set;
/**
* 获取服务器ip和应用服务器端口信息
*/
public class ServerUtils {
/** 协议 */
private static final String PROTOCOL = "protocol";
/** 协议 HTTP/1.1*/
private static final String HTTP11 = "HTTP/1.1";
/** 协议 Http11*/
private static final String HTTP11_OTHER = "Http11";
/** 应用服务容器*/
private static final String CONNECTOR = "*:type=Connector,*";
/** scheme*/
private static final String SCHEME = "scheme";
/** 端口*/
private static final String PORT = "port";
/**
* 获取ipv4和应用程序端口信息
* @return
* @throws MalformedObjectNameException
* @throws NullPointerException
* @throws UnknownHostException
* @throws AttributeNotFoundException
* @throws InstanceNotFoundException
* @throws MBeanException
* @throws ReflectionException
*/
public static String getIPV4EndPoints() throws MalformedObjectNameException,
NullPointerException, UnknownHostException,
AttributeNotFoundException, InstanceNotFoundException,
MBeanException, ReflectionException {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);
String hostname = Inet4Address.getLocalHost().getHostName();
InetAddress[] addresses = Inet4Address.getAllByName(hostname);
return getIPArray(connectors, addresses, mbs, 4);
}
/**
* 获取ipv6和应用程序端口信息
* @return
* @throws MalformedObjectNameException
* @throws NullPointerException
* @throws UnknownHostException
* @throws AttributeNotFoundException
* @throws InstanceNotFoundException
* @throws MBeanException
* @throws ReflectionException
*/
public static String getIPV6EndPoints() throws MalformedObjectNameException,
NullPointerException, UnknownHostException,
AttributeNotFoundException, InstanceNotFoundException,
MBeanException, ReflectionException {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);
String hostname = Inet6Address.getLocalHost().getHostName();
InetAddress[] addresses = Inet6Address.getAllByName(hostname);
return getIPArray(connectors, addresses, mbs, 6);
}
/**
* 获取web container 运行端口
* @return
* @throws MalformedObjectNameException
* @throws NullPointerException
*/
public static String getPort() throws MalformedObjectNameException,
NullPointerException{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
QueryExp subQueryHttp = Query.match(Query.attr(PROTOCOL), Query.value(HTTP11));
QueryExp subQueryHttpOther = Query.anySubString(Query.attr(PROTOCOL), Query.value(HTTP11_OTHER));
QueryExp query = Query.or(subQueryHttp, subQueryHttpOther);
Set<ObjectName> connectors = mbs.queryNames(new ObjectName(CONNECTOR), query);
for (Iterator<ObjectName> i = connectors.iterator(); i.hasNext();) {
ObjectName obj = i.next();
return obj.getKeyProperty(PORT);
}
return "";
}
/**
* 组合当前所有ip和端口信息
* @param connectors
* @param addresses
* @param mbs
* @param ipType 4:ipv4,6:ipv6
* @return
* @throws NullPointerException
* @throws AttributeNotFoundException
* @throws InstanceNotFoundException
* @throws MBeanException
* @throws ReflectionException
*/
private static String getIPArray(Set<ObjectName> connectors,
InetAddress[] addresses,
MBeanServer mbs,
int ipType)
throws NullPointerException, AttributeNotFoundException,
InstanceNotFoundException, MBeanException, ReflectionException {
for (Iterator<ObjectName> i = connectors.iterator(); i.hasNext();) {
ObjectName obj = i.next();
String scheme = mbs.getAttribute(obj, SCHEME).toString();
for (InetAddress addr : addresses) {
if (addr.isAnyLocalAddress() || addr.isLoopbackAddress() ||
addr.isMulticastAddress() ) {
continue;
}
if((ipType == 4 || ipType == 6) && addr.getAddress().length == ipType){
String host = addr.getHostAddress();
String ep = scheme + "://" + host;
return ep;
}
}
}
return "";
}
}
- com.sdk.consul.utils.StringUtils.java
package com.sdk.consul.util;
/**
* 字符串帮助类
*/
public class StringUtils {
/**
* 检查目标字符串是否为空白(null或者空字符串)
* @param target
* @return
*/
public static boolean isEmpty(String target){
return target == null || target.trim().length() == 0;
}
}
- com.sdk.consul.ConsulConfig.java
package com.sdk.consul;
import com.sdk.consul.exception.ConsulServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import javax.management.*;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
/**
* 应用程序注册到consul集群入口
*/
public class ConsulConfig implements InitializingBean,DisposableBean {
private static final Logger LOGGER= LoggerFactory.getLogger(ConsulConfig.class);
/** 应用服务名称 */
@Value("${consul.service.name}")
private String serviceName;
/** 健康检查心跳时间 */
@Value("${consul.service.interval}")
private Integer interval;
/** 应用服务地址 默认空白*/
@Value("${consul.service.url:}")
private String serviceUrl;
/** 应用服务端口 默认空白*/
@Value("${consul.service.port:}")
private String servicePort;
/** 应用服务版本 */
@Value("${consul.service.version}")
private String version;
/** etcd集群url */
@Value("${consul.nodes.url}")
private String nodesUrl;
/** consul配置信息 */
private ConsulProperty consulProperty;
/** 服务注册和销毁 */
private RegisterService registerService;
/**
* 初始化consul配置信息
*/
private void initConsulProperty(){
consulProperty = new ConsulProperty();
consulProperty.setServiceName(this.serviceName);
consulProperty.setServiceUrl(this.serviceUrl);
consulProperty.setServicePort(this.servicePort);
consulProperty.setInterval(this.interval);
consulProperty.setVersion(this.version);
consulProperty.setNodesUrl(this.nodesUrl);
}
/**
* 实现DisposableBean接口中的方法
* 以便工程在容器中启动的时候将服务注册至consul集群中
* @throws Exception
*/
public void afterPropertiesSet() throws ConsulServiceException,
MalformedObjectNameException,NoSuchAlgorithmException,
NullPointerException, AttributeNotFoundException,
InstanceNotFoundException, MBeanException, ReflectionException,
IOException {
//初始化配置信息
initConsulProperty();
//开始注册服务
registerService = new RegisterService(consulProperty);
LOGGER.info(">>>> Starting register service.");
registerService.registerServiceToConsul();
LOGGER.info("<<<< Register service completed.");
}
/**
* 实现InitializingBean接口中的方法
* 应用退出时将服务从集群中删除
* @throws Exception
*/
public void destroy(){
registerService.destroy();
}
}
- com.sdk.consul.ConsulProperty.java
package com.sdk.consul;
import org.springframework.beans.factory.annotation.Value;
import java.io.Serializable;
/**
* consul 配置类
*/
public class ConsulProperty implements Serializable {
/** 应用服务名称 */
private String serviceName;
/** 应用服务id */
private String serviceId;
/** 健康检查心跳时间 */
private Integer interval;
/** 应用服务地址 默认空白*/
private String serviceUrl;
/** 应用服务端口 默认空白*/
private String servicePort;
/** 应用服务版本 */
private String version;
/** etcd集群url */
private String nodesUrl;
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getServiceId() {
return serviceId;
}
public void setServiceId(String serviceId) {
this.serviceId = serviceId;
}
public Integer getInterval() {
return interval;
}
public void setInterval(Integer interval) {
this.interval = interval;
}
public String getServiceUrl() {
return serviceUrl;
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
public String getServicePort() {
return servicePort;
}
public void setServicePort(String servicePort) {
this.servicePort = servicePort;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getNodesUrl() {
return nodesUrl;
}
public void setNodesUrl(String nodesUrl) {
this.nodesUrl = nodesUrl;
}
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("serviceName:").append(this.serviceName).append(",");
sb.append("serviceId:" ).append(this.serviceId).append(",");
sb.append("interval:").append(this.interval).append(",");
sb.append("serviceUrl:").append(this.serviceUrl).append(",");
sb.append("servicePort:").append(this.servicePort).append(",");
sb.append("version:").append(this.version).append(",");
sb.append("nodesUrl:").append(this.nodesUrl);
return sb.toString();
}
}
- com.sdk.consul.RegisterService.java - com.sdk.consul
package com.sdk.consul;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.agent.model.NewService;
import com.sdk.consul.consts.ConsulConsts;
import com.sdk.consul.exception.ConsulServiceException;
import com.sdk.consul.util.Md5Utils;
import com.sdk.consul.util.ServerUtils;
import com.sdk.consul.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 服务注册类
*/
public class RegisterService{
private static final Logger LOGGER= LoggerFactory.getLogger(RegisterService.class);
/** consul配置信息 */
private ConsulProperty consulProperty;
/**
* 获取初始化好的consul信息
* @param consulProperty
*/
RegisterService(ConsulProperty consulProperty){
this.consulProperty = consulProperty;
}
/**
* 检查consul config配置是否正确
*/
private void checkConsulConfig()
throws ConsulServiceException,MalformedObjectNameException,NoSuchAlgorithmException,
NullPointerException, UnknownHostException, AttributeNotFoundException,
InstanceNotFoundException, MBeanException, ReflectionException {
if(consulProperty == null){
throw new ConsulServiceException("Get consul config error, please check the consul config was set at properties file.");
}
if(StringUtils.isEmpty(consulProperty.getServiceName())){
throw new ConsulServiceException("Please set a not blank value to ${consul.service.name} at properties file.");
}
if(consulProperty.getInterval() == null){
throw new ConsulServiceException("Please set a number value to ${consul.service.interval} at properties file.");
}
//处理serviceUrl
String serviceUrlTmp = consulProperty.getServiceUrl();
LOGGER.info("Get service url from JVM startup arguments.");
String serviceUrl = System.getProperty(ConsulConsts.CONSUL_SERVICE_URL);
if(StringUtils.isEmpty(serviceUrl)){
LOGGER.info("Can't find CONSUL_SERVICE_URL argument from JVM startup arguments,please check service url was set in JVM startup arguments,now get service url from properties file.");
serviceUrl = serviceUrlTmp;
}
if(StringUtils.isEmpty(serviceUrl)){
LOGGER.info("Can't get service url from properties file,please check ${consul.service.url} was set in properties file, now get service url from running web server.");
serviceUrl = ServerUtils.getIPV4EndPoints();
}
if(StringUtils.isEmpty(serviceUrl)){
throw new ConsulServiceException("Can't get service url from JVM startup arguments or properties file or running web container, please set CONSUL_SERVICE_URL argument to JVM startup arguments.");
}
consulProperty.setServiceUrl(serviceUrl);
//处理service port
String servicePortTmp = consulProperty.getServicePort();
LOGGER.info("Get service port from JVM startup arguments.");
String servicePort = System.getProperty(ConsulConsts.CONSUL_SERVICE_PORT);
if(StringUtils.isEmpty(servicePort)){
LOGGER.info("Can't find CONSUL_SERVICE_PORT argument from JVM startup arguments,please check service url was set in JVM startup arguments,now get service port from properties file.");
servicePort = servicePortTmp;
}
if(StringUtils.isEmpty(servicePort)){
LOGGER.info("Can't get service port from properties file,please check ${consul.service.port} was set in properties file, now get service port from running web server.");
servicePort =ServerUtils.getPort();
}
if(StringUtils.isEmpty(servicePort)){
throw new ConsulServiceException("Can't get service port,please set CONSUL_SERVICE_PORT argument to JVM startup arguments.");
}
consulProperty.setServicePort(servicePort);
consulProperty.setServiceId(Md5Utils.encrypt(serviceUrl +":"+ servicePort));
if(StringUtils.isEmpty(consulProperty.getVersion())){
throw new ConsulServiceException("Please set a not blank value to ${consul.service.version} at properties file.");
}
if(StringUtils.isEmpty(consulProperty.getNodesUrl())){
throw new ConsulServiceException("Please set a not blank value to ${consul.nodes.url} at properties file.");
}
}
/**
* 将服务注册至consul集群中
*/
void registerServiceToConsul() throws ConsulServiceException,MalformedObjectNameException,
NoSuchAlgorithmException, NullPointerException, AttributeNotFoundException,
InstanceNotFoundException, MBeanException, ReflectionException,IOException{
LOGGER.info(">>>> Starting check consul config.");
checkConsulConfig();
LOGGER.info("<<<< Check consul config success, geted cosul config is:{}",consulProperty.toString());
String consulNodesUrl = consulProperty.getNodesUrl();
List<String> tags = new ArrayList<String>();
tags.add(consulProperty.getVersion());
List<String> consulNodeUrls = new ArrayList<String>();
if (consulNodesUrl.indexOf(",") > -1) {
consulNodeUrls = Arrays.asList(consulNodesUrl.split(","));
} else {
consulNodeUrls.add(consulNodesUrl);
}
//组装服务信息
NewService newService = new NewService();
newService.setId(consulProperty.getServiceId());
newService.setTags(tags);
newService.setName(consulProperty.getServiceName());
newService.setPort(Integer.parseInt(consulProperty.getServicePort()));
newService.setAddress(consulProperty.getServiceUrl());
//组装健康检查信息
NewService.Check serviceCheck = new NewService.Check();
serviceCheck.setHttp(consulProperty.getServiceUrl() + ":" + consulProperty.getServicePort());
serviceCheck.setInterval(consulProperty.getInterval() + "s");
newService.setCheck(serviceCheck);
//往集群中注册服务
for (String url : consulNodeUrls) {
LOGGER.info(">>>> Starting register service to {}.", url);
//组装服务信息
ConsulClient client = new ConsulClient(url);
client.agentServiceRegister(newService);
LOGGER.info("<<<< Register service to {} success.", url);
}
}
/**
* 应用程序退出时删除服务信息
*/
void destroy(){
String consulNodesUrl = consulProperty.getNodesUrl();
List<String> consulNodeUrls = new ArrayList<String>();
if (consulNodesUrl.indexOf(",") > -1) {
consulNodeUrls = Arrays.asList(consulNodesUrl.split(","));
} else {
consulNodeUrls.add(consulNodesUrl);
}
//注销集群中的服务
for (String url : consulNodeUrls) {
LOGGER.info(">>>> Starting deregister service to {}.", url);
//组装服务信息
ConsulClient client = new ConsulClient(url);
client.agentServiceDeregister(consulProperty.getServiceId());
LOGGER.info("<<<< Register service to {} success.", url);
}
}
}
4.配置文件说明
- 应用服务需要加的配置说明例
#当前应用服务名称
consul.service.name=order-center
#consul健康检查心跳时间,consul集群中的节点会定时请求${consul.service.url}+${consul.service.port}
#地址,如果返回200则认为该服务是健康的,否则会将该服务标记为fail
consul.service.interval=2
#当前应用服务对外暴露的地址和端口。
#首先从jvm启动参数中获取暴露的服务地址和端口;
#jvm启动参数中没有配置的话,从当前properties文件获取;
#当前properties文件中没有配置的话,将自动获取web server container的ip和port,
#但请设置好服务器的hostname,linux下可执行命令hostname来设置hostname,
#例:hostname consul-node
consul.service.url=http://10.0.0.93
consul.service.port=8080
#当前应用服务版本
consul.service.version=v1
#consul集群地址,多个地址用半角逗号隔开
consul.nodes.url=http://10.0.0.11:8500,http://10.0.0.12:8500,http://10.0.0.13:8500