mybatis的一个分库分表插件--jade

通过@Intercepts注解信息和@Signature注解信息可以了解到,JadeSQLInterceptor会拦截Executor.query(MappedStatement, Object, RowBounds, ResultHandler)和Executor.update(MappedStatement, Object)两个方法

@Intercepts({
        @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class,
                RowBounds.class, ResultHandler.class}),
        @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class JadeSQLInterceptor implements Interceptor {
    // ......
}

介绍完注解信息后,再来看看plugin()方法,具体实现如下:

    public Object plugin(Object target) {
        return Plugin.wrap(target, this);
    }
其中会解析JadeSQLInterceptor中的@Intercepts和@Signature注解的信息,从而确定需要拦截的方法,然后使用JDK动态代理的方式,为JadeSQLInterceptor创建代理对象。在该代理对象中,会拦截Executor.query(MappedStatement, Object, RowBounds, ResultHandler)和Executor.update(MappedStatement, Object),拦截的具体逻辑在JadeSQLInterceptor.intercept()方法中实现的,具体

    1 从调用者信息invocation中获取method信息,然后判断method是否有ShardBy注解,获取shard的param,

    2 根据这个param分表策略重写sql,再将重写后的sql设置到method的arg[1]中,

    3 调用target object的method方法执行后续操作

  public Object intercept(Invocation invocation) throws Throwable {
        //获取被拦截方法的参数列表  
        Object[] args = invocation.getArgs();
        MappedStatement statement = (MappedStatement) args[INDEX_MAPPED_STATEMENT];
        //获取dao类  
        Class dao = mapperClazz(statement);
        //获取dao中执行的method 
        Method method = shardMapperMethod(statement, dao); 
        // 在这里判断dao的method中是否有ShardBy注解,如果有,表明需要分表操作。返回ShardBy注解的Param,后面会用到  
        Object shardByObject = getExecuteParamByAnnotationClass(args, method, ShardBy.class);
        Modifier modifier = new Modifier(new Definition(dao), method);
        BoundSql boundSql = statement.getBoundSql(args[INDEX_PARAMETER]);
        // 如果需要分表则在这里进行分表分析  
        if (shardByObject != null) {

            Configuration configuration = statement.getConfiguration();
            // 获取dataSource,这个dataSource是spring的代理dataSource  
            DataSource dataSource = configuration.getEnvironment() != null ? configuration.getEnvironment().getDataSource() : null;
            //获取method中标有Param注解的信息  
            Map<String, Object> params = getJadeParameters(args, method);
            RouterInterpreter interpreter = BeanFactory.getBean("jade.routerInterpreter", RouterInterpreter.class);
            //设置到线程变量中,后面解析sql时会用到
            SQLThreadLocal.set(getSQLType(statement), boundSql.getSql(), modifier, params); 
            // 路由了数据源,且重写了sql语句  
            SQLInterpreterResult result = interpreter.interpret(dataSource, boundSql.getSql(), modifier, params, convertSQLArrayParams(params));
            // 将重写后的sql设置到invocation的method的param里  
            args[INDEX_MAPPED_STATEMENT] = buildMappedStatement(statement, boundSql, result.getSQL());
            SQLThreadLocal.set(getSQLType(statement), result.getSQL(), modifier, params);
        } else {
            SQLThreadLocal.set(getSQLType(statement), boundSql.getSql(), modifier, new HashMap<String, Object>());
        }
        try {
            // 调用method方法  
            return invocation.proceed();
        } finally {
            SQLThreadLocal.remove();
        }
    }

重写sql的逻辑在intercepter.interpret(DataSource datasource, String sql, Modify modify, Map paramAsMap, Object[] paramAsArray)这个方法中。如下:

    1 获取配置文件配置的数据源

    2 解析sql,得到路由信息

    3 依据路由后的结果重写了sql的表名

