@Component
public class KafkaTest {
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.linger:1}")
private int linger;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
@PostConstruct
public void test() {
String username = "kafka";
String password = "443322";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9095,ip2:9095,ip3:9095");
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.setProperty("ssl.endpoint.identification.algorithm", "");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
System.out.println("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username
+ "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username
+ "\" password=\"" + password + "\";");
props.put("ssl.truststore.location", "D:\\java_adv\\websecurity-master\\src\\main\\resources\\client.truststore.jks");
props.put("ssl.truststore.password", "aaaa");
// KafkaConsumer consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(properties);
KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
Future send = producer.send(new ProducerRecord("777", "123456789"));
try {
Object o = send.get();
System.out.println(o);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("test...............");
}
}
同时 代码层resources增加client.truststore.jks