akka http使用demo

基于HTTP的REST服务

一个REST HTTP服务由一系列REST API组成,每个API会包括以下两个要素:

  1. API描述,比如:GET /usrs/,描述这个API使用GET方法查询指定用户的信息。
  2. API处理器。包含了这个API具体的实现逻辑。

一个HTTP API的处理流程大概是,HTTP服务器收到HTTP请求,解析出HTTP中的URL,获取到URL中的API PATH及HTTP方法。然后将这个API路由到指定的API处理器,进而获取到响应值,然后HTTP服务器将响应封装为HTTP响应发回给客户端。

如何开发一个akka http REST服务?

这里对官方给的一个例子进行分析:https://developer.lightbend.com/guides/akka-http-quickstart-java/

这个例子里面,定义了几个api:

POST /users
GET /users
GET /users/<id>
DELETE /users/<id>

在akka http中,使用Route来封装api及对应的处理器。这里是定义对应Route的代码:

public Route userRoutes() {    
  //定义所有api前缀为: /users                                                                                     
  return pathPrefix("users", () -> 
    //concat会将多个Route聚合在一起,表示这些Route都会参与/users这个path段的匹配                                                                                 
    concat(       
      //pathEnd表示全路径匹配,这意味着不能匹配“/users/”这样的路径                                                                                           
      pathEnd(() -> concat(
        //匹配GET /users, 处理器是getUsers()
        get(() -> onSuccess(getUsers(), users -> complete(StatusCodes.OK, users, Jackson.marshaller()))),   

        //匹配POST /users, 这里将客户端的请求body解析为User对象, 然后交给createUser去使用, performed是createUser的返回值 
        post(() -> entity(Jackson.unmarshaller(User.class), user -> onSuccess(createUser(user), performed -> {
          log.info("Create result: {}", performed.description);                                               
          return complete(StatusCodes.CREATED, performed, Jackson.marshaller());                              
        }))))),

      // 如果请求是/users/Bill, PathMatchers.segment()会以“/”为分隔符解析出第二个path段Bill,
      // 交给concat聚合的路由进一步匹配                                                                                                        
      path(PathMatchers.segment(), (String name) -> concat(    
        //匹配GET /users/<name>, 处理器是getUsers(name)                                                
        get(() -> onSuccess(getUser(name), performed -> {                                                     
          log.info("Get User: {}", performed.maybeUser.toString());                                       
          if (performed.maybeUser.isPresent()) {                                                          
            return complete(StatusCodes.OK, performed.maybeUser.get(), Jackson.marshaller());             
          } else {                                                                                        
            return complete(StatusCodes.BAD_REQUEST, "user not exist", Jackson.marshaller());             
          }                                                                                               
        })),
        //匹配DELETE /users/<name>, 处理器是deleteUser(name)                                                                                                 
        delete(() -> onSuccess(deleteUser(name), performed -> {                                               
                    log.info("Delete result: {}", performed.description);                                           
                    return complete(StatusCodes.OK, performed, Jackson.marshaller());                               
                  }                                                                                                 
        ))))                                                                                                  
      )                                                                                                             
  );                                                                                                                
}

这里将user相关的几个api通过concat聚合在一起,进行统一维护。

我们再来看看这些api对应的处理器代码:

private CompletionStage<GetUserResponse> getUser(String name) {
  //send ask type message GetUser to userRegistryActor
  return AskPattern.ask(userRegistryActor, new Function<ActorRef<GetUserResponse>, Command>() {
    @Override
    public Command apply(ActorRef<GetUserResponse> ref) throws Exception {
      return new GetUser(name, ref);
    }
  }, askTimeout, scheduler);
}

private CompletionStage<ActionPerformed> deleteUser(String name) {
  return AskPattern.ask(userRegistryActor, ref -> new DeleteUser(name, ref), askTimeout, scheduler);
}

private CompletionStage<Users> getUsers() {
  return AskPattern.ask(userRegistryActor, GetUsers::new, askTimeout, scheduler);
}

