一、响应式编程与传统BIO
传统的 BIO,是线程将数据写入 Connection 之后,当前线程进入 Block 状态,直到响应返回,之后接着做响应返回后的动作。NIO 则是线程将数据写入 Connection 之后,将响应返回后需要做的事情以及参数缓存到一个地方之后,直接返回。在有响应返回后,NIO 的 Selector 的 Read 事件会是 Ready 状态,扫描 Selector 事件的线程,会告诉你的线程池数据好了,然后线程池中的某个线程,拿出刚刚缓存的要做的事情还有参数,继续处理。
背压则指的是如何应对过度的生产者数据,当数据积压过多时可以根据被压策略进行缓存或者丢弃
二、背压代码分享
package com.soft863.highway.common.influxdb;
import com.soft863.highway.common.core.utils.FLuxUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDBException;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.springframework.http.MediaType;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class InfluxDBTemplate implements InfluxDBOperations {
private final String database;
private final WebClient webClient;
//批量提交大小
@Setter
private int bufferSize = 5000;
//缓冲间隔,此时间间隔内没有收到消息,则直接存储,不进行缓冲
@Setter
private int bufferRate = 1000;
//缓冲超时时间,超过此时间没有达到bufferSize也进行存储
@Setter
private Duration bufferTimeout = Duration.ofSeconds(3);
@Setter
private int maxRetry = 3;
@Getter
@Setter
private int backpressureBufferSize = 10;
@Getter
@Setter
private long maxBufferBytes = DataSize.ofMegabytes(15).toBytes();
FluxSink<Point> sink;
public InfluxDBTemplate(String database, WebClient webClient) {
this.database = database;
this.webClient = webClient;
}
private void init() {
AtomicLong pointCounter = new AtomicLong();
FLuxUtils
.bufferRate(
Flux.<Point>create(sink -> this.sink = sink).map(Point::lineProtocol),
bufferRate,
bufferSize,
bufferTimeout,
(point, list) -> pointCounter.addAndGet(point.length() * 2L) > maxBufferBytes)
.doOnNext(ignore -> pointCounter.set(0))
.onBackpressureBuffer(backpressureBufferSize,
list -> log.warn("无法处理更多InfluxDB写入请求"), BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.boundedElastic(), backpressureBufferSize)
.flatMap(batch -> Mono
.create(sink -> {
int size = batch.size();
String data = String.join("\n", batch);
long time = System.currentTimeMillis();
this
.write(data)
.retryWhen(Retry
.backoff(maxRetry, Duration.ofSeconds(1))
.filter(err -> !(err instanceof InfluxDBException)))
.doOnSuccess(nil -> log.trace("保存InfluxDB[{}]数据成功,数量:{},耗时:{}ms",
database,
size,
System.currentTimeMillis() - time))
.doOnError((err) -> log.error("保存InfluxDB数据失败:", err))
.doFinally(s -> sink.success())
.subscribe();
}
))
.onErrorContinue((err, val) -> log.error("保存InfluxDB数据失败:", err))
.subscribe();
}
public void shutdown() {
sink.complete();
}
public WebClient getClient() {
return webClient;
}
@Override
public Mono<Void> write(Point point) {
sink.next(point);
return Mono.empty();
}
public Mono<Void> write(String body) {
return webClient
.post()
.uri(builder -> builder
.path("/write")
.queryParam("db", database)
.build())
.contentType(MediaType.TEXT_PLAIN)
.bodyValue(body)
.exchange()
.flatMap(response -> {
if (response.statusCode().isError()) {
return handleError(response);
}
return response.releaseBody();
});
}
public <T> Mono<T> handleError(ClientResponse response) {
Throwable err = InfluxDBException
.buildExceptionForErrorState(response
.headers()
.asHttpHeaders()
.getFirst("X-Influxdb-Error"));
return response.releaseBody().then(Mono.error(err));
}
@Override
public Mono<Void> write(BatchPoints points) {
long t = System.currentTimeMillis();
int size = points.getPoints().size();
return write(points.lineProtocol())
.doOnSuccess(nil -> log.trace("保存InfluxDB[{}]数据成功,数量:{},耗时:{}ms",
database,
size,
System.currentTimeMillis() - t));
}
}