Apache Kafka是一个pub-sub消息流,可用于创建企业消息队列。
Apache Kafka的Spring支持
Spring框架为Apache Kafka提供了实时发布和订阅数据的支持。
Spring WebSocket
Spring框架还托管了一个名为Spring WebSocket的项目,该项目可用于实时在客户端和服务器之间来回发送消息。为此,Spring WebSocket正在使用STOMP协议。
Maven依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator-core</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
</dependencies>
Kafka Configuration in Spring:
EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrapserver}")
public String bootstrapServer;
@Bean
public Map<String,Object> consumerConfigs(){
Map<String,Object> props=new HashMap<String,Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "temp-groupid.group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
BOOTSTRAP_SERVERS_CONFIG
提到运行Kafka的服务器地址。
KEY_DESERIALIZER_CLASS_CONFIG
并 VALUE_DESERIALIZER_CLASS_CONFIG
从Kafka队列中反序列化密钥和值。
GROUP_ID_CONFIG
为Kafka 分组ID。
使用 AUTO_OFFSET_RESET_CONFIG
偏移配置。
Spring WebSocket Configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/live-temperature").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
}
}
Consumer and Message Publish for WebSocket
@Service
public class KafkaConsumerService{
@Autowired
SimpMessagingTemplate template;
@KafkaListener(topics="${kafka.topic}")
public void consume(@Payload String message) {
if(isNumeric(message)) {
template.convertAndSend("/topic/temperature", message);
}
}
public boolean isNumeric(String str)
{
try
{
@SuppressWarnings("unused")
double d = Double.parseDouble(str);
}
catch(NumberFormatException nfe)
{
return false;
}
return true;
}
}
前台页面:
<!DOCTYPE html>
<html>
<head>
<meta charset="ISO-8859-1">
<title>Home</title>
<link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<script src="/webjars/jquery/jquery.min.js"></script>
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.22.2/moment.min.js"></script>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.7.2/Chart.min.js"></script>
<script type="text/javascript">
var stompClient;
/* Chart Configuration */
var config = {
type : 'line',
data : {
labels : [],
datasets : [ {
label : 'Temperature',
backgroudColor : 'rgb(255, 99, 132)',
borderColor : 'rgb(255, 99, 132)',
data : [],
fill : false
} ]
},
options : {
responsive : true,
title : {
display : true,
text : 'Temperature'
},
tooltips : {
mode : 'index',
intersect : false
},
hover : {
mode : 'nearest',
intersect : true
},
scales : {
xAxes : [ {
display : true,
type : 'time',
time : {
displayFormats : {
quarter : 'h:mm:ss a'
}
},
scaleLabel : {
display : true,
labelString : 'Time'
}
} ],
yAxes : [ {
display : true,
scaleLabel : {
display : true,
labelString : 'Value'
}
} ]
}
}
};
/* Document Ready Event */
$(document).ready(function() {
var ctx = document.getElementById('lineChart').getContext('2d');
window.myLine = new Chart(ctx, config);
/* Configuring WebSocket on Client Side */
var socket = new SockJS('/live-temperature');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/temperature', function(temperature) {
$('#temperature').text(temperature.body);
/* Push new data On X-Axis of Chart */
config.data.labels.push(new Date());
/* Push new data on Y-Axis of chart */
config.data.datasets.forEach(function(dataset) {
dataset.data.push(temperature.body);
});
window.myLine.update();
});
});
});
</script>
</head>
<body>
<div class="alert alert-danger" role="alert" style="width:300px;margin-left:40%;margin-top:10px;">
<p class="text-center">Current Temperature : <b id="temperature">0</b></p>
</div>
<div class="model">
<div class="modal-dialog" style="width:80%;height:auto">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">Temperature</h5>
</div>
<div class="model-body">
<div class="container" style="width:80%">
<canvas id="lineChart"></canvas>
</div>
</div>
</div>
</div>
</div>
</body>
</html
application.properties
扫描二维码关注公众号,回复:
5276890 查看本文章
server.port=5656
#Kafka Topic and Server Port
kafka.topic=livetemperature
kafka.bootstrapserver=localhost:9092
原文地址:https://dzone.com/articles/live-dashboard-using-apache-kafka-and-spring-webso