需求:
将消息先保存到redis,然后将redis中的数据定时保存到mysql中
分析:
redis保存为list,然后使用系统的定时任务调用脚本程序,通过脚本调用php文件进行处理。
windows脚本(live.bat):
"D:\Program Files\phpStudy\php\php-5.6.27-nts\php.exe" -f "D:\My\Program\live.gtarcade.com\redis_to_mysql.php"
php代码(redis_to_mysql.php):
<?php
/**
* 连接redis
*/
class Conn_Redis
{
var $redis;
private static $_instance;
private function __construct($host, $password, $port){
$this->redis = new Redis();
/*$this->redis->connect('127.0.0.1', 6379);
$auth = $this->redis->auth('123456');*/
$this->redis->connect($host, $port);
$auth = $this->redis->auth($password);
return $this->redis;
}
static public function getInstance($host, $password, $port){
if (FALSE == (self::$_instance instanceof self)) {
self::$_instance = new self($host, $password, $port);
}
return self::$_instance;
}
private function __clone(){}
public function lPop($key)
{
return $this->redis->lPop("call_log");
}
public function rPush($key, $value)
{
return $this->redis->rPush($key, $value);
}
public function close()
{
return $this->redis->close();
}
}
/**
* 连接mysql
*/
class Conn_Mysql
{
var $conn;
var $redis_study;
private static $_instance = null;
private function __construct($host, $username, $password, $db_name){
// $conn = mysql_connect($host, $username, $password);
$this->conn = @new mysqli($host, $username, $password, $db_name);
if ($this->conn->connect_errno) {
die("could not connect to the database:\n" . $this->conn->connect_error);//诊断连接错误
}
$this->conn->query("set names 'utf8';");//编码转化
/*$select_db = $conn->select_db($db_name);
if (!$select_db) {
die("could not connect to the db:\n" . $conn->error);
}*/
return $this->conn;
}
static public function getInstance($host, $username, $password, $db_name){
// if (is_null(self::$_instance) || isset(self::$_instance)) {
if (FALSE == (self::$_instance instanceof self)) {
self::$_instance = new self($host, $username, $password, $db_name);
}
return self::$_instance;
}
private function __clone(){}
/**
* 查询
*/
public function query($sql, $link = '') {
$this->result = $this->conn->query($sql) or $this->conn->err($sql);
return $this->result;
}
public function close()
{
$this->conn->close();
}
}
class RedisToMysql{
var $redis;
var $prefix;
var $redis_study;
var $db_prefix;
public function __construct($redis_host, $redis_password, $mysql_host, $mysql_username, $mysql_password, $mysql_db_name, $db_prefix, $redis_port=6379, $mysql_port=3306)
{
$this->redis_host = $redis_host;
$this->redis_password = $redis_password;
$this->redis_port = $redis_port;
$this->mysql_host = $mysql_host;
$this->mysql_username = $mysql_username;
$this->mysql_password = $mysql_password;
$this->mysql_port = $mysql_port;
$this->mysql_db_name = $mysql_db_name;
$this->db_prefix = $db_prefix;
$this->redis = Conn_Redis::getInstance($redis_host, $redis_password, $redis_port);
$this->mysql = Conn_Mysql::getInstance($mysql_host, $mysql_username, $mysql_password, $mysql_db_name);
}
public function insert_redis()
{
/*$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$auth = $redis->auth('123456');
echo "<BR>Connection to server sucessfully";*/
$redis = $this->redis;
// 加上时间戳存入队列
$now_time_fir = date("Y-m-d H:i:s");
$now_time_sec = date("Y-m-d H:i:s", time()+300);
$title_fir = '今天是个好天气';
$title_sec = '今天是个好日子';
// $redis->rPush("call_log", [$title_fir . "%" . $now_time_fir, $title_sec . "%" . $now_time_sec]);
$redis->rPush("call_log", [$title_fir . "%" . $now_time_fir, $title_sec . "%" . $now_time_sec]);
// $redis->close();
}
public function redis_to_mysql($table)
{
$redis = $this->redis;
// 获取现有消息队列的长度
$count = 0;
// 获取消息队列的内容,拼接sql
$insert_sql = "insert into " . $this->db_prefix . $table . " (`title`, `create_time`) values ";
// 回滚数组
$roll_back_arr = array();
while ($log_info = $redis->lPop("call_log")){
$roll_back_arr = $log_info;
if ($log_info == 'nil' || !isset($log_info)) {
$insert_sql .= ";";
break;
}
// 切割出时间和info
$log_info_arr = explode("%",$log_info);
$insert_sql .= " ('".$log_info_arr[0]."','".$log_info_arr[1]."'),";
$count++;
}
// 判定存在数据,批量入库
if ($count != 0) {
$insert_sql = rtrim($insert_sql,",").";";
// $res = mysql_query($insert_sql);
// $res = $GLOBALS['db']->query($insert_sql);
$res = $this->mysql->query($insert_sql);
// 输出入库log和入库结果;
/*echo '<BR><BR>'.date("Y-m-d H:i:s")."insert ".$count." log info result:";
echo json_encode($res);*/
// 数据库插入失败回滚
if(!$res){
foreach($roll_back_arr as $k){
$redis->rPush("call_log", $k);
}
}
}
}
public function close()
{
$this->redis->close();
$this->mysql->close();
}
}
$db_prefix = 'fanwe_';
$redis_host = '127.0.0.1';
$redis_password = '123456';
$redis_port = 6379;
$mysql_host = 'localhost';
$mysql_username = 'root';
$mysql_password = 'root';
$mysql_port = '3306';
$mysql_db_name = 'live';
$table = 'study';
$redis_to_mysqls = new RedisToMysql($redis_host, $redis_password, $mysql_host, $mysql_username, $mysql_password, $mysql_db_name, $db_prefix);
$redis_to_mysqls->redis_to_mysql($table);
$redis_to_mysqls->close();
Author:leedaning
本文地址:http://blog.csdn.net/leedaning/article/details/78288386