Java教程

如何解决注册并发问题并提高QPS

本文主要是介绍如何解决注册并发问题并提高QPS,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言:前面在本地的windows通过apache的ab工具测试了600并发下“查询指定手机是否存在再提交数据”的注册功能会出现重复提交的情况,并且在注册完成时还需要对邀请人进行奖励,记录邀请记录,对该新用户自动发布动态信息,发短信或发邮件等其他业务功能。所以这里当并发时,注册功能就变得低效且容易出现问题。

方法:先对重复提交的问题通过redis解决,再把注册储存用户基本信息以后的操作放到队列中进行异步执行,可以很好的优化注册功能,提高QPS。


一、环境要求

  • PHP版本 >= 5.6.0

  • PHP框架:Thinkphp5.1.*

  • 消息队列:Think-queue2.0

  • PHP扩展:Redis


二、下载框架和消息队列中间件

1. 下载tp5.1。

composer create-project topthink/think=5.1.* tp5  --prefer-dist

2. 安装think-queue。

composer require topthink/think-queue

 3. php安装redis扩展和打开redis服务端和客户端。


三、解决注册重复提交

1. 配置文件中cache设置为redis驱动,并新建控制器因为cache相关命名空间。

use think\Exception;use think\facade\Cache;use think\facade\Env;use think\Queue;

2. 使用无序集合存手机号,通过判断当前手机号是否是在指定键里为成员(如果注册存入数据库失败,通过sRem删除该成员),然后再通过查询数据库判断是否存在。

private $cache;private  $handler;// 实例化redispublic function __construct() {    $this->cache = Cache::init();    $this->handler = $this->cache->handler();
}// 判断手机号是否在集合中$is_existe = $this->handler->sIsMember("register:mobile",$mobile);if(!$is_existe) {   $this->handler->sAdd("register:mobile",$mobile);
}else {   //Log::write('---压力测试'.date("Y-m-d h:i:s").'---手机号已存在');
   var_dump('手机号已存在');    // 用户已存在
   die;
}// 查询手机号码是否已注册$user = db('user')->field('mobile')->where('mobile', $mobile)->find();if ($user) {    //Log::write('---压力测试'.date("Y-m-d h:i:s").'---手机号注册了');
    var_dump('手机号已注册');    // 用户已存在
    die;
}

四、消息队列分解注册功能

1. 配置消息队列,后面以redis驱动为例。