public SQLInterpreterResult interpret(DataSource dataSource, String sql, Modifier modifier, Map<String, Object> parametersAsMap, Object[] parametersAsArray) {
        if (dataSource instanceof DelegatingDataSource) {
            //配置文件中配置的数据源
            dataSource = ((DelegatingDataSource)dataSource).getTargetDataSource();
        }

        if (!(dataSource instanceof XnDataSource)) {
            return null;
        } else {
            Assert.notNull(parametersAsArray, "need parametersAsArray prepared before invoking this interpreter!");
            String bizName = ((XnDataSource)dataSource).getBizName();
            if (logger.isDebugEnabled()) {
                logger.debug("Invoking analyzing: " + sql);
            }

            SQLParseInfo parseInfo = SQLParseInfo.getParseInfo(sql);
            //获取sql中所有的table
            Table[] tables = parseInfo.getTables();
            RouterInterpreter.RoutingInfo routingInfo = null;
            if (tables != null) {
                int beginIndex = 0;
                if (parseInfo.isInsert() && tables.length > 1) {
                    beginIndex = 1;
                }

                for(int i = beginIndex; i < tables.length; ++i) {
                    RoutingDescriptor descriptor = this.routingConfigurator.getDescriptor(bizName, tables[i].getName());
                    if (descriptor != null) {
                        routingInfo = new RouterInterpreter.RoutingInfo(tables[i], descriptor);
                        break;
                    }
                }
            }

            if (routingInfo == null) {
                return null;
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Find routing info: " + routingInfo.byTable + ", " + routingInfo.getDbRouterColumn());
                }

                String forwardTableName = null;
                String forwardDbPattern = null;
                Object columnValue;
                Column column;
                if (routingInfo.getTableRouter() != null) {
                    column = routingInfo.getTableRouterColumn();
                    columnValue = null;
                    if (column != null) {
                        // 从线程变量中获取ShardBy的param
                        columnValue = findShardParamValue(parseInfo, column, parametersAsMap, parametersAsArray);
                        if (columnValue == null) {
                            throw new BadSqlGrammarException("sharding", parseInfo.getSQL(), (SQLException)null);
                        }
                    }
                    // 根据路由策略对shard的param进行路由,获取路由后的表名
                    forwardTableName = routingInfo.getTableRouter().doRoute(columnValue);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("table router is null for sql \"" + sql + "\"");
                }

                if (routingInfo.getDbRouter() != null) {
                    column = routingInfo.getDbRouterColumn();
                    columnValue = null;
                    if (column != null) {
                        columnValue = findShardParamValue(parseInfo, column, parametersAsMap, parametersAsArray);
                        if (columnValue == null) {
                            throw new BadSqlGrammarException("sharding", parseInfo.getSQL(), (SQLException)null);
                        }
                    }

                    forwardDbPattern = routingInfo.getDbRouter().doRoute(columnValue);
                    if (forwardDbPattern != null) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("db pattern is '" + forwardDbPattern + "'");
                        }

                        parametersAsMap.put(XnDataSource.DB_PATTERN, forwardDbPattern);
                    } else {
                        if (logger.isDebugEnabled()) {
                            logger.debug("db pattern is empty");
                        }

                        parametersAsMap.put(XnDataSource.DB_PATTERN, "");
                    }
                } else if (logger.isDebugEnabled()) {
                    logger.debug("db router is null for sql \"" + sql + "\"");
                }
                
                String byTableName = routingInfo.byTable.getName();
                String sqlRewrited;
                if (forwardTableName != null && !forwardTableName.equals(byTableName)) {
                    // 将sql中的原table名换成新的路由后的table名
                    sqlRewrited = SqlRewriter.rewriteSqlTable(sql, byTableName, forwardTableName);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Rewriting SQL: \n  From: " + sql + "\n  To:   " + sqlRewrited);
                    }
                } else {
                    sqlRewrited = sql;
                }
                // 返回重写sql的结果
                return new RouterSQLInterpreterResult(forwardDbPattern, sqlRewrited, parametersAsArray);
            }
        }
    }

重写了sql的table和路由后的db_pattern,将db_pattern设置到SQLThreadLocal线程变量中,在后续dataSource.getConnection()会用到db_pattern。下面介绍一下dataSource是如何根据db_pattern路由到物理库的。

1 首先配置数据源

 <bean id="adminJadeDataSource" class="cn.techwolf.boss.admin.base.mybatis.datasource.JadeDataSourceFactoryBean">
     <constructor-arg index="0" ref="jade.dataSourceFactory" />
     <constructor-arg index="1" value="boss_admin" />
 </bean>

