package com.chen.controller;
import com.chen.utils.BaseDao;
import com.chen.utils.DataImport;
import com.chen.utils.StreamUtil;
import org.junit.Test;
import java.io.File;
import java.sql.Connection;
import java.util.List;
public class BigData1000wTest {
@Test
public void BigData1000wTest() throws Exception {
// 通过JDBCUtil工具类获取数据库连接对象
Connection conn = BaseDao.getConn();
// StreamUtil是已经封装好的使用流读取文件的工具类
List<String> list = StreamUtil.readingLineFormTextFile(new File("D://milliondatatest//test(500W).csv"));
String sql = "insert into mysqltest values(?,?,?,?)"; // 定义要导入数据的sql,无需主键将第一个?设置为null
long start = System.currentTimeMillis(); // 获取方法开始执行前的时间(单位:毫秒)
// 调用刚刚封装好的工具类
DataImport.dispose(conn, list, 0, true, 1000000, sql);
long end = System.currentTimeMillis(); // 获取方法执行结束后的时间
// 相减即可得到插入所有数据的耗时 秒=毫秒/1000;
System.out.println("成功导入" + list.size() + "条数据!!时长:" + (end - start) / 1000 + "秒");
}
}
package com.chen.utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class BaseDao { // 静态工具类,用于创建数据库连接对象和释放资源,方便调用
// 导入驱动jar包或添加Maven依赖(这里使用的是Maven,Maven依赖代码附在文末)
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
// 获取数据库连接对象
public static Connection getConn() {
Connection conn = null;
try {
// rewriteBatchedStatements=true,一次插入多条数据,只插入一次
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/million-test?rewriteBatchedStatements=true", "root", "123456");
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return conn;
}
// 释放资源
public static void closeAll(AutoCloseable... autoCloseables) {
for (AutoCloseable autoCloseable : autoCloseables) {
if (autoCloseable != null) {
try {
autoCloseable.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
package com.chen.utils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
public class DataImport {
// 参数一:数据库连接对象、参数二:流文件读取出的集合、参数三:从第几条数据开始读取,目的是排除表头、参数四:是否包含主键、参数五:每次批量执行添加数据的数量、参数六:sql语句
public static void dispose(Connection conn, List<String> list, Integer startRows, boolean includePrimaryKey, Integer size, String sql) {
try {
conn.setAutoCommit(false); // 设置事物手动提交
PreparedStatement ps = conn.prepareStatement(sql);
String[] split = null;
if (includePrimaryKey) { // 包含主键,只需判断一次
for (int i = startRows; i < list.size(); i++) {
// 按逗号切割字符串,-1代表忽略数组长度,避免数组长度越界异常
split = list.get(i).split(",", -1);
/*下方代码产生警告提示的原因:同一项目中,有重复的代码块(idea很好的提示。但是这里无法将判断放在循环内,不然会多出百万次判断使程序缓慢)*/
for (int j = 0; j < split.length; j++) { // 遍历刚刚获取的数组
// 对集合中的每条数据进行处理,将字符串中多出的引号去掉,避免录入数据库时因字段类型不匹配而导致的格式转换异常
ps.setObject(j + 1, split[j].replace("\"", "")); // 循环赋值
}
ps.addBatch(); // 将所有数据转为一条sql
if (i % size == 0 && i != 0) { // 如果i能整除size,即执行循环体
ps.executeBatch(); // 批量执行sql
conn.commit(); // 事物手动提交
conn.setAutoCommit(false); // 重新设置事物为手动提交
ps = conn.prepareStatement(sql); // 再次为ps对象赋值
}
}
} else { // 不包含主键
for (int i = startRows; i < list.size(); i++) {
String s = list.get(i);
// 将集合中的对象从第一个逗号切割,substring包头不包尾,因此此处需加1
split = s.substring(s.indexOf(",") + 1).split(",", -1);
for (int j = 0; j < split.length; j++) {
ps.setObject(j + 1, split[j].replace("\"", ""));
}
ps.addBatch();
if (i % size == 0 && i != 0) {
ps.executeBatch();
conn.commit();
conn.setAutoCommit(false);
ps = conn.prepareStatement(sql);
}
}
}
ps.executeBatch(); // 循环外提交是因为可能会出现循环内条件不成立而未提交过的情况
conn.commit(); // 提交事物,避免脏数据(事物太长也有弊端)
ps.close(); // 关闭资源
conn.close();
} catch (Exception throwables) {
throwables.printStackTrace();
}
}
}
package com.chen.utils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
public class StreamUtil {
// 批量关闭流
public static void closings(AutoCloseable... closeables) throws Exception {
// 非空判断
if (closeables != null) {
// 循环关闭
for (AutoCloseable auto : closeables) {
// 非空判断
if (auto != null) {
auto.close();
}
}
}
}
// 按行读取文本文件(返回List集合)
// 默认UTF-8格式
public static List<String> readingLineFormTextFile(File textFile) throws Exception {
return readingLineFormTextFile(textFile, "UTF-8");
}
// 自定义编码格式
public static List<String> readingLineFormTextFile(File textFile, String encode) throws Exception {
// 存放结果
List<String> list = new ArrayList<String>();
// 字符输入流
FileReader fr = new FileReader(textFile);
// 缓冲字符输入流
BufferedReader br = new BufferedReader(fr);
// 用于存入复制的数据
String str = null;
// 复制,如果读取的不为空,证明读取到了数据
while ((str = br.readLine()) != null) {
list.add(str);
}
// 关流
closings(br);
closings(fr);
return list;
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>