<?phpreturn [    'connector'  => 'Redis',            // Redis 驱动
    'expire'     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',        // redis 主机ip
    'port'       => 6379,            // redis 端口
    'password'   => '',                // redis 密码
    'select'     => 0,                // 使用哪一个 db,默认为 db0
    'timeout'    => 0,                // redis连接的超时时间
    'persistent' => false,            // 是否是长连接

    //    'connector' => 'Database',   // 数据库驱动
    //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    //    'default'   => 'default',    // 默认的队列名称
    //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
    //    'dsn'       => [],

    //    'connector'   => 'Topthink',    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
    //    'token'       => '',
    //    'project_id'  => '',
    //    'protocol'    => 'https',
    //    'host'        => 'qns.topthink.com',
    //    'port'        => 443,
    //    'api_version' => 1,
    //    'max_retries' => 3,
    //    'default'     => 'default',//        'connector'   => 'Sync',        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行];

2. 完成添加新用户后将指定数据加入消息队列。

<?phpnamespace app\index\controller;use think\Db;use think\Validate;use think\Exception;use think\facade\Cache;use think\facade\Env;use think\Queue;use think\Log;class Index{	
	private $cache;    private  $handler;	public function __construct() {		$this->cache = Cache::init();		$this->handler = $this->cache->handler();
	}    public function index()    {    	$data = input('post.');        unset($data['balance']);        unset($data['credit']);        
        // $blacklist = [
        //     "18124198164","13401363108","17688552009","15089352898","13602940094","13346643336","13181351655","18301123028","13598020751","13014568187",
        //     "13428733909","17337991130","13275342497"
        // ];

        $rule = [            'mobile' => 'require|number|length:11',            'password' => 'require|length:6,32',
        ];        $msg = [            'mobile.require' => '手机号必须',            'mobile.length' => '手机号为11位数字',            'mobile.number' => '手机号为11位数字',            'password.require' => '密码必须',            'password.length' => '密码为6-12位之间',
        ];        //验证数据是否合法
        $mobile = isset($data['mobile']) ? $data['mobile'] : '';        $validate = new Validate($rule, $msg);        $result = $validate->check($data);        if (!$result) {            var_dump($validate->getError());            die;
        }        
        // if(in_array($mobile,$blacklist)) {
        //     var_dump('该手机号已注册了');    // 黑名单
        //     die;
        // }
        
        // 判断手机号是否在集合中
        $is_existe = $this->handler->sIsMember("register:mobile",$mobile);        if(!$is_existe) {            $this->handler->sAdd("register:mobile",$mobile);
        }else {            //Log::write('---压力测试'.date("Y-m-d h:i:s").'---手机号已存在');
            var_dump('手机号已存在');    // 用户已存在
            die;
        }        // 查询手机号码是否已注册
        $user = db('user')->field('mobile')->where('mobile', $mobile)->find();        if ($user) {            //Log::write('---压力测试'.date("Y-m-d h:i:s").'---手机号注册了');
            var_dump('手机号已注册');    // 用户已存在
            die;
        }       

        // 用户不存在注册
        // $data['id']          = getNewUserid();
        $data['no'] = date("Ymdhis").rand(100, 999);        $data['avatar'] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_h.png';        $data['password'] = md5($data['password']);        $randomNickname = date("Ymdhis").rand(100, 999);         
        $data['nickname'] = 'rm_' . $randomNickname;        $data['create_time'] = time();        $data['type'] = 1;        
        /***是否存在邀请人的跑步钱进号***/
        if(isset($data['pbqj_no']) && !empty($data['pbqj_no'])) {            $inviter = db('user')->field('id')->where(["no"=>$data['pbqj_no']])->find();            if($inviter) {                $data['inviter_id'] = $inviter['id'];
            }
        }        /***是否存在邀请人的跑步钱进号***/
        unset($data['pbqj_no']);        $userid = db('user')->insertGetId($data);        if ($userid) {        /******************加入消息队列异步处理后续操作*******************/

            // 1.当前任务将由哪个类来负责处理。 
            // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName  = 'app\index\job\JobUser'; 
            // 2.当前任务归属的队列名称,如果为新队列,会自动创建
            $jobQueueName         = "userJobQueue"; 
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
            //$jobData              = ['ts' => time(), 'bizId' => uniqid() , 'a' => 1];
            $jobData              = ['userid'=>$userid,'time'=>time(),'mobile'=>$mobile,'inviterid'=>(isset($data['inviter_id']) ? $data['inviter_id'] : 0)];            // 4.将该任务推送到消息队列,等待对应的消费者去执行
            $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName);    
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if($isPushed !== false) { 
            	var_dump('加入队列成功');            	die;                //Log::write('-----------加入消息队列成功-----------');
                //echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
            }else{            	var_dump('加入消息队列');            	die;                //Log::write('-----------加入消息队列失败-----------');
                //echo 'Oops, something went wrong.';
            }        /******************加入消息队列异步处理后续操作*******************/

            $res['id'] = $userid;            $res['no'] = $data['no'];            // // token处理类
            // $accessToken = new AccessToken();
            // $accessToken = $accessToken->getToken($userid);

            // if (empty($accessToken)) {
            //     //Log::write('---压力测试'.date("Y-m-d h:i:s").'---秘钥生成失败');
            //     var_dump('秘钥生成失败');
            // } else {
            //     $res['user_token'] = $accessToken;
            // }

            // if (method_exists(\chat\User::class, 'getToken')) {
            //     $chat_token = \chat\User::getToken($res['id'], $data['nickname'], $data['avatar']);
            //     if (!$chat_token) {
            //         //Log::write('---压力测试'.date("Y-m-d h:i:s").'---聊天秘钥生成失败');
            //         var_dump('聊天秘钥生成失败');
            //     } else {
            //         $res['chat_token'] = $chat_token;
            //     }
            // } else {
            //     $res['chat_token'] = '';
            // }
            
            //Log::write('---压力测试'.date("Y-m-d h:i:s").'---注册成功');
            var_dump($res);            die;
        } else {            //Log::write('---压力测试'.date("Y-m-d h:i:s").'---数据库错误');
            $this->handler->sRem("register:mobile",$mobile);            var_dump('数据库错误');            die;
        }
    }    public function hello($name = 'ThinkPHP5')    {        return 'hello,' . $name;
    }
}

 2. 创建消费者(job),对执行队列中的任务。

