定义:这是一种异步编程模式,先开始一个任务的执行,并得到一个用于获取该任务执行结果的凭据对象,而不必等待该任务执行完毕就可以继续执行其他操作。等到需要该任务的执行结果时,再调用凭据对象的相关方法来获取
Promiser: 负责对外暴露可以返回的Promise对象的异步方法,并启动异步任务的执行
compute: 启动异步任务的执行,并返回用于获取异步任务执行结果的凭据对象
Promise: 包装异步任务处理结果的凭据对象。负责检测异步任务是否处理完毕、返回和存储异步任务处理结果
getResult: 获取与其所属Promise实例关联的异步任务的执行结果
setResult: 设置与其所属Promise实例管理的异步任务的执行结果
isDone: 检测与其所属Promise实例关联的异步任务是否执行完毕
Result: 负责表示异步任务处理结果
TaskExecutor: 负责真正执行异步任务所代表的计算,并将其计算结果设置到相应的Promise实例
run: 执行异步任务所代表的计算
下列代码中,需要执行FTP客户端实例化,这是个比较耗时的操作,通过并发地上传文件和初始化客户端可以节约时间。此时可以将客户端实例化做异步执行处理,通过FutureTask这个类的特性实现。FutureTask会获得callable的执行结果,也就是调用com.bruce.promise.FTPUploaderPromisor#newFTPUploaderPromise()时,执行call方法的结果,这个方法执行了异步任务,即com.bruce.promise.FTPClientUtil#init()方法,也就是FTP客户端实例的初始化
package com.bruce.promise;
import java.io.File;
/**
* @Author: Bruce
* @Date: 2019/5/31 17:20
* @Version 1.0
*/
public interface FTPUploader {
void init(String ftpServer, String ftpUserName, String password, String serverDir) throws Exception;
void upload(File file) throws Exception;
void disconnect();
}
package com.bruce.promise;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPClientConfig;
import org.apache.commons.net.ftp.FTPReply;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: Bruce
* @Date: 2019/5/31 18:05
* @Version 1.0
*/
public class FTPClientUtil implements FTPUploader {
final FTPClient ftp = new FTPClient();
final Map<String, Boolean> dirCreateMap = new HashMap<>();
@Override
public void init(String ftpServer, String ftpUserName, String password, String serverDir) throws Exception {
FTPClientConfig config = new FTPClientConfig();
ftp.configure(config);
int reply;
ftp.connect(ftpServer);
System.out.println(ftp.getReplyString());
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
throw new RuntimeException("FTP server refused connection.");
}
boolean isOK = ftp.login(ftpUserName, password);
if (isOK) {
System.out.println(ftp.getReplyString());
} else {
throw new RuntimeException("Failed to login " + ftp.getReplyString());
}
reply = ftp.cwd(serverDir);
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
throw new RuntimeException(
"Failed to change working directory. reply: " + reply
);
} else {
System.out.println(ftp.getReplyString());
}
ftp.setFileType(FTP.ASCII_FILE_TYPE);
}
@Override
public void upload(File file) throws Exception {
InputStream dataIn = new BufferedInputStream(new FileInputStream(file), 1024 * 8);
boolean isOk;
String dirName = file.getParentFile().getName();
String fileName = dirName + '/' + file.getName();
ByteArrayInputStream checkFileInputStream = new ByteArrayInputStream("".getBytes());
try {
if (!dirCreateMap.containsKey(dirName)) {
ftp.makeDirectory(dirName);
dirCreateMap.put(dirName, null);
}
try {
isOk = ftp.storeFile(fileName, dataIn);
} catch (IOException e) {
throw new RuntimeException("Failed to upload " + file, e);
}
if (isOk) {
ftp.storeFile(fileName + ".c", checkFileInputStream);
} else {
throw new RuntimeException("Failed to upload " + file + ",reply:"
+ ftp.getReplyString());
}
} finally {
dataIn.close();
}
}
@Override
public void disconnect() {
if (ftp.isConnected()) {
try {
ftp.disconnect();
} catch (IOException e) {
}
}
}
}
package com.bruce.promise;
import java.util.concurrent.*;
/**
* @Author: Bruce
* @Date: 2019/5/31 17:41
* @Version 1.0
*/
public class FTPUploaderPromisor {
public static Future<FTPUploader> newFTPUploaderPromise(String ftpServer, String ftpUserName, String password, String serverDir) {
Executor helperExecutor = new Executor() {
@Override
public void execute(Runnable command) {
Thread t = new Thread(command);
t.start();
}
};
return newFTPUploaderPromise(ftpServer, ftpUserName, password, serverDir, helperExecutor);
}
public static Future<FTPUploader> newFTPUploaderPromise(final String ftpServer, final String ftpUserName,
final String password, final String serverDir, Executor helperExecutor) {
Callable<FTPUploader> callable = new Callable<FTPUploader>() {
@Override
public FTPUploader call() throws Exception {
String implClazz = System.getProperty("ftp.client.impl");
if (null == implClazz) {
implClazz = "com.bruce.promise.FTPClientUtil";
}
FTPUploader ftpUploader;
ftpUploader = (FTPUploader) Class.forName(implClazz).newInstance();
ftpUploader.init(ftpServer, ftpUserName, password, serverDir);
return ftpUploader;
}
};
final FutureTask<FTPUploader> task = new FutureTask<>(callable);
helperExecutor.execute(task);
return task;
}
}
package com.bruce.promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
/**
* @Author: Bruce
* @Date: 2019/5/31 17:23
* @Version 1.0
*/
public class FakeFTPUploader implements FTPUploader {
private Logger LOG = LoggerFactory.getLogger(FakeFTPUploader.class);
@Override
public void init(String ftpServer, String ftpUserName, String password, String serverDir) throws Exception {
}
@Override
public void upload(File file) throws Exception {
LOG.info("uploading %s", file);
Thread.sleep(1000);
}
@Override
public void disconnect() {
}
}
’
package com.bruce.promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @Author: Bruce
* @Date: 2019/5/31 18:54
* @Version 1.0
*/
public class DataSyncTask implements Runnable {
private Logger LOG = LoggerFactory.getLogger(DataSyncTask.class);
private final Map<String, String> taskParameters;
public DataSyncTask(Map<String, String> taskParameters) {
this.taskParameters = taskParameters;
}
@Override
public void run() {
String ftpServer = taskParameters.get("server");
String ftpUserName = taskParameters.get("userName");
String password = taskParameters.get("password");
String serverDir = taskParameters.get("serverDir");
Future<FTPUploader> ftpClientUtilPromise = FTPUploaderPromisor.newFTPUploaderPromise(ftpServer,
ftpUserName, password, serverDir);
try {
generateFilesFromDB();
} catch (InterruptedException e) {
e.printStackTrace();
}
FTPUploader ftpClientUtil = null;
try {
ftpClientUtil = ftpClientUtilPromise.get();
} catch (InterruptedException e) {
;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
uploadFiles(ftpClientUtil);
}
private void generateFilesFromDB() throws InterruptedException {
LOG.info("generating files from database...");
Thread.sleep(1000);
}
private void uploadFiles(FTPUploader ftpClientUtil) {
Set<File> files = retrieveGeneratedFiles();
for (File file : files) {
try {
ftpClientUtil.upload(file);
} catch (Exception e) {
e.printStackTrace();
}
}
}
protected Set<File> retrieveGeneratedFiles() {
Set<File> files = new HashSet<File>();
File currDir = new File("filePath");
for (File f : currDir.listFiles((dir, name) -> new File(dir, name).isFile() && name.endsWith(".class"))) {
files.add(f);
}
return files;
}
}
package com.bruce.promise;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: Bruce
* @Date: 2019/5/31 19:49
* @Version 1.0
*/
public class CaseRunner {
public static void main(String[] args) {
Map<String, String> params = new HashMap<String, String>();
params.put("server", "serverIP");
params.put("userName", "username");
params.put("password", "password");
params.put("serverDir", "serverDir");
System.setProperty("", "");
DataSyncTask dst;
dst = new DataSyncTask(params);
Thread t = new Thread(dst);
t.start();
}
}
承诺模式的好处就是减少不必要的等待,屏蔽异步编程和同步编程的差异。比较熟悉的实现承诺模式的有java.util.concurrent.FutureTask,在RocketMQ中,org.apache.rocketmq.broker.latency.FutureTaskExt继承了FutureTask,并加以拓展。
参考资料