Vert.x 事务处理

Vert.x中处理数据库事务,需要手动关闭自动提交事务,不多说,直接上代码。

Demo.java

package com.test;
 
import com.test.utils.Runner;
import io.vertx.core.*;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.UpdateResult;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
 
import java.util.ArrayList;
import java.util.List;
 
 
/**
 * Created by sweet on 2017/6/24.
 */
public class Demo extends AbstractVerticle {
 
 
  private static final Logger logger = LogManager.getLogger(Demo.class);
 
  private JDBCClient jdbcClient;
 
  @Override
  public void start() throws Exception {
 
    jdbcClient = JDBCClient.createShared(vertx, new JsonObject()
            .put("url", "jdbc:mysql://localhost:3306/vertx?characterEncoding=UTF-8&useSSL=false")
            .put("driver_class", "com.mysql.jdbc.Driver")
            .put("user", "root").put("password", "xxxx")
            .put("max_pool_size", 30));
 
    Router router = Router.router(vertx);
    router.route().handler(BodyHandler.create());
 
    router.get("/hello").handler(r -> {
      r.response().end("hello");
    });
 
    router.get("/db").handler(this::queryDB);
    router.get("/tx").handler(this::tx);
    router.get("/tx2").handler(this::tx2);
 
    vertx.createHttpServer().requestHandler(router::accept).listen(8081, r -> {
      if (r.succeeded()) {
        logger.info("服务启动成功,端口 8081");
      } else {
        logger.error("服务启动失败,error :" + r.cause().getMessage());
      }
    });
 
  }
 
  // 下面的代码类似
  // try{
  //  // 开启事务
  //  ...
  // // 提交事务
  // } catch (Exception e) {
  // // 回滚事务
  // }
  private void tx(RoutingContext routingContext) {
    jdbcClient.getConnection(conn -> {
 
      final SQLConnection connection = conn.result();
      String s1 = "INSERT INTO movie (id, name, duration, director) VALUES ('1432414', 'xxx124', 20, '124xxx')";
      String s2 = "INSERT INTO movie (id, name, duration, director) VALUES ('1432415', 'xxx125', 21, '125xxx')";
      String s3 = "INSERT INTO movie (id, name, duration, director) VALUES ('1432416', 'xxx126', 22, '126xxx')";
 
      Future.<Void>future(f ->
              connection.setAutoCommit(false, f)
      ).compose(f1 ->
              Future.<UpdateResult>future(f ->
                      connection.update(s1, f)
              )
      ).compose(f2 ->
              Future.<UpdateResult>future(f3 ->
                      connection.update(s2, f3)
              )
      ).compose(f4 ->
              Future.<UpdateResult>future(f5 ->
                      connection.update(s3, f5)
              )
      ).setHandler(res -> {
        if (res.failed()) {
          logger.error("事务失败");
          connection.rollback(roll -> {
            if (roll.succeeded()) {
              logger.info("rollback  ok");
              routingContext.response().setStatusCode(500).end("rollback ok");
              connection.close();
            } else {
              logger.error("rollback error, " + roll.cause().getMessage());
              routingContext.response().setStatusCode(500).end("rollback error");
              connection.close();
            }
          });
        } else {
          logger.info("事务成功");
          connection.commit(commit -> {
            if (commit.succeeded()) {
              logger.info("commit  ok");
              routingContext.response().end("ok");
              connection.close();
            } else {
              logger.error("commit error, " + commit.cause().getMessage());
              routingContext.response().end("commit error");
              connection.close();
            }
          });
        }
 
      });
    });
  }
  
