本文共 6331 字,大约阅读时间需要 21 分钟。
传统的单条INSERT语句逐行插入方式,在处理1万条数据时往往需要数秒,这不仅会导致事务锁竞争加剧,更可能引发连接超时等系统性风险。那么如何优化这种批量插入的场景呢?让我们一起探索吧!
每次插入都独立发往数据库执行,那么每个SQL语句的网络通信都可能造成延迟。
每次插入通常会涉及一次事务的开启和提交(如果没有显式地控制事务)。
如果大量插入数据时,逐个插入的方式会增加锁的争用,尤其是当数据表上有多个事务并发访问时,插入的性能会进一步下降。
在进行批量插入时,将多条INSERT语句放在一个事务中(大事务)比独立事务通常性能更好,原因如下:
事务提交开销
独立事务需要执行COMMIT操作,这会触发MySQL的日志持久化(如redo log的fsync操作),导致磁盘I/O开销。同时,多次发送开启事务和提交事务的操作,带来了额外的网络开销。大事务只需一次COMMIT,减少了日志刷盘的次数,从而显著降低I/O等待时间。锁竞争与锁释放
独立事务每次提交会释放行锁,下一次插入时需重新获取锁,增加了锁竞争的开销。大事务在整个过程中持有锁(如行锁),减少了锁的重复获取和释放,提高并发效率。日志写入优化
对于InnoDB引擎,事务日志(redo log)以顺序追加方式写入。大事务的日志批量写入比多次小事务的分散写入更高效,减少了日志缓冲区切换和磁盘寻址开销。将多条INSERT语句合并成一条INSERT语句(例如INSERT INTO table (col1, col2) VALUES (val1, val2), (val3, val4), ...
)也能提高性能,原因包括:
实现合并INSERT有两种常见方式:
<foreach>
标签。rewriteBatchedStatements=true
,如jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true
。JDBC会帮我们完成SQL的合并。InnoDB为保证自增ID的全局唯一性,在分配自增值时会持有自增锁(AUTO-INC Lock)。在高并发情况下,多个插入操作会频繁争抢数据库的自增ID,这可能导致锁的竞争和性能瓶颈。
使用预生成ID会有更好的性能表现,比如预先生成雪花ID。避免在数据库层面加锁解锁影响性能。
rewriteBatchedStatements=true
): spring: datasource: url: jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true&useServerPrepStmts=false username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver
user
: CREATE TABLE `user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(100) DEFAULT NULL, `age` int(11) DEFAULT NULL, `email` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Data@TableName("user")public class User { private Long id; private String name; private Integer age; private String email;}
private ListprepareTestData(int count) { List users = new ArrayList<>(count); for (int i = 0; i < count; i++) { User user = new User(); user.setId(null); // 自增ID user.setName("test" + i); user.setAge(20 + i % 50); user.setEmail("test" + i + "@test.com"); users.add(user); } return users;}
TRUNCATE TABLE user;ALTER TABLE user AUTO_INCREMENT = 1;
@Testpublic void testSingleTransactionInsert() { Listusers = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (User user : users) { userMapper.insert(user); } stopWatch.stop(); System.out.println("独立事务循环插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:独立事务循环插入耗时: 12485ms
@Test@Transactional // 开始事务public void testBigTransactionInsert() { Listusers = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (User user : users) { userMapper.insert(user); } stopWatch.stop(); System.out.println("大事务循环插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:大事务循环插入耗时: 9565ms
@Mapperpublic interface UserMapper extends BaseMapper{ // 方便测试,直接将SQL写到注解 @Insert( " " ) void batchInsert(@Param("users") List users);}@Testpublic void testMybatisForeachInsert() { List users = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); userMapper.batchInsert(users); stopWatch.stop(); System.out.println("MyBatis foreach批量插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:MyBatis foreach批量插入耗时: 891ms
@Testpublic void testJdbcBatchInsert() { Listusers = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); jdbcTemplate.batchUpdate( "INSERT INTO user (name, age, email) VALUES (?, ?, ?)", new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { User user = users.get(i); ps.setString(1, user.getName()); ps.setInt(2, user.getAge()); ps.setString(3, user.getEmail()); } @Override public int getBatchSize() { return users.size(); } } ); stopWatch.stop(); System.out.println("JDBC batch插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:JDBC batch插入耗时: 587ms
@Testpublic void testMybatisBatchInsert() { Listusers = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { UserMapper mapper = sqlSession.getMapper(UserMapper.class); for (User user : users) { mapper.insert(user); } sqlSession.commit(); } stopWatch.stop(); System.out.println("MyBatis SqlSession批量插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:MyBatis SqlSession批量插入耗时: 825ms
@Testpublic void testMybatisPlusBatchInsert() { Listusers = prepareTestData(10000); StopWatch stopWatch = new StopWatch(); stopWatch.start(); userService.saveBatch(users, 1000); stopWatch.stop(); System.out.println("MyBatis-Plus批量插入耗时: " + stopWatch.getTotalTimeMillis() + "ms");}
测试结果:MyBatis-Plus批量插入耗时: 860ms
基于测试结果,性能排名如下:
问题的核心在于:如何“攒一波”数据来实现批量插入? 推荐批量插入与MQ配合使用。将需要插入的数据的消息发送给MQ,生产者需要保证MQ的消息发送和本地事务的原子性。对于消费者,一次性拉取多个消息进行批量插入。当消息消费失败时,可以让MQ重新投递消息并重新消费。
不过,MQ的引入和积攒数据,带来的数据插入的延迟是不可避免的,同时只能保证最终一致性而不是强一致。
转载地址:http://cfffk.baihongyu.com/