Python编写的事件驱动的网络引擎
twisted.web
: HTTP 客户端和服务器, HTML 模板, 和一个 WSGI 服务器twisted.conch
: SSHv2 和 Telnet 客户端和服务器,以及一个终端模拟器twisted.words
: IRC, XMPP客户端和服务器, 以及其他 IM(Instant Message) 协议twisted.mail
: IMAPv4, POP3, SMTP 客户端和服务器twisted.positioning
: 和NMEA兼容的 GPS 接受者通信的工具twisted.names
: DNS 客户端 和构建自己DNS服务器的工具twisted.trial
: 单元测试框架Twisted 支持所有主流操作系统的时间循环 -- select (所有平台), poll (大多数 POSIX 平台), epoll (Linux), kqueue (FreeBSD, macOS), IOCP (Windows), 和多种GUI事件循环 (GTK+2/3, Qt, wxWidgets). 第三方reactors 可以插入 Twisted,并提供多种其他事件循环。
pip install twisted
安装支持tls的版本
pip instal twisted[tls]
Twisted使实现定制网络应用变得简单。下面是一个TCP服务器,响应回复发给它的所有文本:
from twisted.internet import protocol, reactor, endpoints class Echo(protocol.Protocol): def dataReceived(self, data): data = "Echo-Server:".encode()+data self.transport.write(data) class EchoFactory(protocol.Factory): def buildProtocol(self, addr): return Echo() endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory()) reactor.run()
使用netcat
进行测试:
$netcat -v 127.0.0.1 1234 Connection to 127.0.0.1 1234 port [tcp/*] succeeded! 你好 Echo-Server:你好 中华人民共和国 Echo-Server:中华人民共和国
Twisted 包含一个事件驱动web服务器. 下面是一个Web应用样例; 注意资源对象如何持久化到内存,而不是每次请求都重新创建
from twisted.web import server, resource from twisted.internet import reactor, endpoints class Counter(resource.Resource): isLeaf = True numberRequests = 0 def render_GET(self, request): self.numberRequests += 1 request.setHeader(b"content-type", b"text/plain") content = u"我是请求 #{}\n".format(self.numberRequests) return content.encode("utf-8") endpoints.serverFromString(reactor, "tcp:8080").listen(server.Site(Counter())) reactor.run()
使用curl
进行测试
$ curl 127.0.0.1:8080 我是请求 #1 $ curl 127.0.0.1:8080 我是请求 #2 $ curl 127.0.0.1:8080 我是请求 #3
下面是一个简单的publish/subscribe 服务器, 客户端可以看到其他客户端发送的所有信息
from twisted.internet import reactor, protocol, endpoints from twisted.protocols import basic class PubProtocol(basic.LineReceiver): def __init__(self, factory): self.factory = factory def connectionMade(self): self.factory.clients.add(self) def connectionLost(self, reason): self.factory.clients.remove(self) def lineReceived(self, line): for c in self.factory.clients: source = u"<{}> ".format(self.transport.getHost()).encode("utf-8") c.sendLine(source + line) class PubFactory(protocol.Factory): def __init__(self): self.clients = set() def buildProtocol(self, addr): return PubProtocol(self) endpoints.serverFromString(reactor, "tcp:1025").listen(PubFactory()) reactor.run()
开启两个终端,使用telnet
进行测试:
客户端1:
telnet 127.0.0.1 1025 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. 今天天气不错,我是客户端#1 <IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 今天天气不错,我是客户端#1 <IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 是啊是啊!是客户端#2
客户端#2
telnet 127.0.0.1 1025 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. <IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 今天天气不错,我是客户端#1 是啊是啊!我是客户端#2 <IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 是啊是啊!是客户端#2
Twisted 包含一个高明的(sophisticated) IMAP4 客户端库.
import sys from twisted.internet import protocol, defer, endpoints, task from twisted.mail import imap4 from twisted.python import failure async def main( reactor, username="TaceyWong", password="secret", strport="tls:example.com:993" ): endpoint = endpoints.clientFromString(reactor, strport) factory = protocol.Factory.forProtocol(imap4.IMAP4Client) try: client = await endpoint.connect(factory) await client.login(username.encode("utf-8"), password.encode("utf-8")) await client.select("INBOX") info = await client.fetchEnvelope(imap4.MessageSet(1)) print("First message subject:", info[1]["ENVELOPE"][1]) except: print("IMAP4 client interaction failed") print(failure.Failure().getTraceback()) task.react(lambda *a, **k: defer.ensureDeferred(main(*a, **k)), sys.argv[1:])
具体可以用自己支持IMAP的邮箱做测试
Twisted 包含 一个SSH 客户端 和 服务器, "conch" (比如: Twisted Shell).
import sys, os from twisted.internet import protocol, defer, endpoints, task from twisted.conch.endpoints import SSHCommandClientEndpoint async def main(reactor, username="tacey", sshhost="192.168.1.123", portno="22"): envAgent = endpoints.UNIXClientEndpoint(reactor, os.environ["SSH_AUTH_SOCK"]) endpoint = SSHCommandClientEndpoint.newConnection( reactor, "echo '你好,世界'", username, sshhost, int(portno), agentEndpoint=envAgent, ) class ShowOutput(protocol.Protocol): received = b"" def dataReceived(self, data): self.received += data def connectionLost(self, reason): finished.callback(self.received) finished = defer.Deferred() factory = protocol.Factory.forProtocol(ShowOutput) await endpoint.connect(factory) print("SSH 响应:", await finished) task.react(lambda *a, **k: defer.ensureDeferred(main(*a, **k)), sys.argv[1:])
(可能需要安装bcrypt——pip install bcrypt
)