这是本人第一次写博客,本人作为网络安全初学者,希望通过写博客的形式,记录、巩固、强化自己学习到的东西。同时希望在写博客的过程中能够发现自己的不足,逐渐提高自己。
本人在学习过程中主要参考书籍为《Python安全攻防——渗透测试指南》
渗透测试的过程主要分为:
最近的任务就是使用python完成第二项:信息收集
刚看了一眼,上一篇博客发布的时间是上周末,距今正好一周,这一周断更的主要原因就是,我不该作死的给电脑装双系统,/(ㄒoㄒ)/~~,本来想给电脑再装个kali,结果安装过程一路通畅,到了启动的时候,grud就给我出幺蛾子 了,两个系统都找不到,新装的kali进不去,以前的win10也进不去。后来,借了个pe系统盘,把新装的kali删除了,但是还是进不去win10,没办法只能重装系统了,第一回重装系统,只格式化了C盘,结果还是不行,没办法,把其他几个盘都格式化了,再次重装,导致我的电脑变得干干净净。然后就开始安装软件,那些乱起八糟的东西装起来还真费劲。前前后后折腾一个星期,绝望!幸好近期写的代码,都在博客里有,才不至于啥也没了。
今天的实验内容是:使用Socket模块编写一个多线程的端口扫描工具。
实验思路大概为:建立端口扫描类PortScaner,继承threading.Thread用以实现多线程,编写StartScan()函数,处理接收到 的参数,并建立线程列表,开启线程。使用optparse模块实现接收命令行参数的功能,接收到参数后传入StartScan()
import sys # socket是计算机之间进行网络通信的一套程序接口 import socket # 用于生成命令行参数形式 import optparse # 多线程模块 import threading # 一个同步的队列类 # 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程 import queue # 端口扫描类 class PortScaner(threading.Thread): # 需要传入端口队列、目标IP、探测超时时间 def __init__(self, portqueue, ip, timeout=3): threading.Thread.__init__(self) self._portqueue = portqueue self._ip = ip self._timeout = timeout def run(self): while True: # 判断端口队列是否为空,若为空则说明探测完毕,跳出循环 if self._portqueue.empty(): break # 取出端口,超时时间为0.5秒 port = self._portqueue.get(timeout=0.5) try: # 创建套接字对象 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置超时时间 s.settimeout(self._timeout) # 连接到address处的远程套接字。 # 类似于 connect(address),但是对于 C 级别的 connect() 调用返回的错误, # 本函数将返回错误指示器,而不是抛出异常(对于其他问题,如“找不到主机”,仍然可以抛出异常)。 # 如果操作成功,则错误指示器为 0,否则为 errno 变量的值。 result_code = s.connect_ex((self._ip, port)) # 用于 print() 和 expression 语句的输出,以及用于 input() 的提示符 # 若端口开放,则会返回0 if result_code == 0: sys.stdout.write("[%d] Open" % port) else: sys.stdout.write("[%d] Close" % port) except Exception as e: print(e) finally: # 关闭套接字 s.close() # 参数为目标IP,端口号,线程数 def StartScan(targetip, port, threadNum): # 定义端口列表 portList = [] # 判断输入的是单一端口还是端口范围 if '-' in port: # 将范围端口号切割 for i in range(int(port.split('-')[0]), int(port.split('-')[1]) + 1): portList.append(int(i)) else: portList.append(int(port)) # 定义线程列表 threads = [] # 端口队列 portQueue = queue.Queue() # 将端口加入端口队列 for port in portList: portQueue.put(port) # 创建线程,加入线程列表 for t in range(threadNum): threads.append(PortScaner(portQueue, targetip, timeout=3)) # 启动线程 for thread in threads: thread.start() # 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 for thread in threads: thread.join() def main(): # 创建命令行解析器,参数Usage为:当程序运行不正确或具有帮助选项时,要打印的使用情况摘要。 parser = optparse.OptionParser("Example: python %prog -i 127.0.0.1 -p 80 \n") # add_option()函数,加入选项 type类型、dest存储的变量、help帮助信息 # 获取目标IP,目标端口号,线程数 parser.add_option('-i', type='string', dest='targetIP', default='127.0.0.1', help='target IP') parser.add_option('-p', type='string', dest='targetPort', default='80', help='scann port') parser.add_option('-t', type='int', dest='threadNum', default='10', help='scann thread number') # options,它是一个对象(optpars.Values),保存有命令行参数值。只要知道命令行参数名,就可以访问其对应的值。 # args,它是一个由 positional arguments 组成的列表。 options, args = parser.parse_args() # 开始探测 StartScan(options.targetIP, options.targetPort, options.threadNum) if __name__ == '__main__': try: main() except KeyboardInterrupt: print("被用户中断,终止所有线程")
开始测试
首先查看一下本机开启的端口
运行程序,开始扫描,-i后面是目标ip,-p是目标端口范围(可以是单一端口),-t是线程数,扫描结果如下:
出现的问题:这次写的工具出现的问题还是运行速度太慢,扫描100-1000之间的端口后,用了将近3分钟,目前并未想到解决办法。