mysql的c++调用接口(API) . 实现的简单连接池. 项目笔记

系统:windows系统,linux版本暂不提供

源码文件:

WinTimer.h
ThreadLock.h
ThreadLock.cpp
MysqlConnectionPool.h
MysqlConnectionPool.cpp
AgentConnection.h
AgentConnection.cpp

WinTimer.h

#ifndef WINTIMER_H_
#define WINTIMER_H_

#include <Windows.h>

class MyTimer {
public:

        MyTimer() : freq_{ 0 }, start_{ 0 }, stop_{ 0 } {
                QueryPerformanceFrequency(&freq_);
        }

        inline void start(){
                QueryPerformanceCounter(&start_);
        }

        inline void stop(){
                QueryPerformanceCounter(&stop_);
        }

        inline double elapse(){
                return (stop_.QuadPart - start_.QuadPart) / (double)freq_.QuadPart;
        }

        inline long long ticks(){
                return stop_.QuadPart - start_.QuadPart;
        }
private:
        LARGE_INTEGER freq_;
        LARGE_INTEGER start_;
        LARGE_INTEGER stop_;
};


#endif //WINTIMER_H_

ThreadLock.h

#ifndef MYTHREADLOCK_H_
#define MYTHREADLOCK_H_

#include <Windows.h>

class MyCriticalLock {
public:
	MyCriticalLock();
	~MyCriticalLock();

	void lock();
	void unlock();
private:
	CRITICAL_SECTION critical_section_;
};


#endif //MYTHREADLOCK_H_

ThreadLock.cpp

#include "ThreadLock.h"

MyCriticalLock::MyCriticalLock() {
	::InitializeCriticalSection(&critical_section_);
}

MyCriticalLock::~MyCriticalLock() {
	::DeleteCriticalSection(&critical_section_);
}

void MyCriticalLock::lock() {
	::EnterCriticalSection(&critical_section_);
}

void MyCriticalLock::unlock() {
	::LeaveCriticalSection(&critical_section_);
}

MysqlConnectionPool.h

#ifndef MYSQLCONNECTIONPOOL_H_
#define MYSQLCONNECTIONPOOL_H_

#include <queue>
#include "ThreadLock.h"
#include "AgentConnection.h"

class MysqlConnectionPool {
public:
	/**
	* @nums : 连接数量
	* @url : 数据库连接地址
	* @userName : 用户名
	* @password : 密码
	*/
	MysqlConnectionPool(const sql::SQLString& url, 
		const sql::SQLString& userName, const sql::SQLString& password, int nums = 1);
	// 原子的获取一个连接
	AgentConnection* getAgentConnection();
	// 设置连接池中所有连接的数据库实例
	void setSchema(const sql::SQLString& catalog);
	// 目前总的连接数
	int size() { return size_; }
private:
	// 释放连接到连接池
	friend void AgentConnection::close();
	void freeConnection(AgentConnection* conn);
private:
	sql::Driver *driver_;
	// 总的连接链表
	std::list<AgentConnection*> connections_list_;
	// 空闲队列
	std::queue<AgentConnection*> connection_free_queue_;
	// 空闲队列锁
	MyCriticalLock lock_;
	// 连接池总连接数
	int size_;
};


#endif //MYSQLCONNECTIONPOOL_H_

MysqlConnectionPool.cpp

#include "MysqlConnectionPool.h"

MysqlConnectionPool::MysqlConnectionPool(const sql::SQLString& url, 
	const sql::SQLString& userName, const sql::SQLString& password, int nums) {

	size_ = 0;
	driver_ = nullptr;

	driver_ = get_driver_instance();
	for (int i = 0; i < nums; i++) {

		sql::Connection* conn = driver_->connect(url, userName, password);
		AgentConnection* agentConn = new AgentConnection(conn, this);
		connections_list_.push_back(agentConn);
		connection_free_queue_.push(agentConn);
		size_++;
	}
}

AgentConnection* MysqlConnectionPool::getAgentConnection() {

	AgentConnection* agentConnection = nullptr;
	lock_.lock();
	
	if (connection_free_queue_.empty()) {
		lock_.unlock();
		return nullptr;
	}

	agentConnection = connection_free_queue_.front();
	connection_free_queue_.pop();
	lock_.unlock();
	return agentConnection;
}

void MysqlConnectionPool::freeConnection(AgentConnection* conn) {

	lock_.lock();
	connection_free_queue_.push(conn);
	lock_.unlock();
}

