前言
好了,已经到了有难度的地方了。虽然这个整合网上一查一大堆。不过,说实在只有当自己去排坑的时候才会发现,大部分人的整合估计都是抄抄看看应该可以了的这样的想法。下面这篇sofarpc+zookeeper的整合排坑文章务必要试试。然后你会发现,哦,里面缺了这些东西的:
坑!!!sofarpc+zookeeper整合的坑
要点一、原本项目是存放在“文档”下面的,不过因为“文档”是中文名字,log4j无法识别中文路径,所以迁移到“documents”下面,要知道原因请看参考文章。
下面进入正题。
在WebExt中编写配置类以及对应插件
WebExt里面首先要检查有没有引入sofarpc的依赖:
下面分别是各个文件内容:
package net.w2p.WebExt.config;
public class ZkConf {
//zookeeper服务器所在访问url,注意,形式是:127.0.0.1:2181,这种的,不是zookeeper://127.0.0.1:2181
public String address="127.0.0.1:2181";
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
package net.w2p.WebExt.config;
/***
*
* sofaRpc 的服务提供者程序的配置。
*
* ***/
public class SofaProviderConf {
/***程序占用端口。默认端口 bolt:12200, rest:8341, h2c:12300, dubbo:20880***/
public Integer port = 22101;
/***是否守护端口,true的话随主线程退出而退出,false的话则要主动退出***/
public Boolean daemon = true;
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public Boolean getDaemon() {
return daemon;
}
public void setDaemon(Boolean daemon) {
this.daemon = daemon;
}
}
package net.w2p.WebExt.config;
/****
*
* sofa rpc 消费者客户端
*
* ***/
public class SofaCostumerConf {
}
package net.w2p.WebExt.Plugins.SofaRpc;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import net.w2p.WebExt.config.SofaProviderConf;
public class ZookeeperBoltServer {
SofaProviderConf conf=null;
String zkAddress=null;
private RegistryConfig registryConfig=null;
private ServerConfig serverConfig=null;
/***
* @param zkAddress zookeeper的地址,注意,格式是:127.0.0.1:2181这样的,如果有多个,那么就是:127.0.0.1:2181,127.0.0.1:2182
*
*
* ***/
public ZookeeperBoltServer(String zkAddress, SofaProviderConf conf){
this.conf=conf;
this.zkAddress=zkAddress;
this.registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress(zkAddress);
this.serverConfig = new ServerConfig()
.setPort(conf.port)
.setDaemon(conf.daemon);
}
/***
*
* 该方法将本地方法导出为远程方法。
* 以HelloService为接口,实现类为HelloServiceImpl为例子:
* @param clazz 为HelloService
// * @param interfaceId 为HelloService.class.getName()
* @param instance 为 new HelloServiceImpl()
*
* ****/
public<T> Boolean export(Class<T> clazz,T instance){
// ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
// .setInterfaceId(HelloService.class.getName())
// .setRef(new HelloServiceImpl("result from 22101"))
// .setServer(serverConfig)
// .setRegistry(registryConfig);
ProviderConfig<T> providerConfig = new ProviderConfig<T>()
.setInterfaceId(clazz.getName())
.setRef(instance)
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();
return true;
}
}
package net.w2p.WebExt.Plugins.SofaRpc;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
import net.w2p.WebExt.config.SofaCostumerConf;
import java.util.concurrent.ConcurrentHashMap;
public class ZookeeperBoltClient {
private ConcurrentHashMap<String,ConsumerConfig> map_config=new ConcurrentHashMap<>();
private String zkAddress=null;
private SofaCostumerConf conf=null;
private RegistryConfig registryConfig=null;
/***
* @param zkAddress zookeeper的地址,注意,格式是:127.0.0.1:2181这样的,如果有多个,那么就是:127.0.0.1:2181,127.0.0.1:2182
*
*
* ***/
public ZookeeperBoltClient(String zkAddress, SofaCostumerConf conf){
this.zkAddress=zkAddress;
this.conf=conf;
registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress(this.zkAddress);
// ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
// .setInterfaceId(HelloService.class.getName())
// .setRegistry(registryConfig)
// .setTimeout(3000);
// HelloService helloService = consumerConfig.refer();
//
// ConsumerConfig<EchoService> consumerConfig2 = new ConsumerConfig<EchoService>()
// .setInterfaceId(EchoService.class.getName())
// .setRegistry(registryConfig)
// .setTimeout(3000);
// EchoService echoService = consumerConfig2.refer();
}
/***
*
* 导入需要用到的rpc远程接口。
*
* ***/
public synchronized <T> Boolean importInterface(Class<T> clazz){
if(map_config.containsKey(clazz.getName())){
return false;
}
ConsumerConfig<T> consumerConfig = new ConsumerConfig<T>()
.setInterfaceId(clazz.getName())
.setRegistry(registryConfig)
.setTimeout(3000);
// HelloService helloService = consumerConfig.refer();
map_config.put(clazz.getName(),consumerConfig);
return true;
}
public synchronized <T> T refer(Class<T> clazz){
if(map_config.containsKey(clazz.getName())){
ConsumerConfig<T> consumerConfig=(ConsumerConfig<T>)map_config.get(clazz.getName());
return consumerConfig.refer();
}
else{
return null;
}
}
}
注意,上面的代码实际上是照搬官网,整合而成的,似乎未来趋势会持续优化。
初始化配置中心数据【zookeeper】
-- zookeeper配置与fastdfs以及redis,postgresql不一样,因为不同项目会有不同的文件服务器,redis缓存服务器以及数据库
-- 然而,对于一个集群来说,zookeeper作为中心是唯一的。
create or replace function "initZKConfig"(
in envName varchar
)
returns varchar
as $BODY$
declare _defaultValues varchar;
declare _envName varchar;
declare _appname varchar;
declare _prefix varchar;
declare strArrays varchar[];
declare arrItemLv1 varchar;
declare tempArrSubItem varchar;
declare valArrs varchar[];
declare item_attr varchar;
declare item_title varchar;
declare item_val varchar;
begin
if envName <> 'test' and envName<> 'ppe' and envName<> 'product' then
raise notice '环境变量异常,只能为test、ppe以及product其中一个。';
return '环境变量异常,只能为test、ppe以及product其中一个。';
end if;
_appname:='zk';
_prefix:=concat(_appname,'.','');
_defaultValues:=
'address->zookeeper服务器所在访问url,注意,形式是:127.0.0.1:2181,这种的,不是zookeeper://127.0.0.1:2181->127.0.0.1:2181$$' ||
''
;
strArrays:=string_to_array(_defaultValues,'$$');
_envName:=envName;
-- fastdfs.connect_timeout_in_seconds = 5
-- fastdfs.network_timeout_in_seconds = 30
-- fastdfs.charset = UTF-8
-- fastdfs.http_anti_steal_token = false
-- fastdfs.http_secret_key = FastDFS1234567890
-- fastdfs.http_tracker_http_port = 80
-- #fastdfs.tracker_servers = tw-server:22122,10.0.11.202:22122,10.0.11.203:22122
-- fastdfs.tracker_servers = localhost:22122
-- fastdfs.visit_url = http://localhost/
-- env varchar(100) not null,
-- key varchar(200) not null,
-- appname varchar(100) not null,
-- title varchar(100) not null,
-- value varchar(2000) default NULL::character varying,
insert into xxl_conf_project (appname, title) values (_appname,'zookeeper全局配置') on conflict ("appname") do nothing;
<<loop4BigArray>>
foreach arrItemLv1 in array strArrays
loop
if char_length(arrItemLv1) < 1 then
raise notice '空字符串无须处理';
continue ;
end if;
valArrs:=string_to_array(arrItemLv1,'->');
item_attr:=valArrs[1];
item_title:=valArrs[2];
item_val:=valArrs[3];
raise notice '属性名称:%,描述:%,当前值:%',item_attr,item_title,item_val;
raise notice '开始添加记录';
insert into xxl_conf_node("env","key","appname","title","value")
values (_envName,concat(_prefix,item_attr),_appname,item_title,item_val)
on conflict ("env","key") do nothing ;
end loop loop4BigArray;
return envName||'环境下的'||_appName||'配置成功';
end;
$BODY$ language plpgsql volatile ;
-- 记住执行下面方法分别添加三个环境下的默认数据。
-- select "initZKConfig"('test');
-- select "initZKConfig"('ppe');
-- select "initZKConfig"('product');
请执行:
select "initZKConfig"('test');
select "initZKConfig"('ppe');
select "initZKConfig"('product');
这样,几个环境下面都有zk的初始数据了。
服务提供者
提供服务,对外提供接口的叫服务提供者。下面我们以原有的项目-FileServerWebApp为服务提供者。
添加api模块
新添加一个Api模块,如下:
FileServerApi项目里面其实就只有一个接口,叫做 IHelloService,这个是用来给客户端用的。
模块的build.gradle如下:
plugins {
id 'java'
}
group 'net.w2p'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
dependencies {
compile project(":Shared")
}
服务提供者引用api模块
在FileServerWebApp的build.gradle里面,现在要引用api模块:
实现IHelloService接口
在FileServerWebApp对应目录中实现接口,例如:
代码如下:
package net.w2p.FileServerWebApp.Rpc;
public class HelloServiceImpl implements IHelloService {
@Override
public Integer plus(Integer a, Integer b) {
return a+b;
}
}
初始化配置中心数据【provider数据】
-- 该方法用于初始化sofa rpc的服务提供者的配置
create or replace function "initRpcProviderConfig"(
in envName varchar,
in para_appname varchar
)
returns varchar
as $BODY$
declare _defaultValues varchar;
declare _envName varchar;
declare _appname varchar;
declare _prefix varchar;
declare strArrays varchar[];
declare arrItemLv1 varchar;
declare tempArrSubItem varchar;
declare valArrs varchar[];
declare item_attr varchar;
declare item_title varchar;
declare item_val varchar;
begin
if envName <> 'test' and envName<> 'ppe' and envName<> 'product' then
raise notice '环境变量异常,只能为test、ppe以及product其中一个。';
return '环境变量异常,只能为test、ppe以及product其中一个。';
end if;
_appname:=para_appname;
_prefix:=concat(_appname,'.rpc_provider.','');
_defaultValues:=
'port->程序占用端口。不同的服务提供者要放到同一台机器请使用不同端口避免冲突->22101$$'||
''
;
strArrays:=string_to_array(_defaultValues,'$$');
_envName:=envName;
insert into xxl_conf_project ("appname", title) values (_appname,'') on conflict ("appname") do nothing;
<<loop4BigArray>>
foreach arrItemLv1 in array strArrays
loop
if char_length(arrItemLv1) < 1 then
raise notice '空字符串无须处理';
continue ;
end if;
valArrs:=string_to_array(arrItemLv1,'->');
item_attr:=valArrs[1];
item_title:=valArrs[2];
item_val:=valArrs[3];
raise notice '属性名称:%,描述:%,当前值:%',item_attr,item_title,item_val;
raise notice '开始添加记录';
insert into xxl_conf_node("env","key","appname","title","value")
values (_envName,concat(_prefix,item_attr),_appname,item_title,item_val)
on conflict ("env","key") do nothing ;
end loop loop4BigArray;
return envName||'环境下的'||_appName||'配置成功';
end;
$BODY$ language plpgsql volatile ;
-- 记住执行下面方法分别添加三个环境下的默认数据。
-- select "initRpcProviderConfig"('test','file-server');
-- select "initRpcProviderConfig"('ppe','file-server');
-- select "initRpcProviderConfig"('product','file-server');
请执行:
select "initRpcProviderConfig"('test','file-server');
select "initRpcProviderConfig"('ppe','file-server');
select "initRpcProviderConfig"('product','file-server');
使用java代码整合到spring中进行设置
设置zkConf
BeanConfiguration下面添加:
package net.w2p.local.plugins.BeanConfiguration;
import com.xxl.conf.core.XxlConfClient;
import net.w2p.WebExt.config.ZkConf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfiguration {
@Bean(name="zkConf")
public ZkConf zkConf(){
final String VarPrefix ="zk.";
ZkConf conf=new ZkConf();
conf.address = XxlConfClient.get(VarPrefix+"address");
return conf;
}
}
设置服务提供者
package net.w2p.local.plugins.BeanConfiguration;
import com.xxl.conf.core.XxlConfClient;
import net.w2p.FileServerWebApp.Rpc.HelloServiceImpl;
import net.w2p.FileServerWebApp.Rpc.IHelloService;
import net.w2p.WebExt.Plugins.SofaRpc.ZookeeperBoltServer;
import net.w2p.WebExt.config.SofaProviderConf;
import net.w2p.WebExt.config.ZkConf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* sofa rpc的服务提供者配置
*
* ***/
@Configuration
public class RpcProviderConfiguration {
@Bean(name="providerConf")
public SofaProviderConf providerConf(){
final String VarPrefix ="file-server.rpc_provider.";
SofaProviderConf conf=new SofaProviderConf();
conf.port= XxlConfClient.getInt(VarPrefix+"port");
return conf;
}
@Bean(name="rpcProvider")
@Autowired
public ZookeeperBoltServer rpcProvider(@Qualifier("zkConf")ZkConf zkConf,@Qualifier("providerConf")SofaProviderConf providerConf){
ZookeeperBoltServer provider=new ZookeeperBoltServer(zkConf.address,providerConf);
//--注意,配置要用export来导出远程接口。
//provider.export()
//--发布第一个demo性质的服务接口
provider.export(IHelloService.class,new HelloServiceImpl());
return provider;
}
}
好了,基本整合成功。现在进行测试看是否成功。
测试服务提供者是否正常运行
启动网站,然后,在zk里面执行命令查看节点中是否有我们注册的IHelloService,如下:
zookeeper的命令行用法以及为什么可以验证是不是成功注册发布,请直接参考:
坑!!!sofarpc+zookeeper整合的坑
查看结果如下:
检查!/logs/rpc/common-default.log确认运行情况:
运行情况无异常无报错。
服务提供者整合完毕。
服务消费者
添加新模块
新添加一个web模块,叫MasterWebApp,里面的整合步骤与FileServerWebApp类似,请根据:
一个基本开发框架的整合演化之路–6、项目配置说明+配置中心整合
一个基本开发框架的整合演化之路–7、整合redis
一个基本开发框架的整合演化之路–8、整合数据库-postgresql
做一次整合。
引用api模块
注意,要引用api模块
用java代码配置zk以及消费者
代码内容为:
package net.w2p.local.plugins.BeanConfiguration;
import com.xxl.conf.core.XxlConfClient;
import net.w2p.WebExt.config.ZkConf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfiguration {
@Bean(name="zkConf")
public ZkConf zkConf(){
final String VarPrefix ="zk.";
ZkConf conf=new ZkConf();
conf.address = XxlConfClient.get(VarPrefix+"address");
return conf;
}
}
package net.w2p.local.plugins.BeanConfiguration;
import net.w2p.FileServerWebApp.Rpc.IHelloService;
import net.w2p.WebExt.Plugins.SofaRpc.ZookeeperBoltClient;
import net.w2p.WebExt.config.SofaCostumerConf;
import net.w2p.WebExt.config.SofaProviderConf;
import net.w2p.WebExt.config.ZkConf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/****这是sofarpc的客户端的配置***/
@Configuration
public class RpcConsumerConfiguration {
@Bean(name="rpcConsumer")
@Autowired
public ZookeeperBoltClient rpcConsumer(@Qualifier("zkConf") ZkConf zkConf){
ZookeeperBoltClient consumer=new ZookeeperBoltClient(zkConf.address,new SofaCostumerConf());
//--注意,配置要用import来导入接口
//provider.export()
consumer.importInterface(IHelloService.class);
return consumer;
}
}
进行测试
测试代码如下:
@Autowired
ZookeeperBoltClient consumer;
@Test
public void testRpcConsumer(){
IHelloService service=consumer.refer(IHelloService.class);
Integer a=999;
Integer b=1000;
Integer total=service.plus(a,b);
System.out.println("最后结果是:"+total);
}
执行结果:
查看~/logs/rpc/common-default.log看看有无报错:
无报错信息,整合成功。
结语
这次的整合费劲。。一开始压根连报错信息都不知道在哪里看的。估计大公司所有报错信息都是写在日志上----不过,日志存放目录并没有明确告诉其他人,而网上一堆资料也没有提及到,万一整合不成功运行不起来应该在哪里查看报错信息,怎么样排坑,估计都是一复制就直接运行成功的。