pheanstalk_worker.php <?php // Hopefully you're using Composer autoloading. // 定义 BASE_PATH set_time_limit(0); ini_set('memory_limit', '1024M'); define('BASE_PATH', __DIR__); // Autoload 自动载入 require BASE_PATH.'/vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1'); _log('starting to run'); $cnt = 0; $done_jobs = array(); // worker (performs jobs) while(1){ $islive = $pheanstalk->getConnection()->isServiceListening(); if(!$islive){ _log('connection failed'); } $job = $pheanstalk->watch('testtube')->ignore('default')->reserve(); $job_encoded = json_decode($job->getData(),true); $done_jobs[] = $job_encoded; _log('job:'.$job->getData()); $pheanstalk->delete($job); $cnt++ ; $memory = memory_get_usage(); _log('memory:' . $memory.',cnt:' . $cnt); if($memory > 1000000) { //_log('exiting run due to memory limit'); //exit; } usleep(10); } function _log($txt) { file_put_contents('log/worker.txt', $txt . "\n", FILE_APPEND); }
<?php // Hopefully you're using Composer autoloading. // 定义 BASE_PATH define('BASE_PATH', __DIR__); // Autoload 自动载入 require BASE_PATH.'/vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1'); echo "<pre>"; print_R($pheanstalk->stats()); echo "</pre>"; // ---------------------------------------- // producer (queues jobs) for($i=0;$i<10000;$i++){ $job = new stdClass(); $job->envelope_id = rand(); $job->date = date('Y-m-d H:i:s'); $job_data = json_encode($job); $pheanstalk->useTube('testtube')->put($job_data); echo "pushed: " . $job_data . "\n"; } // ---------------------------------------- // check server availability $pheanstalk->getConnection()->isServiceListening(); // true or false