  private void tx2(RoutingContext routingContext) {
    final String s1 = "INSERT INTO movie (id, name, duration, director) VALUES ('1432414', 'xxx124', 20, '124xxx')";
    final String s2 = "INSERT INTO movie (id, name, duration, director) VALUES ('1432415', 'xxx125', 21, '125xxx')";
    final String s3 = "INSERT INTO movie (id, name_, duration, director) VALUES ('1432416', 'xxx126', 22, '126xxx')";
 
    jdbcClient.getConnection(conn -> {
      if (conn.succeeded()) {
        final SQLConnection connection = conn.result();
 
        List<Future> futures = new ArrayList<>();
 
        Future<Void> future = Future.future();        // 手动开启事务
        connection.setAutoCommit(false, future);
        futures.add(future);
 
        Future<UpdateResult> future1 = Future.future(); // 执行 Update1
        connection.update(s1,future1);
        futures.add(future1);
 
        Future<UpdateResult> future2 = Future.future(); // 执行 Update2
        connection.update(s2,future2);
        futures.add(future2);
 
        Future<UpdateResult> future3 = Future.future(); // 执行 Update3
        connection.update(s3,future3);
        futures.add(future3);
 
        CompositeFuture.all(futures).setHandler(re -> {
          if (re.succeeded()){ // 如果都执行成功,提交事务
            connection.commit(comm -> {
              if (comm.succeeded()){
                routingContext.response().end("commit ok");
              }else {
                routingContext.response().setStatusCode(500).end();
                logger.error(comm.cause().getMessage());
              }
              connection.close();
            });
          }else { // 如果其中一条执行失败,回滚事务
            connection.rollback(roll -> {
              if (roll.succeeded()){
                routingContext.response().setStatusCode(500).end("rollback ok");
                logger.info("rollback ok");
              }else {
                routingContext.response().setStatusCode(500).end();
                logger.error(roll.cause().getMessage());
              }
              connection.close();
            });
          }
        });
      }else {
        routingContext.response().setStatusCode(500).end();
        logger.error(conn.cause().getMessage());
      }
    });
 
  }
 
  private void queryDB(RoutingContext routingContext) {
 
    jdbcClient.getConnection(conn -> {
      if (conn.succeeded()) {
        final SQLConnection connection = conn.result();
 
        Future<ResultSet> future = Future.future();
        future.setHandler(res -> {
          if (res.succeeded()) {
            List<JsonObject> list = res.result().getRows();
            if (list != null && list.size() > 0) {
              routingContext.response().end(Json.encodePrettily(list));
              connection.close();
            }
          } else {
            logger.error(res.cause().getMessage());
            routingContext.response().setStatusCode(500).end();
            connection.close();
          }
        });
 
        final String sql = "SELECT * FROM movie";
        connection.queryWithParams(sql, null, future);
 
      } else {
        logger.error(conn.cause().getMessage());
        routingContext.response().setStatusCode(500).end();
      }
    });
 
 
  }
 
  public static void main(String[] args) {
    Runner.runExample(Demo.class);
  }
}
 
复制代码

里面的main方法可以直接运行

Runner.java 是官方提供的 方便在IDE里运行。

package com.test.utils;
 
/**
 * Created by sweet on 2017/5/8.
 */
 
 
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
 
import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;
 
/*
 * @author <a href="http://tfox.org">Tim Fox</a>
 */
public class Runner {
 
    private static final String WEB_EXAMPLES_DIR = "web-examples";
    private static final String WEB_EXAMPLES_JAVA_DIR = WEB_EXAMPLES_DIR + "/src/main/java/";
    private static final String WEB_EXAMPLES_JS_DIR = WEB_EXAMPLES_DIR + "/src/main/js/";
    private static final String WEB_EXAMPLES_GROOVY_DIR = WEB_EXAMPLES_DIR + "/src/main/groovy/";
    private static final String WEB_EXAMPLES_RUBY_DIR = WEB_EXAMPLES_DIR + "/src/main/ruby/";
 
    public static void runClusteredExample(Class clazz) {
        runExample(WEB_EXAMPLES_JAVA_DIR, clazz, new VertxOptions().setClustered(true), null);
    }
 
    public static void runExample(Class clazz) {
        runExample(WEB_EXAMPLES_JAVA_DIR, clazz, new VertxOptions().setClustered(false), null);
    }
 
    public static void runExample(Class clazz, DeploymentOptions options) {
        runExample(WEB_EXAMPLES_JAVA_DIR, clazz, new VertxOptions().setClustered(false), options);
    }
 
    // JavaScript examples
 
    public static void runJSExample(String scriptName) {
        runScriptExample(WEB_EXAMPLES_JS_DIR, scriptName, new VertxOptions().setClustered(false));
    }
 
    public static void runJSExampleClustered(String scriptName) {
        runScriptExample(WEB_EXAMPLES_JS_DIR, scriptName, new VertxOptions().setClustered(true));
    }
 
