基于HTTP的REST服务
一个REST HTTP服务由一系列REST API组成,每个API会包括以下两个要素:
- API描述,比如:GET /usrs/,描述这个API使用GET方法查询指定用户的信息。
- API处理器。包含了这个API具体的实现逻辑。
一个HTTP API的处理流程大概是,HTTP服务器收到HTTP请求,解析出HTTP中的URL,获取到URL中的API PATH及HTTP方法。然后将这个API路由到指定的API处理器,进而获取到响应值,然后HTTP服务器将响应封装为HTTP响应发回给客户端。
如何开发一个akka http REST服务?
这里对官方给的一个例子进行分析:https://developer.lightbend.com/guides/akka-http-quickstart-java/
这个例子里面,定义了几个api:
POST /users
GET /users
GET /users/<id>
DELETE /users/<id>
在akka http中,使用Route来封装api及对应的处理器。这里是定义对应Route的代码:
public Route userRoutes() {
//定义所有api前缀为: /users
return pathPrefix("users", () ->
//concat会将多个Route聚合在一起,表示这些Route都会参与/users这个path段的匹配
concat(
//pathEnd表示全路径匹配,这意味着不能匹配“/users/”这样的路径
pathEnd(() -> concat(
//匹配GET /users, 处理器是getUsers()
get(() -> onSuccess(getUsers(), users -> complete(StatusCodes.OK, users, Jackson.marshaller()))),
//匹配POST /users, 这里将客户端的请求body解析为User对象, 然后交给createUser去使用, performed是createUser的返回值
post(() -> entity(Jackson.unmarshaller(User.class), user -> onSuccess(createUser(user), performed -> {
log.info("Create result: {}", performed.description);
return complete(StatusCodes.CREATED, performed, Jackson.marshaller());
}))))),
// 如果请求是/users/Bill, PathMatchers.segment()会以“/”为分隔符解析出第二个path段Bill,
// 交给concat聚合的路由进一步匹配
path(PathMatchers.segment(), (String name) -> concat(
//匹配GET /users/<name>, 处理器是getUsers(name)
get(() -> onSuccess(getUser(name), performed -> {
log.info("Get User: {}", performed.maybeUser.toString());
if (performed.maybeUser.isPresent()) {
return complete(StatusCodes.OK, performed.maybeUser.get(), Jackson.marshaller());
} else {
return complete(StatusCodes.BAD_REQUEST, "user not exist", Jackson.marshaller());
}
})),
//匹配DELETE /users/<name>, 处理器是deleteUser(name)
delete(() -> onSuccess(deleteUser(name), performed -> {
log.info("Delete result: {}", performed.description);
return complete(StatusCodes.OK, performed, Jackson.marshaller());
}
))))
)
);
}
这里将user相关的几个api通过concat聚合在一起,进行统一维护。
我们再来看看这些api对应的处理器代码:
private CompletionStage<GetUserResponse> getUser(String name) {
//send ask type message GetUser to userRegistryActor
return AskPattern.ask(userRegistryActor, new Function<ActorRef<GetUserResponse>, Command>() {
@Override
public Command apply(ActorRef<GetUserResponse> ref) throws Exception {
return new GetUser(name, ref);
}
}, askTimeout, scheduler);
}
private CompletionStage<ActionPerformed> deleteUser(String name) {
return AskPattern.ask(userRegistryActor, ref -> new DeleteUser(name, ref), askTimeout, scheduler);
}
private CompletionStage<Users> getUsers() {
return AskPattern.ask(userRegistryActor, GetUsers::new, askTimeout, scheduler);
}
private CompletionStage<ActionPerformed> createUser(User user) {
return AskPattern.ask(userRegistryActor, ref -> new CreateUser(user, ref), askTimeout, scheduler);
}
getUser我没有使用lamda方式,暴露更多细节,方便理解。几个方法都是向userRegistryActor这个actor发送ask消息,比如getUser(name)中是发送GetUser消息,ask方式会有返回GetUserResponse类型的消息, 我们需要告诉消息接受者应该把消息回复给哪个actor,这个接受actor会是一个临时生成的actor,类似这样: Actor[akka://HelloAkkaHttpServer/temp/UserRegistry$a#0]
AskPattern.ask()每次调用都会创建一个临时actor。
userRegistryActor这个actor是UserRegistry的实例,代码如下:
public class UserRegistry extends AbstractBehavior<Command> {
//#user-case-classes
private Users users;
private UserRegistry(ActorContext<Command> context) {
super(context);
this.users = new Users();
}
public static Behavior<Command> create() {
return Behaviors.setup(UserRegistry::new);
}
@Override
public Receive<Command> createReceive() {
//receive message and route to handler
Behavior<Command> registry = this;
return newReceiveBuilder()
.onMessage(CreateUser.class, new Function<CreateUser, Behavior<Command>>() {
@Override
public Behavior<Command> apply(CreateUser param) throws Exception {
param.onCreateUser(users);
return registry;
}
})
.onMessage(GetUser.class, new Function<GetUser, Behavior<Command>>() {
@Override
public Behavior<Command> apply(GetUser param) throws Exception {
param.onGetUser(users);
return registry;
}
})
//....省略一些代码
.build();
}
}
我更改了原例子中代码,将CreateUser、GetUser的内部类拿了出来,也没有使用lamda。 AbstractBehavior是来自com.typesafe.akka/akka-actor-typed_2.13/2.6.11这个包,
早先akka创建actor都是继承AbstractActor, 来自com.typesafe.akka/akka-actor_2.13/2.6.11, 可以从包名看到前者多了个typed,是的AbstractBehavior的定义多了范型,
标识这个Actor支持的消息类型。在大型akka项目中,能有这种限定是有利的,能让开发者在编译器就发现一些消息类型不匹配的问题,是一种协议大约约定的体现。
以前的AbstractActor是一种无类型actor, 这样一个actor它能处理那些类型的消息,其他actor得看它的代码或者文档说明才知道。如果这个actor增加了一种消息类型,或者消息类型发生了变化,需要开发者去进行类型检查,这在大型项目中会非常低效率。因此有了有类型的actor,akka的开发者也推荐新项目使用AbstractBehavior。
继续回到UserRegistry,它接收的消息类型是Command, 在这个例子里,Command是一个标记接口:
public interface Command {
}
CreateUser等这些具体消息类型实现了Command接口,这里只列出CreateUser的代码:
public final class CreateUser implements Command {
public final User user;
public final ActorRef<ActionPerformed> replyTo;
public CreateUser(User user, ActorRef<ActionPerformed> replyTo) {
this.user = user;
this.replyTo = replyTo;
}
public void onCreateUser(Users users) {
users.getUsers().add(this.user);
this.replyTo.tell(new ActionPerformed(String.format("User %s created.", this.user.name)));
}
}
所有userRegistryActor收到的消息都会在UserRegistry#createReceive()中处理,onMessage(CreateUser.class,…)会处理CreateUser类型的消息。在CreateUser消息类中,onCreateUser()方法
完成了业务处理逻辑。它将收到的这个User的信息保存起来,然后向消息发送actor发一个响应:replyTo.tell(),返回的消息封装在ActionPerformed中,它也是一个实现Command接口的类。
总结起来请求流程:
请求: client -> POST /users/xxx -> Route分发请求 -> createUser创建临时actor -> 发送ask异步消息CreateUser -> UserRegistry -> 处理消息CreateUser -> CreateUser#onCreateUser
响应: CreateUser#onCreateUser -> 发送响应tell异步消息 -> createUser创建的临时actor -> Route获取到响应后封装为HTTP response -> client
其他api处理器流程也类似这样。
actor系统创建流程
QuickstartApp.java中包含有这个例子的启动代码。
Behavior<NotUsed> rootBehavior = Behaviors.setup(context -> {
ActorRef<Command> userRegistryActor = context.spawn(UserRegistry.create(), "UserRegistry");
// akka://HelloAkkaHttpServer
System.out.println(context.getSystem());
// Actor[akka://HelloAkkaHttpServer/user/UserRegistry#1979156691]
System.out.println(userRegistryActor);
// [Actor[akka://HelloAkkaHttpServer/user/UserRegistry#1979156691]]
System.out.println(Arrays.toString(context.getChildren().toArray()));
UserRoutes userRoutes = new UserRoutes(context.getSystem(), userRegistryActor);
// 启动http服务器,将Route注册到这个http服务器上。
startHttpServer(userRoutes.userRoutes(), context.getSystem());
return Behaviors.empty();
});
//创建一个actor系统
ActorSystem system = ActorSystem.create(rootBehavior, "HelloAkkaHttpServer");
// akka://HelloAkkaHttpServer
System.out.println(system);
这里就创建了前面提到的userRegistryActor这个actor, 我加了几句测试代码,注视是他们打印出的值。
来看看启动http服务器,注册路由的代码:
static void startHttpServer(Route route, ActorSystem<?> system) {
//绑定到指定actor系统,服务监听localhost:8080,绑定指定路由
CompletionStage<ServerBinding> futureBinding = Http.get(system).newServerAt("localhost", 8080).bind(route);
futureBinding.whenComplete((binding, exception) -> {
if (binding != null) {
InetSocketAddress address = binding.localAddress();
system.log().info("Server online at http://{}:{}/",
address.getHostString(),
address.getPort());
} else {
system.log().error("Failed to bind HTTP endpoint, terminating system", exception);
system.terminate();
}
});
}
完事。
这里例子包含了api的定义、处理器的编写、http服务器在指定actor系统中的启动及路由注册,覆盖了一个http服务的各个方面。
https://developer.lightbend.com/guides/akka-http-quickstart-java/ 这里提供用curl进行测试的例子。
创建用户数据:
curl -H "Content-type: application/json" -X POST -d '{"name": "MrX", "age": 31, "countryOfResidence": "Canada"}' http://localhost:8080/users
curl -H "Content-type: application/json" -X POST -d '{"name": "Anonymous", "age": 55, "countryOfResidence": "Iceland"}' http://localhost:8080/users
curl -H "Content-type: application/json" -X POST -d '{"name": "Bill", "age": 67, "countryOfResidence": "USA"}' http://localhost:8080/users
查询:
curl http://localhost:8080/users
返回值:{"users":[{"name":"Anonymous","age":55,"countryOfResidence":"Iceland"},{"name":"MrX","age":31,"countryOfResidence":"Canada"},{"name":"Bill","age":67,"countryOfResidence":"USA"}]}
curl http://localhost:8080/users/Bill
返回值:{"name":"Bill","age":67,"countryOfResidence":"USA"}
删除:
curl -X DELETE http://localhost:8080/users/Bill
返回值: User Bill deleted.
修改后代码:https://github.com/sniperLx/akka-http-quickstart-java/