void MysqlConnectionPool::setSchema(const sql::SQLString &catalog) {

	std::list<AgentConnection*>::iterator it = connections_list_.begin();
	for (; it != connections_list_.end(); it++) {
		(*it)->setSchema(catalog);
	}
}

AgentConnection.h

#ifndef AGENTCONNECTION_H_
#define AGENTCONNECTION_H_

#include "mysql/jdbc.h"

class MysqlConnectionPool;

class AgentConnection {
public:
        AgentConnection(sql::Connection* const conn, MysqlConnectionPool* const connection_pool) :
                conn_(conn),
                connection_pool_(connection_pool) {

        }

        void clearWarnings();

        sql::Statement* createStatement();

        void close();

        void commit();

        bool getAutoCommit();

        sql::SQLString getCatalog();

        sql::Driver* getDriver();

        sql::SQLString getSchema();

        sql::SQLString getClientInfo();

        void getClientOption(const sql::SQLString& optionName, void* optionValue);

        sql::SQLString getClientOption(const sql::SQLString& optionName);

        sql::DatabaseMetaData* getMetaData();

        sql::enum_transaction_isolation getTransactionIsolation();

        const sql::SQLWarning* getWarnings();

        bool isClosed();

        bool isReadOnly();

        bool isValid();

        bool reconnect();

        sql::SQLString nativeSQL(const sql::SQLString& sql);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int autoGeneratedKeys);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int* columnIndexes);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability);

        sql::PreparedStatement* prepareStatement(const sql::SQLString& sql, sql::SQLString columnNames[]);

        void releaseSavepoint(sql::Savepoint* savepoint);

        void rollback();

        void rollback(sql::Savepoint* savepoint);

        void setAutoCommit(bool autoCommit);

        void setCatalog(const sql::SQLString& catalog);

        void setSchema(const sql::SQLString& catalog);

        sql::Connection* setClientOption(const sql::SQLString& optionName, const void* optionValue);

        sql::Connection* setClientOption(const sql::SQLString& optionName, const sql::SQLString& optionValue);

        void setHoldability(int holdability);

        void setReadOnly(bool readOnly);

        sql::Savepoint* setSavepoint();

        sql::Savepoint* setSavepoint(const sql::SQLString& name);

        void setTransactionIsolation(sql::enum_transaction_isolation level);
private:
	// 连接实例
	sql::Connection* conn_;
	// 连接池实例
	MysqlConnectionPool* connection_pool_;
};

#endif // AGENTCONNECTION_H_

AgentConnection.cpp

#include "AgentConnection.h"
#include "MysqlConnectionPool.h"

void AgentConnection::clearWarnings() {
	conn_->clearWarnings();
}

sql::Statement* AgentConnection::createStatement() {
	return conn_->createStatement();
}

void AgentConnection::close() {
	// 需要修改
	connection_pool_->freeConnection(this);
}

void AgentConnection::commit() {
	conn_->commit();
}

bool AgentConnection::getAutoCommit() {
	return conn_->getAutoCommit();
}

sql::SQLString AgentConnection::getCatalog() {
	return conn_->getCatalog();
}

sql::Driver* AgentConnection::getDriver() {
	return conn_->getDriver();
}

sql::SQLString AgentConnection::getSchema() {
	return conn_->getSchema();
}

sql::SQLString AgentConnection::getClientInfo() {
	return conn_->getClientInfo();
}

void AgentConnection::getClientOption(const sql::SQLString& optionName, void* optionValue) {
	conn_->getClientOption(optionName, optionValue);
}

sql::SQLString AgentConnection::getClientOption(const sql::SQLString& optionName) {
	return conn_->getClientOption(optionName);
}

sql::DatabaseMetaData* AgentConnection::getMetaData() {
	return conn_->getMetaData();
}

sql::enum_transaction_isolation AgentConnection::getTransactionIsolation() {
	return conn_->getTransactionIsolation();
}

const sql::SQLWarning* AgentConnection::getWarnings() {
	return conn_->getWarnings();
}

bool AgentConnection::isClosed() {
	return conn_->isClosed();
}

bool AgentConnection::isReadOnly() {
	return conn_->isReadOnly();
}

bool AgentConnection::isValid() {
	return conn_->isValid();
}

bool AgentConnection::reconnect() {
	return conn_->reconnect();
}