    static class JSAuthRunner {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/auth/server.js");
        }
    }
 
    static class JSAuthJDBC {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/authjdbc/server.js");
        }
    }
 
    static class JSHelloWorldRunner {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/helloworld/server.js");
        }
    }
 
    static class JSRealtimeRunner {
        public static void main(String[] args) { Runner.runJSExample("io/vertx/example/web/realtime/server.js"); }
    }
 
    static class JSChatRunner {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/chat/server.js");
        }
    }
 
    static class JSSessionsRunner {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/sessions/server.js");
        }
    }
 
    static class JSTemplatingRunner {
        public static void main(String[] args) {
            Runner.runJSExample("io/vertx/example/web/templating/mvel/server.js");
        }
    }
 
    // Groovy examples
 
    public static void runGroovyExample(String scriptName) {
        runScriptExample(WEB_EXAMPLES_GROOVY_DIR, scriptName, new VertxOptions().setClustered(false));
    }
 
    public static void runGroovyExampleClustered(String scriptName) {
        runScriptExample(WEB_EXAMPLES_GROOVY_DIR, scriptName, new VertxOptions().setClustered(true));
    }
 
    static class GroovyAuthRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/auth/server.groovy");
        }
    }
 
    static class GroovyAuthJDBC {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/authjdbc/server.groovy");
        }
    }
 
    static class GroovyHelloWorldRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/helloworld/server.groovy");
        }
    }
 
    static class GroovyChatRunner {
        public static void main(String[] args) { Runner.runGroovyExample("io/vertx/example/web/chat/server.groovy"); }
    }
 
    static class GroovyRealtimeRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/realtime/server.groovy");
        }
    }
 
    static class GroovySessionsRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/sessions/server.groovy");
        }
    }
 
    static class GroovyTemplatingRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/templating/mvel/server.groovy");
        }
    }
 
    static class GroovyRestRunner {
        public static void main(String[] args) {
            Runner.runGroovyExample("io/vertx/example/web/rest/simple_rest.groovy");
        }
    }
 
    // Ruby examples
 
    public static void runRubyExample(String scriptName) {
        runScriptExample(WEB_EXAMPLES_RUBY_DIR, scriptName, new VertxOptions().setClustered(false));
    }
 
    public static void runRubyExampleClustered(String scriptName) {
        runScriptExample(WEB_EXAMPLES_RUBY_DIR, scriptName, new VertxOptions().setClustered(true));
    }
 
    static class RubyAuthRunner {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/auth/server.rb");
        }
    }
 
    static class RubyAuthJDBC {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/authjdbc/server.rb");
        }
    }
    static class RubyHelloWorldRunner {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/helloworld/server.rb");
        }
    }
 
    static class RubyChatRunner {
        public static void main(String[] args) { Runner.runRubyExample("io/vertx/example/web/chat/server.rb");
        }
    }
 
    static class RubyRealtimeRunner {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/realtime/server.rb");
        }
    }
 
    static class RubySessionsRunner {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/sessions/server.rb");
        }
    }
 
    static class RubyTemplatingRunner {
        public static void main(String[] args) {
            Runner.runRubyExample("io/vertx/example/web/templating/mvel/server.rb");
        }
    }
 
    public static void runExample(String exampleDir, Class clazz, VertxOptions options, DeploymentOptions
            deploymentOptions) {
        runExample(exampleDir + clazz.getPackage().getName().replace(".", "/"), clazz.getName(), options, deploymentOptions);
    }
 
 
    public static void runScriptExample(String prefix, String scriptName, VertxOptions options) {
        File file = new File(scriptName);
        String dirPart = file.getParent();
        String scriptDir = prefix + dirPart;
        runExample(scriptDir, scriptDir + "/" + file.getName(), options, null);
    }
 
    public static void runExample(String exampleDir, String verticleID, VertxOptions options, DeploymentOptions deploymentOptions) {
        if (options == null) {
            // Default parameter
            options = new VertxOptions();
        }
        // Smart cwd detection
 
        // Based on the current directory (.) and the desired directory (exampleDir), we try to compute the vertx.cwd
        // directory:
        try {
            // We need to use the canonical file. Without the file name is .
            File current = new File(".").getCanonicalFile();
            if (exampleDir.startsWith(current.getName()) && !exampleDir.equals(current.getName())) {
                exampleDir = exampleDir.substring(current.getName().length() + 1);
            }
        } catch (IOException e) {
            // Ignore it.
        }
 
        System.setProperty("vertx.cwd", exampleDir);
        Consumer<Vertx> runner = vertx -> {
            try {
                if (deploymentOptions != null) {
                    vertx.deployVerticle(verticleID, deploymentOptions);
                } else {
                    vertx.deployVerticle(verticleID);
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        };
        if (options.isClustered()) {
            Vertx.clusteredVertx(options, res -> {
                if (res.succeeded()) {
                    Vertx vertx = res.result();
                    runner.accept(vertx);
                } else {
                    res.cause().printStackTrace();
                }
            });
        } else {
            Vertx vertx = Vertx.vertx(options);
            runner.accept(vertx);
        }
    }
}
 
复制代码

相关资源:vert.x vertx应用开发实例教程.pdf

猜你喜欢

转载自blog.csdn.net/mawei7510/article/details/118992404