各种网络测试代码:ping、tcp、udp、serial(COM口)

旧代码备忘,以后可能还用得上。
功能:程序加载一组xml配置文件,根据xml文件的描述调用不同的模块测试各种网络是否正常。例如测试COM口连接的装置对收发数据的反馈是否正常,如果不正常,可以判断装置存在问题。

xml配置文件实例:

<testcase port_type="RS232">
  <IOs port_id="1" baud_rate="9600" data_bit="8" stop_bit="0" parity="0" timeout="30000">
    <IO io_type="X" out_data="01 02 03 04 05 06 07 08 09 10"
                    in_data ="01 02 03 04 05 06">  </IO>
  </IOs>
</testcase>

port_type为RS232时调用COM通讯模块,以IOs 的属性值设置端口参数,IOs 下含有多个IO节点为测试数据,out_data为发送数据,in_data 是应该收到的反馈。io_type为X是二进制数据测试、io_type为S时是字符串数据测试。

等待数据反馈的函数:

void waitForReadyRead(quint32 tt){
	const quint32 MAX_TIME_OUT = 1000 * 60;	//最多等一分钟
	const quint32 MIN_TIME_OUT = 100;		//最少等100毫秒
	const quint32 STEP_TIME_OUT = 100;		//每隔一个间隔时间检查一次是否收到信息
	qint32 timeout_temp = tt;
	if(tt == UINT_MAX || timeout_temp > MAX_TIME_OUT){
		timeout_temp = MAX_TIME_OUT;
	}else if(timeout_temp < MIN_TIME_OUT){
		timeout_temp = MIN_TIME_OUT;
	}else{
		timeout_temp = timeout_temp / STEP_TIME_OUT * STEP_TIME_OUT;
	}
	while(timeout_temp > 0 && false == testResult.HaveReceive()){
		QTime t;
		t.start();
		while(t.elapsed() < STEP_TIME_OUT)
			QCoreApplication::processEvents();
		timeout_temp -= STEP_TIME_OUT;
	}
}

COM通信用的开源的qextserialport
GitHub - qextserialport/qextserialport: Automatically exported from code.google.com/p/qextserialport
https://github.com/qextserialport/qextserialport/tree/master
调用示例:


QBool CPortRS232::RunTest(){
	QBool re = QBool(false);
	QString port_name = QObject::tr("COM%1").arg(port_id);//linux下com口名字需要修改
	m_port = new QextSerialPort(port_name, QextSerialPort::EventDriven);
    m_port->setBaudRate(BaudRateType(baud_rate));
    m_port->setFlowControl(FLOW_OFF);												//握手方式
    m_port->setParity(ParityType(parity));											//校验方式
    m_port->setDataBits(DataBitsType(data_bit));
    m_port->setStopBits(StopBitsType(stop_bit));
    m_port->setTimeout(timeout);														//超时
	//
    if(m_port->open(QIODevice::ReadWrite) == true){
        connect(m_port, SIGNAL(dsrChanged(bool)), this, SLOT(onDsrChanged(bool)));
		if(true == connect(m_port, SIGNAL(readyRead()), this, SLOT(onReadyRead()))){		//打开监听
			//if(!(((QextSerialPort*)m_port)->lineStatus() & LS_DSR)){
			//	testResult.Add(EProtocolSerial, EInforError, QObject::tr("串口设备未打开"));
			//	re = QBool(false);
			//}else{
				re = writeIoVector();														//写数据并检查反馈结果
			//}
		}else{
			testResult.Add(EProtocolSerial, EInforError, QObject::tr("无法绑定槽函数"));
			re = QBool(false);
		}
		m_port->close();
    }else{
		testResult.Add(EProtocolSerial, EInforError, QObject::tr("无法打开COM口,可能已被占用或者无此端口:%1").arg(port_name));
    }
	//
    return re;
}

QBool CPortRS232::writeIoVector(){
	qint32 out_size = m_io.size();
	for(qint32 i = 0; i < out_size; ++i){
		testResult.Add(GetEProtocol(), EInforOut, m_io[i]->GetOutStr());
		io_index = i;
		m_port->write(m_io[i]->GetOutData());							//开始发送			
		waitForReadyRead(timeout);										//m_port->waitForReadyRead(timeout);
		if(QBool(true) != testResult.CheckResult(m_io[i])){				//检查结果
			return QBool(false);
		}
		testResult.SetNullIO();											//重置最近的IO信息
	}
	return QBool(true);
}


