版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/PeersLee/article/details/79816299
架构
Server,Spider
CrawlId
package Peerslee.HotMonitor.Server.Spider;
/*
* 1. 定时抓取id
* 2. 上传zk
* https://www.panda.tv/live_lists?pageno=1&pagenum=120
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import Peerslee.HotMonitor.Bean.Info;
import Peerslee.HotMonitor.Bean.Info.Room;
import Peerslee.HotMonitor.Server.Centralize.Centralizer;
import Peerslee.HotMonitor.Utils.MysqlHandler;
public class CrawlId {
private static final int BUCKET_COUNT = 3;
private static final String TABLE_NAME = "PandaTv";
private static HttpClient client = HttpClients.createDefault();
private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
private static Centralizer centralizer = new Centralizer(
"localhost:2181,localhost:2182,localhost:2183");
int hash(String id) { return Integer.parseInt(id) % 3; }
List<Info> crawl() {
List<Info> iList = new ArrayList<Info>();
for (int i = 1; ; i++) {
String url = String.format("https://www.panda.tv/live_lists?pageno=%d&pagenum=120",i);
System.out.println("CrawId: [" + url + "]");
HttpGet hGet = new HttpGet(url);
try {
HttpResponse response = client.execute(hGet);
HttpEntity entity = response.getEntity();
String json = EntityUtils.toString(entity);
ObjectMapper mapper = new ObjectMapper();
Info info = mapper.readValue(json, Info.class);
if (info.getData().getItems().size() == 0) break;
iList.add(info);
} catch (Exception e) {
System.out.println("CrawlId: [Crawl '" + url + "' error.]");
}
}
return iList;
}
void divideBucket(List<Room> rItems) {
/*
* JVM将泛型存储的东西都视为Object, 底层的数组类型,它只能是Object[]
*/
Map<Integer, StringBuilder> buckets = new HashMap<Integer, StringBuilder>();
for (Info.Room room: rItems) {
String roomId = room.getId();
System.out.println(room);
// db
if(db.isNullRecord(TABLE_NAME, roomId)) { // 验证
String Columns = "roomId, roomName, classifyCname, classifyEname, userName, nickName";
String Values = (new StringBuilder("'")).
append(room.getId()).append("', '").
append(room.getName()).append("', '").
append(room.getClassification().getCname()).append("', '").
append(room.getClassification().getEname()).append("', '").
append(room.getUserinfo().getUserName()).append("', '").
append(room.getUserinfo().getNickName()).append("'").
toString();
db.insertRecord(TABLE_NAME, Columns, Values);
}
}
// 分桶
String []roomIds = db.selectAll(TABLE_NAME, "roomId", 0);
for (String roomId: roomIds) {
int bId = hash(roomId);
if (!buckets.containsKey(bId)) buckets.put(bId, new StringBuilder()) ;
buckets.get(bId).append(roomId).append(",");
}
// 注册watcher
centralizer.registerNode(new String[]{
"/ZNodeId/ZNodeId_0",
"/ZNodeId/ZNodeId_1",
"/ZNodeId/ZNodeId_2"});
// 上传zk
Map<String, byte[]> ZNode = new HashMap<>();
for (int i = 0; i < BUCKET_COUNT; i++) {
if (buckets.get(i) != null) {
int len = buckets.get(i).length();
ZNode.put("/ZNodeId/ZNodeId_" + i,
buckets.get(i).deleteCharAt(len - 1).toString().getBytes());
}
}
centralizer.setBytesToNode(ZNode);
}
public static void main(String[] args) {
CrawlId ci = new CrawlId();
while (true) {
List<Room> rList= new ArrayList<Room>();
for (Info i: ci.crawl()) rList.addAll(i.getData().getItems());
ci.divideBucket(rList);
try {
Thread.sleep(1000*60*30);
} catch (InterruptedException e) { } // 30 分钟一次
}
}
}
CrawlInfo0,1,2
package Peerslee.HotMonitor.Server.Spider;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import Peerslee.HotMonitor.Bean.Follower;
import Peerslee.HotMonitor.Calculate.Calculater;
import Peerslee.HotMonitor.Server.Centralize.Centralizer;
/*
* https://www.panda.tv/room_followinfo?roomid=266019
*/
public class CrawlInfo0 {
private static String ZK_NODE; // 根据socket 设置,"/ZNodeId/ZNodeId_"
private static final int PORT = 5200; // 5201,5202
private static HttpClient client = HttpClients.createDefault();
private static Centralizer centralizer = new Centralizer(
"localhost:2181,localhost:2182,localhost:2183");
private static Calculater calculater = new Calculater();
public void getFollowerByIds() {
String []ids = centralizer.getStringFromNode(ZK_NODE).split(",");
Map<String, String> recentFollower = new HashMap<String, String>();
System.out.println("CrawlInfo: [Crawling follower, waiting...]");
Stream.of(ids).forEach(id -> {
String fCount = getFollower(id);
if (fCount != null) {
recentFollower.put(id, fCount);
}
});
calculater.calculate(recentFollower);
}
public String getFollower(String roomId) {
String url = String.format("https://www.panda.tv/room_followinfo?roomid=%s", roomId);
HttpGet hGet = new HttpGet(url);
try {
HttpResponse response = client.execute(hGet);
HttpEntity entity = response.getEntity();
String json = EntityUtils.toString(entity);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, Follower.class).getData().getFans();
} catch (Exception e) {
System.out.println("CrawlInfo: [Crawl '" + url + "' error.]");
return null;
}
}
public static void main(String[] args) {
CrawlInfo0 cInfo = new CrawlInfo0();
Socket socket = null;
try {
ServerSocket sSocket = new ServerSocket(PORT);
while (true) {
socket = sSocket.accept();
System.out.println("等待连接...");
InputStreamReader isr = new InputStreamReader(
socket.getInputStream(), "UTF-8");
BufferedReader reader = new BufferedReader(isr);
ZK_NODE = reader.readLine(); // 读一行
System.out.println("CrawlInfo: [Get follower by " + ZK_NODE + " ids.]");
cInfo.getFollowerByIds();
}
} catch (Exception e) {
System.out.println("CrawlInfo: [CrawlInfo error.]");
} finally {
try {
socket.close();
} catch (IOException e) { }
}
}
}
Server,Centralizer
package Peerslee.HotMonitor.Server.Centralize;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
//要记得初始化 ZKNode
public class Centralizer implements Watcher{
private static final int SESSION_TIME_OUT = 5000; // 5 second
private static CountDownLatch cdLatch = new CountDownLatch(1);
private ZooKeeper zk;
public Centralizer(String hostPort) {
try {
this.zk = new ZooKeeper(hostPort, SESSION_TIME_OUT, this);
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* 创建node
*/
void initZKNode(Map<String, byte[]> ZNode) {
for (Entry<String, byte[]> node: ZNode.entrySet()) {
try {
String path = zk.create(node.getKey(),
node.getValue(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("Success create znode:" + path);
} catch (Exception e) {}
}
}
/*
* 注册watcher
*/
public void registerNode(String []node) {
for (String n: node) {
try {
zk.exists(n, true);
System.out.println("ZK: [Exist " + n + " successful.]");
} catch (Exception e) {
System.out.println("ZK: [Exist " + n + " error.]");
}
}
}
/*
* 获取string
*/
public String getStringFromNode(String node) {
try {
return new String(zk.getData(node, true, new Stat()));
} catch (Exception e) {
System.out.println("ZK: [Get bytes error.]");
return null;
}
}
/*
* 更新
*/
public void setBytesToNode(Map<String, byte[]> ZNode) {
for (Entry<String, byte[]> node: ZNode.entrySet()) {
try {
Thread.sleep(5000); // 伪分布式下,sleep 一下
Stat stat = zk.setData(node.getKey(), node.getValue(), -1);
System.out.println("ZK: [Set Node:" + node.getKey() + ".]");
System.out.println(stat.getCzxid() + "/" + stat.getMzxid() +
"/" + stat.getVersion());
} catch (Exception e) {
System.out.println("ZK: [Set bytes error.]");
}
}
}
@Override
public void process(WatchedEvent event) {
System.out.println("---- zk process ----");
cdLatch.countDown(); // 连接成功,之后不会再 -1
if (KeeperState.SyncConnected == event.getState() ) {
switch (event.getType().getIntValue()) {
case -1: //EventType.None
System.out.println("ZK: [process none.]");
break;
case 1: //EventType.NodeCreated
System.out.println("ZK: [process node created.]");
break;
case 3: //EventType.NodeDataChanged
System.out.println("ZK: [process node data changed.]");
// System.out.println(event.getPath());
/*
* Socket
*/
Socket socket = null;
try {
String hostName;
int port = -1;
System.out.println(event.getPath());
switch (event.getPath()) {
case "/ZNodeId/ZNodeId_0":
hostName = "127.0.0.1";
port = 5200;
break;
case "/ZNodeId/ZNodeId_1":
hostName = "127.0.0.1";
port = 5201;
break;
case "/ZNodeId/ZNodeId_2":
hostName = "127.0.0.1";
port = 5202;
break;
default:
hostName = null;
break;
}
System.out.println(hostName);
if (hostName == null) return;
socket = new Socket(hostName, port);
socket.setSoTimeout(15000);
PrintWriter writer = new PrintWriter(socket.getOutputStream());
writer.print(event.getPath());
writer.flush();
} catch (Exception e) {
System.out.println("ZK: [Socket error.]");
} finally {
try {
socket.close();
} catch (IOException e) { }
}
break;
case 4: //EventType.NodeChildrenChanged
System.out.println("ZK: [process children list changed.]");
break;
default:
System.out.println("ZK: [process nothing.]");
break;
}
}
}
public static void main(String[] args) {
final String HOST_PORT = "localhost:2181,localhost:2182,localhost:2183";
Centralizer centralizer = new Centralizer(HOST_PORT);
System.out.println(centralizer.zk.getState());
try {
cdLatch.await();
} catch (InterruptedException e) {}
finally {
cdLatch.countDown();
}
Map<String, byte[]> ZNode = new HashMap<String, byte[]>();
// ZNode.put("/ZNodeId", "".getBytes());
ZNode.put("/ZNodeId/ZNodeId_0", "".getBytes());
// ZNode.put("/ZNodeId/ZNodeId_1", "".getBytes());
// ZNode.put("/ZNodeId/ZNodeId_2", "".getBytes());
// ZNode.put("/ZNodeData", "".getBytes());
// centralizer.initZKNode(ZNode);
centralizer.setBytesToNode(ZNode);
System.out.println(centralizer.getStringFromNode("/ZNodeId/ZNodeId_0"));
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {}
}
}
Calculate
package Peerslee.HotMonitor.Calculate;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import Peerslee.HotMonitor.Utils.MysqlHandler;
/*
* 负责在每台机器上运算
*/
public class Calculater {
private static final String TABLE_NAME = "PandaTv";
private static final int COLUMN_NUM = 12;
private static String BASE_PATH = "E:\\HOT_MONITOR\\";
private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
private static Date today = null ;
private static SimpleDateFormat sdFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm");
/*
* 根据id,选择性计算
*/
public void calculate(Map<String, String> recentFollower) {
today = new Date(); // 更新时间
String roomId = recentFollower.keySet().toString().
replace("[", "(").replace("]", ")");
String []columns = {"roomId", "flowerCount",
"flowerMax", "flowerMatime",
"flowerMin", "flowerMitime",
"flowerDist"};
for (Map<String, String> previous: db.
selectRecord(TABLE_NAME, roomId, columns)) {
String rId = previous.get("roomId");
update(recentFollower.get(rId), previous, columns);
db.updateRecord(TABLE_NAME, rId, previous);
System.out.println(previous);
}
System.out.println("Calculater: [Fault-Tolerant --->.]");
saveToLocal(getRecentRecord());
}
// 更新
void update(String rFollower, Map<String, String> previous, String[] columns) {
/*
* 如果 record 中元素为空,则赋值,recentFollower中元素
*/
for (String column: columns) {
switch (column) {
case "flowerCount":
String fCount = previous.get(column);
if (fCount == null || fCount == rFollower) {
previous.put(column, rFollower);
previous.put("flowerDist", "0");
} else {
previous.put("flowerDist",
String.valueOf(Long.parseLong(rFollower) - Long.parseLong(fCount)));
}
break;
case "flowerMax":
String fMax = previous.get(column);
// null直接 put,否则比较
if (fMax == null || Long.parseLong(fMax) < Long.parseLong(rFollower)) {
previous.put(column, rFollower);
previous.put("flowerMatime", sdFormat.format(today));
}
break;
case "flowerMin":
String fMin = previous.get(column);
if (fMin == null || Long.parseLong(fMin) > Long.parseLong(rFollower)) {
previous.put(column, rFollower);
previous.put("flowerMitime", sdFormat.format(today));
}
break;
default:
break;
}
}
}
//冗余
String []getRecentRecord() {
return db.selectAll(TABLE_NAME, "*", COLUMN_NUM);
}
void saveToLocal(String []strings) {
// String tsString = (new SimpleDateFormat("yyyy/MM/dd [HH:mm:ss]")).
// format((new Date()).getTime());
long ts = (new Date()).getTime();
File dir = new File(BASE_PATH);
if (!dir.exists()) {
dir.mkdirs();
}
String filePath = String.format("%s%d.dat", BASE_PATH, ts);
try {
//文件输入流
FileOutputStream fos = new FileOutputStream(new File(filePath));
//输入writer
OutputStreamWriter opw = new OutputStreamWriter(fos, "UTF-8");
//缓冲writer
BufferedWriter bw = new BufferedWriter(opw);
for (String str: strings) {
bw.write(str + "\n");
}
fos.close();
} catch (Exception e) {
System.out.println("Calculater: [Save to local error.]");
}
System.out.println("Calculater: [Save to " + filePath + " successful.]");
}
}
Bean
package Peerslee.HotMonitor.Bean;
import java.util.List;
/*
* Json
*/
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties({"errno", "errmsg"})
public class Info {
private int errno;
private String errmsg;
private Data data;
@JsonIgnoreProperties({"total", "highLightNum", "sliderdata"})
public static class Data {
private List<Room> items;
private int total;
private int highLightNum;
private List<Object> sliderdata;
public List<Room> getItems() {
return items;
}
public void setItems(List<Room> items) {
this.items = items;
}
@Override
public String toString() {
return "Data [items=" + items + "]";
}
}
@JsonIgnoreProperties({
"person_num", "pictures", "tag_switch",
"tag", "tag_color", "room_type",
"rtype_value", "status", "roomkey",
"room_key", "ishighlight", "top_description",
"is_top", "label", "host_level_info",
"ticket_rank_info", "top_icon", "medalNum",
"rollinfo", "pkinfo"
})
public static class Room {
private String id; // room_id
private String name; // room_name
private String person_num;
private Classify classification; // classify
private Object pictures;
private String tag_switch;
private String tag;
private String tag_color;
private String room_type;
private String rtype_value;
private String status;
private User userinfo; // user
private String roomkey;
private String room_key;
private String ishighlight;
private String top_description;
private int is_top;
private List label;
private Object host_level_info;
private Object ticket_rank_info;
private String top_icon;
private int medalNum;
private List rollinfo;
private String pkinfo;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Classify getClassification() {
return classification;
}
public void setClassification(Classify classification) {
this.classification = classification;
}
public User getUserinfo() {
return userinfo;
}
public void setUserinfo(User userinfo) {
this.userinfo = userinfo;
}
@Override
public String toString() {
return "Room [id=" + id + ", name=" + name + ", classification=" + classification + ", userinfo=" + userinfo
+ "]";
}
}
public static class Classify {
private String cname;
private String ename;
public String getCname() {
return cname;
}
public void setCname(String cname) {
this.cname = cname;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
@Override
public String toString() {
return "Classify [cname=" + cname + ", ename=" + ename + "]";
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class User {
private String rid;
private String userName;
private String nickName;
@JsonIgnore
private String avatar;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getNickName() {
return nickName;
}
public void setNickName(String nickName) {
this.nickName = nickName;
}
@Override
public String toString() {
return "User [userName=" + userName + ", nickName=" + nickName + "]";
}
}
public Data getData() {
return data;
}
public void setData(Data data) {
this.data = data;
}
@Override
public String toString() {
return "Info [data=" + data + "]";
}
}
package Peerslee.HotMonitor.Bean;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties({"errno", "errmsg"})
public class Follower {
private int errno;
private String errmsg;
private Data data;
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Data {
private String fans;
@JsonIgnore
private String is_followed;
public String getFans() {
return fans;
}
public void setFans(String fans) {
this.fans = fans;
}
}
public Data getData() {
return data;
}
public void setData(Data data) {
this.data = data;
}
}
Client
package Peerslee.HotMonitor.Client;
import java.awt.BorderLayout;
import java.awt.Font;
import java.awt.event.ItemEvent;
import java.awt.event.ItemListener;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.swing.JComboBox;
import javax.swing.JComponent;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTabbedPane;
import javax.swing.JTable;
import javax.swing.SwingConstants;
import javax.swing.WindowConstants;
import Peerslee.HotMonitor.Utils.MysqlHandler;
import jdk.nashorn.internal.scripts.JS;
public class Index {
private static MysqlHandler db = MysqlHandler.getMHandler(new String[] {
"jdbc:mysql://localhost:3306/HotMonitor", "root", "123"});
void index() {
JFrame jf = new JFrame();
// jf.setExtendedState(JFrame.MAXIMIZED_BOTH);
jf.setSize(600, 800);
jf.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
jf.setLocationRelativeTo(null);
jf.setLayout(null);
JTabbedPane jtp = new JTabbedPane();
jtp.add("类别", createTab1("1"));
jtp.add("主播", createTab2("2"));
jtp.setSelectedIndex(0);
jf.setContentPane(jtp);
jf.setVisible(true);
}
JComponent createTab2(String text) {
String[] columnNames = {"roomId", "nickName", "roomName",
"flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"};
String[] header = {"直播间ID", "主播名称", "直播间名称",
"订阅人数", "订阅变化量", "订阅最高值", "高峰时间点", "订阅最低值", "低谷时间点"};
JPanel jp = new JPanel(new BorderLayout());
List<Map<String, String>> rList = null;
rList = db.selectType(0, null);
Map<String, String> jbMap = new LinkedHashMap<String, String>();
for (Map<String, String> r: rList) {
if (r.get("classifycname").length() != 0)
jbMap.put(r.get("classifycname"), r.get("classifyename"));;
}
JComboBox<String> jb = new JComboBox<String>(jbMap.keySet().toArray(new String[0]));
JScrollPane jsp = new JScrollPane();
jp.add(jsp, BorderLayout.CENTER);
JLabel jl = new JLabel();
jl.setFont(new Font(null, Font.PLAIN, 15));
jl.setHorizontalAlignment(SwingConstants.RIGHT);
jp.add(jl, BorderLayout.SOUTH);
jb.addItemListener(new ItemListener() {
@Override
public void itemStateChanged(ItemEvent e) {
if (e.getStateChange() == ItemEvent.SELECTED) {
String kString = e.getItem().toString();
System.out.println("选中:" + kString);
String [][]rowData = getRowData(3, columnNames, jbMap, kString);
JTable jt = new JTable(rowData, header);
jsp.setViewportView(jt);
jsp.repaint();
jl.removeAll();
jl.setText("["+ kString+ "] 有 ["+ rowData.length+ "] 条数据.");
jl.repaint();
}
}
});
jb.setSelectedIndex(2); // 默认选 熊猫星秀
jp.add(jb, BorderLayout.NORTH);
return jp;
}
JComponent createTab1(String text) {
JPanel jp = new JPanel(new BorderLayout());
JComboBox<String> jb = new JComboBox<String>(new String[] {"订阅量", "直播数"});
JScrollPane jsp = new JScrollPane();
jp.add(jsp, BorderLayout.CENTER);
JLabel jl = new JLabel();
jl.setFont(new Font(null, Font.PLAIN, 15));
jl.setHorizontalAlignment(SwingConstants.RIGHT);
jp.add(jl, BorderLayout.SOUTH);
jb.addItemListener(new ItemListener() {
@Override
public void itemStateChanged(ItemEvent e) {
if (e.getStateChange() == ItemEvent.SELECTED) {
String kString = e.getItem().toString();
System.out.println("选中:" + kString);
int type = kString == "订阅量"? 1: 2;
String[] columnNames = null;
String[] header = null;
if (type == 1) {
columnNames = new String[] {"fSum", "classifycname"};;
header = new String[] {"订阅总人数", "直播类别"};
} else {
columnNames = new String[] {"RSum", "classifycname"};
header = new String[] {"直播间总数", "直播类别"};
}
String [][]rowData = getRowData(type, columnNames, null, null);
JTable jt = new JTable(rowData, header);
jsp.setViewportView(jt);
jsp.repaint();
jl.removeAll();
jl.setText("共 " + rowData.length + " 条数据.");
jl.repaint();
}
}
});
jb.setSelectedIndex(1); // 默认选
jp.add(jb, BorderLayout.NORTH);
return jp;
}
String[][] getRowData(int type, String[] columnNames, Map<String, String> jbMap, String kString) {
List<Map<String, String>> iList = db.selectType(type,
jbMap == null? null: jbMap.get(kString));
String[][] rowData = new String[iList.size()][columnNames.length];
for (int i = 0; i < iList.size(); i++) {
for (int j = 0; j < columnNames.length; j++) {
rowData[i][j] = iList.get(i).get(columnNames[j]);
}
}
return rowData;
}
}
Utils
package Peerslee.HotMonitor.Utils;
/*
* mysql 增删改查
*/
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.mysql.jdbc.Connection;
// 要记得初始化 table
public class MysqlHandler {
public static Connection db;
public static Statement opt;
/*
* 单例模式:双重检验锁(lazy)
* db -> 唯一
*/
private volatile static MysqlHandler mHandler; // 禁止指令重排序,保证创建实例为原子动作
public MysqlHandler(String []params) { //url, username, password
try {
Class.forName("com.mysql.jdbc.Driver");
db = (Connection) DriverManager.getConnection(
params[0], params[1], params[2]);
System.out.println("MySQL: [Connected database successfully.]");
opt = db.createStatement();
} catch (Exception e) {
System.out.println("MySQL: [Connected database error.]");
}
}
public static MysqlHandler getMHandler(String []params) {
if (mHandler == null) {
synchronized (MysqlHandler.class) { // class 锁
if (mHandler == null) {
mHandler = new MysqlHandler(params);
}
}
}
return mHandler;
}
public void createTable(String tName, Map<String, String> params, String pKey) {
StringBuilder sql = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sql.append(tName).append(" (");
for (Entry<String, String> p: params.entrySet()) {
sql.append(p.getKey()).append(" ").append(p.getValue()).append(",");
}
sql.deleteCharAt(sql.length() - 1);
sql.append(pKey.isEmpty()? ")": ",PRIMARY KEY ("+ pKey +"))");
System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
try {
opt.executeUpdate(sql.toString());
} catch (Exception e) {
System.out.println("MySQL: [Create table error.]");
return;
}
System.out.println("MySQL: [Create table successfully.]");
}
public boolean isNullRecord(String tName, String roomId) {
StringBuilder sql = new StringBuilder("SELECT COUNT(*) FROM ");
sql.append(tName).append(" WHERE roomId = ").append(roomId);
System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
try {
ResultSet res = opt.executeQuery(sql.toString());
res.next(); // if the value is SQL NULL, the value returned is 0
return res.getInt(1) == 0? true: false;
} catch (Exception e) {
System.out.println("MySQL: ["+ e.getMessage() +"]");
return false;
}
}
public List<Map<String, String>> selectType(int type, String cEname) {
List<Map<String, String>> resList = new ArrayList<Map<String,String>>();
String sql = null;
String []columns = null;
switch (type) {
case 0: // 类别,根据订阅人数
columns = new String[] {"classifycname", "classifyename"};
sql = "SELECT classifycname, classifyename "
+ "FROM pandatv GROUP BY classifyename ORDER BY SUM(flowerCount) DESC";
break;
case 1: // 类别,根据订阅人数
columns = new String[] {"fSum", "classifycname"};
sql = "SELECT SUM(flowerCount) AS fSum, classifycname "
+ "FROM pandatv GROUP BY classifycname ORDER BY fSum DESC";
break;
case 2: // 类别,根据主播人数
columns = new String[] {"RSum", "classifycname"};
sql = "SELECT COUNT(classifycname) AS rSum, classifycname "
+ "FROM pandatv GROUP BY classifycname ORDER BY rSum DESC";
break;
case 3: // 主播,根据类别
columns = new String[] {"roomId", "nickName", "roomName",
"flowercount", "flowerDist", "flowerMax", "flowerMatime", "flowerMin", "flowerMitime"};
sql = "SELECT roomId, nickName, roomName, flowercount, flowerDist, flowerMax, flowerMatime, flowerMin, flowerMitime "
+ "FROM pandatv "
+ "WHERE classifyename = '"+ cEname+ "' AND flowerCount > 0 "
+ "GROUP BY CAST(flowercount AS DECIMAL) DESC;";
break;
default:
break;
}
System.out.println(sql);
if (sql != null) {
try {
ResultSet rSet = opt.executeQuery(sql.toString());
while (rSet.next()) {
Map<String, String> res = new HashMap<>();
for (String c: columns) { res.put(c, rSet.getString(c)); }
resList.add(res);
}
} catch (Exception e) {
System.out.println("MySQL: [Selecting record error.]");
return null;
}
System.out.println("MySQL: [Select record successfully.]");
} else {
System.out.println("MySQL: [Selecting record null.]");
}
return resList;
}
public String[] selectAll(String tName, String columns, int cNum) {
String sql = "SELECT " + columns + " FROM " + tName; // *, 则需要传入 column 数量
System.out.println("MySQL: [Sql -> " + sql + "]");
try {
ResultSet rSet = opt.executeQuery(sql.toString());
if (rSet.last()) {
String []res = new String[rSet.getRow()];
rSet.beforeFirst();
int i = 0;
StringBuilder sBuilder = new StringBuilder();
while(rSet.next()) {
if (cNum == 0) res[i++] = rSet.getString(columns);
else {
sBuilder.delete(0, sBuilder.length()); // 清空
for (int j = 1; j <= cNum; j++) {
sBuilder.append(rSet.getString(j)).append("::");
}
res[i++] = sBuilder.toString();
}
}
System.out.println("MySQL: [Select all successful.]");
return res;
} else {
System.out.println("MySQL: [There are no rows in the result set.]");
return null;
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("MySQL: [Select all error.]");
return null;
}
}
public List<Map<String, String>> selectRecord(String tName, String roomId, String []Columns) {
List<Map<String, String>> resList = new ArrayList<Map<String,String>>();
StringBuilder sql = new StringBuilder("SELECT ");
for (String c: Columns) sql.append(c).append(", ");
sql.deleteCharAt(sql.length() - 2).append("FROM ").append(tName);
if (!roomId.isEmpty()) sql.append(" WHERE roomId IN ").append(roomId);
System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
try {
ResultSet rSet = opt.executeQuery(sql.toString());
while (rSet.next()) {
Map<String, String> res = new HashMap<>();
for (String c: Columns) res.put(c, rSet.getString(c));
resList.add(res);
}
} catch (Exception e) {
System.out.println("MySQL: [Selecting record error.]");
return null;
}
System.out.println("MySQL: [Select record successfully.]");
return resList;
}
public void insertRecord(String tName, String Columns, String Values) {
StringBuilder sql = new StringBuilder("INSERT INTO ");
sql.append(tName).append(" (").append(Columns).append(") VALUES (").
append(Values).append(")");
System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
try {
opt.executeUpdate(sql.toString());
} catch (Exception e) {
System.out.println("MySQL: [Inserting records error.]");
return;
}
System.out.println("MySQL: [Insert records successfully.]");
}
public void updateRecord(String tName, String roomId, Map<String, String> records) {
if (records.isEmpty()) return;
StringBuilder sql = new StringBuilder("UPDATE ");
sql.append(tName).append(" SET ");
for (Entry<String, String> entry: records.entrySet()) {
sql.append(entry.getKey()).append(" = '").append(entry.getValue()).append("', ");
}
sql.deleteCharAt(sql.length() - 2).append("WHERE roomId = ").append(roomId);
System.out.println("MySQL: [Sql -> " + sql.toString() + "]");
try {
opt.executeUpdate(sql.toString());
} catch (Exception e) {
System.out.println("MySQL: [Updating records error.]");
return;
}
System.out.println("MySQL: [Update records successfully.]");
}
public static void main(String[] args) {
String url = "jdbc:mysql://localhost:3306/HotMonitor";
String username = "root";
String password = "123";
String tName = "PandaTv";
Map<String, String> params = new HashMap<String, String>();
params.put("roomId", "VARCHAR(16) not NULL");
params.put("roomName", "VARCHAR(255)");
params.put("classifyCname", "VARCHAR(16)");
params.put("classifyEname", "VARCHAR(16)");
params.put("userName", "VARCHAR(32)");
params.put("nickName", "VARCHAR(32)");
params.put("flowerCount", "VARCHAR(8)");
params.put("flowerMax", "VARCHAR(8)");
params.put("flowerMatime", "VARCHAR(16)");
params.put("flowerMin", "VARCHAR(8)");
params.put("flowerMitime", "VARCHAR(16)");
params.put("flowerDist", "VARCHAR(8)");
String pKey ="roomId";
MysqlHandler db = MysqlHandler.getMHandler(new String[]{url, username, password});
// db.selectAll("PandaTv", "roomId");
db.createTable(tName, params, pKey);
// System.out.println(db.isNullRecord(tName, "123321"));
// String Columns = "roomId, roomName, classifyCname, classifyEname";
// String Values = "'123321', 'hello', 'eng', 'eng'";
// db.insertRecord(tName, Columns, Values);
// String roomId = "'123321'";
// String []Columns = {"classifyCname", "classifyEname"};
// String []Values = {"'chn'", "'chn'"};
// db.updateRecord(tName, roomId, Columns, Values);
// System.out.println("Result: " + db.selectRecords(tName, roomId, Columns)); // roomId=""
}
}
pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Peerslee</groupId>
<artifactId>HotMonitor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>HotMonitor</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<!-- Json注解支持 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.8.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s001\\data
dataLogDir=D:\\Tools\\zk\\s001\\logs
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s002\\data
dataLogDir=D:\\Tools\\zk\\s002\\logs
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\Tools\\zk\\s003\\data
dataLogDir=D:\\Tools\\zk\\s003\\logs
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
安装包
链接:https://pan.baidu.com/s/1ZnTFX8e8L4t5WerNQN0d4g 密码:lx8r
演示流程
1. 启动 zk 集群
2. 启动CrawlInfo 0,1,2
3. 启动CrawlId
4. Client