使用python编写网络工具
介绍基本的网络编程
Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯。使用Python中的socket库就可以进行网络相关的编程。
函数 | 描述 |
---|---|
服务器端套接字 | |
s.bind() | 绑定地址(host,port)到套接字, 在 AF_INET下,以元组(host,port)的形式表示地址。 |
s.listen() | 开始 TCP 监听。backlog 指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为 1,大部分应用程序设为 5 就可以了。 |
s.accept() | 被动接受TCP客户端连接,(阻塞式)等待连接的到来 |
客户端套接字 | |
s.connect() | 主动初始化TCP服务器连接,。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。 |
s.connect_ex() | connect()函数的扩展版本,出错时返回出错码,而不是抛出异常 |
公共用途的套接字函数 | |
s.recv() | 接收 TCP 数据,数据以字符串形式返回,bufsize 指定要接收的最大数据量。flag 提供有关消息的其他信息,通常可以忽略。 |
s.send() | 发送 TCP 数据,将 string 中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于 string 的字节大小。 |
s.sendall() | 完整发送 TCP 数据。将 string 中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回 None,失败则抛出异常。 |
s.recvfrom() | 接收 UDP 数据,与 recv() 类似,但返回值是(data,address)。其中 data 是包含接收数据的字符串,address 是发送数据的套接字地址。 |
s.sendto() | 发送 UDP 数据,将数据发送到套接字,address 是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。 |
s.close() | 关闭套接字 |
import socket host = "www.baidu.com" port = 80 # 创建socket对象 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 连接服务器 client.connect((host,port)) # 发送数据 HTTP请求 client.send(b"GET / HTTP/1.1\r\nHost: baidu.com\r\n\r\n") # 接收响应数据 response = client.recv(4096) # 打印数据 print(response.decode()) client.close()
使用socket函数创建一个socket对象。AF_INET表示使用标准的IPv4地址或主机名,SOCK_STREAM是流式套接字,表示使用TCP。上述程序向www.baidu.com对应的主机发送了一个HTTP GET请求,并且得到了响应
HTTP/1.1 200 OK Accept-Ranges: bytes Cache-Control: no-cache Connection: keep-alive Content-Length: 9508 Content-Type: text/html ...
import socket host = "127.0.0.1" port = 9091 # 创建socket对象 client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 发送数据 client.sendto(b"AAAAAAAAAA",(host,port)) # 接收数据 data,addr = client.recvfrom(4096) # 打印响应信息 print(data.decode()) client.close()
使用UDP时,socket中的函数中的第二个参数变成了SOCK_DGRAM即数据报套接字,并且发送数据时使用sendto函数,接收数据时使用recvfrom函数。
import socket import threading from urllib import request IP = '0.0.0.0' PORT = 9999 def handler(client_socket): with client_socket as sock: request = sock.recv(1024) print(f'[*] Received: {request.decode("utf-8")}') sock.send(b"ACK") def main(): server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind((IP,PORT)) server.listen(5) print(f"[*] Listening on {IP}:{PORT}") while True: client,address = server.accept() print(f"[*] Accepted connection from {address[0]:{address[1]}}") client_handler = threading.Thread(target=handler,args=(client,)) client_handler.start() if __name__ == "__main__": main()
指定服务器应该监听哪个IP地址和端口,接着让服务器开始监听,并且将最大连接数设置为5。下一步让服务器进入主循环,并在该循环中等待外来连接。当一个客户端成功建立连接时,将接收到的客户端socket对象保存到client变量中,将远程连接的详细信息保存到address变量中。然后,创建一个新的线程,让它指向handler函数,并传入client变量。创建好后,启动这个线程来处理接收到的连接,与此同时服务端的主循环也已经准备好处理下一个外来连接,而handler函数会调用recv接收数据,并给客户端发送一段简单的回复。
文档: ipaddress模块介绍
在下面案例中会用到的几个重要方法
import ipaddress addr = ipaddress.ip_address('192.0.2.1') # 返回一个object print(addr.version) # 打印IP版本 net4 = ipaddress.ip_network('192.0.2.0/24') # 返回接口 print(net4.num_addresses) # 答打印地址数 for host in net4.hosts(): # 遍历 print(host)
输出
4 256 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4 192.0.2.5 192.0.2.6 192.0.2.7 192.0.2.8 192.0.2.9 192.0.2.10 192.0.2.11
如上代码迭代显示该网段上的"可用"的独立地址
大部分操作系统都会执行一个操作: 向一台主机发送一格UDP数据包时,如果主机上的UDP端口没有开启,一般会返回一个ICMP包来提示目标端口不可访问,可以以此来判断主机是否存活。
使用Python的socket库,代码如下
import socket import os host = '10.81.226.234' def main(): # 判断系统是否为windows if os.name == 'nt': # windwos允许嗅探任何协议 socket_protocol = socket.IPPROTO_IP else: # Linux强制指定一个协议进行嗅探 socket_protocol = socket.IPPROTO_ICMP # 创建socket对象 sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind((host,0)) # 设置包含IP头 sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1) # 对windows机器 额外启用混杂模式 if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) # 读取一个数据包 print(sniffer.recvfrom(65565)) # 关闭混杂模式 if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF) if __name__ == '__main__': main()
host变量为本机IP地址,然后创建一个socket对象,使用bind函数绑定socket对象,使用setsockopt函数修改socket设置,使其在抓包时包含IP头部。之后会判断平台是否为windows,如果是则启动网卡的混杂模式。
运行输出
$ sudo python3 sock_sniffer.py (b'E\xc0\x00X\x88j\x00\x00@\x01\x17\x04\nQ\xe2\xea\nQ\xe2\xea\x03\x01?+\x00\x00\x00\x00E\x00\x00<\n)@\x00@\x06\xc8\x0f\nQ\xe2\xeao\xb2\x0b\x96\xaf\x1e\x00P\x1f\xd8Y\xfd\x00\x00\x00\x00\xa0\x02\xfa\xf0h\xb2\x00\x00\x02\x04\x05\xb4\x04\x02\x08\n\x9e\x9d\xda}\x00\x00\x00\x00\x01\x03\x03\x07', ('10.81.226.234', 0))
毫无疑问,这些输出相当凌乱,可以将其进行解码
进行更深层次解析分析,提取诸如协议类型、源IP地址和目的IP地址等有用的信息
如下是IPv4报文头
因为是二进制数据,要对IP头各个数据段进行分割
使用ctypes库或者struct库
ctypes提供了各种兼容C语言的数据结构,可调用符合C语言标准的共享库中的函数
文档介绍: ctypes — A foreign function library for Python
示例代码
from ctypes import * import socket import struct from numpy import uint32 class IP(Structure): _fields_ = [ ("ihl", c_ubyte, 4), ("version", c_ubyte, 4), ("tos", c_ushort, 8), ("len", c_ushort, 16), ("id", c_ushort, 16), ("offset", c_ushort, 16), ("ttl", c_ubyte, 8), ("protocol_num", c_ubyte, 8), ("sum", c_ushort, 16), ("src", c_uint32, 32), ("dst", c-uint32, 32) ] def __new__(cls, socket_buffer=None): return cls.from_buffer_copy(socket_buffer) def __init__(self,socket_buffert=None): self.src_address = socket.inet_ntoa(struct.pack("<L",self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
这个类创建了一个名为_fields_的结构,用于定义IP头各个部分。该结构使用了ctypes里定义的C语言数据类型,例如代表unsigned char 类型的c_ubyte,代表unsigned short的c_ushort等。上述代码中定义的字段和上图中的IP头部中的字段一一对应。各个字段的定义有3个字段组成: 字段名称、数据类型以及字段位数。设置字段位数使得能够以位为单位指定数据长度,这意味着能够自由指定想要的长度。
在上述代码中,IP类继承自ctypes库的Structure类,要求创建对象前必须定义_fields_结构。为了向fields结构里填充数据,Structure类利用了_new_函数。此函数的第一个参数是指向当前类的引用,new函数用该引用创建当前类的第一个对象。之后这个对象被传给_init_函数进行初始化。
该库提供了一些格式字符,用来定义二进制数据的结构
import ipaddress import struct class IP: def __init__(self,buff=None): header = struct.unpack('<BBHHHBBH4s4s',buff) self.ver = header[0] >> 4 self.ihl = header[0] & 0xF self.tos = header[1] self.len = header[2] self.id = header[3] self.offset = header[4] self.ttl = header[5] self.protocol = header[6] self.sum = header[7] self.src = header[8] self.dst = header[9] self.src_address = ipaddress.ip_address(self.src) self.dst_address = ipaddress.ip_address(self.dst) # 映射 self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
<BBHHHBBH4s4s
中的<
表示数据字节序,这里表示小端序。
BBHHHBBH4s4s
表示IP头的各部分。struct库中提供了若干格式字符。B是1字节,H是2字节,ns是n个字节的数组。
因为不能按位指定想要的长度了,但这里先获取version,只要取第一个字节的高4位,所以使用>>4
进行偏移,将高4位向低位偏移4位,原来的高位用0补齐,原来的低4为被覆盖,这样就得到了version。ihl是第一个字节的低4位,将第一个字节与二进制数00001111
相与即可得到,对其他字段的获取只要顺次按下标获取即可。
如下是完整代码实现
import ipaddress import os from shutil import ExecError import socket import struct import sys class IP: def __init__(self, buff=None): header = struct.unpack('<BBHHHBBH4s4s', buff) self.ver = header[0] >> 4 self.ihl = header[0] & 0xF self.tos = header[1] self.len = header[2] self.id = header[3] self.offset = header[4] self.ttl = header[5] self.protocol_num = header[6] self.sum = header[7] self.src = header[8] self.dst = header[9] self.src_address = ipaddress.ip_address(self.src) self.dst_address = ipaddress.ip_address(self.dst) # 映射 self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} try: self.protocol = self.protocol_map[self.protocol_num] except: print(f'No protocol {self.protocol_num}') self.protocol = str(self.protocol_num) def sniff(host): if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: raw_buffer = sniffer.recvfrom(65535)[0] ip_header = IP(raw_buffer[0:20]) print( f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}") except KeyboardInterrupt: if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) sys.exit() if __name__ == "__main__": if len(sys.argv) == 2: host = sys.argv[1] else: host = "127.0.0.1" sniff(host)
首先实现IP类,它定义了一个Python结构,可以把数据包的前20字节映射到读/写的IP头对象,将IP数据包中的字段拆解出来,可供用户识别。将之前实现的抓包程序合并进来,就实现了完整的流程。
测试运行:
因为是Linux平台,所以只能看到ICMP协议数据,运行程序后
使用ping命令 ping baidu.com
如下是程序输出
$ sudo python3 ip_decoder.py 10.81.226.234 Protocol: ICMP 10.81.226.234 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Protocol: ICMP 10.81.226.234 -> 10.81.226.234 Protocol: ICMP 220.181.38.148 -> 10.81.226.234
解码了IP层数据后还要解码ICMP响应。不同的ICMP消息之间千差万别,但一定存在3个字段:类型、代码和校验和。类型和代码表示了要接收的ICMP消息是什么类型的,也就指明了如何正确地解码其中的数据。
字段 | 长度 | 含义 |
---|---|---|
Type | 1字节 | 消息类型: - 0:回显应答报文 - 8:请求回显报文 |
Code | 1字节 | 消息代码,此处值为0。 |
Checksum | 2字节 | 检验和。 |
Identifier | 2字节 | 标识符,发送端标示此发送的报文 |
Sequence Number | 2字节 | 序列号,发送端发送的报文的顺序号。每发送一次顺序号就加1。 |
Data | 可变 | 选项数据,是一个可变长的字段,其中包含要返回给发送者的数据。回显应答通常返回与所收到的数据完全相同的数据。 |
如上所示,数据包开头的8个二进制位代表类型,其后的8个二进制位代表ICMP消息代码。
完整代码实现
import ipaddress import os import socket import struct import sys class IP: def __init__(self, buff=None): header = struct.unpack('<BBHHHBBH4s4s', buff) self.ver = header[0] >> 4 self.ihl = header[0] & 0xF self.tos = header[1] self.len = header[2] self.id = header[3] self.offset = header[4] self.ttl = header[5] self.protocol_num = header[6] self.sum = header[7] self.src = header[8] self.dst = header[9] self.src_address = ipaddress.ip_address(self.src) self.dst_address = ipaddress.ip_address(self.dst) # 映射 self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} try: self.protocol = self.protocol_map[self.protocol_num] except: print(f'No protocol {self.protocol_num}') self.protocol = str(self.protocol_num) class ICMP: def __init__(self,buff): header = struct.unpack('<BBHHH',buff) self.type = header[0] self.code = header[1] self.sum = header[2] self.id = header[3] self.seq = header[4] def sniff(host): if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: raw_buffer = sniffer.recvfrom(65535)[0] ip_header = IP(raw_buffer[0:20]) if ip_header.protocol == "ICMP": print(f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}") print(f"Version: {ip_header.ver}") print(f"Header Length: {ip_header.ihl} TTL: {ip_header.ttl}") offset = ip_header.ihl * 4 buf = raw_buffer[offset:offset+8] icmp_header = ICMP(buf) print(f"ICMP -> Type: {icmp_header.type} Code: {icmp_header.code}") except KeyboardInterrupt: if os.name == 'nt': sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) sys.exit() if __name__ == "__main__": if len(sys.argv) == 2: host = sys.argv[1] else: host = "127.0.0.1" sniff(host)
如上代码中增加了ICMP消息结构及其解析。在sniff中也增加了一系列流程:
在接受数据包的主循环中判断接收到的数据包是否为ICMP数据包,然后计算出ICMP数据在原始数据包中的偏移,最后将数据按照ICMP结构进行解析,输出其中的类型(type)和代码(code)字段。IP头的长度是基于IP头中的ihl字段计算的,该字段记录了IP头中有多少个32位(4字节)长的数据块,将这个字段乘以4就能计算出IP头的大小,以及数据包中下一网络层开始的位置。
运行测试
$ sudo python3 icmp_decoder.py 10.81.226.234 Protocol: ICMP 10.81.226.234 -> 10.81.226.234 Version: 4 Header Length: 5 TTL: 64 ICMP -> Type: 3 Code: 1 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Version: 4 Header Length: 5 TTL: 52 ICMP -> Type: 0 Code: 0 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Version: 4 Header Length: 5 TTL: 52 ICMP -> Type: 0 Code: 0 Protocol: ICMP 220.181.38.148 -> 10.81.226.234 Version: 4
完整代码如下
import ipaddress import os import socket import struct import sys import threading import time SUBNET = '10.81.226.0/24' MESSAGE = 'PYTHONRULES' # 签名 # IP消息结构 class IP: def __init__(self, buff=None): header = struct.unpack('<BBHHHBBH4s4s', buff) self.ver = header[0] >> 4 self.ihl = header[0] & 0xF self.tos = header[1] self.len = header[2] self.id = header[3] self.offset = header[4] self.ttl = header[5] self.protocol_num = header[6] self.sum = header[7] self.src = header[8] self.dst = header[9] self.src_address = ipaddress.ip_address(self.src) self.dst_address = ipaddress.ip_address(self.dst) # 映射 self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} try: self.protocol = self.protocol_map[self.protocol_num] except: print(f'No protocol {self.protocol_num}') self.protocol = str(self.protocol_num) # ICMP消息结构 class ICMP: def __init__(self, buff): header = struct.unpack('<BBHHH', buff) self.type = header[0] self.code = header[1] self.sum = header[2] self.id = header[3] self.seq = header[4] # 发送UDP数据包 def udp_sender(): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender: for ip in ipaddress.ip_network(SUBNET).hosts(): # 遍历子网IP地址 sender.sendto(bytes(MESSAGE, 'utf8'), (str(ip), 65212)) # 扫描 class Scanner: def __init__(self, host): self.host = host if os.name == 'nt': socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP self.socket = socket.socket( socket.AF_INET, socket.SOCK_RAW, socket_protocol) self.socket.bind((host, 0)) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == 'nt': self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) # 获取数据包 def sniff(self): hosts_up = set([f"{str(self.host)}"]) try: while True: raw_buffer = self.socket.recvfrom(65535)[0] ip_header = IP(raw_buffer[0:20]) if ip_header.protocol == "ICMP": offset = ip_header.ihl * 4 buff = raw_buffer[offset:offset+8] icmp_header = ICMP(buff) if icmp_header.code == 3 and icmp_header.type == 3: if ipaddress.ip_address(ip_header.src_address) in ipaddress.IPv4Network(SUBNET): if raw_buffer[len(raw_buffer) - len(MESSAGE):] == bytes(MESSAGE, 'utf8'): target = str(ip_header.src_address) if target != self.host and target not in hosts_up: hosts_up.add(str(ip_header.src_address)) print(f"Host UP: {target}") # 处理键盘中断 except KeyboardInterrupt: if os.name == 'nt': self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) print('\nUser interrupted') if hosts_up: print(f'\n\nSummary: Hosts up on {SUBNET}') for host in sorted(hosts_up): print(f'{host}') print('') sys.exit() if __name__ == "__main__": if len(sys.argv) == 2: host = sys.argv[1] else: host = '127.0.0.1' scan = Scanner(host) time.sleep(3) thread = threading.Thread(target=udp_sender) thread.start() scan.sniff()
程序的开头定义了一个字符串作为签名,udp_sender负责遍历子网下的IP地址,发送UDP数据包。接着定义了一个名叫Scanner的类,向其中传入扫描器所在主机的IP地址来初始化它。sniff函数负责嗅探网络上的数据,解析并提取报文格式,并将在线的主机记录下来。接收到ICMP消息时,要检查这个响应是不是来自正在扫描的网段,然后检查ICMP消息里有没有自定义的签名("PYTHONRULES!")。如果检查通过则打印发送这条消息的主机IP地址,如果使用Ctrl+C中断该程序,就会把目前扫描出来的结果打印。
运行测试
$ sudo python3 scanner.py 10.81.226.234 Host UP: 10.81.226.3 Host UP: 10.81.226.1 Host UP: 10.81.226.251 Host UP: 10.81.226.14 Host UP: 10.81.226.51 Host UP: 10.81.226.37 Host UP: 10.81.226.34 Host UP: 10.81.226.39 Host UP: 10.81.226.23 ...
使用Scapy进行流量嗅探
安装python3版本:$ sudo pip3 install scapy
编写一个能分解并输出数据包内容的基础嗅探器。Scapy提供了一个接口函数sniff:
sniff(filter="",iface="any",prn=function,count=N)
filter
参数允许指定一个Berkeley数据包过滤器,用于过滤嗅探到的数据包。将此参数留空则代表要嗅探所有数据包。
iface
参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。
prn
参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器会将该数据包传给这个回调函数,这是该函数接收的唯一参数。
count
参数可以用来指定要嗅探的数据包数量
from scapy.all import sniff # 回调函数 接收数据包 def packet_callback(packet): print(packet.show()) def main(): sniff(prn=packet_callback,count=1) if __name__ == "__main__": main()
如上代码所示,该程序使用sniff获取一个数据包并显示
使用管理员权限运行
$ sudo python3 sniff.py ###[ Ethernet ]### dst = 14:14:4b:81:96:23 src = 34:cf:f6:89:e3:83 type = IPv4 ###[ IP ]### version = 4 ihl = 5 tos = 0x0 len = 60 id = 44584 flags = DF frag = 0 ttl = 64 proto = tcp chksum = 0x934c src = 10.81.226.234 dst = 8.8.4.4 \options \ ###[ TCP ]### sport = 54080 dport = https seq = 2355326062 ack = 0 dataofs = 10 reserved = 0 flags = S window = 64240 chksum = 0xf975 urgptr = 0 options = [('MSS', 1460), ('SAckOK', b''), ('Timestamp', (889222417, 0)), ('NOP', None), ('WScale', 7)] None
使用show方法将数据包显示在终端。可以看到打印出的信息按照数据链路层、网络层、传输层的层次显示。
下面解释上述部分信息的含义:
Ethernet
是以太网,该层下的信息对应了以太网帧结构的一部分: 目的地址、源地址、数据字段。以太网帧中的数据字段存放了IP数据报,到IP层被解析
IP
层下对应的信息是IP数据包中的关键字段:
version
是版本号,4即IPv4,ihl
是报文头长度,以字节为单位
tos
是服务类型,该字段使不同类型的数据报能区分开来,len
是数据报长度
id flags frag
是标识、标志、片偏移,与IP分片有关
ttl
是寿命,即Time-To-Live
,这确保了数据报不会永远在网络中循环,当TTL=0,则该数据报会被丢弃
proto
是协议,该字段值指示了IP数据报的数据部分应交给哪个特定的运输层协议
checksum
是首部校验和,帮助路由器检测数据报中的比特错误
src
和dst
分别是源IP和目的IP
上述打印结果的最后一层是TCP
字段报文结构,sport
是源端口号,dport
是目的端口。
seq
和ack
是序号和确认号,这些字段被用于实现TCP的可靠数据传输服务(三次握手)
概念 | 描述 | 示例 |
---|---|---|
描述词 | 想匹配的数据 | host, net, port |
数据流方向 | 数据行进的方向 | src, dst, src or dst |
通信协议 | 发送数据所用协议 | ip, ip6, tcp, udp |
examples:
只捕获来源于网络中某一IP的主机流量: src host 192.168.10.1
只捕获80端口: port 80
只捕获ICMP流量: ICMP
示例代码
from scapy.all import * def capture(packet): print("*"*30) print(f"source:{packet[IP].src}:{packet.sport} target:{packet[IP].dst}:{packet.sport}") print(packet.show()) print("*"*30) if __name__ == "__main__": packets = sniff(filter="tcp and port 80",prn=capture,count=10) # 保存输出文件 wrpcap("res.pcap",packets)
该程序只获取TCP端口为80的数据包,使用wrpcap函数进行保存,pcap文件可以使用wireshark打开。
send(IP(dst="www.baidu.com",ttl=1)/ICMP())
使用send函数发送ICMP数据包,可以设置字段值,指定协议名称,从第三层发送数据包。
sendp(Ether()/IP(dst="127.0.0.1",ttl=1)/ICMP())
使用sendp函数,从第二层发送数据包,上面两个函数都没有接收功能。
sr函数可以发送数据包,并且接收数据,会返回2个列表,第一个是有应答的answer,第二个是无应答的answer
from scapy.all import sr,sr1,IP,ICMP a,b=sr(IP(dst="www.baidu.com",ttl=1)/ICMP()) print(a.show()) print(b.show())
输出
$ sudo python3 send.py Begin emission: Finished sending 1 packets. .* Received 2 packets, got 1 answers, remaining 0 packets 0000 IP / ICMP 10.81.226.234 > 180.101.49.11 echo-request 0 ==> IP / ICMP 10.81.255.254 > 10.81.226.234 time-exceeded ttl-zero-during-transit / IPerror / ICMPerror / Padding None None
sr1函数只返回1个列表,有应答的answer。
from scapy.all import sr,sr1,IP,ICMP p =sr1(IP(dst="www.baidu.com",ttl=1)/ICMP()) print(p.show())
输出
$ sudo python3 send.py Begin emission: Finished sending 1 packets. .* Received 2 packets, got 1 answers, remaining 0 packets ###[ IP ]### version = 4 ihl = 5 tos = 0xc0 len = 70 id = 27585 flags = frag = 0 ttl = 64 proto = icmp chksum = 0x16ab src = 10.81.255.254 dst = 10.81.226.234 \options \ ###[ ICMP ]### type = time-exceeded code = ttl-zero-during-transit chksum = 0xf4ff reserved = 0 length = 0 unused = 0 ###[ IP in ICMP ]### version = 4 ihl = 5 tos = 0x0 len = 28 id = 1 flags = frag = 0 ttl = 1 proto = icmp chksum = 0xe734 src = 10.81.226.234 dst = 180.101.49.11 \options \ ###[ ICMP in ICMP ]### type = echo-request code = 0 chksum = 0xf7ff id = 0x0 seq = 0x0 unused = '' ###[ Padding ]### load = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' None
发送一个ICMP请求消息给目标主机,若源主机收到目标主机的应答响应消息,则表示目标可达。
from socket import timeout from tabnanny import verbose from scapy.all import IP,ICMP,sr1 from random import randint from optparse import OptionParser import ipaddress import logging # 关闭报错 logging.getLogger("scapy.runtime").setLevel(logging.ERROR) hosts = [] # 存放结果 def scan(ip): # 生成随机字段 ip_id = randint(1,65535) # IP报文标识符 icmp_id = randint(1,65535) # ICMP报文标识符 icmp_seq = randint(1,65535) # ICMP序列号 packet = IP(dst=ip,ttl=64,id=ip_id)/ICMP(id=icmp_id,seq=icmp_seq)/b'rootkit' result = sr1(packet,timeout=1,verbose=False) # 发送并接收数据包 if result: for rcv in result: scan_ip = rcv[IP].src print(scan_ip,'Host is up') hosts.append(scan_ip) else: print(ip,'Host is down') def main(): # 命令行参数选项 parser = OptionParser("Usage: %prog -i <target host>") # 输出帮助信息 parser.add_option('-i',type='string',dest='IP',help='specify target host') # 获取IP地址参数 options,args = parser.parse_args() # 解析参数 print("Scan report for " + options.IP + "\n") # 判断单台主机还是网段 if '/' not in options.IP: scan(options.IP) else: net4 = ipaddress.ip_network(options.IP) for ip in net4.hosts(): scan(str(ip)) print("scan result:") for host in hosts: print(host) if __name__ == "__main__": main()
该程序首先解析命令行参数,然后判断参数是单个IP还是网段,如果是网段则遍历其子网IP进行处理。