Batch提交
Batch提交是用一个Connection来对若干个SQL进行一次性、批量提交给MySQL处理。但有一个问题是,如果某个SQL 挂了而其他的并不会受影响,这样就无法保证原子性,只能再封装一层Transaction来实现。 同时,Batch处理并不是每个Server都支持的,需要看具体的产品和版本号。
代码
我们通过观察具体的客户端实现代码来了解批量提交是如何实现的 spring-jdbc-5.1.3.RELEASE中的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public int[] batchUpdate(String sql, SqlParameterSource[] batchArgs) {
if (batchArgs.length == 0) {
return new int[0];
}
ParsedSql parsedSql = getParsedSql(sql);
PreparedStatementCreatorFactory pscf = getPreparedStatementCreatorFactory(parsedSql, batchArgs[0]);
// 调用另外一个batchUpdate方法
return getJdbcOperations().batchUpdate(
pscf.getSql(),
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Object[] values = NamedParameterUtils.buildValueArray(parsedSql, batchArgs[i], null);
pscf.newPreparedStatementSetter(values).setValues(ps);
}
@Override
public int getBatchSize() {
return batchArgs.length;
}
});
}
第二个batchUpdate方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
if (logger.isDebugEnabled()) {
logger.debug("Executing SQL batch update [" + sql + "]");
}
// 调用execute方法,传入sql和回调接口
int[] result = execute(sql, (PreparedStatementCallback<int[]>) ps -> {
try {
int batchSize = pss.getBatchSize();
InterruptibleBatchPreparedStatementSetter ipss =
(pss instanceof InterruptibleBatchPreparedStatementSetter ?
(InterruptibleBatchPreparedStatementSetter) pss : null);
if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
// 如果支持批处理操作则依次将每个SQL的参数进行填充
for (int i = 0; i < batchSize; i++) {
pss.setValues(ps, i);
if (ipss != null && ipss.isBatchExhausted(i)) {
break;
}
ps.addBatch();
}
// 执行批处理函数(底层交给jdbc driver的实现类去执行,如mysql-connector-java)
return ps.executeBatch();
}
else {
List<Integer> rowsAffected = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
pss.setValues(ps, i);
if (ipss != null && ipss.isBatchExhausted(i)) {
break;
}
rowsAffected.add(ps.executeUpdate());
}
int[] rowsAffectedArray = new int[rowsAffected.size()];
for (int i = 0; i < rowsAffectedArray.length; i++) {
rowsAffectedArray[i] = rowsAffected.get(i);
}
return rowsAffectedArray;
}
}
finally {
if (pss instanceof ParameterDisposer) {
((ParameterDisposer) pss).cleanupParameters();
}
}
});
Assert.state(result != null, "No result array");
return result;
}
调用的execute方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
throws DataAccessException {
Assert.notNull(psc, "PreparedStatementCreator must not be null");
Assert.notNull(action, "Callback object must not be null");
if (logger.isDebugEnabled()) {
String sql = getSql(psc);
logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
}
Connection con = DataSourceUtils.getConnection(obtainDataSource());
PreparedStatement ps = null;
try {
ps = psc.createPreparedStatement(con);
applyStatementSettings(ps);
T result = action.doInPreparedStatement(ps);
handleWarnings(ps);
return result;
}
catch (SQLException ex) {
// Release Connection early, to avoid potential connection pool deadlock
// in the case when the exception translator hasn't been initialized yet.
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
String sql = getSql(psc);
JdbcUtils.closeStatement(ps);
ps = null;
DataSourceUtils.releaseConnection(con, getDataSource());
con = null;
throw translateException("PreparedStatementCallback", sql, ex);
}
finally {
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
JdbcUtils.closeStatement(ps);
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
上面是spring-jdbc的batchUpdate逻辑的实现,但是具体的业务执行是由Server来执行的,所以不同的Server的实现机制不同可能对性能有较大影响。 以MySQL为例,在mysql-connector-java-5.1.43的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public boolean canRewriteAsMultiValueInsertAtSqlLevel() throws SQLException {
return this.parseInfo.canRewriteAsMultiValueInsert;
}
@Override
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
}
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
// we timeout the entire batch, not individual statements
int batchTimeout = this.timeoutInMillis;
this.timeoutInMillis = 0;
resetCancelledState();
try {
statementBegins();
clearWarnings();
// 注意这里的条件,batchHasPlainStatements默认初始化就是false,而rewriteBatchedStatements
// 需要在jdbc url里制定,否则不会走合并SQL的逻辑,如jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true
if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
if (canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
&& this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
return executeBatchSerially(batchTimeout);
} finally {
this.statementExecuting.set(false);
clearBatch();
}
}
}
/**
* Rewrites the already prepared statement into a multi-value insert
* statement of 'statementsPerBatch' values and executes the entire batch
* using this new statement.
*
* @return update counts in the same fashion as executeBatch()
*
* @throws SQLException
*/
protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
......
}
如何整合
根据DML的类型是insert、update还是delete,会走不同方法:
- 如果是insert语句,满成条件情况下,会整合成:”insert into xxx_table values (xx),(yy),(zz)…”这样的语句。
- 如果是update\delete语句,满成条件情况下,会整合成:”update t set … where id = 1; update t set … where id = 2; update t set … where id = 3 …“这样的语句。
总结
如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:
- 需要在jdbcUrl后添加参数rewriteBatchedStatements=true
- this.batchHasPlainStatements为false
- 如果是update\delete 语句,还需要mysql版本>=4.1.0,并且batch的数量>3
References
- https://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
- https://blog.csdn.net/qq271859852/article/details/79562262
本文首次发布于 LiuShuo’s Blog, 转载请保留原文链接.