(1). 在同一模块下新建job文件夹和一个执行类(JobUser), 需要对应生产者中jobHandlerClassName。

(2). 前面执行完队列加入成功后,可以本地使用redis客户端通过lrange queues:userJobQueue 0 -1 查看队列成员 (queues:userJobQueue中,userJobQueue是自己在加入队列前自己起的队列名称,与queues: 拼接就是redis的list的键名,所以可以直接查看 )。

https://img3.sycdn.imooc.com/645b34130001c65a16790220.jpg

(3).队列中的data就是自己传递的数据,后面需要在消费者中通过该数据进行注册功能后的业务操作: 送奖励,存储邀请记录,发动态,发短信,发邮件等等。

<?phpnamespace app\index\job;use think\queue\Job;use think\Db;use think\Exception;use think\facade\Cache;use think\facade\Env;class JobUser {    private  $cache;    private  $handler;    public  function  __construct()    {        $this->cache = Cache::init();        $this->handler = $this->cache->handler();
    }    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job,$data) {        $job->delete();        //print("hahah\n");
        // print("<info>The user already exists "."</info>\n");
        //     exit();
        if(empty($data) || empty($data['userid']) || empty($data['mobile'])) {            $job->delete();            print("canshu buzu\n");            return;
        }        // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);        if(!$isJobStillNeedToBeDone) {            print("hahah\n");            $job->delete();            return;
        }        $isJobDone = $this->doHelloJob($data);        if ($isJobDone) {            //如果任务执行成功, 记得删除任务
            $job->delete();            print("<info>Hello Job has been done and deleted"."</info>\n");
        }else{            if ($job->attempts() > 3) {                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");                //$job->delete();
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data) {        // 判断手机缓存集合中是否存在
        // $is_existe = $this->handler->sIsMember("register:mobile",$data['mobile']);
        // if($is_existe) {
        //     return false;  
        // } 

        // // 查询当前用户是否在数据库中存在
        // $userinfo = Db::name('user')->field('id')->where('id',$data['userid'])->find();
        // if($userinfo) {
        //     return false;  
        // } 

        return true;
    }    /**
     * 根据消息中的数据进行实际的业务处理
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
    */
    private function doHelloJob($data) {        try{            if(isset($data['inviterid']) && !empty($data['inviterid'])) {                // 添加邀请记录
                $res_record = Db::name('user_inviter')
                    ->insert([                        'inviterid'   => $data['inviterid'],                        'userid'      => $data['userid'],                        'code'        => $data['inviterid'] . 'T' . $data['userid'],                        'create_time' => $data['time'],
                ]);                // 给邀请人赠送300步币
                Db::name('user_credit')
                    ->insert([                        'userid'      => $data['inviterid'],                        'type'        => 1,                        'credit'      => 300,                        'source'      => $res_record,                        'create_time' => $data['time']
                ]);                // 更新邀请人步币(用户表)
                Db::name('user')->where('id', $data['inviterid'])->setInc('credit', 300);              
            }
            
            {   // 注册成功发表动态
                $dynamic_data['userid'] = $data['userid'];                $dynamic_data['dynamic'] = base64_encode('号外!号外!我加入跑步钱进了,大家一起走路领红包吧!');                $dynamic_data['images'][] = 'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_d.png';                $dynamic_data['images'] = serialize($dynamic_data['images']);                $dynamic_data['create_time'] = $data['time'];                $result = Db::name('dynamic')->insert($dynamic_data);
            }

        }catch(\Exception $e) {            Log::write('---执行消息队列出错---'.$e->getMessage());            return false;
        }        return true;        // 根据消息中的数据进行实际的业务处理...
        //var_dump($data);//        print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");//        print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");//        print("<info>Hello Job is Done!"."</info> \n");

        //return true;
    }    /**
     * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
     * @param $jobData  string|array|...      //发布任务时传递的 jobData 数据
    */
    public function failed($jobData) {        //send_mail_to_somebody() ;
        print("Warning: Job failed after max retries. job data is :".var_export($jobData,true)."\n");
    }

}