private CompletionStage<ActionPerformed> createUser(User user) {
  return AskPattern.ask(userRegistryActor, ref -> new CreateUser(user, ref), askTimeout, scheduler);
}

getUser我没有使用lamda方式,暴露更多细节,方便理解。几个方法都是向userRegistryActor这个actor发送ask消息,比如getUser(name)中是发送GetUser消息,ask方式会有返回GetUserResponse类型的消息, 我们需要告诉消息接受者应该把消息回复给哪个actor,这个接受actor会是一个临时生成的actor,类似这样: Actor[akka://HelloAkkaHttpServer/temp/UserRegistry$a#0]

AskPattern.ask()每次调用都会创建一个临时actor。

userRegistryActor这个actor是UserRegistry的实例,代码如下:

public class UserRegistry extends AbstractBehavior<Command> {
  //#user-case-classes

  private Users users;

  private UserRegistry(ActorContext<Command> context) {
    super(context);
    this.users = new Users();
  }

  public static Behavior<Command> create() {
    return Behaviors.setup(UserRegistry::new);
  }

  @Override
  public Receive<Command> createReceive() {
    //receive message and route to handler
    Behavior<Command> registry = this;
    return newReceiveBuilder()
      .onMessage(CreateUser.class, new Function<CreateUser, Behavior<Command>>() {
        @Override
        public Behavior<Command> apply(CreateUser param) throws Exception {
          param.onCreateUser(users);
          return registry;
        }
      })
      .onMessage(GetUser.class, new Function<GetUser, Behavior<Command>>() {
        @Override
        public Behavior<Command> apply(GetUser param) throws Exception {
          param.onGetUser(users);
          return registry;
        }
      })
      //....省略一些代码
      .build();
  }
}

我更改了原例子中代码,将CreateUser、GetUser的内部类拿了出来,也没有使用lamda。 AbstractBehavior是来自com.typesafe.akka/akka-actor-typed_2.13/2.6.11这个包,
早先akka创建actor都是继承AbstractActor, 来自com.typesafe.akka/akka-actor_2.13/2.6.11, 可以从包名看到前者多了个typed,是的AbstractBehavior的定义多了范型,
标识这个Actor支持的消息类型。在大型akka项目中,能有这种限定是有利的,能让开发者在编译器就发现一些消息类型不匹配的问题,是一种协议大约约定的体现。

以前的AbstractActor是一种无类型actor, 这样一个actor它能处理那些类型的消息,其他actor得看它的代码或者文档说明才知道。如果这个actor增加了一种消息类型,或者消息类型发生了变化,需要开发者去进行类型检查,这在大型项目中会非常低效率。因此有了有类型的actor,akka的开发者也推荐新项目使用AbstractBehavior。

继续回到UserRegistry,它接收的消息类型是Command, 在这个例子里,Command是一个标记接口:

public interface Command {
}

CreateUser等这些具体消息类型实现了Command接口,这里只列出CreateUser的代码:

public final class CreateUser implements Command {
    public final User user;
    public final ActorRef<ActionPerformed> replyTo;

    public CreateUser(User user, ActorRef<ActionPerformed> replyTo) {
        this.user = user;
        this.replyTo = replyTo;
    }

    public void onCreateUser(Users users) {
        users.getUsers().add(this.user);
        this.replyTo.tell(new ActionPerformed(String.format("User %s created.", this.user.name)));
    }
}

所有userRegistryActor收到的消息都会在UserRegistry#createReceive()中处理,onMessage(CreateUser.class,…)会处理CreateUser类型的消息。在CreateUser消息类中,onCreateUser()方法
完成了业务处理逻辑。它将收到的这个User的信息保存起来,然后向消息发送actor发一个响应:replyTo.tell(),返回的消息封装在ActionPerformed中,它也是一个实现Command接口的类。

总结起来请求流程:
请求: client -> POST /users/xxx -> Route分发请求 -> createUser创建临时actor -> 发送ask异步消息CreateUser -> UserRegistry -> 处理消息CreateUser -> CreateUser#onCreateUser
响应: CreateUser#onCreateUser -> 发送响应tell异步消息 -> createUser创建的临时actor -> Route获取到响应后封装为HTTP response -> client

其他api处理器流程也类似这样。

actor系统创建流程

QuickstartApp.java中包含有这个例子的启动代码。

Behavior<NotUsed> rootBehavior = Behaviors.setup(context -> {
  ActorRef<Command> userRegistryActor = context.spawn(UserRegistry.create(), "UserRegistry");
  // akka://HelloAkkaHttpServer
  System.out.println(context.getSystem()); 
  // Actor[akka://HelloAkkaHttpServer/user/UserRegistry#1979156691]
  System.out.println(userRegistryActor);  
  // [Actor[akka://HelloAkkaHttpServer/user/UserRegistry#1979156691]] 
  System.out.println(Arrays.toString(context.getChildren().toArray())); 
  
  UserRoutes userRoutes = new UserRoutes(context.getSystem(), userRegistryActor); 

  // 启动http服务器,将Route注册到这个http服务器上。
  startHttpServer(userRoutes.userRoutes(), context.getSystem());
  return Behaviors.empty();
});

//创建一个actor系统
ActorSystem system = ActorSystem.create(rootBehavior, "HelloAkkaHttpServer");
// akka://HelloAkkaHttpServer
System.out.println(system); 

这里就创建了前面提到的userRegistryActor这个actor, 我加了几句测试代码,注视是他们打印出的值。

来看看启动http服务器,注册路由的代码:

static void startHttpServer(Route route, ActorSystem<?> system) {    
  //绑定到指定actor系统,服务监听localhost:8080,绑定指定路由                                                       
  CompletionStage<ServerBinding> futureBinding = Http.get(system).newServerAt("localhost", 8080).bind(route);               
                                                                                                                            
  futureBinding.whenComplete((binding, exception) -> {                                                                      
    if (binding != null) {                                                                                                  
      InetSocketAddress address = binding.localAddress();                                                                   
      system.log().info("Server online at http://{}:{}/",                                                                   
        address.getHostString(),                                                                                            
        address.getPort());                                                                                                 
    } else {                                                                                                                
      system.log().error("Failed to bind HTTP endpoint, terminating system", exception);                                    
      system.terminate();                                                                                                   
    }                                                                                                                       
  });                                                                                                                       
}                                                                                                                           

完事。

这里例子包含了api的定义、处理器的编写、http服务器在指定actor系统中的启动及路由注册,覆盖了一个http服务的各个方面。

https://developer.lightbend.com/guides/akka-http-quickstart-java/ 这里提供用curl进行测试的例子。

创建用户数据:
curl -H "Content-type: application/json" -X POST -d '{"name": "MrX", "age": 31, "countryOfResidence": "Canada"}' http://localhost:8080/users
curl -H "Content-type: application/json" -X POST -d '{"name": "Anonymous", "age": 55, "countryOfResidence": "Iceland"}' http://localhost:8080/users
curl -H "Content-type: application/json" -X POST -d '{"name": "Bill", "age": 67, "countryOfResidence": "USA"}' http://localhost:8080/users

查询:
curl http://localhost:8080/users
返回值:{"users":[{"name":"Anonymous","age":55,"countryOfResidence":"Iceland"},{"name":"MrX","age":31,"countryOfResidence":"Canada"},{"name":"Bill","age":67,"countryOfResidence":"USA"}]}

curl http://localhost:8080/users/Bill
返回值:{"name":"Bill","age":67,"countryOfResidence":"USA"}

删除:
curl -X DELETE http://localhost:8080/users/Bill
返回值: User Bill deleted.

修改后代码:https://github.com/sniperLx/akka-http-quickstart-java/

猜你喜欢

转载自blog.csdn.net/lx1848/article/details/113101336