设置变量:
from collections import namedtuple ''' 利用namedtuple 设置变量 ''' default=[None,None] ConfigItem=namedtuple("ConfigItem",["cmd_name","cmd"]) #获取电脑上链接的设备id deviceId=ConfigItem(cmd_name="device_Id",cmd="adb devices | sed -n '2p' | awk '{print $1}'") #获取app的内存 meminfo=ConfigItem(cmd_name="meminfo",cmd="adb shell dumpsys meminfo packageName | sed -n '2p' |awk -F \" \" '{print $2}'") #获取app的CPU cpuinfo=ConfigItem(cmd_name="cpuinfo",cmd="adb shell dumpsys cpuinfo | grep packageName |awk -F \" \" '{print $1}'| awk '{SUM+=$1}'END'{print SUM}'") #获取app(设备)的FPS数据,这个计算的是加载时长,获得FPS需要计算: 1000/(fpsinfo/1000/100) fpsinfo=ConfigItem(cmd_name="fpsinfo",cmd="adb shell dumpsys SurfaceFlinger --latency SurfaceView[packageName]#0 | sed -n '1p' |awk -F \" \" '{print $1}'") #获取页面加载时长,计算页面渲染时间 #用户界面发送请求的时间 + 网络传输时间 + 服务端处理时间 (包括数据层的处理时间)+ 网络传输时间 + 用户端展示返回结果的时间
使用Thread的Timer启动定时任务,抓取性能数据:
import threading import os import sys import subprocess import time import datetime from threading import Event ''' 启动定时器,每隔一定的时间刷新adb数据 gflag:控制子线程结束时间,不然会子线程不会结束 result:返回结果列表 ''' class cmdTimer(): instance=None def __new__(cls, *args, **kwargs): if cls.instance is None: cls.instance = super().__new__(cls) return cls.instance ''' interval:定时任务触发间隔 duration:执行时长 ''' def __init__(self,interval,duration): self.interval=interval self.duration=duration self.gflag=True self.result=[] def run(self,cmd): P=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,encoding="GBK") tmp=P.stdout.readline().strip() #通过adb获取的值计算内存的值mem if "meminfo" in cmd: tmp="{:.2f}".format(int(tmp)/1024/1024) #通过adb获取的值计算FPS的值 if "SurfaceFlinger" in cmd: tmp="{:.2f}".format(1000/(int(tmp)/1000/1000)) if self.gflag: timer=threading.Timer(self.interval,self.run,[cmd]) timer.start() self.result.append(tmp) def runCmd(self,cmd): timer=threading.Timer(self.interval,self.run,[cmd]) timer.start() time.sleep(self.duration) self.gflag=False timer.cancel() #print("The Timer running result is {}".format(str(self.result))) return self.result #测试 if __name__=='__main__': command="adb devices | sed -n '2p' | awk '{print $1}'" print(cmdTimer(1,10).runCmd(command))
利用Process,同时抓取CPU/Mem等性能数据。
import random import setting import multiprocessing import CmdTimer ''' 在测试案例运行中获取安卓包下的性能数据并收集 关键点一: 多进程是并行执行, 相当于分别独立得处理各个请求。 关键点二: 多线程,虽然不能并行运行, 但是可以通过避开阻塞切换线程 来实现并发的效果,并且不浪费cpu ''' #不同Process 使用相同的队列 q = multiprocessing.Queue() jobs = [] def getResult(cmd,q): t=CmdTimer.cmdTimer(1,20).runCmd(cmd) #print(t) q.put(t) def data_collect(): package="XXXXX" #deviceId_cmd=setting.deviceId.cmd cpu_cmd=setting.cpuinfo.cmd.replace("packageName",package) fps_cmd=setting.fpsinfo.cmd.replace("packageName",package) mem_cmd=setting.meminfo.cmd.replace("packageName",package) ''' 这里args必须是tuple target书写的时候不能添加() ''' p1 = multiprocessing.Process(target=getResult,args=(cpu_cmd,q)) p2 = multiprocessing.Process(target=getResult,args=(fps_cmd,q)) p3 = multiprocessing.Process(target=getResult,args=(mem_cmd,q)) # jobs.append(p1) # jobs.append(p2) # jobs.append(p3) # # for p in jobs: # p.start() # # for p in jobs: # p.join() # # results = [q.get() for j in jobs] # return results p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() cpuResult=q.get() fpsResult=q.get() memResult=q.get() return cpuResult,fpsResult,memResult if __name__=='__main__': print(data_collect())