(3). 设置任务执行失败后的处理,比如记录日志或发邮件给开发者。

a. 在tags.php中配置失败后执行了类。

<?php// 应用行为扩展定义文件return [    // 应用初始化
    'app_init'     => [],    // 应用开始
    'app_begin'    => [],    // 模块初始化
    'module_init'  => [],    // 操作开始执行
    'action_begin' => [],    // 视图内容过滤
    'view_filter'  => [],    // 日志写入
    'log_write'    => [],    // 应用结束
    'app_end'      => [],    'queue_failed' => [        // 数组形式,[ 'ClassName' , 'methodName']
        ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']        // 字符串(静态方法),'StaicClassName::methodName'
        // 'MyQueueFailedLogger::logAllFailedQueues'

        // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
        // 'application\\behavior\\MyQueueFailedLogger'

        // 闭包形式
        /*
        function( &$jobObject , $extra){
            // var_dump($jobObject);
            return true;
        }
        */
    ],
    
];

b. 在application目录下创建任务错误执行后的处理脚本,根据业务需求自定。

<?phpnamespace app\behavior;use think\Db;class MyQueueFailedLogger{    const should_run_hook_callback = true;    /**
     * @param $jobObject   \think\queue\Job   //任务对象,保存了该任务的执行情况和业务数据
     * @return bool     true                  //是否需要删除任务并触发其failed() 方法
    */
    public function logAllFailedQueues(&$jobObject) {        $failedJobLog = [            'jobHandlerClassName'   => $jobObject->getName(), // 'application\index\job\Hello'
            'queueName' => $jobObject->getQueue(),			   // 'helloJobQueue'
            'jobData'   => $jobObject->getRawBody()['data'],  // '{'a': 1 }'
            'attempts'  => $jobObject->attempts(),            // 3
        ];        var_export(json_encode($failedJobLog,true));        $data = [            "content" => json_encode($failedJobLog,true),            "create_time" => time(),
        ];        Db::name('ztest')->insertGetId($data);        // $jobObject->release();     //重发任务
        //$jobObject->delete();         //删除任务
        //$jobObject->failed();	  //通知消费者类任务执行失败

        return self::should_run_hook_callback;
    }
}


五、通过命令运行消息队列,以下以windows举栗

1. cmd进入当前项目, 然后输入 "php think queue:listen --queue userJobQueue"   (userJobQueue是自己的队列名)。

2. 也可以在项目的根目录创建bat文件,文件写入"php think queue:listen --queue userJobQueue",保存只需双击就可以执行。

 

六、测试结果

使用了消息队列后,同样610的并发,使用时间就缩短了

https://img1.sycdn.imooc.com/645b3414000193ad07260508.jpg

https://img4.sycdn.imooc.com/645b34140001a27709750734.jpg


这篇关于如何解决注册并发问题并提高QPS的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!