sql::SQLString AgentConnection::nativeSQL(const sql::SQLString& sql) {
	return conn_->nativeSQL(sql);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql) {
	return conn_->prepareStatement(sql);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int autoGeneratedKeys) {
	return conn_->prepareStatement(sql, autoGeneratedKeys);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int* columnIndexes) {
	return conn_->prepareStatement(sql, columnIndexes);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency) {
	return conn_->prepareStatement(sql, resultSetType, resultSetConcurrency);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) {
	return conn_->prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}

sql::PreparedStatement* AgentConnection::prepareStatement(const sql::SQLString& sql, sql::SQLString columnNames[]) {
	return conn_->prepareStatement(sql, columnNames);
}

void AgentConnection::releaseSavepoint(sql::Savepoint* savepoint) {
	conn_->releaseSavepoint(savepoint);
}

void AgentConnection::rollback() {
	conn_->rollback();
}

void AgentConnection::rollback(sql::Savepoint* savepoint) {
	conn_->rollback(savepoint);
}

void AgentConnection::setAutoCommit(bool autoCommit) {
	conn_->setAutoCommit(autoCommit);
}

void AgentConnection::setCatalog(const sql::SQLString& catalog) {
	conn_->setCatalog(catalog);
}

void AgentConnection::setSchema(const sql::SQLString& catalog) {
	conn_->setSchema(catalog);
}

sql::Connection* AgentConnection::setClientOption(const sql::SQLString& optionName, const void* optionValue) {
	return nullptr;
}

sql::Connection* AgentConnection::setClientOption(const sql::SQLString& optionName, const sql::SQLString& optionValue) {
	return nullptr;
}

void AgentConnection::setHoldability(int holdability) {
	conn_->setHoldability(holdability);
}

void AgentConnection::setReadOnly(bool readOnly) {
	conn_->setReadOnly(readOnly);
}

sql::Savepoint* AgentConnection::setSavepoint() {
	return conn_->setSavepoint();
}

sql::Savepoint* AgentConnection::setSavepoint(const sql::SQLString& name) {
	return conn_->setSavepoint(name);
}

void AgentConnection::setTransactionIsolation(sql::enum_transaction_isolation level) {
	conn_->setTransactionIsolation(level);
}

 简单测试实例:

#include <iostream>
#include <string>
#include <stdint.h>
#include <time.h>
#include <thread>
#include <regex>


#include "WinTimer.h"
#include "mysql/jdbc.h"
#include "MysqlConnectionPool.h"

#pragma comment( linker, "/subsystem:\"windows\" /entry:\"mainCRTStartup\"" )


using namespace std;
using namespace sql;

typedef unsigned long ulong;

#define NUMOFFSET 100
#define COLNAME 200

// 请求总数
volatile long requests_count_ = 0;

string ip_generator() {

	//Sleep(1000);

	string ipout;
	// 设置随机种子
	//srand((int)GetTickCount());
	
	int part_1 = rand() % 238 + 1;
	int part_2 = rand() % 255;
	int part_3 = rand() % 255;
	int part_4 = rand() % 255;

	ipout += to_string(part_1);
	ipout += ".";
	ipout += to_string(part_2);
	ipout += ".";
	ipout += to_string(part_3);
	ipout += ".";
	ipout += to_string(part_4);

	return ipout;
}

static void retrieve_data_and_print(ResultSet* rs, int type, int colidx, string colname) {

        /* retrieve the row count in the result set */
        //cout << "\nRetrieved " << rs->rowsCount() << " row(s)." << endl;
	if (rs->rowsCount() < 1) {
		cout << "the outcome is zero" << endl;
		return;
	}

        /* fetch the data : retrieve all the rows in the result set */
        while (rs->next()) {
                if (type == NUMOFFSET) {
			SQLString s =  rs->getString(colidx);
                        cout << s.c_str() << " ";
                }
                else if (type == COLNAME) {

                        cout << rs->getString(colname.c_str()).c_str() << " ";
                } // if-else
        } // while

        cout << endl;

} // retrieve_data_and_print()