void CPortRS232::onReadyRead(){
	QByteArray bytes;
	qint64 bOldSize = 0;
	qint64 bMoreSize = m_port->bytesAvailable();
	bytes.resize(bOldSize + bMoreSize);
	while(m_port->read(bytes.data() + bOldSize, bMoreSize) > 0){
		QTime t;
		t.start();
		while(t.elapsed() < 50)
			;
		//
		bOldSize = bytes.size();
		bMoreSize = m_port->bytesAvailable();
		bytes.resize(bOldSize + bMoreSize);
	}
	if(m_io[io_index]->GetType() == EIoS){
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes));
	}else{
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes.toHex().toUpper()));
	}
}

void CPortRS232::onDsrChanged(bool status){
	if (status){
		testResult.Add(EProtocolSerial, EInforEvent, QObject::tr("设备切换到了打开状态"));
	}else{
		testResult.Add(EProtocolSerial, EInforEvent, QObject::tr("设备切换到了关闭状态"));
	}
}

ping测试代码

QBool CPortLan::RunTestPing(){
	QBool bCheckIp = checkSrcIp(src_ip);
	if(QBool(false) == bCheckIp){
		testResult.Add(GetEProtocol(), EInforError, QObject::tr("本机IP地址设置不一致:%1.ping将以不指定源ip地址的方式执行").arg(src_ip.toString()));
	}
	if(QBool(true) == bCheckIp){
		strPingCmd = QString("ping.exe %1 -S %2").arg(des_ip.toString()).arg(src_ip.toString());
        //ping的源代码可以支持指定网卡,但可能无法与qt结合
	}else{
		strPingCmd = QString("ping.exe %1").arg(des_ip.toString());
	}
	testResult.Add(GetEProtocol(), EInforOut, strPingCmd);
	QProcess *ping = new QProcess(this);
	connect(ping, SIGNAL(readyReadStandardOutput()), SLOT(readPingResult()));
	ping->start(strPingCmd);	//默认4次ping
	waitForReadyRead(timeout);	//默认每次ping等待3000ms,累计默认12000ms
	ping->close();
	DelObj(ping);
	return bPingResult;
}

QBool CPortLan::checkSrcIp(const QHostAddress &srcIpPar){
	QList<QHostAddress> list = QNetworkInterface::allAddresses();  
	foreach(QHostAddress address, list){  
		if(address.protocol() == QAbstractSocket::IPv4Protocol
			&& address == srcIpPar)  
			return QBool(true); 
	}
	return QBool(false);
}


void CPortLan::readPingResult(){
	bPingResult = QBool(false);
	QProcess *ping = qobject_cast<QProcess *>(sender());
	if(!ping)
		return;
	QByteArray resByte = ping->readAllStandardOutput();
	QString res = QString::fromLocal8Bit(resByte);//中文乱码问题
	if(!res.contains('%')){//默认发4条ping记录,找最后一条统计信息
		testResult.Add(GetEProtocol(), EInforEvent, tr("%1").arg(res));
		return;
	}
	const int percent = res.mid(res.indexOf('('), res.indexOf(')')).section('%', 0, 0).remove('(').toInt();
	testResult.Add(GetEProtocol(), EInforIn, tr("%1").arg(res));
	if(percent == 100) {//统计结果为100%失败
		testResult.Add(GetEProtocol(), EInforError, QString(strPingCmd + tr(" 失败")));
	}else{
		testResult.Add(GetEProtocol(), EInforEvent, QString(strPingCmd + tr(" 成功(失败概率为百分之%1)").arg(percent)));
		bPingResult = QBool(true);
	}
}

tcp、udp测试


