SOA
服务治理--最主要就是服务之间的治理(治理基本上要做成运行时治理)
Dubbo使用条件
必须要与spring集成
Dubbo的通信协议(消费者调用生产者)
netty和mina实现
手写一个类似于dubbo SOA需要解决的问题
1、为什么消费者能够调用到生产者的服务层
2、生产者怎么把内容注册到注册中心的,然后消费者是如何获取这个信息的
3、dubbo框架是如何跟spring进行整合的,消费者获取的代理实例是如何创建
手写SOA框架
1.新建Maven工程,引入Spring的依赖包
2.在resources包下,新建META-INF包,新增soa.xsd, spring.handlers, spring.schemas
soa.xsd
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.youierdong.com/schema/soa"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www.youierdong.com/schema/soa"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<!-- 注册标签 -->
<xsd:element name="registry">
<xsd:complexType>
<xsd:attribute name="protocol" type="xsd:string"></xsd:attribute>
<xsd:attribute name="address" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
<!--reference标签-->
<xsd:element name="reference">
<xsd:complexType>
<xsd:attribute name="id" type="xsd:string"></xsd:attribute>
<xsd:attribute name="interface" type="xsd:string"></xsd:attribute>
<xsd:attribute name="loadbalance" type="xsd:string"></xsd:attribute>
<xsd:attribute name="protocol" type="xsd:string"></xsd:attribute>
<xsd:attribute name="cluster" type="xsd:string"></xsd:attribute>
<xsd:attribute name="retries" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
<!-- 协议标签-->
<xsd:element name="protocol">
<xsd:complexType>
<xsd:attribute name="name" type="xsd:string"></xsd:attribute>
<xsd:attribute name="port" type="xsd:string"></xsd:attribute>
<xsd:attribute name="host" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
<!--service标签-->
<xsd:element name="service">
<xsd:complexType>
<xsd:attribute name="interface" type="xsd:string"></xsd:attribute>
<xsd:attribute name="ref" type="xsd:string"></xsd:attribute>
<xsd:attribute name="protocol" type="xsd:string"></xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:schema>
spring.handlers
http\://www.youierdong.com/schema/soa=com.youi.erdong.spring.parse.SOANamespaceHandlers
注意:www.youierdong.com/schema/soa与xsd的配置对应
spring.schemas
http\://www.youierdong.com/schema/soa.xsd=META-INF/soa.xsd
3新建SOANamespaceHandlers (spring标签解析类)
import org.springframework.beans.factory.xml.NamespaceHandlerSupport;
/**
* spring标签解析类 继承NamespaceHandlerSupport
*
* @author Administrator
*
*/
public class SOANamespaceHandlers extends NamespaceHandlerSupport {
/**
* 注册标签
*/
public void init() {
this.registerBeanDefinitionParser("registry",
new RegistryBeanDefinitionParse(Registry.class));
this.registerBeanDefinitionParser("reference",
new ReferenceBeanDefinitionParse(Reference.class));
this.registerBeanDefinitionParser("protocol",
new ProtocolBeanDefinitionParse(Protocol.class));
this.registerBeanDefinitionParser("service",
new ServiceBeanDefinitionParse(Service.class));
}
}
其他bean解析类
import org.springframework.beans.factory.config.BeanDefinition;
/**
* bean初始化转换 Protocol标签解析类
*
* @author Administrator
*/
public class ProtocolBeanDefinitionParse implements BeanDefinitionParser {
// Protocol
private Class<?> beanClass;
public ProtocolBeanDefinitionParse(Class<?> beanClass) {
this.beanClass = beanClass;
}
// parse转换方法 解析soa.xsd的Protocol标签 获取BeanDefinition
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String name = element.getAttribute("name");
String port = element.getAttribute("port");
String host = element.getAttribute("host");
if (name == null || name.length() == 0) {
throw new RuntimeException("Protocol name 不能为空");
}
if (port == null || port.length() == 0) {
throw new RuntimeException("Protocol port 不能为空");
}
if (host == null || host.length() == 0) {
throw new RuntimeException("Protocol host 不能为空");
}
beanDefinition.getPropertyValues().add("name", name);
beanDefinition.getPropertyValues().add("port", port);
beanDefinition.getPropertyValues().add("host", host);
// 注册
parserContext.getRegistry().registerBeanDefinition(
"Protocol" + host + port, beanDefinition);
return beanDefinition;
}
}
/**
* bean初始化转换 Reference标签解析类
*
* @author Administrator
*/
public class ReferenceBeanDefinitionParse implements BeanDefinitionParser {
// Reference
private Class<?> beanClass;
public ReferenceBeanDefinitionParse(Class<?> beanClass) {
this.beanClass = beanClass;
}
// parse转换方法 解析soa.xsd的Reference标签 获取BeanDefinition
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String id = element.getAttribute("id");
String intf = element.getAttribute("interface");
String loadbalance = element.getAttribute("loadbalance");
String protocol = element.getAttribute("protocol");
if (id == null || id.length() == 0) {
throw new RuntimeException("Reference id 不能为空");
}
if (intf == null || intf.length() == 0) {
throw new RuntimeException("Reference intf 不能为空");
}
if (loadbalance == null || loadbalance.length() == 0) {
throw new RuntimeException("Reference loadbalance 不能为空");
}
if (protocol == null || protocol.length() == 0) {
throw new RuntimeException("Reference protocol 不能为空");
}
beanDefinition.getPropertyValues().add("id", id);
beanDefinition.getPropertyValues().add("intf", intf);
beanDefinition.getPropertyValues().add("loadbalance", loadbalance);
beanDefinition.getPropertyValues().add("protocol", protocol);
// 注册
parserContext.getRegistry().registerBeanDefinition("Reference" + id,
beanDefinition);
return beanDefinition;
}
}
/**
* bean初始化转换 Registry标签解析类
*
* @author Administrator
*/
public class RegistryBeanDefinitionParse implements BeanDefinitionParser {
// Registry
private Class<?> beanClass;
public RegistryBeanDefinitionParse(Class<?> beanClass) {
this.beanClass = beanClass;
}
// parse转换方法 解析soa.xsd的registry标签 获取BeanDefinition
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
// spring会把beanClass实例化 BeanDefinitionNames??
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String protocol = element.getAttribute("protocol");
String address = element.getAttribute("address");
if (protocol == null || address.length() == 0) {
throw new RuntimeException("Registry protocol 不能为空");
}
if (address == null || address.length() == 0) {
throw new RuntimeException("Registry address 不能为空");
}
beanDefinition.getPropertyValues().add("protocol", protocol);
beanDefinition.getPropertyValues().add("address", address);
// 注册
parserContext.getRegistry().registerBeanDefinition(
"Registry" + address, beanDefinition);
return beanDefinition;
}
}
/**
* bean初始化转换 Service标签解析类
*
* @author Administrator
*/
public class ServiceBeanDefinitionParse implements BeanDefinitionParser {
// Service
private Class<?> beanClass;
public ServiceBeanDefinitionParse(Class<?> beanClass) {
this.beanClass = beanClass;
}
// parse转换方法 解析soa.xsd的Service标签 获取BeanDefinition
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String intf = element.getAttribute("interface");
String ref = element.getAttribute("ref");
String protocol = element.getAttribute("protocol");
if (intf == null || intf.length() == 0) {
throw new RuntimeException("Service intf 不能为空");
}
if (ref == null || ref.length() == 0) {
throw new RuntimeException("Service ref 不能为空");
}
/**
* if (protocol == null || protocol.length() == 0) { throw new
* RuntimeException("Service protocol 不能为空"); }
*/
beanDefinition.getPropertyValues().add("intf", intf);
beanDefinition.getPropertyValues().add("ref", ref);
beanDefinition.getPropertyValues().add("protocol", protocol);
// 注册
parserContext.getRegistry().registerBeanDefinition(
"Service" + ref + intf, beanDefinition);
return beanDefinition;
}
}
其他bean实体类
public class Protocol {
private String name;
private String port;
private String host;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
}
public class Reference {
private String id;
private String intf;
private String loadbalance;
private String protocol;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getIntf() {
return intf;
}
public void setIntf(String intf) {
this.intf = intf;
}
public String getLoadbalance() {
return loadbalance;
}
public void setLoadbalance(String loadbalance) {
this.loadbalance = loadbalance;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
}
public class Registry {
private String protocol;
private String address;
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
public class Service {
private String intf;
private String ref;
private String protocol;
public String getIntf() {
return intf;
}
public void setIntf(String intf) {
this.intf = intf;
}
public String getRef() {
return ref;
}
public void setRef(String ref) {
this.ref = ref;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
}
其他service类
public interface UserService {
public String eat(String param);
}
public class UserServiceImpl implements UserService {
public String eat(String param) {
// TODO Auto-generated method stub
return null;
}
}
测试的xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:erdong="http://www.youierdong.com/schema/soa"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.youierdong.com/schema/soa
http://www.youierdong.com/schema/soa.xsd"
default-lazy-init="true">
<bean id="userServiceImpl" class="com.youi.erdong.test.service.UserServiceImpl"></bean>
<erdong:service interface="com.youi.erdong.test.service.UserService" ref="userServiceImpl"></erdong:service>
<erdong:registry protocol="redis" address="127.0.0.1:6379"></erdong:registry>
<erdong:reference id="testServiceImpl3" interface="com.youi.erdong.test.service.UserService" loadbalance="random" protocol="http"></erdong:reference>
<erdong:protocol name="http" port="27017" host="127.0.0.1"></erdong:protocol>
</beans>
运行测试类
public class Mytest {
public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext(
"mytest.xml");
}
}
4 打包后,新建消费者工程引用。
消费者获取的代理实例是如何创建?
序列化对象
import java.io.Serializable;
public class Protocol implements Serializable, InitializingBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
// 序列化id
private static final long serialVersionUID = -323182438989154754L;
private String name;
private String port;
private String host;
private String contextpath;
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
if (name.equalsIgnoreCase("rmi")) {
RmiUtil rmiUtil = new RmiUtil();
rmiUtil.startRmiServer(host, port, "erdongrmi");
}
}
// spring启动后的事件
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!ContextRefreshedEvent.class.getName().equals(
event.getClass().getName())) {
return;
}
if (!"netty".equalsIgnoreCase(name)) {
return;
}
new Thread(new Runnable() {
public void run() {
try {
NettyUtil.startServer(port);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getContextpath() {
return contextpath;
}
public void setContextpath(String contextpath) {
this.contextpath = contextpath;
}
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
// TODO Auto-generated method stub
}
}
public class Registry implements Serializable {
// 序列化id
private static final long serialVersionUID = 1672531801363254807L;
private String protocol;
private String address;
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
/**
* InitializingBean SpringIOC注入后调用
*
*/
public class Service implements Serializable, InitializingBean,
ApplicationContextAware {
// 序列化id
private static final long serialVersionUID = -2814888066547175285L;
private String intf;
private String ref;
private String protocol;
public static ApplicationContext application;
// InitializingBean 实现方法
public void afterPropertiesSet() throws Exception {
// 注册生产者的服务
BaseRegistryDelegate.registry(ref, application);
}
// ApplicationContextAware 实现方法 用来获取Spring上下文
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
// TODO Auto-generated method stub
this.application = applicationContext;
}
public String getIntf() {
return intf;
}
public void setIntf(String intf) {
this.intf = intf;
}
public String getRef() {
return ref;
}
public void setRef(String ref) {
this.ref = ref;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public static ApplicationContext getApplication() {
return application;
}
public void setApplication(ApplicationContext application) {
this.application = application;
}
}
代理实例
/**
* 代理实例接口FactoryBean 获取Spring上下文 ApplicationContextAware
*/
public class Reference implements Serializable, FactoryBean,
ApplicationContextAware, InitializingBean {
// 序列化id
private static final long serialVersionUID = 6496428948999332452L;
private String id;
private String intf;
private String loadbalance;
private String protocol;
private String cluster;
private String retries;
private Invoke invoke;// 调用者
private ApplicationContext application;
private static Map<String, Invoke> invokes = new HashMap<String, Invoke>();
private List<String> registryInfo = new ArrayList<String>();
private static Map<String, LoadBalance> loadBalance = new HashMap<String, LoadBalance>();
private static Map<String, Cluster> clusters = new HashMap<String, Cluster>();
/**
* 注册远程调用协议
*/
static {
invokes.put("http", new HttpInvoke());
invokes.put("rmi", new RmiInvoke());
invokes.put("netty", new NettyInvoke());
loadBalance.put("random", new RandomLoadBalance());
loadBalance.put("roundrobin", new RoundRobinLoadBalance());
clusters.put("failover", new FailoverClusterInvoke());
clusters.put("failfast", new FailfastClusterInvoke());
clusters.put("failsafe", new FailsafeClusterInvoke());
}
public Reference() {
System.out.println("66666666666666666666");
}
/** ApplicationContextAware的实现方法 */
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.application = applicationContext;
}
/**
* 拿到一个实例,这个方法是Spring初始化时getBean方法里调用的
* ApplicationContext.getBean("id")时就会调用getObject(),其返回值会交给Spring进行管理
* 在getObject()方法里返回的是intf接口的代理对象
*/
public Object getObject() throws Exception {
System.out.println("返回intf的代理对象");
if (protocol != null && protocol.length() > 0) {
invoke = invokes.get(protocol);
} else {
// Protocol的实例在spring容器中
Protocol protocol = application.getBean(Protocol.class);
if (protocol != null) {
invoke = invokes.get(protocol.getName());
} else {
invoke = invokes.get("http");
}
}
// 返回代理对象
return Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class<?>[] { Class.forName(intf) },
new InvokeInvocationHandler(invoke, this));
}
public Class getObjectType() {
if (intf != null && intf.length() > 0) {
try {
return Class.forName(intf);
} catch (Exception e) {
// TODO: handle exception
}
}
return null;
}
/** 返回的代理实例是否是单例 */
public boolean isSingleton() {
// TODO Auto-generated method stub
return false;
}
// InitializingBean实现的方法
public void afterPropertiesSet() throws Exception {
// 消费者获得生产者所有注册信息
registryInfo = BaseRegistryDelegate.getRegistry(id, application);
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getIntf() {
return intf;
}
public void setIntf(String intf) {
this.intf = intf;
}
public String getLoadbalance() {
return loadbalance;
}
public void setLoadbalance(String loadbalance) {
this.loadbalance = loadbalance;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public List<String> getRegistryInfo() {
return registryInfo;
}
public void setRegistryInfo(List<String> registryInfo) {
this.registryInfo = registryInfo;
}
public static Map<String, LoadBalance> getLoadBalance() {
return loadBalance;
}
public static void setLoadBalance(Map<String, LoadBalance> loadBalance) {
Reference.loadBalance = loadBalance;
}
public String getCluster() {
return cluster;
}
public void setCluster(String cluster) {
this.cluster = cluster;
}
public String getRetries() {
return retries;
}
public void setRetries(String retries) {
this.retries = retries;
}
public static Map<String, Cluster> getClusters() {
return clusters;
}
public static void setClusters(Map<String, Cluster> clusters) {
Reference.clusters = clusters;
}
}
调用策略
/**
* 采用策略模式进行rpc调用 返回json Stringj进行通信
*
* @author Administrator
*
*/
public interface Invoke {
public String invoke(Invocation invocation);
}
public class HttpInvoke implements Invoke {
public String invoke(Invocation invocation) {
Reference reference = invocation.getReference();
List<String> registryInfo = reference.getRegistryInfo();
// 负载均衡算法
String loadbalance = reference.getLoadbalance();
LoadBalance loadBalanceBean = reference.getLoadBalance().get(
loadbalance);
NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo);
// 调用远程的生产者是传输json字符串
// 根据serviceId调用生产者spring容器中的service实例
// 根据methodName和methodtype获取利用反射调用method方法
JSONObject sendParam = new JSONObject();
sendParam.put("methodName", invocation.getMethod().getName());
sendParam.put("methodParams", invocation.getObjs());
sendParam.put("serviceId", reference.getId());
sendParam.put("paramTypes", invocation.getMethod().getParameterTypes());
String url = "http://" + nodeInfo.getHost() + ":" + nodeInfo.getPort()
+ nodeInfo.getContextpath();
// 调用生产者的服务
String result = HttpRequest.sendPost(url, sendParam.toJSONString());
return result;
}
}
public class NettyInvoke implements Invoke {
public String invoke(Invocation invocation) {
Reference reference = invocation.getReference();
List<String> registryInfo = reference.getRegistryInfo();
// 负载均衡算法
String loadbalance = reference.getLoadbalance();
LoadBalance loadBalanceBean = reference.getLoadBalance().get(
loadbalance);
NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo);
// 调用远程的生产者是传输json字符串
// 根据serviceId调用生产者spring容器中的service实例
// 根据methodName和methodtype获取利用反射调用method方法
JSONObject sendParam = new JSONObject();
sendParam.put("methodName", invocation.getMethod().getName());
sendParam.put("methodParams", invocation.getObjs());
sendParam.put("serviceId", reference.getId());
sendParam.put("paramTypes", invocation.getMethod().getParameterTypes());
try {
return NettyUtil.sendMsg(nodeInfo.getHost(), nodeInfo.getPort(),
sendParam.toJSONString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
/**
* RMI通信协议
*
* @author Administrator
*
*/
public class RmiInvoke implements Invoke {
public String invoke(Invocation invocation) {
Reference reference = invocation.getReference();
List<String> registryInfo = reference.getRegistryInfo();
// 负载均衡算法
String loadbalance = reference.getLoadbalance();
LoadBalance loadBalanceBean = reference.getLoadBalance().get(
loadbalance);
NodeInfo nodeInfo = loadBalanceBean.doSelect(registryInfo);
// 调用远程的生产者是传输json字符串
// 根据serviceId调用生产者spring容器中的service实例
// 根据methodName和methodtype获取利用反射调用method方法
JSONObject sendParam = new JSONObject();
sendParam.put("methodName", invocation.getMethod().getName());
sendParam.put("methodParams", invocation.getObjs());
sendParam.put("serviceId", reference.getId());
sendParam.put("paramTypes", invocation.getMethod().getParameterTypes());
RmiUtil rmi = new RmiUtil();
SOARmi soaRmi = rmi.startRmiClient(nodeInfo, "erdongrmi");
try {
return soaRmi.invoke(sendParam.toString());
} catch (Exception e) {
// TODO: handle exception
}
return null;
}
}
/**
* 封装InvokeInvocationHandler的invoke方法里的三个参数Object , Method , Object[]
*
* @author Administrator
*
*/
public class Invocation {
private Method method;
private Object[] objs;
private Invoke invoke;
private Reference reference;
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
public Object[] getObjs() {
return objs;
}
public void setObjs(Object[] objs) {
this.objs = objs;
}
public Reference getReference() {
return reference;
}
public void setReference(Reference reference) {
this.reference = reference;
}
public Invoke getInvoke() {
return invoke;
}
public void setInvoke(Invoke invoke) {
this.invoke = invoke;
}
}
动作代理
/**
* jdk动作代理 InvokeInvocationHandler 是一个advice(通知、增强),它进行了rpc(http、rmi、netty)的远程调用
*
* @author Administrator
*
*/
public class InvokeInvocationHandler implements InvocationHandler {
private Invoke invoke;
private Reference reference;
public InvokeInvocationHandler(Invoke invoke, Reference reference) {
this.invoke = invoke;
this.reference = reference;
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
// 这里最终要调用多个远程的生产者 生产者在启动的时候要在注册中心(redis)注册信息
System.out.println("已经获取到了代理实例 InvokeInvocationHandler invoke");
Invocation invocation = new Invocation();
invocation.setMethod(method);
invocation.setObjs(args);
invocation.setReference(reference);
invocation.setInvoke(invoke);
// String result = invoke.invoke(invocation);
Cluster cluster = reference.getClusters().get(reference.getCluster());
String result = cluster.invoke(invocation);
return result;
}
}
生产者信息注册 RedisApi是Rdis工具类
public interface BaseRegistry {
public boolean registry(String param, ApplicationContext application);
public List<String> getRegistry(String id, ApplicationContext application);
}
/**
* redis注册中心处理类
*
*/
public class RedisRegistry implements BaseRegistry {
public boolean registry(String ref, ApplicationContext application) {
try {
Protocol protocol = application.getBean(Protocol.class);
Map<String, Service> services = application
.getBeansOfType(Service.class);
Registry registry = application.getBean(Registry.class);
RedisApi.createJedisPool(registry.getAddress());
for (Map.Entry<String, Service> entry : services.entrySet()) {
if (entry.getValue().getRef().equals(ref)) {
JSONObject jo = new JSONObject();
jo.put("protocol", JSONObject.toJSONString(protocol));
jo.put("services",
JSONObject.toJSONString(entry.getValue()));
JSONObject ipport = new JSONObject();
ipport.put(protocol.getHost() + ":" + protocol.getPort(),
jo);
lpush(ipport, ref);
}
}
return true;
} catch (Exception e) {
// TODO: handle exception
}
return false;
}
/**
* [ { "127.0.0.1:27017":{
* "protocol":"{"host":"127.0.0.1","name":"http","port":"27017"}",
* "services"
* :"{"intf":"com.youi.erdong.test.service.UserService4","protocol":"
* http","ref":"userServiceImpl4"}" },{ "127.0.0.1:27017":{
* "protocol":"{"host":"127.0.0.1","name":"http","port":"27017"}",
* "services"
* :"{"intf":"com.youi.erdong.test.service.UserService1","protocol":"
* http","ref":"userServiceImpl1"}" } } ]
*/
// 数据加入到redis
private void lpush(JSONObject ipport, String key) {
if (RedisApi.exists(key)) {
Set<String> keys = ipport.keySet();
String ipportStr = "";
for (String ks : keys) {
ipportStr = ks;
}
// 去重
boolean isold = false;
List<String> registryInfo = RedisApi.lrange(key);
List<String> newRegistry = new ArrayList<String>();
for (String node : registryInfo) {
JSONObject jo = JSONObject.parseObject(node);
if (jo.containsKey(ipportStr)) {
newRegistry.add(ipport.toJSONString());
isold = true;
} else {
newRegistry.add(node);
}
}
if (isold) {
if (newRegistry.size() > 0) {
RedisApi.del(key);
String[] newReStr = new String[newRegistry.size()];
for (int i = 0; i < newReStr.length; i++) {
newReStr[i] = newRegistry.get(i);
}
RedisApi.lpush(key, newReStr);
}
} else {
RedisApi.lpush(key, ipport.toJSONString());
}
} else {
RedisApi.lpush(key, ipport.toJSONString());
}
}
public List<String> getRegistry(String id, ApplicationContext application) {
try {
Registry registry = application.getBean(Registry.class);
RedisApi.createJedisPool(registry.getAddress());
if (RedisApi.exists(id)) {
// 获取list
return RedisApi.lrange(id);
}
} catch (Exception e) {
// TODO: handle exception
}
return null;
}
}
/**
* 委托者类 注册内容委托类
*/
public class BaseRegistryDelegate {
public static void registry(String ref, ApplicationContext application) {
// 获取注册信息
Registry registry = application.getBean(Registry.class);
String protocol = registry.getProtocol();
BaseRegistry baseRegistry = Registry.registryMap.get(protocol);
baseRegistry.registry(ref, application);
}
public static List<String> getRegistry(String id,
ApplicationContext application) {
Registry registry = application.getBean(Registry.class);
String protocol = registry.getProtocol();
BaseRegistry registryBean = Registry.registryMap.get(protocol);
return registryBean.getRegistry(id, application);
}
}
负载均衡算法
public interface LoadBalance {
NodeInfo doSelect(List<String> registryInfo);
}
public class NodeInfo {
private String host;
private String port;
private String contextpath;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getContextpath() {
return contextpath;
}
public void setContextpath(String contextpath) {
this.contextpath = contextpath;
}
}
/**
* 负载均衡随机算法
*/
public class RandomLoadBalance implements LoadBalance {
public NodeInfo doSelect(List<String> registryInfo) {
Random random = new Random();
int index = random.nextInt(registryInfo.size());
String registry = registryInfo.get(index);
JSONObject registryJO = (JSONObject) JSONObject.parse(registry);
Collection values = registryJO.values();
JSONObject node = new JSONObject();
for (Object value : values) {
node = JSONObject.parseObject(value.toString());
}
JSONObject protocol = node.getJSONObject("protocol");
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.setHost(protocol.get("host") != null ? protocol
.getString("host") : "");
nodeInfo.setPort(protocol.get("port") != null ? protocol
.getString("port") : "");
nodeInfo.setContextpath(protocol.get("contextpath") != null ? protocol
.getString("contextpath") : "");
return nodeInfo;
}
}
/***
* 轮询负载均衡算法
*/
public class RoundRobinLoadBalance implements LoadBalance {
private static Integer index = 0;
public NodeInfo doSelect(List<String> registryInfo) {
synchronized (index) {
if (index >= registryInfo.size()) {
index = 0;
}
String registry = registryInfo.get(index);
index++;
JSONObject registryJO = (JSONObject) JSONObject.parse(registry);
Collection values = registryJO.values();
JSONObject node = new JSONObject();
for (Object value : values) {
node = JSONObject.parseObject(value.toString());
}
JSONObject protocol = node.getJSONObject("protocol");
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.setHost(protocol.get("host") != null ? protocol
.getString("host") : "");
nodeInfo.setPort(protocol.get("port") != null ? protocol
.getString("port") : "");
nodeInfo.setContextpath(protocol.get("contextpath") != null ? protocol
.getString("contextpath") : "");
return nodeInfo;
}
}
}
非web工程的http接收方法
/**
* 这个是soa框架中给生产者接收请求用的servlet,这个必须是采用http协议才能掉得到
*
* @author Administrator
*
*/
public class DispatcherServlet extends HttpServlet {
private static final long serialVersionUID = 7394893382457783784L;
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
try {
JSONObject requestparam = httpProcess(req, resp);
String serviceId = requestparam.getString("serviceId");
String methodName = requestparam.getString("methodName");
JSONArray paramTypes = requestparam.getJSONArray("paramTypes");
JSONArray methodParamJa = requestparam.getJSONArray("methodParams");
//
Object[] objs = null;
if (methodParamJa != null) {
objs = new Object[methodParamJa.size()];
int i = 0;
for (Object o : methodParamJa) {
objs[i++] = o;
}
}
// 获取spring上下文
ApplicationContext application = Service.getApplication();
Object serviceBean = application.getBean(serviceId);
// 需要考虑重载
Method method = getMethod(serviceBean, methodName, paramTypes);
if (method != null) {
// 反射调用
Object result = method.invoke(serviceBean, objs);
resp.getWriter().write(result.toString());
} else {
resp.getWriter().write("no such method!!!!");
}
} catch (Exception e) {
// TODO: handle exception
}
}
private Method getMethod(Object bean, String methodName, JSONArray paramType) {
Method[] methods = bean.getClass().getMethods();
List<Method> retmMethod = new ArrayList<Method>();
for (Method method : methods) {
// 找到相同方法名的方法
if (methodName.trim().equals(method.getName())) {
retmMethod.add(method);
}
}
if (retmMethod.size() == 0) {
return retmMethod.get(0);
}
boolean isSameSize = false;
boolean isSameType = false;
for (Method method : retmMethod) {
Class<?>[] typs = method.getParameterTypes();
if (typs.length == paramType.size()) {
isSameSize = true;
}
if (isSameSize) {
continue;
}
for (int i = 0; i < typs.length; i++) {
if (typs[i].toString().contains(paramType.getString(i))) {
isSameType = true;
} else {
isSameType = false;
break;
}
}
if (isSameType) {
return method;
}
}
return null;
}
// 获取请求参数
public static JSONObject httpProcess(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
StringBuffer sb = new StringBuffer();
InputStream is = req.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is,
"utf-8"));
String s = "";
while ((s = br.readLine()) != null) {
sb.append(s);
}
if (sb.toString().length() <= 0) {
return null;
}
return JSONObject.parseObject(sb.toString());
}
}
/**
* 继承Remote
*
* @author Administrator
*
*/
public interface SOARmi extends Remote {
public String invoke(String param) throws RemoteException;
}
/**
* 生产者端
*
* @author Administrator
*
*/
public class SOARmiImpl extends UnicastRemoteObject implements SOARmi {
private static final long serialVersionUID = 6735305564709334218L;
protected SOARmiImpl() throws RemoteException {
super();
// TODO Auto-generated constructor stub
}
public String invoke(String param) throws RemoteException {
JSONObject requestparam = JSONObject.parseObject(param);
// 要从远程的生产者的spring容器中拿到对应的serviceid实例
String serviceId = requestparam.getString("serviceId");
String methodName = requestparam.getString("methodName");
JSONArray paramTypes = requestparam.getJSONArray("paramTypes");
// 这个对应的方法参数
JSONArray methodParamJa = requestparam.getJSONArray("methodParams");
// 这个就是反射的参数
Object[] objs = null;
if (methodParamJa != null) {
objs = new Object[methodParamJa.size()];
int i = 0;
for (Object o : methodParamJa) {
objs[i++] = o;
}
}
// 拿到spring的上下文
ApplicationContext application = Service.getApplication();
// 服务层的实例
Object serviceBean = application.getBean(serviceId);
// 这个方法的获取,要考虑到这个方法的重载
Method method = getMethod(serviceBean, methodName, paramTypes);
if (method != null) {
Object result;
try {
result = method.invoke(serviceBean, objs);
return result.toString();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
return "---------------------------------nosuchmethod-----------------------------";
}
return null;
}
private Method getMethod(Object bean, String methodName,
JSONArray paramTypes) {
Method[] methods = bean.getClass().getMethods();
List<Method> retMethod = new ArrayList<Method>();
for (Method method : methods) {
// 把名字和methodName入参相同的方法加入到list中来
if (methodName.trim().equals(method.getName())) {
retMethod.add(method);
}
}
// 如果大小是1就说明相同的方法只有一个
if (retMethod.size() == 1) {
return retMethod.get(0);
}
boolean isSameSize = false;
boolean isSameType = false;
jack: for (Method method : retMethod) {
Class<?>[] types = method.getParameterTypes();
if (types.length == paramTypes.size()) {
isSameSize = true;
}
if (!isSameSize) {
continue;
}
for (int i = 0; i < types.length; i++) {
if (types[i].toString().contains(paramTypes.getString(i))) {
isSameType = true;
} else {
isSameType = false;
}
if (!isSameType) {
continue jack;
}
}
if (isSameType) {
return method;
}
}
return null;
}
}
/**
* Rmi工具 RMI是底层是socket 不能跨平台 java底层的协议
*
* @author Administrator
*
*/
public class RmiUtil {
/**
* @Description 启动rmi服务
* @param @param host
* @param @param port
* @param @param id 参数
* @return void 返回类型
* @throws
*/
public void startRmiServer(String host, String port, String id) {
try {
SOARmi soaRmi = new SOARmiImpl();
LocateRegistry.createRegistry(Integer.valueOf(port));
// rmi://127.0.0.1:1135/fudisfuodsuf id保证bind的唯一
Naming.bind("rmi://" + host + ":" + port + "/" + id, soaRmi);
System.out.println("rmi server start !!!");
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AlreadyBoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public SOARmi startRmiClient(NodeInfo nodeinfo, String id) {
String host = nodeinfo.getHost();
String port = nodeinfo.getPort();
try {
return (SOARmi) Naming.lookup("rmi://" + host + ":" + port + "/"
+ id);
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NotBoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
Netty
/**
* Netty 底层NIO无阻塞协议
*
* @author Administrator
*
*/
public class NettyUtil {
public static void startServer(String port) throws Exception {
// BOOS线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 工作者线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new NettyServerInHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128);
// 完成同步功能
ChannelFuture f = b.bind(Integer.parseInt(port)).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static String sendMsg(String host, String port, final String sendmsg)
throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
final StringBuffer resultmsg = new StringBuffer();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyClientInHandler(resultmsg, sendmsg));
}
});
// 这个是连接服务端,一直在等待着服务端的返回消息,返回的信息封装到future,可以监控线程的返回
ChannelFuture f = b.connect(host, Integer.parseInt(port)).channel()
.closeFuture().await();
return resultmsg.toString();
} finally {
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerInHandler extends ChannelInboundHandlerAdapter {
/*
* @see netty的客户端有消息过来的时候就会调用这个方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println(resultStr);
result.release();
String response = invokeService(resultStr);
// 放入缓存
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.writeAndFlush(encoded);
ctx.close();
}
private String invokeService(String param) {
JSONObject requestparam = JSONObject.parseObject(param);
// 要从远程的生产者的spring容器中拿到对应的serviceid实例
String serviceId = requestparam.getString("serviceId");
String methodName = requestparam.getString("methodName");
JSONArray paramTypes = requestparam.getJSONArray("paramTypes");
// 这个对应的方法参数
JSONArray methodParamJa = requestparam.getJSONArray("methodParams");
// 这个就是反射的参数
Object[] objs = null;
if (methodParamJa != null) {
objs = new Object[methodParamJa.size()];
int i = 0;
for (Object o : methodParamJa) {
objs[i++] = o;
}
}
// 拿到spring的上下文
ApplicationContext application = Service.getApplication();
// 服务层的实例
Object serviceBean = application.getBean(serviceId);
// 这个方法的获取,要考虑到这个方法的重载
Method method = getMethod(serviceBean, methodName, paramTypes);
if (method != null) {
Object result;
try {
result = method.invoke(serviceBean, objs);
return result.toString();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
return "---------------------------------nosuchmethod-----------------------------";
}
return null;
}
private Method getMethod(Object bean, String methodName,
JSONArray paramTypes) {
Method[] methods = bean.getClass().getMethods();
List<Method> retMethod = new ArrayList<Method>();
for (Method method : methods) {
// 把名字和methodName入参相同的方法加入到list中来
if (methodName.trim().equals(method.getName())) {
retMethod.add(method);
}
}
// 如果大小是1就说明相同的方法只有一个
if (retMethod.size() == 1) {
return retMethod.get(0);
}
boolean isSameSize = false;
boolean isSameType = false;
jack: for (Method method : retMethod) {
Class<?>[] types = method.getParameterTypes();
if (types.length == paramTypes.size()) {
isSameSize = true;
}
if (!isSameSize) {
continue;
}
for (int i = 0; i < types.length; i++) {
if (types[i].toString().contains(paramTypes.getString(i))) {
isSameType = true;
} else {
isSameType = false;
}
if (!isSameType) {
continue jack;
}
}
if (isSameType) {
return method;
}
}
return null;
}
@Override
public boolean isSharable() {
// TODO Auto-generated method stub
return super.isSharable();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
super.channelWritabilityChanged(ctx);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
// TODO Auto-generated method stub
super.bind(ctx, localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.disconnect(ctx, promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
// TODO Auto-generated method stub
super.write(ctx, msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.flush(ctx);
}
@Override
protected Object clone() throws CloneNotSupportedException {
// TODO Auto-generated method stub
return super.clone();
}
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
return super.equals(obj);
}
@Override
protected void finalize() throws Throwable {
// TODO Auto-generated method stub
super.finalize();
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return super.hashCode();
}
@Override
public String toString() {
// TODO Auto-generated method stub
return super.toString();
}
}
public class NettyClientInHandler extends ChannelInboundHandlerAdapter {
public StringBuffer message;// 接受服务端的消息
public String sendMsg;// 发送给服务端的消息
public NettyClientInHandler(StringBuffer message, String sendMsg) {
this.message = message;
this.sendMsg = sendMsg;
}
/*
* @see 当我们连接成功以后会触发这个方法 在这个方法里面完成消息的发送
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------------channelActive-------------");
ByteBuf encoded = ctx.alloc().buffer(4 * sendMsg.length());
encoded.writeBytes(sendMsg.getBytes());
ctx.write(encoded);
ctx.flush();
}
/*
* @see 一旦服务端有消息过来,这个方法会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("------------------channelRead--------------------");
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
System.out.println("server response msg:" + new String(result1));
message.append(new String(result1));
result.release();
}
@Override
public boolean isSharable() {
// TODO Auto-generated method stub
return super.isSharable();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelUnregistered(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
super.channelWritabilityChanged(ctx);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
// TODO Auto-generated method stub
super.bind(ctx, localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.disconnect(ctx, promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.close(ctx, promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
// TODO Auto-generated method stub
super.deregister(ctx, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
// TODO Auto-generated method stub
super.write(ctx, msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.flush(ctx);
}
@Override
protected Object clone() throws CloneNotSupportedException {
// TODO Auto-generated method stub
return super.clone();
}
@Override
public boolean equals(Object obj) {
// TODO Auto-generated method stub
return super.equals(obj);
}
@Override
protected void finalize() throws Throwable {
// TODO Auto-generated method stub
super.finalize();
}
@Override
public int hashCode() {
// TODO Auto-generated method stub
return super.hashCode();
}
@Override
public String toString() {
// TODO Auto-generated method stub
return super.toString();
}
}
集群容错
public interface Cluster {
public String invoke(Invocation invocation) throws Exception;
}
/**
* @Description 这个如果调用节点异常,直接失败
* @ClassName FailfastClusterInvoke
* @Date 2017年11月18日 下午9:55:23
* @Author dn-jack
*/
public class FailfastClusterInvoke implements Cluster {
public String invoke(Invocation invocation) throws Exception {
Invoke invoke = invocation.getInvoke();
try {
return invoke.invoke(invocation);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}
/**
* @Description 这个如果调用失败就自动切换到其他集群节点
* @ClassName FailoverClusterInvoke
* @Date 2017年11月18日 下午9:37:46
* @Author dn-jack
*/
public class FailoverClusterInvoke implements Cluster {
public String invoke(Invocation invocation) throws Exception {
String retries = invocation.getReference().getRetries();
Integer retriint = Integer.parseInt(retries);
for (int i = 0; i < retriint; i++) {
try {
Invoke invoke = invocation.getInvoke();
String result = invoke.invoke(invocation);
return result;
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
throw new RuntimeException("retries" + retries + "全部失败!!!!");
}
}
/**
* @Description 调用节点失败,直接忽略
* @ClassName FailsafeClusterInvoke
* @Date 2017年11月18日 下午9:55:49
* @Author dn-jack
*/
public class FailsafeClusterInvoke implements Cluster {
public String invoke(Invocation invocation) throws Exception {
Invoke invoke = invocation.getInvoke();
try {
return invoke.invoke(invocation);
} catch (Exception e) {
e.printStackTrace();
return "忽略";
}
}
}
消息发布与订阅
/**
* redis的发布与订阅,跟我们的activemq中的topic消息消费机制差不多 是一个广播形式的消费消息
*/
public class RedisServerRegistry extends JedisPubSub {
/*
* @see 当往频道其实就是队列,当往里面发布消息的时候,这个方法就会触发
*/
@Override
public void onMessage(String channel, String message) {
}
@Override
public void subscribe(String... channels) {
// TODO Auto-generated method stub
super.subscribe(channels);
}
}