static void retrieve_rsmetadata_and_print(ResultSet* rs) {

	if (rs->rowsCount() == 0) {
		throw runtime_error("ResultSetMetaData FAILURE - no records in the result set");
	}

	cout << "ResultSet Metadata" << endl;
	cout << "------------------" << endl;

	/* The following commented statement won't work with Connector/C++ 1.0.5 and later */
	//auto_ptr < ResultSetMetaData > res_meta ( rs -> getMetaData() );

	ResultSetMetaData* res_meta = rs->getMetaData();

	int numcols = res_meta->getColumnCount();
	cout << "\nNumber of columns in the result set = " << numcols << endl << endl;

	cout.width(20);
	cout << "Column Name/Label";
	cout.width(20);
	cout << "Column Type";
	cout.width(20);
	cout << "Column Size" << endl;

	for (int i = 0; i < numcols; ++i) {
		cout.width(20);
		cout << res_meta->getColumnLabel(i + 1);
		cout.width(20);
		cout << res_meta->getColumnTypeName(i + 1);
		cout.width(20);
		cout << res_meta->getColumnDisplaySize(i + 1) << endl << endl;
	}

	cout << "\nColumn \"" << res_meta->getColumnLabel(1);
	cout << "\" belongs to the Table: \"" << res_meta->getTableName(1);
	cout << "\" which belongs to the Schema: \"" << res_meta->getSchemaName(1) << "\"" << endl << endl;

} // retrieve_rsmetadata_and_print()

//字符串分割函数
std::vector<std::string> split(std::string str, const std::string &pattern)
{
	int pos = 0;
	std::vector<std::string> result;

	if (str.empty() || pattern.empty())
		return result;

	str += pattern;
	for (int i = 0; i < str.size();)
	{
		pos = str.find(pattern, i);

		if (pos < str.size())
		{
			result.push_back(str.substr(i, pos - i));
			i = pos + pattern.size();
			continue;
		}
		break;
	}
	return result;
}

ulong long2ip(const std::string &ip)
{
	if (ip.empty())
		return 0;

	ulong iplong = 0;
	std::vector<std::string> partList = split(ip, "."); // 这里还是需要对ip做校验

	iplong += atoi(partList[0].c_str()) << 24;
	iplong += atoi(partList[1].c_str()) << 16;
	iplong += atoi(partList[2].c_str()) << 8;
	iplong += atoi(partList[3].c_str());

	return iplong;
}

void worker_thread(void* lparam) {

	srand((int)GetTickCount());

	MysqlConnectionPool* pool = (MysqlConnectionPool*)lparam;

	AgentConnection* conn = nullptr;
	Statement* stmt = nullptr;

	while (1) {

		conn = pool->getAgentConnection();

		if (nullptr == conn) {

			Sleep(100);
			continue;
		}

		string newip = ip_generator();
		ulong iplong = long2ip(newip);

		std::string ipLocationSql;
		ipLocationSql += "SELECT * from ip_data where ip1 <= ";
		ipLocationSql += to_string(iplong);
		ipLocationSql += " AND ";
		ipLocationSql += to_string(iplong);
		ipLocationSql += " <= ip2";

		MyTimer timer;
		ResultSet* res = nullptr;
		stmt = conn->createStatement();

		timer.start();
		res = stmt->executeQuery(ipLocationSql.c_str());
		timer.stop();
		double intervals = timer.elapse();

		res->next();
		string ss = res->getString("province").c_str();
		// 增加请求计数
		InterlockedAdd(&requests_count_, 1);

		string debugInfo;
		debugInfo += "MYSQL TEST ===== province : ";
		debugInfo += ss;
		debugInfo += " ======= request counts : ";
		debugInfo += to_string(requests_count_);

		OutputDebugString(debugInfo.c_str());
		OutputDebugString("\n");

		conn->close();
		conn = nullptr;

		if(nullptr != stmt)
			delete stmt;
		stmt = nullptr;

		if (nullptr != res)
			delete res;
		res = nullptr;	
	}
}

int main(int argc, char* argv[])
{
	
	AgentConnection* conn = nullptr;
	Statement* stmt = nullptr;

	MysqlConnectionPool* pool = new MysqlConnectionPool("192.168.213.131:3306", "root", "083415", 10);
	pool->setSchema("tcpcc");

	// 创建多个执行线程
	const int thread_numbers = 16;
	std::thread th[thread_numbers];

	for (int i = 0; i < thread_numbers; i++) {

		Sleep(1000);
		th[i] = thread(worker_thread, pool);
	}

	for (int j = 0; j < thread_numbers; j++)
		th[j].join();

	return 0;
}

猜你喜欢

转载自blog.csdn.net/paradox_1_0/article/details/105932958