前面写了一篇专门介绍使用python去做webservice接口自动化测试的文章,然后有小伙伴看完之后反馈说能不能出一篇python做websocket接口自动化的文章,所以今天这篇文章就专门来和大家聊聊这个问题,如何使用python来实现websocket类型的接口自动化。其实不管是http的接口,还是webservice、websocket的接口,做自动化的思路都是一样的,并没有什么很大的差别,无非就是用到的请求库不一样而已。
在python中,发送http请求,比较有名的是requests这个库;webservice的接口可以使用suds-py3来请求。那么websocket怎么请求呢?其实在python中也有很多第三方库可以用来发送websocket请求。比如websocket-client, 那么接下来我们先来看看websocket-client这个库的使用。
由于 websocket-client 是第三方库,使用之前需要先安装,安装直接使用 pip 命令安装即可。
pip install websocket-client728 x 901029 x 127
安装好 websocket-client 这个模块之后,就可以正式开始使用了,关于使用 websocket-client 模块创建 socket 客户端有两种方式。一个是 websocket.create_connection 方法,还有一个是 websocket.WebSocketApp
import json from websocket import create_connection # 1、建立连接 ws = create_connection("ws://127.0.0.1:5000/info") # 2、获取连接状态 print("获取连接状态:", ws.getstatus()) # 3、发送请求参数 ws.send('发送数据 hello musen') # 4、获取返回结果 result = ws.recv() print("接收结果:", result) # 5、关闭连接 ws.close()
关于 create_connection 的使用就是这么几部操作,使用起来也很方便,一般在测试 WebSocket 接口消息发送功能是否正常,发送完消息就断开连接,用这个方法还是比较方便。不过很多情况下 WebSocket 接口只是用来做持续性监听工作,并不涉及到传递请求参数。那么使用 WebSocket 中 WebSocketApp 这个类来测试监听功能会更合适。
如果要对某个接口进行长期的进行监听,监听其功能是否异常,那么就可以使用 webSocketApp 这个类来创建一个长连接进行监听。接下来我们一起来看看 WebSocketApp 这个类源码的初始化方法。
从上述代码中可以看到 WebSocketApp 进行初始化的时候定义了很多的参数。接下来给大家介绍几个常用的参数
其他的参数 大家可以去看官方文档,这边就不给大家一一说明了。
创建了一个连接对象之后,需要不断监听返回的数据,则调用 run_forever 方法,要保持长连接即可,接下来我们来看一个使用案例。
需求:监测 ws://127.0.0.1:5000/info 这个接口接口是否正常。
在下面的代码中我们定义了三个方法
on_message 方法:专门用来监听和处理返回的数据(这边只是打印了一下,没有做特别的处理)
on_error 方法:专门用来监听是否出现错误(这边只是打印了一下错误,没有做特别的处理)
on_close 方法:用来监听连接是否关闭,当连接关闭时,则会调用该方法
import websocket # 定义一个用来接收监听数据的方法 def on_message(ws, message): print("监听到服务器返回的消息,内容如下:") print(message) # 定义一个用来处理错误的方法 def on_error(ws, error): print("-----连接出现异常,异常信息如下-----") print(error) # 定义一个用来处理关闭连接的方法 def on_close(ws): print("-------连接已关闭------") if __name__ == "__main__": ws = websocket.WebSocketApp("ws://127.0.0.1:5000/info", on_message=on_message, on_error=on_error, on_close=on_close, ) ws.run_forever()
关于 websocket-client 的基本使用就给大家介绍到这里,大家有兴趣可以自己去扩展研究。
接下来给大家分享一下如何通过 unittest 去编写 WebSocket 的接口用例。下面是我自己写的一个 WebSocket 的接口
import json import unittest from websocket import create_connection import websocket class TestVote(unittest.TestCase): url = 'ws://127.0.0.1:5000/vote' @classmethod def setUpClass(cls): # 前置方法 websocket.enableTrace(True) # 打开跟踪,查看日志 cls.ws = create_connection(cls.url) # 创建连接 cls.ws.settimeout(10) # 设置超时时间 def test_connect_status(self): """测试连接状态""" # 断言连接状态 self.assertEqual(101, self.ws.getstatus()) def test_send_info(self): # 第一步:准备参数 params = {'id': 1} expected = {'status': "success"} # 第二步:发送请求 self.ws.send(json.dumps(params)) # 获取结果 result = self.ws.recv() res = json.loads(result) # 第三步:断言: print("接收结果:", res) self.assertEqual(expected['status'], res['status']) @classmethod def tearDownClass(cls): cls.ws.close() if __name__ == '__main__': unittest.main()
说明:上面针对于 ws://127.0.0.1:5000/vote 这个接口写了两条用例,一条测试连接状态是否正常,一条测试数据发送的功能是否异常,关于 websockt 接口自动化测试就给大家介绍到这里了。