QBool CPortLan::RunTestTcp(){
	m_tcp = new QTcpSocket;
	m_tcp->connectToHost(des_ip, des_port);
	if(!m_tcp->waitForConnected(timeout)){
		testResult.Add(GetEProtocol(), EInforError, QObject::tr("建立TCP连接失败:%1").arg(m_tcp->errorString()));
		return QBool(false);
	}	
	if(m_tcp->open(QIODevice::ReadWrite) == true){
		if(true == connect(m_tcp, SIGNAL(readyRead()), this, SLOT(onReadyReadTcp()))){		//打开监听
			return writeIoVectorTcp();														//写数据并检查反馈结果
		}else{
			testResult.Add(EProtocolSerial, EInforError, QObject::tr("无法绑定槽函数"));
		}
	}else{
		testResult.Add(EProtocolSerial, EInforError, QObject::tr("无法打开网口"));
	}
	return QBool(false);
}
QBool CPortLan::RunTestUdp(){
	if(QBool(false) == checkSrcIp(src_ip)){
		testResult.Add(GetEProtocol(), EInforError, QObject::tr("本机IP地址设置不一致:%1").arg(src_ip.toString()));
		return QBool(false);
	}
	m_udp = new QUdpSocket;
	if(false == m_udp->bind(src_ip, src_port, QUdpSocket::ShareAddress)){			//绑定本机某网卡的特定IP,没效果,应该仅对server端有效
		testResult.Add(GetEProtocol(), EInforError, QObject::tr("端口绑定失败 %1:%2").arg(src_ip.toString()).arg(src_port));
		return QBool(false);
	}
	if(true == connect(m_udp, SIGNAL(readyRead()), this, SLOT(onReadyReadUdp()))){		//打开监听
		return writeIoVectorUdp();														//写数据并检查反馈结果
	}else{
		testResult.Add(EProtocolSerial, EInforError, QObject::tr("无法绑定槽函数"));
	}
	return QBool(false);
}

QBool CPortLan::writeIoVectorTcp(){
	qint32 out_size = m_io.size();
	for(qint32 i = 0; i < out_size; ++i){
		io_index = i;
		testResult.Add(GetEProtocol(), EInforOut, m_io[i]->GetOutStr());
		m_tcp->write(m_io[i]->GetOutData());							//开始发送
		waitForReadyRead(timeout);										//m_port->waitForReadyRead(timeout);
		if(QBool(true) != testResult.CheckResult(m_io[i])){				//检查结果
			return QBool(false);
		}
		testResult.SetNullIO();											//重置最近的IO信息
	}
	return QBool(true);
}

void CPortLan::onReadyReadTcp(){
	QByteArray bytes(m_tcp->bytesAvailable(), 0);
	m_tcp->read(bytes.data(), bytes.size());
	EIoType iot = m_io[io_index]->GetType();
	if(iot == EIoS){
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes));
	}else if(iot == EIoX){
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes.toHex().toUpper()));
	}else{
	}
}

QBool CPortLan::writeIoVectorUdp(){
	qint32 out_size = m_io.size();
	QUdpSocket *pUdpSender = new QUdpSocket;//udp收发两条线
	for(qint32 i = 0; i < out_size; ++i){
		io_index = i;
		testResult.Add(GetEProtocol(), EInforOut, m_io[i]->GetOutStr());
		QByteArray datagram = m_io[i]->GetOutData();
		pUdpSender->writeDatagram(datagram.data(),datagram.size(), des_ip, des_port);  
		waitForReadyRead(timeout);										//m_port->waitForReadyRead(timeout);
		if(QBool(true) != testResult.CheckResult(m_io[i])){				//检查结果
			return QBool(false);
		}
		testResult.SetNullIO();											//重置最近的IO信息
	}
	DelObj(pUdpSender);
	return QBool(true);
}

void CPortLan::onReadyReadUdp(){
	QByteArray bytes;
	while(m_udp->hasPendingDatagrams()){  //拥有等待的数据报   
        QByteArray datagram;
        datagram.resize(m_udp->pendingDatagramSize());  
        m_udp->readDatagram(datagram.data(),datagram.size());  
		bytes += datagram;
    } 
	//
	EIoType iot = m_io[io_index]->GetType();
	if(iot == EIoS){
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes));
	}else if(iot == EIoX){
		testResult.Add(GetEProtocol(), EInforIn, QString(bytes.toHex().toUpper()));
	}else{
	}
}

猜你喜欢

转载自blog.csdn.net/weixin_43172531/article/details/103763229