2 配置SessionFactory,将此数据源与SessionFactory绑定

    <bean id="bossAdminSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="adminJadeDataSource" />
        <property name="configLocation" value="classpath:mybatis/mybatis-config.xml" />
        <property name="failFast" value="true"/>
        <property name="mapperLocations" value="classpath:mybatis/admin/*DAO.xml" />
    </bean>

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="bossAdminSessionFactory" />
        <property name="basePackage" ref="adminPackageValue"/>
        <!--<property name="nameGenerator" ref="defaultDAOBeanNameGenerator" />-->
    </bean>

    <!--统一声明各个datasource的dao-->
    <bean id="adminPackageValue" class="java.lang.String">
        <constructor-arg>
            <value>
                cn.admin.*.dao.admin
            </value>
        </constructor-arg>
    </bean>

在cn.admin.*.dao.admin包下了mapper文件中的sql执行都会走此逻辑数据源,执行sql时在此逻辑数据源中getConnection()会根据db_pattern选择物理库,下面分析一下dataSource是如何路由到物理库的。

    String pattern = (String)local.getParameters().get(DB_PATTERN);
    if (pattern == null) {
       if (this.logger.isDebugEnabled()) {
           this.logger.debug("not found DB_PATTERN, using default patter '' for SQL '" + local.getSql() + "'");
       }

       pattern = "";
   }

   Connection conn;
   if (!write && !TransactionSynchronizationManager.isSynchronizationActive()) {
       conn = this.connectionManager.getReadConnection(this.getBizName(), pattern);
   } else {
       conn = this.connectionManager.getWriteConnection(this.getBizName(), pattern);
   }

在JadeSQLIntecepter中将DB_PATTERN放入到了SOLThreadLocal,例如message加上pattern后缀变成message_70,然后根据配置文件中的bizName也就是逻辑库名和db_pattern来获取connection,connection的获取过程如下:

1 DbAgent根据db也就是bizName如message来获取DataSourcePool,逻辑库与实体库的映射关系配置在zookeeper上,如下的配置

<instance name="message_box" timestamp="2017-11-01 16:00:00" type="router">
        <route expression="message_box_[0-9]|boss_message_box_[1-9][0-9]|message_box_[1-4][0-9][0-9]" instance="message_box_0"/>
        <route expression="message_box_[5-9][0-9][0-9]" instance="message_box_1"/>
    </instance>

type="router"表示message_box这个逻辑库走分库路由,其中message_box_0至message_box_499走message_0库,message_box_500至message_box_999走message_1库。配置信息交给zookeeper管理,并对其进行监听。

    public StormDataSourcePool getDsPool(String db) {
        StormDataSourcePool ds = (StormDataSourcePool)this.dsPool.get(db);
        if (ds == null) {
            synchronized(this) {
                if (ds == null) {
                    this.registerClient(db);
                    this.watchDbConfig(db);
                    log.debug("not found ds for db " + db + " , retirve");
                    this.compareAndReload(this.retriveDbConfig(db));
                }
            }
        }

        return (StormDataSourcePool)this.dsPool.get(db);
    }

根据配置信息的type为router,StormDataSourcePool指向了其子类RouterDsPool,router的路由解析就是在这个RouterDsPool实例的getReadableDs(String pattern)或者getWriteableDs(String pattern)中完成的,调用了findDataSource(String pattern),如下:

    protected StormDataSourcePool findDataSource(String pattern) {
        Iterator i$ = this.config.getRoutes().iterator();

        RouteConfig route;
        do {
            if (!i$.hasNext()) {
                throw new NoRouteMatchExecption(pattern, this.config.getName());
            }

            route = (RouteConfig)i$.next();
            log.debug("Comparing " + pattern + " aginst " + route.getExpression());
        } while(!pattern.matches(route.getExpression()));

        return this.agent.getDsPool(route.getInstance());
    }

匹配了pattern和配置信息中的expression,得到instance,如message_box_75匹配后得到message_box_0,再从zk中获取message_box_0的配置信息,

<instance name="message_box_0" timestamp="2017-11-01 15:00:00" type="singler">
    	<server charset="utf8" database="message_box" host="192.168.1.1" password="123456" port="3306" priority="" type="mysql" user="test" wrflag="wr"/>
    </instance>

配置信息中配置了database的信息,host,port,user, password, 读写库标识等,type为singer,表示不分库,至此得到此连接信息。返回此连接



猜你喜欢

转载自blog.csdn.net/zongyeqing/article/details/80154994