tars-php 改造 invoke 支持多个tcp请求

zz/2024/5/23 0:56:11

现在“微服务”越来越少流行, 公司内部很多接口原来是CGI, 或者直接读DB,都改成用微服务接口,
不用CGI接口,其实挻好的,不好暴露接口到公网,网络延时也少,不需要做鉴权,直接调用微服务接口就可以。
但不能读DB,也调用接口,就稍微有些不便。
打个比方
有一张表,里面有3万条的数据,我要取ID和名字,两个字段,
“ select id,name from table ”就可以 了,
但接口每次返回1000条, 我就要连续查询30次,假如每个查询200ms, 全部查询出来都要6s,
这个结果肯定不能接受, 但tars-php 官方每次查询只支持一个, 所以我们就只能动手去改造了。

支持多个TCP客户端连接

在Tars\client\Communicator 添加 tcpMutipleConnect方法

 /*** 返回多个TCP连接* @param int $connection_quantity 连接数量* @param string $sIp               目标IP* @param int $iPort                机架端口* @param int $timeout_ms_max       连接超时时间* @return array                返回连接* @throws Exception*/public function tcpMutipleConnect(int $connection_quantity, $sIp = '', $iPort = 0,$timeout_ms_max=100){$connection_array=[];//循环执行连接for($i=0;$i<$connection_quantity;$i++){$socket = stream_socket_client("tcp://{$sIp}:{$iPort}",$errno,$errstr,1,STREAM_CLIENT_ASYNC_CONNECT);stream_set_blocking($socket,0);$connection_array[] = array('socket' => $socket);}//记录开始时间和剩下未成功的连接$time_start = microtime(true);$connection_array_no_ok = $connection_array;//逐一判断是否可用,如果出现不可用则返回falsewhile(1){//如果是第一次或者仍然需要处理,则休息一段时间usleep(5 * 1000);//逐一判断是否可用,如果出现不可用则返回falseforeach($connection_array_no_ok as $key1 => $value1){//尝试发送,如果成功则从数组中去掉if(@stream_socket_sendto($value1['socket'],'') >= 0){unset($connection_array_no_ok[$key1]);continue;}}//如果已经处理完则退出if(empty($connection_array_no_ok)){break;}//如果已经超时也退出if((microtime(true) - $time_start) * 1000 >= $timeout_ms_max){break;}}//如果仍然有不OK的连接则返回falseif(empty($connection_array_no_ok) === false){throw new Exception("TCP 多连接不成功");}//逐一设置阻塞foreach($connection_array as $key1 => $value1){stream_set_blocking($value1['socket'],1);}//返回数据return $connection_array;}

异步的多个TCP请求处理

在Tars\client\Communicator 添加 invokeTcpMutiple 方法


/*** 异步的多个TCP请求处理* @param $requestPacket_list  请求列表* @param int $timeout      超时时间 (ms)* @param string $sIp           请求IP* @param int $iPort            请求端口* @param int $connection_quantity  最多并发量 (同时请求的连接数)* @return array                    返回结果* @throws Exception* @throws \Exception*/public function invokeTcpMutiple($requestPacket_list, $timeout=30000, $sIp = '', $iPort = 0,$connection_quantity=200){// 转换成网络需要的timeout$timeout = $timeout / 1000;$count = count($this->_routeInfo) - 1;if ($count === -1) {throw new \Exception('Rout fail', Code::ROUTE_FAIL);}$index = rand(0, $count);$ip = empty($sIp) ? $this->_routeInfo[$index]['sIp'] : $sIp;$port = empty($iPort) ? $this->_routeInfo[$index]['iPort'] : $iPort;$requestPacket_list_count = count($requestPacket_list) ;//检查KEY数量,如果超多则返回falseif($requestPacket_list_count > $connection_quantity){throw new \Exception("已超出最大并发请求:".$connection_quantity);}$connect_info_chunk_array = $this->tcpMutipleConnect($requestPacket_list_count,$ip,$port);//逐个拼装发送数据foreach($requestPacket_list as $key1 => $requestPacket){$socket = $connect_info_chunk_array[$key1]['socket'];$requestBuf = $requestPacket->encode();//执行发送fwrite($socket,$requestBuf, strlen($requestBuf));}//执行接收$read = array();$write  = array();$except = array();$socket_remaining = array();$return_result=[];foreach($connect_info_chunk_array as $key1 => $value1){$socket_remaining[$key1] = array('socket' => $value1['socket'],'result' => '','length' => 0,'is_finish' => false,);$read[] = $value1['socket'];}$time_start = microtime(true);while(($num_changed_streams = stream_select($read,$write,$except,0,200 * 1000)) !== false){//如果已经超时,则退出if(microtime(true) - $time_start >= $timeout){break;}//如果没有变化的则继续等if($num_changed_streams == 0){$read = array();foreach($socket_remaining as $key1 => $value1){if($value1['is_finish'] === false){$read[] = $value1['socket'];}}$write = array();$except = array();continue;}//接收数据foreach($read as $key1 => $value1){//找到这个socket对应的key$socket_belong_key = null;foreach($socket_remaining as $key2 => $value2){if($value1 == $value2['socket']){$socket_belong_key = $key2;break;}}if($socket_belong_key === null){throw new \Exception("出现了未知的Socket");}//如果这个socket已经设置为结束,则跳过if($socket_remaining[$socket_belong_key]['is_finish']){continue;}//获取数据$result = fread($value1, 8192);if(empty($result)){throw new \Exception("数据接收不成功");}//如果是第一次获取则解析头部if($socket_remaining[$socket_belong_key]['length'] == 0){//在这里从第一个包中获取总包长$list = unpack('Nlen', substr($result, 0, 4));$total_length = $list['len'];//记录长度和数据$socket_remaining[$socket_belong_key]['length'] = $total_length;$socket_remaining[$socket_belong_key]['result'] = $result;}//如果不是第一次收包,则累加并判断长度else{//记录数据$socket_remaining[$socket_belong_key]['result'] .= $result;}//如果数据超长则返回错误if(strlen($socket_remaining[$socket_belong_key]['result']) > $socket_remaining[$socket_belong_key]['length']){throw new \Exception("服务器返回数据超过预计长度的数据");}//如果长度已经达到则标记为完成if(strlen($socket_remaining[$socket_belong_key]['result']) == $socket_remaining[$socket_belong_key]['length']){$socket_remaining[$socket_belong_key]['is_finish'] = true;}}//本次所有数据都已经处理完成,准备下一次的句柄列表$read = array();foreach($socket_remaining as $key1 => $value1){if($value1['is_finish'] === false){$read[] = $value1['socket'];}}$write = array();$except = array();//如果已经没有要处理的句柄,则退出if(empty($read)){break;}}//过滤掉未完成的数据foreach($socket_remaining as $key1 => $value1){if($value1['is_finish'] === false){unset($socket_remaining[$key1]);continue;}}//如果全部都没有完成则直接返回结果if(empty($socket_remaining)){return $return_result;}//开始逐一处理foreach($socket_remaining as $key1 => $value1) {//记录$responseBuf = $value1['result'];$responsePacket = new ResponsePacket();$responsePacket->_responseBuf = $responseBuf;$responsePacket->iVersion = $this->_iVersion;$sBuffer = $responsePacket->decode();//请求统计上报
//            $endTime = $this->militime();if(!is_null($this->_locator)){
//                $this->_statF->addStat($requestPacket->_servantName,$requestPacket->_funcName, $sIp,$iPort, ($endTime - $startTime), 0, 0);}$return_result[$key1]=$sBuffer;}return $return_result;}

改造GET方法

(微服务的接口,tars-php根据tars配置文件自动生成)

改造, 让get 支持多个请求,打比方说
select id,name from table 0,1000;
select id,name from table 1000,1000;
select id,name from table 2000,1000;
select id,name from table 3000,1000;

		public function GetMutilple( $TegReqList,GetTegRsp &$rsp) {try {$requestPacketList =[];foreach ($TegReqList  as $key=> $req ){$requestPacket = new RequestPacket();$requestPacket->_iVersion = $this->_iVersion;$requestPacket->_funcName = 'Get';$requestPacket->_servantName = $this->_servantName;$encodeBufs = [];$__buffer = TUPAPIWrapper::putStruct("req",1,$req,$this->_iVersion);$encodeBufs['req'] = $__buffer;$requestPacket->_encodeBufs = $encodeBufs;$requestPacketList[$key] = $requestPacket;}$sBuffer_list = $this->_communicator->invokeTcpMutiple($requestPacketList,$this->_iTimeout);$result_data=[];foreach ($TegReqList as $key=>$value ) {if(empty( $sBuffer_list[$key])){$result_data[$key]=['data'=>[],'info'=>[],];}$sBuffer = $sBuffer_list[$key];$rsp= new GetTegRsp;TUPAPIWrapper::getStruct("rsp",2,$rsp,$sBuffer,$this->_iVersion);$request =  TUPAPIWrapper::getInt32("",0,$sBuffer,$this->_iVersion);$result_data[$key]=['data'=>$rsp,'info'=>$request,];}return $result_data;}catch (\Exception $e) {throw $e;}}

校验结果, 一次请求200个,耗时422ms, 速度大增!

real	0m0.422s
user	0m0.040s
sys	0m0.052s

http://www.ngui.cc/zz/2700298.html

相关文章

golang []map[string][string] , 先按大小,再按键名排序

type IpSortItem struct {Value int json:"value"Name string json:"name" } // 先按数量排序&#xff0c;再按名称排序 func sortIPView(rspData []map[string]string) []*IpSortItem {var totalList []intdataList : make(map[int][]string)for _, …

Java eclipse 设置代码折叠

1 windows->perferences->General->Editors->Structured Text Editors 选中Enable folding 效果 2 windows->perferences->Java->Editors->Folding 效果

javac 命令找不到

环境配置出错 仔细的查看每一个配置 复制黏贴的还是出现错误 在自己的电脑中生成的时候错误 仔细查看对比就可以了 使用命令也可以 set java_homeC:\Program Files\Java\jdk1.6.0_26 安装JDK的根目录 set classpath%JAVA_HOME%\lib\tools.jar;%JAVA_HOME%\lib\dt.jar; …

thinkPHP安装与配置

使用Git安装thinkPHP: 1 git clone https://gitee.com/liu21st/thinkphp5.git tp5 &#xff1b; clone thinkPHP的基础框架 &#xff0c;tp5为文件名字。 2 切换到tp5目录下在命令行中使用 3 git clone https://gitee.com/liu21st/framework.git thinkphp 4 git checko…

多线程同步I/O和单线程异步I/O

多线程同步I/O和单线程异步I/O 2014-08-24 同步I/O与异步I/O 线程在执行中如果遇到磁盘读写或网络通信&#xff08;统称IO操作&#xff09;&#xff0c;通常要耗费较长的时间&#xff0c;这时OS会剥夺此线程的CPU控制权&#xff0c;使其暂停执行&#xff0c;同时将资源让给其…

nodejs 调试技巧

1&#xff0c; 命令行调试 run 执行脚本,在第一行暂停 restart 重新执行脚本 cont, c 继续执行,直到遇到下一个断点 next, n 单步执行 step, s 单步执行并进入函数 out, o 从函数中步出 setBreakpoint(), sb() 在当前行设置断点 setBreakpoint(‘f()’), sb(...) 在函数f的第一…

Zookeeper实战之单机集群模式

Zookeeper的单机模式的安装及应用&#xff0c;但是Zookeeper是为了解决分布式应用场景的&#xff0c;所以通常都会运行在集群模式下。由于手头机器不足&#xff0c;所以打算在一台机器上部署三个Zookeeper服务来组成一个Zookeeper集群。这里解压Zookeeper的安装包到/opt目录下&…

ubuntu系统查找命令

ubuntu系统查找命令 一.以文件名查找&#xff1a; 1. find 命令 find / -name "filename" 目的&#xff1a;在根目录“/”开始搜被称为filename的文件&#xff0c;“filename”文件名可以包含通配符&#xff08;*&#xff0c;&#xff1f;&#xff09;&#xff0…

kudu 文件描述符 更改

kudu 文件描述符超过阀值 kudu 文件描述符缺省打开文件数为32768 在/etc/security/limits.d/下找到了cloudera的limit配置文件&#xff0c;里面限制为32768 /etc/security/limits.d/cloudera-scm.conf 修改&#xff1a; 32768会覆盖系统配置&#xff0c;cm启动的进程最大打…

同一局域网内的数据库的连接

1、连接他人的数据库 1&#xff09;保证两台机器存在同一局域网内&#xff0c;即192.168.后的两个字节是相同的。 例如&#xff1a;教师机的IP4地址是192.168.88.100&#xff0c;你应该将自己的IP4地址设置成192.168.88.53. 2&#xff09;保证教师机的子网掩码与本机相同。 …