安装Pthreads 基本上需要重新编译PHP,加上 --enable-maintainer-zts 参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等
一、安装
这里使用的是 php-7.0.2
./configure --prefix=/usr/local/php7 --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --enable-debug --enable-maintainer-zts --enable-pcntl --enable-fpm --enable-opcache --enable-embed=shared --enable-json=shared --enable-phpdbg --with-curl=shared --with-mysql=/usr/local/mysql --with-mysqli=/usr/local/mysql/bin/mysql_config --with-pdo-mysql
make && make install
安装pthreads
pecl install pthreads
二、Thread
<"Hello World {$this->getThreadId()}\n"; } }; $thread->start() && $thread->join(); #2 class workerThread extends Thread { public function __construct($i){ $this->i=$i; } public function run(){ while(true){ echo $this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=new workerThread($i); $workers[$i]->start(); } "color: #0000ff">三、 Worker 与 Stackable
Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.
<"My Worker Thread"); $sql1 = new SQLQuery('select * from test order by id desc limit 1,5'); $worker->stack($sql1); $sql2 = new SQLQuery('select * from test order by id desc limit 5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); "color: #0000ff">四、 互斥锁
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同
<"/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //没有互斥锁 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥锁 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); "htmlcode"><"Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); "color: #0000ff">五、 线程同步
有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。thread"htmlcode">
<"Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); "color: #0000ff">六、线程池
一个Pool类
<"%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //压入任务到池中 break; }else{ //如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null; } catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n"; } "htmlcode"><"%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null; } catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n"; } "htmlcode"><"%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete; } class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } } } $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->shutdown(); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool);七、多线程文件安全读写
LOCK_SH 取得共享锁定(读取的程序)
LOCK_EX 取得独占锁定(写入的程序
LOCK_UN 释放锁定(无论共享或独占)
LOCK_NB 如果不希望 flock() 在锁定时堵塞
<"/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) { // 进行排它型锁定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 释放锁定 } else { echo "Couldn't get the lock!"; } fclose($fp); $fp = fopen('/tmp/lock.txt', 'r+'); if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1); } fclose($fp); "color: #0000ff">八、多线程与数据连接
pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。
Worker 与 PDO
<"select id,name from members order by id desc limit "; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=...;dbname=example','www',''); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); "htmlcode"># cat pool.php <"mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN='".$this->number['name']."' and CMD='' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); "htmlcode"><"mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > ){ $mobile = substr($mobile, -); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); } } $pool = new Pool(, \ExampleWorker::class, []); #foreach (range(, ) as $number) { # $pool->submit(new Work($number)); #} $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); #print_r($dbh); #$sql = "select id, mobile from members where id < :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':id',); #$sth->execute(); #$result = $sth->fetchAll(); #print_r($result); # #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':mobile', 'aa'); #$sth->bindValue(':id',''); #echo $sth->execute(); #echo $sth->queryString; #echo $sth->debugDumpParams(); $sql = "select id, mobile from members order by id asc"; // limit "; $row = $dbh->query($sql); while($members = $row->fetch(PDO::FETCH_ASSOC)) { #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']); } $pool->shutdown(); "color: #ff0000">多线程中操作数据库总结
总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目
数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。
<?php $dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array( PDO::ATTR_PERSISTENT => true )); ?>关于php pthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com桃源资源网 Design By www.nqtax.com暂无“php pthreads多线程的安装与使用”评论...P70系列延期,华为新旗舰将在下月发布
3月20日消息,近期博主@数码闲聊站 透露,原定三月份发布的华为新旗舰P70系列延期发布,预计4月份上市。
而博主@定焦数码 爆料,华为的P70系列在定位上已经超过了Mate60,成为了重要的旗舰系列之一。它肩负着重返影像领域顶尖的使命。那么这次P70会带来哪些令人惊艳的创新呢?
根据目前爆料的消息来看,华为P70系列将推出三个版本,其中P70和P70 Pro采用了三角形的摄像头模组设计,而P70 Art则采用了与上一代P60 Art相似的不规则形状设计。这样的外观是否好看见仁见智,但辨识度绝对拉满。