1、数据预处理
1、网站爬取数据,写为csv
2、添加表头
sed -i ‘1i\时报错:
sed: -i may not be used with stdin
mac中应该写为:【mac自带的sed命令,是基于bsd的,所以与Linux-like下常用的gnu不一样】
sed -i "" '1i\
insert value here
' filename.csv
3、处理为json
2、数据导入es集群
- 可参考这篇文章ElasticSearch(十二)——无文档ID的Json文件批量导入(Java/Python)
但是,之前
client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(addressMaster), transport));
中的InetSocketTransportAddress在6.3中已经不能使用,改为TransportAddress即可
public class JsonToES {
private static final Logger LOGGER = LoggerFactory.getLogger(OpenClose.class);
public void jsonToES(File[] files) {
Map<String, Object> hashMap = EsPropertiesUtils.getConf();
TransportClient client = OpenClose.getInstance(hashMap);
int idNum = 0;
for (File f : files) {
// 一、绝对路径读取文件
System.out.println(f.getAbsolutePath());
File file = new File(f.getAbsolutePath());
// 二、开始一行一行的写
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Map<String, Object> valuesMap = new HashMap<>(64, 0.6f);
String line;
while ((line = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex("taobao", "rates", Integer.toString(idNum)).setSource(line, XContentType.JSON));
// valuesMap.clear();
if (idNum % 20000 == 0) {
System.out.println("++++++++++++++++++++++++++++++++++++");
// 三、批量导入,与网速有关
bulkRequest.execute().actionGet();
}
idNum++;
System.out.println(idNum);
}
// 四、导入每个文件最后不足2000条的数据(但是必须得有数据,否则报错: no requests added)
// 查阅源码可知,可通过拿到父类requests属性,判断其size来解决[子类调用父类方法获得属性]
if (bulkRequest.request().requests().size() > 0) {
bulkRequest.get();
}
// 五、操作下一个文件
} catch (IOException e) {
LOGGER.error(":ERROR:堆栈信息====={}", e.getMessage());
}
}
OpenClose.closeClient(client);
}
public static void main(String[] args) {
JsonToES jsonToES = new JsonToES();
File file = new File("/Users/wanghai/shendeng_back/rates/");
// 目录下只有我需要读取的文件,故不再进行进一步处理
File[] files = file.listFiles();
long begin = System.currentTimeMillis();
jsonToES.jsonToES(files);
System.out.println("总耗时: " + (System.currentTimeMillis() - begin) / 1000 + "s");
}
}
3、优化
之前进行过一次优化实战,但是当时没有以多线程的形式进行优化。这里进行多线程提高写入速度的实战。
3-1、多线程读写文件小练习——写
先热身一下,以多线程的形式读写同一个文件,查看效率差异。
1、没用过Lock,打算不使用synchronized,尝试下Lock
3-1-1、synchronized与Lock的区别与使用
- 锁类型
- 可重入锁:在执行对象中所有同步方法不用再次获得锁
- 可中断锁:在等待获取锁过程中可中断
- 公平锁: 按等待获取锁的线程的等待时间进行获取,等待时间长的具有优先获取锁权利
- 读写锁:对资源读取和写入的时候拆分为2部分处理,读的时候可以多线程一起读,写的时候必须同步地写
synchronized的缺点有:
1)不能响应中断;
2)锁的释放由虚拟机来完成,不用人工干预,不过此即使缺点也是优点,优点是不用担心会造成死锁,缺点是由可能获取到锁的线程阻塞之后其他线程会一直等待,性能不高。
- 区别
synchronized与Lock的部分区别 |
类别 | synchronized | Lock |
---|---|---|
出身 | Java的关键字,在JVM层面实现了对临界资源的同步互斥访问 | 一个类,基于jdk层面实现 |
锁释放 | 1、已经获取锁的线程执行完同步代码,释放锁 2、线程执行发生异常,jvm会让线程释放锁 | 有加锁就必须有释放锁,finally中unLock()释放锁使Lock对更具灵活性 |
锁状态 | 无法判断 | 可以判断,tryLock(),获取锁的时候锁被占用就返回false |
锁类型 | 可重入 不可中断 非公平 | 可重入 可判断 可公平(默认非公平) |
more
使用场景:
TODO
4线程生产数据2线程数据写入实践
⚠️:该demo的多线程写磁盘的逻辑不好,有待修正,哈哈,但是3-2的测试还是有意义的;
⚠️:这个demo并没有生产意义,因为数据都是写入同一个文件,所以最终还是串行写入,磁头在各个磁盘来回寻道,也许不但不会提高效率,反而降低;
⚠️:生产中可以考虑不同线程读取不同的文件(但是磁头切换寻道可能耗时更多),然后处理数据的是别的线程,也就是获取数据和处理数据使用不同的线程,异步处理。
1、生产数据线程
public class WriteThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 2000000; i++) {
WriterQueue.getInstance().produce("线程:" + Thread.currentThread().getName() + "的第: " + i + " 条数据 " + UUID.randomUUID().toString() + " 123456789qwertyuiopasdfghjklzxcvbnm中文中文中文中文中文中文中文中文中文中文中文");
}
}
private void sleep(int millis) throws InterruptedException {
Thread.sleep(millis);
}
}
2、数据队列
Condtion接口:
- Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法
- 例如下方代码所示,多线程写同一个缓冲区:当向队列中写入数据之后,唤醒”读线程”;当从队列读出数据之后,唤醒”写线程”;并且当缓冲区满的时候,”写线程”需要等待;当缓冲区为空时,”读线程”需要等待。
package threaddemo;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>pakage: threaddemo</p>
*
* descirption:
*
* @author wanghai
* @version V1.0
* @since <pre>2018/8/8 下午11:16</pre>
*/
class WriterQueue {
private static final int MAX_QUEUE_SIZE = 5000;
private LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private static volatile WriterQueue instance;
private WriterQueue() {
}
public static WriterQueue getInstance() {
if (instance == null) {
synchronized (WriterQueue.class) {
if (instance == null) {
instance = new WriterQueue();
}
}
}
return instance;
}
void produce(String value) {
lock.lock();
try {
while (linkedBlockingQueue.size() == MAX_QUEUE_SIZE) {
System.out.println("队列中数据已满,暂时无法写入数据!");
notFull.await();
}
linkedBlockingQueue.add(value);
// If any threads are waiting on this condition then one is selected for waking up.
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
LinkedList<String> consumeAll() {
LinkedList<String> values = new LinkedList<>();
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println("消费线程中断");
e.printStackTrace();
}
try {
while (linkedBlockingQueue.size() == 0) {
System.out.println("队列中无数据!");
notEmpty.await();
}
values.addAll(linkedBlockingQueue);
// 清空队列
linkedBlockingQueue.clear();
notFull.signal(); // 唤醒"写"线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return values;
}
}
TODO:消费者线程等待时间超时,中断线程
⚠️:该demo中获取执行时长是错误的,正确做法见3-2-3
3、数据写磁盘
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.LinkedList;
/**
* <p>pakage: threaddemo</p>
* <p>
* descirption:负责将数据写入到文件中
*
* @author wanghai
* @version V1.0
* @since <pre>2018/8/8 下午11:27</pre>
*/
public class Consumer implements Runnable {
private String fileName;
public Consumer(String fileName) {
this.fileName = fileName;
}
@Override
public void run() {
String path = "/Users/wanghai/";
File outputFile = new File(path + fileName);
if (!outputFile.exists()) {
try {
outputFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile, true)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
int i = 0;
// TODO:改变退出条件,使用线程中断
while (i < 800) {
LinkedList<String> list;
list = WriterQueue.getInstance().consumeAll();
if (list == null || list.size() == 0) {
System.out.println("no data...");
continue;
}
i++;
System.out.println("写数据到 " + path + fileName + "第" + i + "次");
for (String content : list) {
try {
bw.write(Thread.currentThread().getName() + "====" + content);
bw.newLine();
bw.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
list = null;
}
try {
bw.flush();
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void sleep(int millis) throws InterruptedException {
Thread.sleep(millis);
}
}
4、测试类
join而不是start,我们需要获得程序运行时间
join()的部分源码 |
public final void join() throws InterruptedException {
join(0);
}
// -----------------------------------------------------------------
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 我们可以看到这里使用了while循环做判断的,然后调用wait方法的,所以说join方法的执行是完全通过wait方法实现的
// 等待时间为0的时候,如果当前线程还存活的话,就无限等待,直到线程死亡
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
TODO:验证这样写入数据是否有重写漏写
public class WriteTest {
private static long test() throws InterruptedException {
WriteThread write = new WriteThread();
for (int i = 0; i < 4; i++) {
// 四个线程开始生产数据,5000暂停,一个线程写200000条。
new Thread(write).start();
}
// 一直尝试写数据到磁盘
Thread consumer1 = new Thread(new Consumer("uuid.txt"));
Thread consumer2 = new Thread(new Consumer("uuid.txt"));
consumer1.start();
consumer2.start();
if (!consumer1.isAlive() && !consumer2.isAlive()) {
return System.currentTimeMillis();
} else
return 0;
}
public static void main(String[] args) throws InterruptedException {
long begin = System.currentTimeMillis();
long stop = test();
if (stop != 0) {
System.out.println("耗时" + (stop - begin) / 1000 + "s");
}
}
}
3-2、多线程读写文件小练习——读
3-2-1、单线程读
1、18.74G的文件,一个线程,读取出来之后写入另一个文件,耗时35秒(buffer 10M);耗时29秒(buffer 100M)
public class One {
public static void main(String[] args) throws IOException {
long begin = System.currentTimeMillis();
// 读取原始文件⚠️(1)
BufferedReader br = new BufferedReader(new FileReader("/Users/wanghai/uuid.txt"));
FileWriter fileWriter = new FileWriter(new File("/Users/wanghai/uuid_one.txt"));
String line;
while ((line = br.readLine()) != null) {
fileWriter.write(line);
}
fileWriter.close();
br.close();
// 10s
System.out.println("单线程耗时: " + (System.currentTimeMillis() - begin)/1000 + "s");
}
}
⚠️(1)
:解决OOM问题,OOM是因为BufferedReader是将所有文件行都保存在内存中 - 如果文件足够大,这将很快导致OutOfMemoryError。
- 可以使用Apache Commons IO相关方法
- 改为Stream
- 使用Scanner
java采用的编码中,char占用2 b,所以,4.5G的文件,读进内存,会接近9G。
public class One {
public static void main(String[] args) throws IOException {
long begin = System.currentTimeMillis();
Scanner sc = null;
// 读取原始文件
InputStream inputStream = new FileInputStream((new File("/Users/wanghai/uuid.txt")));
OutputStream outputStream = new FileOutputStream(new File("/Users/wanghai/uuid_one.txt"));
String line;
byte[] buffer = new byte[10240];
sc = new Scanner(inputStream, "UTF-8");
// while (sc.hasNextLine()) {
// String line = sc.nextLine();}
while (inputStream.read(buffer) != -1) {
outputStream.write(buffer);
}
outputStream.close();
inputStream.close();
// 10s
System.out.println("单线程耗时: " + (System.currentTimeMillis() - begin)/1000 + "s");
}
}
3-2-2、拆分文件
public class SpiltFile {
private static final int READE_BUFFER = 10240;
public static void main(String[] args) {
File srcFile = new File("/Users/wanghai/io/uuid_one.txt");
//创建一个文件对象
try {
splitFile(srcFile, 4);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void splitFile(File srcFile, int fileNumber) throws IOException {
// 指定个数切割,所以每个文件的开头和结尾极有可能被拆开
long fileLength = srcFile.length(); // 18743808000————17.4565G,与操作系统显示的略有差距
byte[] buffer = new byte[READE_BUFFER];
FileInputStream fis = new FileInputStream(srcFile);
if (fileNumber <= 0) {
fileNumber = 4;
}
for (int i = 1; i <= fileNumber; i++) {
int writeTimes = (int) Math.ceil(fileLength / fileNumber / READE_BUFFER);
String fileName = "uuid_more_" + i + ".txt";
File fi = new File(srcFile.getParent(), fileName);
FileOutputStream fos = new FileOutputStream(fi);
// 多读一次,确保读完文件
for (; writeTimes >= 0; writeTimes--) {
if (fis.read(buffer) != -1) {
fos.write(buffer);
} else {
// 确保读完文件
System.out.println("down!");
break;
}
}
fos.close();
System.out.printf("切割后的子文件: %s , 大小: %d\n", fi.getAbsoluteFile(), fi.length());
}
fis.close();
}
}
3-2-3、多线程读
- 使用CountDownLatch或者CyclicBarrier来控制父子线程顺序(下文解释为什么不使用join)
- 果然,与猜测的一样,减少时长效果并不明显(猜测往多个文件写会更慢)
- 子线程执行结束顺序1-3-4-2可以表明,CountDownLatch中,线程不是串行执行的。
package threaddemo;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* <p>pakage: threaddemo</p>
* <p>
* descirption:
*
* @author wanghai
* @version V1.0
* @since <pre>2018/8/12 下午9:54</pre>
*/
public class More implements Runnable {
private String fileName;
private OutputStream outputStream;
private static final String FILE_PARENT = "/Users/wanghai/io/";
// 设定需要计数的子线程数目
private static CountDownLatch countDownLatch = new CountDownLatch(4);
public More(int fileNum, OutputStream outputStream) {
this.fileName = FILE_PARENT + "uuid_more_" + fileNum + ".txt";
this.outputStream = outputStream;
}
@Override
public void run() {
try {
System.out.println(fileName + " run!");
write(fileName, outputStream);
countDownLatch.countDown();
System.out.println(fileName + " down!");
} catch (IOException e) {
e.printStackTrace();
}
}
public void write(String fileName, OutputStream outputStream) throws IOException {
InputStream inputStream = new FileInputStream((new File(fileName)));
byte[] buffer = new byte[10240];
while (inputStream.read(buffer) != -1) {
outputStream.write(buffer);
}
inputStream.close();
}
public static void main(String[] args) {
System.out.println("主线程开始执行....");
long begin = System.currentTimeMillis();
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(new File(FILE_PARENT + "uuid_more.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
for (int i = 1; i < 5; i++) {
More more = new More(i, outputStream);
Thread write = new Thread(more);
write.start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 当我们在线程上调用join()方法时,父线程(这里是Main)进入等待状态。它将保持等待状态,直到调用的子线程终止,然后执行下一个线程的join。join使得线程变串行执行
System.out.println("4线程countDownLatch耗时: " + (System.currentTimeMillis() - begin) / 1000 + "s");
}
}
3-2-4、控制父子线程顺序,CountDownLatch与join的区别
- 当我们在线程上调用join()方法时,父线程(这里是Main)进入等待状态。它将保持等待状态,直到调用的子线程终止,然后执行下一个线程的join。
join使得线程变串行执行,再加上线程创建、切换、销毁的时间,效率反而变低 - 之前的理解有误,不乏一些“博客专家“理解也是错误的,CountDownLatch理解一:与join的区别
- 所以,最主要的区别是,CountDownLatch更灵活,可以更“自由“的解锁
- 这篇stackoverflow的问答,很好的打了一些人的脸
join测试,在本测试背景下,与CountDownLatch相比,并没有什么区别
修改main方法如下:
public static void main(String[] args) throws InterruptedException, IOException {
long begin = System.currentTimeMillis();
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(new File(FILE_PARENT + "uuid_more.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
List<Thread> writes = new ArrayList<>();
for (int i = 1; i < 5; i++) {
More more = new More(i, outputStream);
Thread write = new Thread(more);
write.start();
writes.add(write);
}
writes.get(0).join();
System.out.println("1");
writes.get(1).join();
System.out.println("2");
writes.get(2).join();
System.out.println("3");
writes.get(3).join();
outputStream.close();
System.out.println("4线程join耗时: " + (System.currentTimeMillis() - begin) / 1000 + "s");
}
4、多线程同步写ES
4-1、切分文件
在这篇文章中使用代码切割文件,这里尝试使用split
命令切分文件。
split -l 250000 all_rate.csv
25万行为分界线分割文件
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import util.EsPropertiesUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
/**
* descirption:
*
* @author wanghai
* @version V1.0
* @since <pre>2018/8/13 下午9:34</pre>
*/
public class JsonToEsThread implements Runnable {
private String fileName;
private static CountDownLatch countDownLatch = new CountDownLatch(4);
public JsonToEsThread(String fileName) {
this.fileName = fileName;
}
@Override
public void run() {
System.out.println(fileName + " run!");
jsonToES(this.fileName);
countDownLatch.countDown();
System.out.println(fileName + " down!");
}
public void jsonToES(String fileName) {
Map<String, Object> hashMap = EsPropertiesUtils.getConf();
TransportClient client = OpenClose.getInstance(hashMap);
File file = new File(fileName);
int idNum = 0;
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
String line;
while ((line = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex("taobaoThread", "rates", Integer.toString(idNum)).setSource(line, XContentType.JSON));
if (idNum % 50000 == 0) {
System.out.println("++++++++++++++++++++++++++++++++++++");
bulkRequest.execute().actionGet();
}
idNum++;
System.out.println(Thread.currentThread().getName() + " " + idNum);
}
if (bulkRequest.request().requests().size() > 0) {
bulkRequest.get();
}
// 五、操作下一个文件
} catch (IOException e) {
e.printStackTrace();
}
OpenClose.closeClient(client);
}
public static void main(String[] args) {
System.out.println("主线程开始执行....");
long begin = System.currentTimeMillis();
for (int i = 1; i < 5; i++) {
JsonToEsThread jsonToEsThread = new JsonToEsThread("/Users/wanghai/io/rate_" + i + ".csv");
Thread write = new Thread(jsonToEsThread);
write.start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 当我们在线程上调用join()方法时,父线程(这里是Main)进入等待状态。它将保持等待状态,直到调用的子线程终止,然后执行下一个线程的join。join使得线程变串行执行
System.out.println("4线程写ES耗时: " + (System.currentTimeMillis() - begin) / 1000 + "s");
}
}
4-2、备注
个人觉得,这样操作是不对的,首先,还是那个老问题,磁头只有一个,如果说网速很慢,磁盘读取也很慢,那么这样做,可能会有效果。但是我这里的情况,外地用学校VPN,网速特别的慢,应该是没太大性能优化。
还是觉得异步处理的办法在实际生产中可能会有用,一部分线程读取数据,一部分线程写数据到ES。
5、多线程异步写ES
参考
1、CountDownLatch理解一:与join的区别
2、这篇stackoverflow的问答,很好的打了一些人的脸