系统:windows系统,linux版本暂不提供
源码文件:
WinTimer.h
ThreadLock.h
ThreadLock.cpp
MysqlConnectionPool.h
MysqlConnectionPool.cpp
AgentConnection.h
AgentConnection.cpp
WinTimer.h
#ifndef WINTIMER_H_
#define WINTIMER_H_
#include <Windows.h>
class MyTimer {
public:
MyTimer() : freq_{ 0 }, start_{ 0 }, stop_{ 0 } {
QueryPerformanceFrequency(&freq_);
}
inline void start(){
QueryPerformanceCounter(&start_);
}
inline void stop(){
QueryPerformanceCounter(&stop_);
}
inline double elapse(){
return (stop_.QuadPart - start_.QuadPart) / (double)freq_.QuadPart;
}
inline long long ticks(){
return stop_.QuadPart - start_.QuadPart;
}
private:
LARGE_INTEGER freq_;
LARGE_INTEGER start_;
LARGE_INTEGER stop_;
};
#endif //WINTIMER_H_
ThreadLock.h
#ifndef MYTHREADLOCK_H_
#define MYTHREADLOCK_H_
#include <Windows.h>
class MyCriticalLock {
public:
MyCriticalLock();
~MyCriticalLock();
void lock();
void unlock();
private:
CRITICAL_SECTION critical_section_;
};
#endif //MYTHREADLOCK_H_
ThreadLock.cpp
#include "ThreadLock.h"
MyCriticalLock::MyCriticalLock() {
::InitializeCriticalSection(&critical_section_);
}
MyCriticalLock::~MyCriticalLock() {
::DeleteCriticalSection(&critical_section_);
}
void MyCriticalLock::lock() {
::EnterCriticalSection(&critical_section_);
}
void MyCriticalLock::unlock() {
::LeaveCriticalSection(&critical_section_);
}
MysqlConnectionPool.h
#ifndef MYSQLCONNECTIONPOOL_H_
#define MYSQLCONNECTIONPOOL_H_
#include <queue>
#include "ThreadLock.h"
#include "AgentConnection.h"
class MysqlConnectionPool {
public:
/**
* @nums : 连接数量
* @url : 数据库连接地址
* @userName : 用户名
* @password : 密码
*/
MysqlConnectionPool(const sql::SQLString& url,
const sql::SQLString& userName, const sql::SQLString& password, int nums = 1);
// 原子的获取一个连接
AgentConnection* getAgentConnection();
// 设置连接池中所有连接的数据库实例
void setSchema(const sql::SQLString& catalog);
// 目前总的连接数
int size() { return size_; }
private:
// 释放连接到连接池
friend void AgentConnection::close();
void freeConnection(AgentConnection* conn);
private:
sql::Driver *driver_;
// 总的连接链表
std::list<AgentConnection*> connections_list_;
// 空闲队列
std::queue<AgentConnection*> connection_free_queue_;
// 空闲队列锁
MyCriticalLock lock_;
// 连接池总连接数
int size_;
};
#endif //MYSQLCONNECTIONPOOL_H_
MysqlConnectionPool.cpp
#include "MysqlConnectionPool.h"
MysqlConnectionPool::MysqlConnectionPool(const sql::SQLString& url,
const sql::SQLString& userName, const sql::SQLString& password, int nums) {
size_ = 0;
driver_ = nullptr;
driver_ = get_driver_instance();
for (int i = 0; i < nums; i++) {
sql::Connection* conn = driver_->connect(url, userName, password);
AgentConnection* agentConn = new AgentConnection(conn, this);
connections_list_.push_back(agentConn);
connection_free_queue_.push(agentConn);
size_++;
}
}
AgentConnection* MysqlConnectionPool::getAgentConnection() {
AgentConnection* agentConnection = nullptr;
lock_.lock();
if (connection_free_queue_.empty()) {
lock_.unlock();
return nullptr;
}
agentConnection = connection_free_queue_.front();
connection_free_queue_.pop();
lock_.unlock();
return agentConnection;
}
void MysqlConnectionPool::freeConnection(AgentConnection* conn) {
lock_.lock();
connection_free_queue_.push(conn);
lock_.unlock();
}
void MysqlConnectionPool::setSchema(const sql::SQLString &catalog) {
std::list<AgentConnection*>::iterator it = connections_list_.begin();
for (; it != connections_list_.end(); it++) {
(*it)->setSchema(catalog);
}
}
AgentConnection.h
#ifndef AGENTCONNECTION_H_
#define AGENTCONNECTION_H_
#include "mysql/jdbc.h"
class MysqlConnectionPool;
class AgentConnection {
public:
AgentConnection(sql::Connection* const conn, MysqlConnectionPool* const connection_pool) :
conn_(conn),
connection_pool_(connection_pool) {
}
void clearWarnings();
sql::Statement* createStatement();
void close();
void commit();
bool getAutoCommit();
sql::SQLString getCatalog();
sql::Driver* getDriver();
sql::SQLString getSchema();
sql::SQLString getClientInfo();
void getClientOption(const sql::SQLString& optionName, void* optionValue);
sql::SQLString getClientOption(const sql::SQLString& optionName);
sql::DatabaseMetaData* getMetaData();
sql::enum_transaction_isolation getTransactionIsolation();
const sql::SQLWarning* getWarnings();
bool isClosed();
bool isReadOnly();
bool isValid();
bool reconnect();
sql::SQLString nativeSQL(const sql::SQLString& sql);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int autoGeneratedKeys);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int* columnIndexes);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability);
sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, sql::SQLString columnNames[]);
void releaseSavepoint(sql::Savepoint* savepoint);
void rollback();
void rollback(sql::Savepoint* savepoint);
void setAutoCommit(bool autoCommit);
void setCatalog(const sql::SQLString& catalog);
void setSchema(const sql::SQLString& catalog);
sql::Connection* setClientOption(const sql::SQLString& optionName, const void* optionValue);
sql::Connection* setClientOption(const sql::SQLString& optionName, const sql::SQLString& optionValue);
void setHoldability(int holdability);
void setReadOnly(bool readOnly);
sql::Savepoint* setSavepoint();
sql::Savepoint* setSavepoint(const sql::SQLString& name);
void setTransactionIsolation(sql::enum_transaction_isolation level);
private:
// 连接实例
sql::Connection* conn_;
// 连接池实例
MysqlConnectionPool* connection_pool_;
};
#endif // AGENTCONNECTION_H_
AgentConnection.cpp
#include "AgentConnection.h"
#include "MysqlConnectionPool.h"
void AgentConnection::clearWarnings() {
conn_->clearWarnings();
}
sql::Statement* AgentConnection::createStatement() {
return conn_->createStatement();
}
void AgentConnection::close() {
// 需要修改
connection_pool_->freeConnection(this);
}
void AgentConnection::commit() {
conn_->commit();
}
bool AgentConnection::getAutoCommit() {
return conn_->getAutoCommit();
}
sql::SQLString AgentConnection::getCatalog() {
return conn_->getCatalog();
}
sql::Driver* AgentConnection::getDriver() {
return conn_->getDriver();
}
sql::SQLString AgentConnection::getSchema() {
return conn_->getSchema();
}
sql::SQLString AgentConnection::getClientInfo() {
return conn_->getClientInfo();
}
void AgentConnection::getClientOption(const sql::SQLString& optionName, void* optionValue) {
conn_->getClientOption(optionName, optionValue);
}
sql::SQLString AgentConnection::getClientOption(const sql::SQLString& optionName) {
return conn_->getClientOption(optionName);
}
sql::DatabaseMetaData* AgentConnection::getMetaData() {
return conn_->getMetaData();
}
sql::enum_transaction_isolation AgentConnection::getTransactionIsolation() {
return conn_->getTransactionIsolation();
}
const sql::SQLWarning* AgentConnection::getWarnings() {
return conn_->getWarnings();
}
bool AgentConnection::isClosed() {
return conn_->isClosed();
}
bool AgentConnection::isReadOnly() {
return conn_->isReadOnly();
}
bool AgentConnection::isValid() {
return conn_->isValid();
}
bool AgentConnection::reconnect() {
return conn_->reconnect();
}
sql::SQLString AgentConnection::nativeSQL(const sql::SQLString& sql) {
return conn_->nativeSQL(sql);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql) {
return conn_->prepareStatement(sql);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int autoGeneratedKeys) {
return conn_->prepareStatement(sql, autoGeneratedKeys);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int* columnIndexes) {
return conn_->prepareStatement(sql, columnIndexes);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency) {
return conn_->prepareStatement(sql, resultSetType, resultSetConcurrency);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
return conn_->prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}
sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, sql::SQLString columnNames[]) {
return conn_->prepareStatement(sql, columnNames);
}
void AgentConnection::releaseSavepoint(sql::Savepoint* savepoint) {
conn_->releaseSavepoint(savepoint);
}
void AgentConnection::rollback() {
conn_->rollback();
}
void AgentConnection::rollback(sql::Savepoint* savepoint) {
conn_->rollback(savepoint);
}
void AgentConnection::setAutoCommit(bool autoCommit) {
conn_->setAutoCommit(autoCommit);
}
void AgentConnection::setCatalog(const sql::SQLString& catalog) {
conn_->setCatalog(catalog);
}
void AgentConnection::setSchema(const sql::SQLString& catalog) {
conn_->setSchema(catalog);
}
sql::Connection* AgentConnection::setClientOption(const sql::SQLString& optionName, const void* optionValue) {
return nullptr;
}
sql::Connection* AgentConnection::setClientOption(const sql::SQLString& optionName, const sql::SQLString& optionValue) {
return nullptr;
}
void AgentConnection::setHoldability(int holdability) {
conn_->setHoldability(holdability);
}
void AgentConnection::setReadOnly(bool readOnly) {
conn_->setReadOnly(readOnly);
}
sql::Savepoint* AgentConnection::setSavepoint() {
return conn_->setSavepoint();
}
sql::Savepoint* AgentConnection::setSavepoint(const sql::SQLString& name) {
return conn_->setSavepoint(name);
}
void AgentConnection::setTransactionIsolation(sql::enum_transaction_isolation level) {
conn_->setTransactionIsolation(level);
}
简单测试实例:
#include <iostream>
#include <string>
#include <stdint.h>
#include <time.h>
#include <thread>
#include <regex>
#include "WinTimer.h"
#include "mysql/jdbc.h"
#include "MysqlConnectionPool.h"
#pragma comment( linker, "/subsystem:\"windows\" /entry:\"mainCRTStartup\"" )
using namespace std;
using namespace sql;
typedef unsigned long ulong;
#define NUMOFFSET 100
#define COLNAME 200
// 请求总数
volatile long requests_count_ = 0;
string ip_generator() {
//Sleep(1000);
string ipout;
// 设置随机种子
//srand((int)GetTickCount());
int part_1 = rand() % 238 + 1;
int part_2 = rand() % 255;
int part_3 = rand() % 255;
int part_4 = rand() % 255;
ipout += to_string(part_1);
ipout += ".";
ipout += to_string(part_2);
ipout += ".";
ipout += to_string(part_3);
ipout += ".";
ipout += to_string(part_4);
return ipout;
}
static void retrieve_data_and_print(ResultSet* rs, int type, int colidx, string colname) {
/* retrieve the row count in the result set */
//cout << "\nRetrieved " << rs->rowsCount() << " row(s)." << endl;
if (rs->rowsCount() < 1) {
cout << "the outcome is zero" << endl;
return;
}
/* fetch the data : retrieve all the rows in the result set */
while (rs->next()) {
if (type == NUMOFFSET) {
SQLString s = rs->getString(colidx);
cout << s.c_str() << " ";
}
else if (type == COLNAME) {
cout << rs->getString(colname.c_str()).c_str() << " ";
} // if-else
} // while
cout << endl;
} // retrieve_data_and_print()
static void retrieve_rsmetadata_and_print(ResultSet* rs) {
if (rs->rowsCount() == 0) {
throw runtime_error("ResultSetMetaData FAILURE - no records in the result set");
}
cout << "ResultSet Metadata" << endl;
cout << "------------------" << endl;
/* The following commented statement won't work with Connector/C++ 1.0.5 and later */
//auto_ptr < ResultSetMetaData > res_meta ( rs -> getMetaData() );
ResultSetMetaData* res_meta = rs->getMetaData();
int numcols = res_meta->getColumnCount();
cout << "\nNumber of columns in the result set = " << numcols << endl << endl;
cout.width(20);
cout << "Column Name/Label";
cout.width(20);
cout << "Column Type";
cout.width(20);
cout << "Column Size" << endl;
for (int i = 0; i < numcols; ++i) {
cout.width(20);
cout << res_meta->getColumnLabel(i + 1);
cout.width(20);
cout << res_meta->getColumnTypeName(i + 1);
cout.width(20);
cout << res_meta->getColumnDisplaySize(i + 1) << endl << endl;
}
cout << "\nColumn \"" << res_meta->getColumnLabel(1);
cout << "\" belongs to the Table: \"" << res_meta->getTableName(1);
cout << "\" which belongs to the Schema: \"" << res_meta->getSchemaName(1) << "\"" << endl << endl;
} // retrieve_rsmetadata_and_print()
//字符串分割函数
std::vector<std::string> split(std::string str, const std::string &pattern)
{
int pos = 0;
std::vector<std::string> result;
if (str.empty() || pattern.empty())
return result;
str += pattern;
for (int i = 0; i < str.size();)
{
pos = str.find(pattern, i);
if (pos < str.size())
{
result.push_back(str.substr(i, pos - i));
i = pos + pattern.size();
continue;
}
break;
}
return result;
}
ulong long2ip(const std::string &ip)
{
if (ip.empty())
return 0;
ulong iplong = 0;
std::vector<std::string> partList = split(ip, "."); // 这里还是需要对ip做校验
iplong += atoi(partList[0].c_str()) << 24;
iplong += atoi(partList[1].c_str()) << 16;
iplong += atoi(partList[2].c_str()) << 8;
iplong += atoi(partList[3].c_str());
return iplong;
}
void worker_thread(void* lparam) {
srand((int)GetTickCount());
MysqlConnectionPool* pool = (MysqlConnectionPool*)lparam;
AgentConnection* conn = nullptr;
Statement* stmt = nullptr;
while (1) {
conn = pool->getAgentConnection();
if (nullptr == conn) {
Sleep(100);
continue;
}
string newip = ip_generator();
ulong iplong = long2ip(newip);
std::string ipLocationSql;
ipLocationSql += "SELECT * from ip_data where ip1 <= ";
ipLocationSql += to_string(iplong);
ipLocationSql += " AND ";
ipLocationSql += to_string(iplong);
ipLocationSql += " <= ip2";
MyTimer timer;
ResultSet* res = nullptr;
stmt = conn->createStatement();
timer.start();
res = stmt->executeQuery(ipLocationSql.c_str());
timer.stop();
double intervals = timer.elapse();
res->next();
string ss = res->getString("province").c_str();
// 增加请求计数
InterlockedAdd(&requests_count_, 1);
string debugInfo;
debugInfo += "MYSQL TEST ===== province : ";
debugInfo += ss;
debugInfo += " ======= request counts : ";
debugInfo += to_string(requests_count_);
OutputDebugString(debugInfo.c_str());
OutputDebugString("\n");
conn->close();
conn = nullptr;
if(nullptr != stmt)
delete stmt;
stmt = nullptr;
if (nullptr != res)
delete res;
res = nullptr;
}
}
int main(int argc, char* argv[])
{
AgentConnection* conn = nullptr;
Statement* stmt = nullptr;
MysqlConnectionPool* pool = new MysqlConnectionPool("192.168.213.131:3306", "root", "083415", 10);
pool->setSchema("tcpcc");
// 创建多个执行线程
const int thread_numbers = 16;
std::thread th[thread_numbers];
for (int i = 0; i < thread_numbers; i++) {
Sleep(1000);
th[i] = thread(worker_thread, pool);
}
for (int j = 0; j < thread_numbers; j++)
th[j].join();
return 0;
}