java多线程批量添加大数据

最近做了一个兑奖业务,业务大体是 用户购买可口可乐,在瓶盖上有个兑奖码,用户通过发送兑奖码,如果中奖了就给用户一个账号充值一定金额(类似团购的优惠劵,一定期限可以使用)。

其中涉及的到上传兑奖码部分,并导入数据库。(数据量目前是6000万)

批量上传多个文件,要求每个文件最大500万条数据。

考虑到导入的数据量太大,用户等待的时间太长,做成多线程。

今天在家简单的弄一下用5000万的数据导入批量导入(时间比较久,估计要2小时,希望提出修改建议)

代码如下

package com.cp.io;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

public class CsvFileCreateUtil {

 public static void main(String[] args) {

  BufferedWriter writer = null;

  try {

   long begin = System.currentTimeMillis();

   System.out.println("开始时间:::" + begin);

   FileWriter fw = new FileWriter(new File("D:\\500W-1.csv"));
   writer = new BufferedWriter(fw);

   for (int i = 0; i < 5000000; i++) {
    int num = 123456 + i;
    writer.write("" + num + " \n");
   }

   writer.flush();
   writer.close();

   long end = System.currentTimeMillis();
   System.out.println("结束时间:::" + end);

   System.out.println("总用时:::" + (end - begin));

  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

package com.cp.io;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
 * 读取文件
 *
 * @author pan
 *
 */
public class ReadCsvUser implements Runnable {

 private String filename;

 public ReadCsvUser(String filename) {
  this.filename = filename;
 }

 public void run() {

  System.out.println("当前使用的线程是:" + Thread.currentThread().getName()
    + ",正在读文件:" + filename + ",执行开始时间:"
    + System.currentTimeMillis());

  // 读文件
  BufferedReader reader = null;
  List<String> batchList = new ArrayList<String>(20000);

  try {
   String filepath = "D:\\" + filename;
   FileReader fr = new FileReader(new File(filepath));
   reader = new BufferedReader(fr);

   String line = null;
   while ((line = reader.readLine()) != null) {
    batchList.add(line);

    if (batchList.size() == 20000) {

     // 调用数据库批量添加
     this.batchAddUser(batchList);
     batchList.clear();
    }
   }

   if (batchList.size() < 20000) {
    this.batchAddUser(batchList);
    batchList.clear();
   }

   reader.close();

  } catch (FileNotFoundException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 // 批量添加数据库
 public void batchAddUser(List<String> userList) {

  Connection conn = null;
  PreparedStatement pstmt = null;
  try {
   conn = ConnectionUtil.getConnection();
   conn.setAutoCommit(false);
   pstmt = conn
     .prepareStatement("insert into user(name,pwds)values(?,?)");

   for (String str : userList) {
    pstmt.setString(1, str);
    pstmt.setString(2, str);
    pstmt.addBatch();
   }

   pstmt.executeBatch();
   conn.commit();
   
  } catch (SQLException e) {
   try {
    conn.rollback();
   } catch (SQLException e1) {
    e1.printStackTrace();
   }
   e.printStackTrace();
  } finally {

   try {
    if (pstmt != null) {
     pstmt.close();
     pstmt = null;
    }
    if (conn != null) {
     conn.close();
    }
   } catch (SQLException e) {
    e.printStackTrace();
   }
  }
 }
}

package com.cp.io;

public class Test {
 public static void main(String[] args) {

  String csvstr = "500W-1.csv,500W-2.csv,500W-3.csv,500W-4.csv,500W-5.csv,500W-6.csv,500W-7.csv,500W-8.csv,500W-9.csv,500W-10.csv";
  String[] csvArray = csvstr.split(",");

  int i = 1;
  for (String str : csvArray) {
   ReadCsvUser rcu = new ReadCsvUser(str);
   new Thread(rcu, "线程" + i).start();
   i++;
  }
 }
}

猜你喜欢

转载自ladengjava.iteye.com/blog/2217495