• 聊聊ShardingSphere是怎么进行sql重写的


    本文主要研究一下ShardingSphere进行sql重写的原理

    prepareStatement

    org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java

    public final class ShardingSphereConnection extends AbstractConnectionAdapter {
    
        @Override
        public PreparedStatement prepareStatement(final String sql) throws SQLException {
            return new ShardingSpherePreparedStatement(this, sql);
        }
    
        //......
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement

    ShardingSpherePreparedStatement

    org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

    public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
        
        @Getter
        private final ShardingSphereConnection connection;
    
        public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
            this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
        }
    
        private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                                final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
                                                final String[] columns) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new EmptySQLException().toSQLException();
            }
            this.connection = connection;
            metaDataContexts = connection.getContextManager().getMetaDataContexts();
            SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
            hintValueContext = sqlParserRule.isSqlCommentParseEnabled() ? new HintValueContext() : SQLHintUtils.extractHint(sql).orElseGet(HintValueContext::new);
            this.sql = sqlParserRule.isSqlCommentParseEnabled() ? sql : SQLHintUtils.removeHint(sql);
            statements = new ArrayList<>();
            parameterSets = new ArrayList<>();
            SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(
                    DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()));
            sqlStatement = sqlParserEngine.parse(this.sql, true);
            sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData(), sqlStatement, connection.getDatabaseName());
            parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
            statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
            executor = new DriverExecutor(connection);
            JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());
            batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName());
            kernelProcessor = new KernelProcessor();
            statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData());
            trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
            selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
            statementManager = new StatementManager();
        }
    
        //......
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    ShardingSpherePreparedStatement继承了AbstractPreparedStatementAdapter,其构造器主要是通过SQLParserEngine解析sql得到SQLStatement,创建DriverExecutor、BatchPreparedStatementExecutor、KernelProcessor、StatementManager;这里即使useServerPrepStmts=true,也不会触发mysql server的prepare操作

    executeUpdate

        public int executeUpdate() throws SQLException {
            try {
                if (statementsCacheable && !statements.isEmpty()) {
                    resetParameters();
                    return statements.iterator().next().executeUpdate();
                }
                clearPrevious();
                QueryContext queryContext = createQueryContext();
                trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
                if (null != trafficInstanceId) {
                    JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
                    return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
                }
                executionContext = createExecutionContext(queryContext);
                if (hasRawExecutionRule()) {
                    Collection executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback());
                    return accumulate(executeResults);
                }
                return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
                // CHECKSTYLE:OFF
            } catch (final RuntimeException ex) {
                // CHECKSTYLE:ON
                handleExceptionInTransaction(connection, metaDataContexts);
                throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
            } finally {
                clearBatch();
            }
        }
    
        private void clearPrevious() {
            statements.clear();
            parameterSets.clear();
            generatedValues.clear();
        }
    
        private ExecutionContext createExecutionContext(final QueryContext queryContext) {
            ShardingSphereRuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
            ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
            SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
            ExecutionContext result = kernelProcessor.generateExecutionContext(
                    queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
            findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    这里executeUpdate会先执行clearPrevious方法,清空statements、parameterSets、generatedValues,然后createExecutionContext,这里有一步是kernelProcessor.generateExecutionContext

    KernelProcessor

    generateExecutionContext

    shardingsphere-infra-context-5.4.0-sources.jar!/org/apache/shardingsphere/infra/connection/kernel/KernelProcessor.java

        public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                                         final ConfigurationProperties props, final ConnectionContext connectionContext) {
            RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext);
            SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext);
            ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult);
            logSQL(queryContext, props, result);
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    KernelProcessor的generateExecutionContext方法先创建routeContext,然后执行rewrite,最后执行createExecutionContext

    rewrite

        private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                         final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {
            SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);
            return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    rewrite主要是通过SQLRewriteEntry的rewrite方法进行的

    SQLRewriteEntry

    shardingsphere-infra-rewrite-5.4.0-sources.jar!/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java

        /**
         * Rewrite.
         * 
         * @param sql SQL
         * @param params SQL parameters
         * @param sqlStatementContext SQL statement context
         * @param routeContext route context
         * @param connectionContext connection context
         * @param hintValueContext hint value context
         * 
         * @return route unit and SQL rewrite result map
         */
        public SQLRewriteResult rewrite(final String sql, final List params, final SQLStatementContext sqlStatementContext,
                                        final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {
            SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext, hintValueContext);
            SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);
            DatabaseType protocolType = database.getProtocolType();
            Map storageTypes = database.getResourceMetaData().getStorageTypes();
            return routeContext.getRouteUnits().isEmpty()
                    ? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext)
                    : new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);
        }
    
        private SQLRewriteContext createSQLRewriteContext(final String sql, final List params, final SQLStatementContext sqlStatementContext,
                                                          final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {
            SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext, hintValueContext);
            decorate(decorators, result, routeContext, hintValueContext);
            result.generateSQLTokens();
            return result;
        }
    
        private void decorate(final Map decorators, final SQLRewriteContext sqlRewriteContext,
                              final RouteContext routeContext, final HintValueContext hintValueContext) {
            if (hintValueContext.isSkipSQLRewrite()) {
                return;
            }
            for (Entry entry : decorators.entrySet()) {
                entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    SQLRewriteEntry的rewrite方法,先通过createSQLRewriteContext来创建SQLRewriteContext,这里通过decorate方法遍历decorators,挨个执行SQLRewriteContextDecorator的decorate方法;最后通过GenericSQLRewriteEngine或者RouteSQLRewriteEngine进行rewrite

    SQLRewriteContextDecorator

    org/apache/shardingsphere/infra/rewrite/context/SQLRewriteContextDecorator.java

    @SingletonSPI
    public interface SQLRewriteContextDecorator extends OrderedSPI {
        
        /**
         * Decorate SQL rewrite context.
         *
         * @param rule rule
         * @param props ShardingSphere properties
         * @param sqlRewriteContext SQL rewrite context to be decorated
         * @param routeContext route context
         */
        void decorate(T rule, ConfigurationProperties props, SQLRewriteContext sqlRewriteContext, RouteContext routeContext);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    SQLRewriteContextDecorator定义了decorate方法,它有诸如ShardingSQLRewriteContextDecorator、EncryptSQLRewriteContextDecorator的实现类

    EncryptSQLRewriteContextDecorator

    org/apache/shardingsphere/encrypt/rewrite/context/EncryptSQLRewriteContextDecorator.java

    /**
     * SQL rewrite context decorator for encrypt.
     */
    public final class EncryptSQLRewriteContextDecorator implements SQLRewriteContextDecorator {
        
        @Override
        public void decorate(final EncryptRule encryptRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
            SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();
            if (!containsEncryptTable(encryptRule, sqlStatementContext)) {
                return;
            }
            Collection encryptConditions = createEncryptConditions(encryptRule, sqlRewriteContext);
            if (!sqlRewriteContext.getParameters().isEmpty()) {
                Collection parameterRewriters = new EncryptParameterRewriterBuilder(encryptRule,
                        sqlRewriteContext.getDatabaseName(), sqlRewriteContext.getSchemas(), sqlStatementContext, encryptConditions).getParameterRewriters();
                rewriteParameters(sqlRewriteContext, parameterRewriters);
            }
            Collection sqlTokenGenerators = new EncryptTokenGenerateBuilder(encryptRule,
                    sqlStatementContext, encryptConditions, sqlRewriteContext.getDatabaseName()).getSQLTokenGenerators();
            sqlRewriteContext.addSQLTokenGenerators(sqlTokenGenerators);
        }
        
        private Collection createEncryptConditions(final EncryptRule encryptRule, final SQLRewriteContext sqlRewriteContext) {
            SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();
            if (!(sqlStatementContext instanceof WhereAvailable)) {
                return Collections.emptyList();
            }
            Collection whereSegments = ((WhereAvailable) sqlStatementContext).getWhereSegments();
            Collection columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();
            return new EncryptConditionEngine(encryptRule, sqlRewriteContext.getSchemas())
                    .createEncryptConditions(whereSegments, columnSegments, sqlStatementContext, sqlRewriteContext.getDatabaseName());
        }
        
        private boolean containsEncryptTable(final EncryptRule encryptRule, final SQLStatementContext sqlStatementContext) {
            for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
                if (encryptRule.findEncryptTable(each).isPresent()) {
                    return true;
                }
            }
            return false;
        }
        
        private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection parameterRewriters) {
            for (ParameterRewriter each : parameterRewriters) {
                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
            }
        }
        
        @Override
        public int getOrder() {
            return EncryptOrder.ORDER;
        }
        
        @Override
        public Class getTypeClass() {
            return EncryptRule.class;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行

    SQLToken

    @RequiredArgsConstructor
    @Getter
    public abstract class SQLToken implements Comparable {
        
        private final int startIndex;
        
        @Override
        public final int compareTo(final SQLToken sqlToken) {
            return startIndex - sqlToken.startIndex;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    SQLToken它有诸如InsertValuesToken、SubstitutableColumnNameToken、InsertColumnsToken之类的实现类

    RouteSQLRewriteEngine

        /**
         * Rewrite SQL and parameters.
         *
         * @param sqlRewriteContext SQL rewrite context
         * @param routeContext route context
         * @return SQL rewrite result
         */
        public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
            Map sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);
            for (Entry> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
                Collection routeUnits = entry.getValue();
                if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
                    sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
                } else {
                    addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);
                }
            }
            return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));
        }
    
        private void addSQLRewriteUnits(final Map sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,
                                        final RouteContext routeContext, final Collection routeUnits) {
            for (RouteUnit each : routeUnits) {
                sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));
            }
        }
    
        private Map translate(final SQLStatement sqlStatement, final Map sqlRewriteUnits) {
            Map result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1F);
            for (Entry entry : sqlRewriteUnits.entrySet()) {
                DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName());
                String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType);
                SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters());
                result.put(entry.getKey(), sqlRewriteUnit);
            }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    addSQLRewriteUnits是往sqlRewriteUnits添加SQLRewriteUnit,最后translate方法构建SQLRewriteUnit;SQLRewriteUnit包含了更改之后的sql以及对应改动后的参数

    useDriverToExecuteUpdate

    org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

        private int useDriverToExecuteUpdate() throws SQLException {
            ExecutionGroupContext executionGroupContext = createExecutionGroupContext();
            cacheStatements(executionGroupContext.getInputGroups());
            return executor.getRegularExecutor().executeUpdate(executionGroupContext,
                    executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
        }
    
        private ExecutionGroupContext createExecutionGroupContext() throws SQLException {
            DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine();
            return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getDatabaseName()));
        } 
    
        private void cacheStatements(final Collection> executionGroups) throws SQLException {
            for (ExecutionGroup each : executionGroups) {
                each.getInputs().forEach(eachInput -> {
                    statements.add((PreparedStatement) eachInput.getStorageResource());
                    parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
                });
            }
            replay();
        }
    
        private void replay() throws SQLException {
            replaySetParameter();
            for (Statement each : statements) {
                getMethodInvocationRecorder().replay(each);
            }
        }
    
        private void replaySetParameter() throws SQLException {
            for (int i = 0; i < statements.size(); i++) {
                replaySetParameter(statements.get(i), parameterSets.get(i));
            }
        }
    
        protected final void replaySetParameter(final PreparedStatement preparedStatement, final List params) throws SQLException {
            setParameterMethodInvocations.clear();
            addParameters(params);
            for (PreparedStatementInvocationReplayer each : setParameterMethodInvocations) {
                each.replayOn(preparedStatement);
            }
        }
    
        private void addParameters(final List params) {
            int i = 0;
            for (Object each : params) {
                int index = ++i;
                setParameterMethodInvocations.add(preparedStatement -> preparedStatement.setObject(index, each));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement
    该方法委托给executor.getRegularExecutor().executeUpdate,最后一个参数为callback,即createExecuteUpdateCallback

    DriverExecutionPrepareEngine.prepare

    org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java

        public final ExecutionGroupContext prepare(final RouteContext routeContext, final Collection executionUnits,
                                                      final ExecutionGroupReportContext reportContext) throws SQLException {
            return prepare(routeContext, Collections.emptyMap(), executionUnits, reportContext);
        }
    
        public final ExecutionGroupContext prepare(final RouteContext routeContext, final Map connectionOffsets, final Collection executionUnits,
                                                      final ExecutionGroupReportContext reportContext) throws SQLException {
            Collection> result = new LinkedList<>();
            for (Entry> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {
                String dataSourceName = entry.getKey();
                List sqlUnits = entry.getValue();
                List> sqlUnitGroups = group(sqlUnits);
                ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
                result.addAll(group(dataSourceName, connectionOffsets.getOrDefault(dataSourceName, 0), sqlUnitGroups, connectionMode));
            }
            return decorate(routeContext, result, reportContext);
        }
    
        protected List> group(final String dataSourceName, final int connectionOffset, final List> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {
            List> result = new LinkedList<>();
            List connections = databaseConnectionManager.getConnections(dataSourceName, connectionOffset, sqlUnitGroups.size(), connectionMode);
            int count = 0;
            for (List each : sqlUnitGroups) {
                result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));
            }
            return result;
        }
    
        private ExecutionGroup createExecutionGroup(final String dataSourceName, final List sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {
            List result = new LinkedList<>();
            for (SQLUnit each : sqlUnits) {
                result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), statementManager, connection, connectionMode, option, databaseTypes.get(dataSourceName)));
            }
            return new ExecutionGroup<>(result);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    group方法调用遍历SQLUnit执行createExecutionGroup,而后者则执行sqlExecutionUnitBuilder.build;这里databaseConnectionManager.getConnections获取的connection是通过真正driver获取的connection(com.mysql.jdbc.Driver)

    PreparedStatementExecutionUnitBuilder

    org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java

        public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager,
                                       final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {
            PreparedStatement preparedStatement = createPreparedStatement(
                    executionUnit, statementManager, connection, connectionMode, option, databaseType);
            return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);
        }
    
        private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection,
                                                          final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {
            return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement

    StatementManager

    org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java

        public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option,
                                               final DatabaseType databaseType) throws SQLException {
            Statement result = cachedStatements.get(new CacheKey(executionUnit, connectionMode));
            if (null == result || result.isClosed() || result.getConnection().isClosed()) {
                String sql = executionUnit.getSqlUnit().getSql();
                if (option.isReturnGeneratedKeys()) {
                    result = null == option.getColumns() || 0 == option.getColumns().length
                            ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                            : connection.prepareStatement(sql, option.getColumns());
                } else {
                    result = connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
                }
                cachedStatements.put(new CacheKey(executionUnit, connectionMode), result);
            }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql

    createExecuteUpdateCallback

    org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

        private JDBCExecutorCallback createExecuteUpdateCallback() {
            boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
            return new JDBCExecutorCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
                    metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown) {
                
                @Override
                protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                    return ((PreparedStatement) statement).executeUpdate();
                }
                
                @Override
                protected Optional getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
                    return Optional.empty();
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    createExecuteUpdateCallback创建的JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement

    小结

    • ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement,它在ShardingSpherePreparedStatement的executeUpdate的时候进行sql重写,然后prepare,最后执行的时候是通过JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement
    • rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行
    • PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement:它通过StatementManager.createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql
    • useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement

    ShardingSpherePreparedStatement实现了java.sql.PreparedStatement接口,其sql属性是用户传入的sql,即未经过重写的sql,而实际execute的时候,会触发sql重写(包括重写sql语句及参数),最后会通过connection.prepareStatement(传入重写之后的sql)来创建真正的PrepareStatement,然后有一步replay操作,把重写后的参数作用到真正的PrepareStatement,最后通过((PreparedStatement) statement).executeUpdate()来触发执行
    至此我们可以得到sql重写的一个基本思路:通过实现java.sql.PreparedStatement接口伪装一个PreparedStatement类,其创建和set参数先内存缓存起来,之后在execute的时候进行sql重写,创建真正的PreparedStatement,replay参数,执行execute方法

  • 相关阅读:
    JVM学习之 内存结构
    Flutter 中的照片管理器(photo_manager):简介与使用指南
    基于小波神经网络的数据分类算法matlab仿真
    模板的特化(具体化)
    阿里P9大牛徒手编写的这份十亿级并发手册,教你彻底玩懂高并发,赶紧收藏
    Mybatis
    一文带你玩转offer-01
    java学习之git的基本使用
    Web前端:前端开发人员与后端开发人员,谁最适合你的业务?
    爆破shadow文件密码脚本(完成版)
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/132684229