RocketMQ是阿里巴巴开源的一个分布式消息和流数据平台,已经捐赠给Apache基金会。从背景来看,RocketMQ不仅久经阿里双十一的考验,而且能从Apache基金会顺利毕业,都能证明其性能、稳定性和代码质量的优异。而且,笔者参加过一次RocketMQ社区组织的开发者会议,社区氛围很好,而且能够和开发者直接交流,也不存在语言障碍。一个好的社区氛围能让我们在遇到问题以后,更轻松地解决问题。此外,RocketMQ涉及分布式系统的通信和高性能存储,通信和存储是很多系统都会涉及到的内容,因此非常值得学习。
rocketmq-remoting是RocketMQ中实现通信的组件,基于Netty封装,具有很好的通用性,可以在很多分布式系统中使用。而且,从pom文件可以看出remoting模块只依赖了netty和fastjson(日志模块暂不关注),所以不用担心这个模块的代码会与其他模块代码发生交互,盘根错节在一起,难以理清头绪。
今天的目的是能使用remoting模块实现测试环境,方便对代码进行debug。
首先来了解NettyRemotingServer和NettyRemotingClient的使用。
从NamesrvController中可以看到NettyRemotingServer的启动流程,这里对其进行了简化,只保留了必要的部分。
@Slf4j
public class TestController {
private RemotingServer remotingServer;
private BrokerHousekeepingService brokerHousekeepingService;
private NettyServerConfig nettyServerConfig;
private ExecutorService remotingExecutor;
public TestController(NettyServerConfig nettyServerConfig) {
this.nettyServerConfig = nettyServerConfig;
this.brokerHousekeepingService = new BrokerHousekeepingService();
remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads());
}
public void initialize(){
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.registerProcessor();
}
private void registerProcessor(){
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(), this.remotingExecutor);
log.info("registerProcessor");
}
public void start(){
remotingServer.start();
log.info("TestController start");
}
public void shutdown(){
remotingServer.shutdown();
remotingExecutor.shutdown();
}
}
@Slf4j
public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception {
if (ctx != null){
log.debug("receive request, {}, {}, {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()){
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
default:
break;
}
return null;
}
@Override
public boolean rejectRequest() {
return false;
}
public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
log.info(">>> putKVConfig");
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//解析请求
final PutKVConfigRequestHeader requestHeader =
(PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
log.info("requestHeader:{}", requestHeader);
//返回处理结果
response.setCode(ResponseCode.SUCCESS);
response.setRemark("This is remark");
String message = "This is a message from server";
response.setBody(message.getBytes());
return response;
}
public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
log.info(">>> getKVConfig");
final RemotingCommand reponse = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) reponse.readCustomHeader();
final GetKVConfigRequestHeader requestHeader =
(GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
log.info("requestHeader:{}", requestHeader);
responseHeader.setValue("响应头中的value");
reponse.setCode(ResponseCode.SUCCESS);
reponse.setRemark(null);
return reponse;
}
}
public class TestStartup {
public static void main(String[] args) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setServerWorkerThreads(4);
TestController testController = new TestController(nettyServerConfig);
testController.initialize();
testController.start();
}
}
服务端启动完成,下面发送一条消息测试:
@Slf4j
public class TestRequest {
@Test
public void test() {
//启动RemotingClient
String namesrvAddr = "127.0.0.1:8888";
long timeoutMillis = 3000;
NettyClientConfig nettyClientConfig = new NettyClientConfig();
RemotingClient remotingClient = new NettyRemotingClient(nettyClientConfig);
remotingClient.start();
//实例化request
PutKVConfigRequestHeader header = new PutKVConfigRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG,
header);
request.addExtField("namespace", "namespace");
request.addExtField("key", "key");
request.addExtField("value", "value");
//发送请求
try {
RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
log.info("reponse: {}", response);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingConnectException e) {
e.printStackTrace();
} catch (RemotingSendRequestException e) {
e.printStackTrace();
} catch (RemotingTimeoutException e) {
e.printStackTrace();
}
}
}
到这里,就完成了简单的测试环境搭建,下一次将详细分析服务端的消息处理流程。