组成
NSQ 是一个实时分布式消息传递平台。
保证与任何分布式系统一样,实现目标是进行智能权衡。 通过对这些权衡的实际情况保持透明,我们希望对 NSQ 在生产中部署时的行为有所期待。
NSQ 是 simplequeue(simplehttp 的一部分)的继承者,因此设计为(没有特定的顺序):
单个 nsqd 实例旨在一次处理多个数据流。 流称为“主题”,主题具有 1 个或多个“通道”。 每个频道都会收到主题所有消息的副本。 实际上,通道映射到消耗主题的下游服务。
主题和渠道没有先验配置。 通过发布到命名主题或通过订阅命名主题上的通道来首次使用时创建主题。 首次使用时,通过订阅指定的频道创建频道。
主题和通道都相互独立地缓冲数据,防止缓慢的消费者导致其他渠道的积压(同样适用于主题级别)。
通道可以并且通常也会连接多个客户端。 假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。 例如:
总而言之,消息是从主题 - >通道(每个通道接收该主题的所有消息的副本)多播的,但是从通道 - >消费者均匀分布(每个消费者接收该通道的一部分消息)。
NSQ 还包括一个辅助应用程序 nsqlookupd,它提供了一个目录服务,消费者可以在其中查找提供他们有兴趣订阅的主题的 nsqd 实例的地址。 在配置方面,这将消费者与生产者分离(他们都只需要知道在哪里联系 nsqlookupd 的常见实例,从不相互联系),从而降低了复杂性和维护。
在较低级别,每个 nsqd 具有与 nsqlookupd 的长期 TCP 连接,在该连接上它定期推送其状态。 此数据用于通知 nsqlookupd 将为消费者提供哪些 nsqd 地址。 对于消费者,将公开 HTTP /lookup
端点以进行轮询。
要引入主题的新的不同使用者,只需启动配置了 nsqlookupd 实例的地址的 NSQ 客户端。 添加新的使用者或新发布者不需要进行任何配置更改,从而大大降低了开销和复杂性。
注意:在将来的版本中,启发式 nsqlookupd 用于返回地址可以基于深度,连接的客户端数量或其他“智能”策略。 目前的实施就是全部。 最终,目标是确保所有生产者都被阅读,使深度保持接近零。
值得注意的是,nsqd 和 nsqlookupd 守护进程旨在独立运行,不需要兄弟之间的通信或协调。
我们还认为,有一种方法可以集中查看,内省和管理集群。 我们建立了 nsqadmin 来做到这一点。 它提供了一个 Web UI 来浏览主题/渠道/消费者的层次结构,并检查每个层的深度和其他关键统计数据。 此外,它还支持一些管理命令,例如删除和清空通道(当通道中的消息可以安全地丢弃以便将深度恢复为 0 时,这是一个有用的工具)。
这是我们的最高优先事项之一。 我们的生产系统处理大量流量,所有流量都建立在我们现有的消息传递工具之上,因此我们需要一种方法来缓慢而有条不紊地升级基础架构的特定部分,几乎没有影响。
首先,在消息生产者方面,我们构建了 nsqd 来匹配 simplequeue。 具体来说,nsqd 将 HTTP /put
端点(就像 simplequeue 一样)暴露给 POST 二进制数据(有一点需要注意,端点需要另外一个指定“主题”的查询参数)。 想要切换到开始发布到 nsqd 的服务只需要进行少量的代码更改。
其次,我们在 Python 和 Go 中构建了库,这些库与我们现有库中习惯使用的功能和习惯相匹配。 通过将代码更改限制为引导,这简化了消息使用者方面的转换。 所有业务逻辑都保持不变。
最后,我们构建了实用程序来将新旧组件粘合在一起。 这些都可以在存储库的 examples 目录中找到:
NSQ 旨在以分布式方式使用。 nsqd 客户端(通过 TCP)连接到提供指定主题的所有实例。 没有中间人,没有消息经纪人,也没有 SPOF:
此拓扑消除了链接单个聚合订阅源的需要。 而是直接从所有生产者消费。 从技术上讲,哪个客户端连接到哪个 NSQ 并不重要,只要有足够的客户端连接到所有生产者以满足消息量,就可以保证所有客户端最终都会被处理。
对于 nsqlookupd,通过运行多个实例来实现高可用性。 它们不直接相互通信,数据被认为最终是一致的。 消费者轮询所有已配置的 nsqlookupd 实例并将响应联合起来。 陈旧,不可访问或其他故障节点不会使系统停止运行。
NSQ 保证消息将至少传送一次,尽管可能存在重复消息。 消费者应该期待这一点并重复数据删除或执行幂等操作。
此保证作为协议的一部分强制执行,其工作方式如下(假设客户端已成功连接并订阅了某个主题):
这可确保导致消息丢失的唯一边缘情况是 nsqd 进程的不正常关闭。 在这种情况下,内存中的任何消息(或任何未刷新到磁盘的缓冲写入)都将丢失。
如果防止消息丢失是最重要的,那么即使是这种边缘情况也可以减轻。 一种解决方案是站起来接收相同部分消息副本的冗余 nsqd 对(在不同的主机上)。 因为您已经将您的消费者编写为幂等的,所以对这些消息进行双倍时间没有下游影响,并且允许系统在不丢失消息的情况下忍受任何单个节点故障。
需要注意的是,NSQ 提供了构建模块,以支持各种生产用例和可配置的耐用性。
nsqd 提供了一个配置选项--mem-queue-size,它将确定给定队列在内存中保留的消息数。 如果队列的深度超过此阈值,则消息将透明地写入磁盘。 这将给定 nsqd 进程的内存占用限制为 mem-queue-size * #_of_channels_and_topics
此外,精明的观察者可能已经确定这是通过将此值设置为低(例如 1 或甚至 0)来获得更高的交付保证的便捷方式。 磁盘支持的队列旨在经受不干净的重启(尽管消息可能会被传递两次)。
此外,与消息传递保证相关,干净关闭(通过向 nsqd 进程发送 TERM 信号)可以安全地保留当前在内存中,传输中,延迟和各种内部缓冲区中的消息。
注意,名称以字符串``#ephemeral`结尾的主题/频道不会缓存到磁盘,而是在传递 mem-queue-size 后丢弃消息。 这使得不需要消息保证的消费者能够订阅频道。 在最后一个客户端断开连接后,这些短暂的通道也将消失。 对于短暂的主题,这意味着已经创建,使用和删除了至少一个频道(通常是短暂的频道)。
NSQ 旨在通过“memcached-like”命令协议与简单的大小前缀响应进行通信。所有消息数据都保存在核心中,包括尝试次数,时间戳等元数据。这消除了从服务器到客户端来回复制数据,这是重新排队消息时先前工具链的固有属性。这也简化了客户端,因为他们不再需要负责维护消息状态。
此外,通过降低配置复杂性,设置和开发时间大大减少(特别是在主题有> 1 个消费者的情况下)。
对于数据协议,我们做出了一项关键设计决策,通过将数据推送到客户端而不是等待它来提高性能和吞吐量。这个概念,我们称之为 RDY 状态,实质上是客户端流控制的一种形式。
当客户端连接到 nsqd 并订阅某个通道时,它将处于 RDY 状态 0.这意味着不会向客户端发送任何消息。当客户端准备好接收消息时,它会发送一个命令,将其 RDY 状态更新为准备处理的某些#,例如 100.没有任何其他命令,100 条消息将被推送到客户端,因为它们可用(每次递减)该客户端的服务器端 RDY 计数)。
客户端库旨在发送命令以在达到可配置的max-in-flight
设置的~25%时更新 RDY 计数(并正确考虑到多个 nsqd 实例的连接,进行适当划分)
这是一个重要的性能旋钮,因为一些下游系统能够更容易地批量处理消息并从更高的max-in-flight
中获益。
值得注意的是,因为它具有缓冲和推送功能,能够满足流(通道)的独立副本的需要,所以我们已经生成了一个类似于 simplequeue 和 pubsub 组合的守护进程。 这在简化我们系统的拓扑方面非常强大,我们传统上维护上面讨论的旧工具链。
我们早期做出了战略决策,在 Go 中构建 NSQ 核心。 我们最近在博客上写了关于我们对 Go 的使用,并提到了这个项目 - 浏览该帖子以了解我们对语言的看法可能会有所帮助。
关于 NSQ,Go 通道(不要与 NSQ 通道混淆)和语言内置的并发功能非常适合 nsqd 的内部工作。 我们利用缓冲通道来管理内存消息队列,并无缝地将溢出写入磁盘。
标准库可以轻松编写网络层和客户端代码。 内置的内存和 cpu 分析钩子突出了优化的机会,并且需要很少的集成工作。 我们还发现,单独测试组件,使用接口模拟类型以及迭代构建功能非常容易。
NSQ 由 3 个守护进程组成:
NSQ 中的数据流被建模为流和消费者的树。 主题是不同的数据流。 频道是订阅特定主题的消费者的逻辑分组。
单个 nsqd 可以有很多主题,每个主题可以有很多通道。 通道接收主题的所有消息的副本,在通道上的每个消息在其订户之间分发时启用多播样式传送,从而实现负载平衡。
这些原语构成了表达各种简单和复杂拓扑的强大框架。
主题和通道,NSQ 的核心原语,最能说明系统设计如何无缝转换为 Go 的功能。
Go 的通道(以下称为“go-chan”用于消除歧义)是表达队列的自然方式,因此 NSQ 主题/通道的核心只是消息指针的缓冲转发。 缓冲区的大小等于--mem-queue-size 配置参数。
从线上读取数据后,向主题发布消息的行为包括:
为了从主题到其通道获取消息,主题不能依赖于典型的 go-chan 接收语义,因为在 go-chan 上接收的多个 goroutine 将分发消息,而期望的最终结果是将每个消息复制到每个通道(goroutine)。
相反,每个主题都维护着 3 个主要的 goroutine。 第一个称为路由器,负责从传入的 go-chan 读取新发布的消息并将它们存储在队列(内存或磁盘)中。
第二个叫做 messagePump,负责将消息复制并推送到通道,如上所述。
第三个负责 DiskQueue IO,稍后将讨论。
通道稍微复杂一点,但是共享暴露单个输入和单个输出 go-chan 的基本目标(以抽象出内部消息可能在内存或磁盘上的事实):
此外,每个通道维护 2 个按时间排序的优先级队列,负责延迟和正在进行的消息超时(以及 2 个随附的 goroutine 用于监控它们)。
通过管理每通道数据结构来改进并行化,而不是依赖于 Go 运行时的全局计时器调度程序。
注意:在内部,Go 运行时使用单个优先级队列和 goroutine 来管理计时器。 这支持(但不限于)整个时间包。 它通常不需要用户时间排序的优先级队列,但重要的是要记住它是一个单一锁定的数据结构,可能会影响 GOMAXPROCS> 1 的性能。 请参阅 runtime / time.go。
NSQ 的设计目标之一是限制内存中保存的消息数量。 它通过 DiskQueue(它拥有主题或通道的第三个主要 goroutine)透明地将消息溢出写入磁盘来实现此目的。
由于内存队列只是一个 go-chan,如果可能的话,首先将消息路由到内存是很简单的,然后回退到磁盘:
for msg := range c.incomingMsgChan { select { case c.memoryMsgChan <- msg: default: err := WriteMessageToBackend(&msgBuf, msg, c.backend) if err != nil { // ... handle errors ... } } } 复制代码
利用 Go 的 select 语句,只需几行代码即可表示此功能:上述默认情况仅在 memoryMsgChan 已满时执行。
NSQ 还具有短暂主题/渠道的概念。 它们丢弃消息溢出(而不是写入磁盘)并在它们不再有客户订阅时消失。 这是 Go 接口的完美用例。 主题和通道具有声明为后端接口而不是具体类型的结构成员。 正常主题和通道使用 DiskQueue,而 DummyBackendQueue 中使用短暂的存根,实现无操作后端。
在任何垃圾收集环境中,您都会受到吞吐量(执行有用工作),延迟(响应性)和驻留集大小(占用空间)之间的紧张关系。
从 Go 1.2 开始,GC 就是标记 - 扫描(并行),非代数,非压缩,停止世界,而且大多数都是精确的。 它大部分都是精确的,因为其余的工作没有及时完成(它是针对 Go 1.3)。
Go GC 肯定会继续改进,但普遍的事实是:你创造的垃圾越少,你收集的时间就越少。
首先,了解 GC 在实际工作负载下的表现非常重要。 为此,nsqd 以 statsd 格式(以及其他内部指标)发布 GC 统计数据。 nsqadmin 显示这些指标的图表,让您深入了解 GC 在频率和持续时间方面的影响.
为了真正减少垃圾,您需要知道它的生成位置。 Go 工具链再一次提供了答案:
考虑到这一点,以下优化证明对 nsqd 很有用
NSQ TCP 协议是一个很好的例子,其中使用这些 GC 优化概念产生了很大的效果。
该协议采用长度前缀帧结构,使编码和解码变得简单和高效:
[x][x][x][x][x][x][x][x][x][x][x][x]... | (int32) || (int32) || (binary) | 4-byte || 4-byte || N-byte ------------------------------------... size frame ID data 复制代码
由于帧的组件的确切类型和大小是提前知道的,我们可以避免encoding/binary
包的方便性 Read()
和Write()
包装器(及其无关的接口查找和转换),而是调用适当的二进制文件binary.BigEndian
方法直接。
为了减少套接字 IO 系统调用,客户端 net.Conn 包含 bufio.Reader 和 bufio.Writer。 Reader 公开了 ReadSlice(),它重用了它的内部缓冲区。 这在读取插座时几乎消除了分配,大大降低了 GC 的压力。 这是可能的,因为与大多数命令相关联的数据不会转义(在不是这样的边缘情况下,数据被显式复制)。
在更低级别,MessageID 被声明为[16]字节,以便能够将其用作映射键(切片不能用作映射键)。 但是,由于从套接字读取的数据存储为[]字节,而不是通过分配字符串键产生垃圾,并且为了避免从切片到 MessageID 的后备数组的副本,使用不安全的包直接转换切片到一个 MessageID:
id := *(*nsq.MessageID)(unsafe.Pointer(&msgID)) 复制代码
注意:这是一个黑客。 如果编译器对此进行了优化并且问题 3512 可以解决此问题,则没有必要。 它还值得阅读问题 5376,该问题讨论了“const like”字节类型的可能性,它可以在接受字符串的情况下互换使用,而无需分配和复制。
类似地,Go 标准库仅在字符串上提供数字转换方法。 为了避免字符串分配,nsqd 使用直接在[]字节上操作的自定义基本 10 转换方法。
这些似乎是微优化,但 TCP 协议包含一些最热门的代码路径。 总的来说,它们以每秒数万条消息的速度,对分配数量和开销产生了重大影响:
benchmark old ns/op new ns/op delta BenchmarkProtocolV2Data 3575 1963 -45.09% benchmark old ns/op new ns/op delta BenchmarkProtocolV2Sub256 57964 14568 -74.87% BenchmarkProtocolV2Sub512 58212 16193 -72.18% BenchmarkProtocolV2Sub1k 58549 19490 -66.71% BenchmarkProtocolV2Sub2k 63430 27840 -56.11% benchmark old allocs new allocs delta BenchmarkProtocolV2Sub256 56 39 -30.36% BenchmarkProtocolV2Sub512 56 39 -30.36% BenchmarkProtocolV2Sub1k 56 39 -30.36% BenchmarkProtocolV2Sub2k 58 42 -27.59% 复制代码
NSQ 的 HTTP API 建立在 Go 的 net / http 包之上。 因为它只是 HTTP,所以几乎任何现代编程环境都可以使用它,而无需特殊的客户端库。
它的简洁性掩盖了它的强大功能,因为 Go 的 HTTP 工具箱最有趣的一个方面是它支持的各种调试功能。 net / http / pprof 包直接与本机 HTTP 服务器集成,公开端点以检索 CPU,堆,goroutine 和 OS 线程配置文件。 这些可以直接从 go 工具中定位:
$ go tool pprof http://127.0.0.1:4151/debug/pprof/profile 复制代码
这对于调试和分析正在运行的进程非常有价值!
此外,/stats 端点以 JSON 或漂亮打印的文本返回大量度量标准,使管理员可以轻松实时地从命令行进行内省:
$ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected' 复制代码
这产生连续输出,如:
[page_views ] depth: 0 be-depth: 0 msgs: 105525994 e2e%: 6.6s, 6.2s, 6.2s [page_view_counter ] depth: 0 be-depth: 0 inflt: 432 def: 0 re-q: 34684 timeout: 34038 msgs: 105525994 e2e%: 5.1s, 5.1s, 4.6s [realtime_score ] depth: 1828 be-depth: 0 inflt: 1368 def: 0 re-q: 25188 timeout: 11336 msgs: 105525994 e2e%: 9.0s, 9.0s, 7.8s [variants_writer ] depth: 0 be-depth: 0 inflt: 592 def: 0 re-q: 37068 timeout: 37068 msgs: 105525994 e2e%: 8.2s, 8.2s, 8.2s [poll_requests ] depth: 0 be-depth: 0 msgs: 11485060 e2e%: 167.5ms, 167.5ms, 138.1ms [social_data_collector ] depth: 0 be-depth: 0 inflt: 2 def: 3 re-q: 7568 timeout: 402 msgs: 11485060 e2e%: 186.6ms, 186.6ms, 138.1ms [social_data ] depth: 0 be-depth: 0 msgs: 60145188 e2e%: 199.0s, 199.0s, 199.0s [events_writer ] depth: 0 be-depth: 0 inflt: 226 def: 0 re-q: 32584 timeout: 30542 msgs: 60145188 e2e%: 6.7s, 6.7s, 6.7s [social_delta_counter ] depth: 17328 be-depth: 7327 inflt: 179 def: 1 re-q: 155843 timeout: 11514 msgs: 60145188 e2e%: 234.1s, 234.1s, 231.8s [time_on_site_ticks] depth: 0 be-depth: 0 msgs: 35717814 e2e%: 0.0ns, 0.0ns, 0.0ns [tail821042#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 33909699 e2e%: 0.0ns, 0.0ns, 0.0ns 复制代码
最后,每个新的 Go 版本通常都会带来可衡量的性能提升。 对最新版本的 Go 进行重新编译提供免费提升时总是很好!
来自其他生态系统,Go 的管理依赖关系的哲学(或缺乏)需要一点时间来习惯。
NSQ 从一个单一的巨型仓库发展而来,具有相对进口,内部包之间几乎没有分离,完全接受关于结构和依赖管理的推荐最佳实践。
有两种主要的思想流派:
注意:这实际上仅适用于二进制包,因为对于可导入包来说,对于使用哪个版本的依赖项做出中间决策是没有意义的。
NSQ 使用 gpm 为上面的(2)提供支持。
它的工作原理是将您的依赖项记录在 Godeps 文件中,我们稍后将其用于构建 GOPATH 环境。
Go 为编写测试和基准测试提供了坚实的内置支持,因为 Go 使得并发操作的建模变得如此简单,所以在测试环境中建立一个完整的 nsqd 实例是微不足道的。
但是,初始实现的一个方面成为测试的问题:全局状态。最明显的罪犯是使用一个全局变量,该变量在运行时保存对 nsqd 实例的引用,即 var nsqd * NSQd。
某些测试会通过使用短格式变量赋值(即 nsqd:= NewNSQd(...))无意中在本地范围内屏蔽此全局变量。这意味着全局引用并未指向当前正在运行的实例,从而破坏了测试。
要解决此问题,将传递一个 Context 结构,其中包含配置元数据和对父 nsqd 的引用。所有对全局状态的引用都被本地上下文替换,允许子(主题,通道,协议处理程序等)安全地访问此数据并使其更可靠地进行测试。
面对不断变化的网络条件或意外事件而不健壮的系统是在分布式生产环境中不能很好地运行的系统。
NSQ 的设计和实现方式允许系统容忍故障并以一致,可预测和不令人惊讶的方式运行。
最重要的理念是快速失败,将错误视为致命错误,并提供调试确实发生的任何问题的方法。
但是,为了做出反应,你需要能够发现异常情况......
NSQ TCP 协议是面向推送的。在连接,握手和订阅之后,消费者处于 RDY 状态 0.当消费者准备好接收消息时,它将该 RDY 状态更新为它愿意接受的消息的数量。 NSQ 客户端库在后台不断地管理它,从而产生流控制的消息流。
nsqd 会定期通过连接发送心跳。客户端可以配置心跳之间的间隔,但 nsqd 在发送下一个之前需要响应。
应用程序级心跳和 RDY 状态的组合避免了行头阻塞,否则会导致心跳无效(即,如果消费者在处理消息流时落后于 OS 的接收缓冲区将填满,阻止心跳)。
为了保证进度,所有网络 IO 都与相对于配置的心跳间隔的截止时间绑定。这意味着你可以从字面上拔掉 nsqd 和消费者之间的网络连接,它将检测并正确处理错误。
检测到致命错误时,强制关闭客户端连接。正在进行的消息超时并重新排队以便传递给另一个消费者。最后,记录错误并增加各种内部指标。
启动 goroutines 非常容易。 不幸的是,编排清理工作并不容易。 避免死锁也具有挑战性。 大多数情况下,这归结为一个排序问题,在上游 goroutines 发送它之前,在 go-chan 上接收的 goroutine 退出。
为什么要关心呢? 这很简单,孤立的 goroutine 是一个内存泄漏。 长时间运行的守护进程中的内存泄漏是不好的,特别是当期望您的进程在其他所有失败时都将保持稳定时。
更复杂的是,典型的 nsqd 进程在消息传递中涉及许多 goroutine。 在内部,消息“所有权”经常发生变化。 为了能够干净地关闭,考虑所有进程内消息非常重要。
虽然没有任何魔法子弹,但以下技术使其更容易管理......
同步包提供了 sync.WaitGroup,可用于执行有多少 goroutine 的实时计算(并提供等待退出的方法)。
为了减少典型的样板,nsqd 使用这个包装器:
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } // can be used as follows: wg := WaitGroupWrapper{} wg.Wrap(func() { n.idPump() }) ... wg.Wait() 复制代码
在多个子 goroutine 中触发事件的最简单方法是提供一个在准备就绪时关闭的 go-chan。 在该 go-chan 上的所有未决接收将被激活,而不是必须向每个 goroutine 发送单独的信号。
func work() { exitChan := make(chan int) go task1(exitChan) go task2(exitChan) time.Sleep(5 * time.Second) close(exitChan) } func task1(exitChan chan int) { <-exitChan log.Printf("task1 exiting") } func task2(exitChan chan int) { <-exitChan log.Printf("task2 exiting") } 复制代码
实现一个可靠的,无死锁的退出路径非常难以解决所有正在进行的消息。 一些提示:
理想情况下,负责发送 go-chan 的 goroutine 也应该负责关闭它。
如果消息不能丢失,请确保清空相关的 go-chans(特别是无缓冲的!)以保证发送者可以取得进展。
或者,如果消息不再相关,则应将单个 go-chan 上的发送转换为选择,并添加退出信号(如上所述)以保证进度。
一般顺序应该是:
最后,可以使用的最重要的工具是记录的 goroutines 的入口和出口! 它使得在死锁或泄漏的情况下识别罪魁祸首变得更加容易。
nsqd 日志行包括将 goroutine 与其兄弟(和父级)相关联的信息,例如客户端的远程地址或主题/通道名称。
日志是冗长的,但并不详细到日志压倒性的程度。 有一条细线,但 nsqd 倾向于在发生故障时在日志中提供更多信息,而不是试图以牺牲实用性为代价来减少干扰。