一、Aysnc I/O 是啥?
流计算系统中经常需要与外部系统进行交互,比如需要查询外部数据库以关联上用户的额外信息。Flink Async I/O API 允许用户在数据流中使用异步请求客户端访问外部存储。该API处理与数据流的集成,以及消息顺序性(Order)、事件时间(event time)、一致性(容错)等脏活累活。用户只需要专注于业务。
二、业务实现:访问外部系统实时获取用户所在位置信息(省市信息、区号等)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
public class AsyncFunction extends RichAsyncFunction<String, ActivityBean> {
// transient 不参与序列化 不持久化缓存状态
private transient CloseableHttpAsyncClient httpClient = null;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化异步httpClient
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(3000) // socket超时时间
.setConnectTimeout(3000) // 链接超时时间
.build();
httpClient = HttpAsyncClients.custom()
.setMaxConnTotal(20) // 最多可创建的httpClient请求
.setDefaultRequestConfig(requestConfig)
.build();
httpClient.start();
}
@Override
public void asyncInvoke(String input, ResultFuture<ActivityBean> resultFuture) throws Exception {
String[] fields = input.split(",");
String uid = fields[0];
String aid = fields[1];
String time = fields[2];
int eventType = Integer.parseInt(fields[3]);
String longitude = fields[4];
String latitude = fields[5];
String url = "mytest";
HttpGet httpGet = new HttpGet(url);
Future<HttpResponse> future = httpClient.execute(httpGet, null);
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
HttpResponse httpResponse = future.get();
String province = null;
if (httpResponse.getStatusLine().getStatusCode() == 200) {
// 获取请求的json字符串
String result = EntityUtils.toString(httpResponse.getEntity());
// 转成json对象
JSONObject jsonObject = JSON.parseObject(result);
// 获取位置信息
JSONObject regeocode = jsonObject.getJSONObject("regeocode");
if (regeocode != null && !regeocode.isEmpty()) {
JSONObject address = regeocode.getJSONObject("addressComponent");
// 获取省市区
province = address.getString("province");
}
}
return province;
} catch (Exception e) {
return null;
}
}
}).thenAccept((String province) -> {
resultFuture.complete(Collections.singleton(ActivityBean.of(uid, aid, null, time, eventType, province)));
});
}
@Override
public void close() throws Exception {
httpClient.close();
}
}
三、最佳实践
Async I/O operator提供完全exactly-once容错保证,它将运行中的异步请求记录存储在检查点中,并在从故障恢复时恢复/重新触发请求
最佳实践
1.使用Executor作为Future的回调时,推荐使用线程切换开销较小的DirectExecutor,可以选择下面任意方式或得:
org.apache.flink.runtime.concurrent.Executors.directExecutor()
com.google.common.util.concurrent.MoreExecutors.directExecutor()
2.asyncInvoke#asyncInvoke不是被Flink多线程调用的,不要在里面直接使用阻塞操作。