NATS 是一种高性能、轻量级的开源消息系统,采用发布/订阅模式,通过基于文本的简单协议实现进程间通信。其核心特性包括极低的延迟、高吞吐量、以及对主题、订阅和通配符的灵活支持。NATS 广泛应用于微服务架构、物联网(IoT)和实时消息推送等场景,并通过 JetStream 组件提供消息持久化和流处理能力。与 Kafka、RabbitMQ 等其他消息协议相比,NATS 在简单性和性能方面具有优势,同时支持集群部署以实现高可用性和可扩展性。
NATS 协议详解
1. NATS 协议规范与设计
1.1 协议概述与核心特性
NATS 协议是一种设计简洁、高性能的发布/订阅消息系统协议。其核心目标是提供一种轻量级、快速且易于实现的进程间通信机制。NATS 协议基于文本,通常运行在 TCP/IP 之上,这使得客户端和服务器之间的交互易于理解和调试,同时也方便了各种编程语言的客户端库的实现 。NATS 服务器内部采用零分配字节解析器(zero allocation byte parser),旨在实现高效的消息处理,减少内存分配开销,从而提升性能 。该协议遵循发布/订阅模式,客户端通过标准的 TCP/IP 套接字与服务器建立连接,并利用一组精简的协议操作进行交互,每个操作以换行符(CRLF,即 \r\n
或 0x0D0A
)作为结束标志 。
NATS 的核心特性包括:
- 高性能与低延迟:NATS 旨在提供亚毫秒级的消息传递延迟和每秒数百万消息的处理能力,非常适合对实时性要求高的应用 。
- 轻量级:NATS 服务器本身是一个小巧的二进制文件(通常小于20MB),不依赖外部组件,部署和管理非常便捷 。
- 基于主题的发布/订阅:消息通过主题进行路由,发布者将消息发送到特定主题,订阅者则根据兴趣订阅主题来接收消息。
- 灵活的通配符订阅:支持
*
(匹配单个令牌)和>
(匹配一个或多个后续令牌)两种通配符,增强了订阅的灵活性 。 - 请求/响应模式:内置支持请求/响应模式,简化了同步通信的实现 。
- 队列组(Queue Groups):允许多个订阅者组成队列,消息在队列组内进行负载均衡,确保每条消息只被一个订阅者处理 。
- 集群支持:NATS 服务器可以组成集群,提供高可用性和横向扩展能力 。
- 安全性:支持 TLS/SSL 加密连接以及基于令牌、用户名/密码、NKEYS 和 JWT 的身份验证与授权机制 。
- 多语言客户端支持:官方和社区提供了多种编程语言的客户端库,如 Go, Java, JavaScript, Python 等 。
- 可选的持久化 (JetStream):NATS JetStream 组件提供了消息持久化、至少一次交付、流处理等高级功能,弥补了核心 NATS 「最多一次」交付的不足 。
NATS 的设计哲学强调简单性、性能和可扩展性,使其成为构建现代分布式系统和云原生应用的理想选择。
1.2 消息格式与通信流程
NATS 协议采用基于文本的格式,并使用 CRLF(\r\n
)作为消息的结束符。这种设计使得协议易于理解和调试,同时也保持了较低的性能开销。协议消息由控制行和可选的负载(消息内容)组成。核心的协议消息类型包括 INFO、CONNECT、PUB、SUB、UNSUB 和 MSG,它们共同构成了 NATS 客户端与服务器之间交互的基础。INFO 消息由服务器发送给客户端,用于传递服务器的配置信息和协议版本等元数据。CONNECT 消息则由客户端发送给服务器,用于建立连接并进行身份验证(如果需要)。PUB 消息用于发布消息到指定的主题,SUB 消息用于订阅一个或多个主题,UNSUB 消息用于取消订阅,而 MSG 消息则是服务器向客户端投递已发布消息的载体。
NATS 的通信流程始于客户端与服务器建立 TCP 连接。连接建立后,服务器会主动向客户端发送一条 INFO 消息,其中包含了服务器的唯一标识、协议版本、是否支持 TLS 加密、最大负载大小等关键信息。客户端在收到 INFO 消息后,会根据自身配置和服务器信息,回复一条 CONNECT 消息。CONNECT 消息中可能包含客户端提供的用户名、密码、客户端名称、协议版本等信息,用于身份验证和连接参数协商。一旦 CONNECT 消息被服务器成功处理,客户端与服务器之间的通信链路便正式建立,可以开始进行消息的发布和订阅操作。
发布消息时,客户端会向服务器发送 PUB 消息。PUB 消息的格式通常为 PUB <subject> [reply-to] <payload_size>\r\n<payload>\r\n
。其中 <subject>
是消息发布的目标主题,它是一个由点号分隔的字符串,用于消息的路由。可选的 [reply-to]
字段用于指定回复主题,这在请求-响应模式中非常有用。<payload_size>
指明了消息负载的字节长度,紧随其后的 CRLF 之后便是实际的二进制消息负载。服务器收到 PUB 消息后,会根据主题将消息路由给所有订阅了该主题或其匹配通配符的客户端。
订阅消息时,客户端向服务器发送 SUB 消息。SUB 消息的格式为 SUB <subject> [queue-group] <sid>\r\n
。<subject>
是客户端希望订阅的主题,同样支持通配符。可选的 [queue-group]
字段用于指定队列组名称,当多个订阅者使用相同的队列组名称订阅同一主题时,NATS 服务器会确保每条消息只被队列组中的一个订阅者接收,从而实现负载均衡。<sid>
是一个由客户端生成的唯一订阅标识符,用于后续的取消订阅操作。服务器收到 SUB 消息后,会记录下客户端的订阅信息。当有新的 PUB 消息发布到匹配的主题时,服务器会向所有符合条件的订阅者(包括普通订阅者和队列订阅者)发送 MSG 消息。MSG 消息的格式为 MSG <subject> [sid] [reply-to] <payload_size>\r\n<payload>\r\n
,其中 [sid]
对应接收消息的客户端的订阅 ID,其他字段与 PUB 消息类似。
取消订阅操作通过 UNSUB 消息完成。UNSUB 消息的格式为 UNSUB <sid> [max_msgs]\r\n
。<sid>
是要取消的订阅的唯一标识符。可选的 [max_msgs]
参数允许客户端指定在取消订阅前最多接收多少条消息,这为平滑取消订阅提供了便利。服务器处理 UNSUB 消息后,将不再向该客户端发送该订阅相关的消息。整个通信流程设计简洁高效,确保了消息的快速路由和分发。
1.3 核心概念:主题、订阅与通配符
NATS 的核心通信模型围绕主题(Subject)、订阅(Subscription)和通配符(Wildcards)构建,这些概念共同实现了灵活高效的消息路由和分发机制 。
主题(Subject) 是 NATS 消息传递的基石。它是一个用于标识消息内容的字符串,发布者(Publisher)将消息发布到特定的主题,而订阅者(Subscriber)则通过订阅一个或多个主题来接收感兴趣的消息。主题名称在 NATS 中是区分大小写的,并且必须是非空的字符串 。它们可以包含任何 UTF-8 字符,但不允许嵌入空格或制表符。主题名称通常由一个或多个令牌(Token)组成,这些令牌之间用点号(.
)分隔,例如 foo.bar.baz
。点号作为层级分隔符,使得主题可以形成一种命名空间结构,便于组织和筛选消息。例如,orders.new
、orders.processed
、user.created
、sensor.temperature.living_room
都是有效的主题名称。主题名称中不允许出现连续的点号(如 foo..bar
)或点号前后有空格(如 foo. bar
),这些都被视为无效主题 。回复主题(Reply-to Subject)也遵循相同的命名规则,它允许消息的接收方知道将回复发送到哪个主题,从而实现请求/回复模式。
订阅(Subscription) 是客户端向 NATS 服务器表达其对特定主题消息兴趣的方式。客户端通过发送 SUB
协议消息来创建一个订阅,指定要订阅的主题以及一个唯一的订阅 ID(Subscription ID, SID) 。服务器会记录每个客户端的订阅信息。当有消息发布到某个主题时,服务器会查找所有订阅了该主题或其匹配通配符的客户端,并将消息(通过 MSG
或 HMSG
协议消息)传递给它们。一个客户端可以创建多个订阅,每个订阅对应一个不同的主题或通配符模式。订阅 ID 在客户端的连接上下文中必须是唯一的,用于在后续的 UNSUB
(取消订阅)操作或服务器传递消息时标识该订阅。NATS 还支持队列组(Queue Groups),通过在 SUB
消息中指定一个可选的队列组名称,可以将多个订阅者组织成一个组。当消息发布到该组订阅的主题时,NATS 服务器会确保该消息只被组内的一个(且仅一个)订阅者接收,从而实现消息的负载均衡和并行处理,这对于构建可扩展的微服务架构非常有用。
通配符(Wildcards) 为 NATS 的订阅机制提供了极大的灵活性,允许订阅者匹配多个主题。NATS 支持两种类型的通配符:*
(星号)和 >
(大于号)。
*
(星号):匹配主题中任意一个单独的令牌。例如,订阅foo.*.bar
会匹配foo.one.bar
、foo.two.bar
,但不会匹配foo.one.two.bar
或foo.bar
。星号通配符只能替代一个完整的令牌。>
(大于号,也称为全通配符):匹配主题中一个或多个尾随的令牌,并且必须是主题的最后一个令牌。例如,订阅foo.>
会匹配foo.bar
、foo.bar.baz
、foo.one.two.three
,但不会匹配foo
本身。订阅>
会匹配所有主题(受授权限制)。
通配符必须作为独立的令牌使用,例如 foo.*.baz
或 foo.>
是有效的,而 foo*.bar
或 f*o.b*r
是无效的 。通配符使得订阅者可以以一种更宽泛的方式表达其兴趣,而不需要确切知道所有可能的具体主题。例如,一个监控服务可以订阅 sensor.>
来接收所有传感器的数据,而一个特定的传感器数据处理服务可能只订阅 sensor.temperature.>
。这种基于主题和通配符的路由机制是 NATS 高效、解耦通信的关键。
总结来说,NATS 的主题、订阅和通配符机制共同构成了其强大的消息路由能力。发布者将消息发送到特定主题,订阅者通过订阅(可能包含通配符)来表达对消息的兴趣,NATS 服务器则负责将消息准确地路由到所有匹配的订阅者。这种设计模式简单直观,同时提供了高度的灵活性和可扩展性,使其能够适应各种复杂的分布式系统通信需求。
2. NATS 的应用场景与优势
2.1 微服务架构中的应用
NATS 在微服务架构中扮演着至关重要的角色,它通过提供轻量级、高性能的通信机制,有效地解耦了服务之间的依赖关系,使得各个服务能够独立开发、部署和扩展 。NATS.js 为 JavaScript 生态系统提供了强大的微服务支持,其统一的 Service API 确保了在 Node.js、Deno 乃至浏览器环境中开发微服务时接口的一致性 。这种跨平台的一致性极大地简化了开发流程,并降低了在不同环境中维护代码的复杂性。基于 NATS 的发布/订阅模式,微服务之间可以进行高效的消息传递,而请求/响应模式则支持类似 RPC 的交互,满足了微服务间同步调用的需求 。NATS 的设计理念强调简单性和高性能,使其成为构建云原生和分布式系统的理想选择 。通过 NATS,微服务可以自动注册自身并提供服务发现功能,同时内置的负载均衡机制能够自动将请求分配到不同的服务实例,从而提高了系统的整体可用性和可伸缩性 。
在微服务架构中,NATS 的应用场景非常广泛。例如,它可以用于服务之间的解耦通信,允许服务独立部署和扩展 。一个服务可以发布请求,而另一个服务则可以订阅并处理该请求,这种模式使得服务间的交互更加灵活和高效 。NATS 的轻量级特性使其非常适合在资源受限的环境下运行,同时其高性能保证了即使在大量微服务交互的情况下,也能保持较低的延迟和高吞吐量 。此外,NATS 支持多种消息模式,包括发布/订阅、请求/响应以及队列组模式,这些模式为构建复杂的微服务交互提供了灵活的基础 。例如,队列组模式允许多个订阅者共享消息负载,确保每个消息只被组中的一个消费者处理,这对于实现任务分发和负载均衡非常有用 。NATS.js 的微服务支持还包括标准的错误处理机制和超时控制,这对于构建健壮的分布式应用至关重要 。对于移动客户端开发,可以通过 NATS.ws (WebSocket 传输层) 连接到 NATS 服务器,然后使用 Service API 构建微服务架构,特别适合需要实时通信的移动应用场景 。
2.2 IoT(物联网)领域的应用
NATS 协议凭借其轻量级、高性能以及对多种传输协议的支持,在物联网(IoT)领域展现出强大的应用潜力。物联网环境通常涉及大量的设备连接、海量的数据交换以及对实时性的高要求,NATS 的特性恰好能够满足这些需求。NATS 可以部署在各种环境中,包括边缘设备,这使得它能够有效地连接和管理物联网设备,将传感器数据实时传输到云端进行处理和分析 。NATS 服务器经过高度优化,其二进制文件小巧(不到 20 MB),使其可以轻松在各种机器上运行,无论是在资源受限的 Raspberry Pi 还是大规模的服务器上,也无论是在云端、本地、边缘、裸机、虚拟机还是在容器中,均可轻松运行 。这种灵活性使得 NATS 能够适应物联网场景中设备和环境的多样性。
在物联网应用中,NATS 可以用于连接各种设备和后端服务,支持设备之间的实时数据交换 。例如,通过 NATS JetStream,边缘设备能够轻松将数据流(如温度和湿度传感器数据)传输到流数据库(如 RisingWave)进行实时处理和分析 。NATS JetStream 提供了消息流的持久化能力,确保了在不可靠网络环境下数据的可靠传递 。EMQX 等消息中间件也支持 NATS 协议,并提供了与 MQTT 协议的双向互通能力,进一步拓展了 NATS 在物联网领域的应用范围 。NATS 支持 TCP/TLS 以及 WebSocket/WebSocket over SSL 等多种传输协议,为物联网设备提供了灵活的连接选项 。其扁平化的 Subject 设计和通配符支持,使得消息的路由和过滤更加高效 。虽然核心 NATS 本身不提供消息持久化,但通过 JetStream 扩展,可以满足物联网应用中对消息可靠性和持久化的需求 。阿里云等云服务商也提供了 NATS 社区版的快速部署服务,并特别强调了其在边缘计算和 IoT 场景下的应用,支持在边缘部署本地 NATS 集群并与中心集群交互,使得边缘设备可以通过连接边缘本地集群快速进行业务数据的生产 。
2.3 实时消息推送
NATS 协议以其卓越的性能和低延迟特性,成为构建实时消息推送系统的理想选择。在需要快速、可靠地将消息传递给大量客户端的场景中,如实时交易系统、实时通知服务、游戏后端玩家交互和状态同步,以及移动应用的推送通知等,NATS 都能表现出色 。NATS 的设计目标之一就是实现高吞吐量和低延迟,其服务器在分发消息流程中避免了使用可能成为性能瓶颈的通道(chan),从而能够达到每秒百万级消息的传输速率 。例如,有实测结果显示,NATS 在处理100万条消息投递时,耗时仅需3-4秒,即每秒可以投递30-40万条消息;而在处理1千条消息投递时,耗时仅为1毫秒,这样的性能完全可以满足对延迟要求极为苛刻的场景,如多人游戏 。
NATS 支持多种消息路由模式,包括发布/订阅(Publish Subscribe)、请求/回复(Request Reply)和队列(Queueing),这些模式为不同类型的实时消息推送需求提供了灵活的解决方案 。发布/订阅模式允许一个消息被多个订阅者接收,类似于广播,非常适合新闻推送、实时行情更新等场景 。请求/回复模式则支持一对一或一对多的同步调用,可以用于需要即时响应的交互场景 。队列组模式则允许多个订阅者共享消息负载,确保每个消息只被组中的一个消费者处理,这对于需要负载均衡的实时任务分发非常有用 。NATS 不仅支持标准的 TCP 连接,还支持 WebSocket 和 HTTP/2 协议,这使得它可以方便地集成到 Web 应用和移动应用中,实现跨平台的实时消息推送 。例如,在移动客户端开发中,可以通过 NATS.ws (WebSocket 传输层) 连接到 NATS 服务器,实现实时通信功能 。京东云的消息推送系统就采用了类似的技术,实现了单机百万级别的并发 TCP 连接,展示了 NATS 在处理大规模实时推送方面的潜力 。
2.4 与其他消息协议的对比 (Kafka, RabbitMQ, MQTT, WebSocket, HTTP)
NATS 作为一种高性能、轻量级的消息系统,在多种场景下与其他流行的消息协议(如 Kafka, RabbitMQ, MQTT, WebSocket, HTTP)相比,展现出其独特的优势和适用性。理解这些差异对于选择最适合特定需求的通信协议至关重要。
NATS vs. Apache Kafka vs. RabbitMQ :
NATS 的核心设计原则是简单性和性能优先。它是一个轻量级的服务器,协议简单,旨在提供极低的延迟和高吞吐量。Core NATS 提供「最多一次」(at-most-once)的消息交付语义,而 JetStream 扩展则提供了「至少一次」(at-least-once)和「精确一次」(exactly-once)的语义以及持久化能力 。Kafka 则是一个分布式的流处理平台,其核心是一个持久化的、分区的、可复制的提交日志。Kafka 设计用于处理海量数据流,并提供强大的持久性保证和有序性保证(在分区内)。RabbitMQ 是一个功能丰富的消息代理,实现了 AMQP(Advanced Message Queuing Protocol)标准,并支持多种消息模式,如工作队列、发布/订阅、路由等,它强调消息的可靠传递和复杂路由。
特性 | NATS (Core + JetStream) | Apache Kafka | RabbitMQ |
---|---|---|---|
架构 | 轻量级服务器 | 分布式日志 | 消息代理 |
协议 | 自定义文本协议 | 自定义二进制协议 | AMQP, MQTT, STOMP 等 |
持久性 | 可选 (JetStream) | 始终持久化 | 可配置 (持久化队列) |
消息顺序 | 每个主题 | 每个分区 | 每个队列 |
核心优势 | 极简、高性能、低延迟 | 高吞吐、强持久性、流处理 | 功能丰富、可靠传递、复杂路由 |
集群 | 全网格网络,自动发现 | 依赖 ZooKeeper 或 KRaft 协调 | Erlang 集群,队列镜像 |
运维复杂度 | 低 | 高 | 中 |
典型用例 | 微服务通信、实时消息、请求/回复 | 事件溯源、流处理、大数据管道 | 企业集成、任务队列、复杂路由 |
吞吐量 (示例) | 8M msg/sec (fire-and-forget) | 2.1M msg/sec (fire-and-forget) | 80K msg/sec (fire-and-forget) |
延迟 (P99) | 0.5-2ms (内存中) | 15-25ms (持久化日志) | 5-15ms (内存/磁盘混合) |
资源占用 | 内存 10-50MB | 内存 8-16GB | 内存 100-500MB |
Table 1: NATS, Apache Kafka, and RabbitMQ 特性对比
NATS vs. MQTT :
MQTT 是专为物联网(IoT)和机器对机器(M2M. 通信设计的轻量级协议,特别适用于网络带宽有限或设备资源受限的场景。MQTT 是一个 OASIS 标准,也是一个 ISO 标准 (ISO/IEC 20922)。NATS 虽然也适用于 IoT,但其设计更偏向于云原生应用和微服务架构。Core NATS 是「发后即忘」的,而 MQTT 提供了 QoS 0、1、2 等级别来保证消息的可靠传递。MQTT 的保留消息(Retained Messages)功能允许新订阅者立即获取最后一条消息,这在 NATS 中需要 JetStream 来模拟。NATS 的主题是扁平的,而 MQTT 的主题天然支持层级结构,更适合表示设备或资产的层级关系。在 OT(运营技术)领域,MQTT 拥有更广泛的生态系统支持。✅
NATS vs. WebSockets :
WebSockets 是一种在单个 TCP 连接上提供全双工通信通道的协议,通常用于浏览器与服务器之间的实时交互。NATS 可以运行在 WebSockets 之上,为浏览器客户端提供基于 NATS 主题的消息传递能力 。NATS 在 WebSockets 的基础上增加了强大的消息路由、主题管理、集群和持久化(通过 JetStream)等功能。WebSockets 本身主要解决浏览器与服务器之间的双向通信问题,而 NATS 则是一个更通用的消息系统,可以用于后端服务之间、设备之间以及后端与前端之间的通信。NATS 的集群和自愈能力使其在构建大规模实时应用时更具优势。
NATS vs. HTTP:
HTTP 是一种无状态的请求/响应协议,广泛用于 Web 通信。虽然 HTTP/2 和 HTTP/3 引入了一些改进,如多路复用和服务器推送,但 HTTP 本质上不适合作为通用的消息传递协议。对于实时消息推送,基于 HTTP 的轮询或长轮询效率低下且延迟高。NATS 的发布/订阅模型和推送机制更适合实时场景。NATS 的请求/回复模式也比基于 HTTP 的同步调用更灵活,因为它可以异步处理回复,并且不局限于严格的点对点通信 。
总结来说,NATS 在需要极致性能、低延迟和简单性的场景下表现出色,特别适合微服务通信和实时消息推送。Kafka 更适合大数据流处理和需要强持久性保证的场景。RabbitMQ 则提供了丰富的消息模式和可靠传递,适用于企业级应用集成。MQTT 是 IoT 领域的标准协议,针对受限设备和不可靠网络进行了优化。WebSockets 为浏览器提供了双向通信能力,而 NATS 可以在此基础上构建更强大的实时消息系统。
3. NATS 的技术实现与工具
3.1 服务器架构与核心组件
NATS 服务器的设计目标是轻量级、高性能和高可用性。其核心架构围绕着事件驱动的模型构建,能够高效地处理大量的并发连接和消息。NATS 服务器本身是用 Go 语言编写的,利用了 Go 语言在并发处理和网络编程方面的优势。服务器内部实现了一个零分配字节解析器,用于快速解析传入的协议消息,这对于降低延迟和提高吞吐量至关重要 。NATS 服务器的核心组件包括客户端连接管理器、消息路由器、订阅管理器以及可选的 JetStream 模块。客户端连接管理器负责处理客户端的 TCP/IP 连接,包括连接的建立、认证、心跳维持以及连接的关闭。消息路由器是 NATS 的核心,它负责接收发布到主题的消息,并根据当前的订阅信息将消息准确地分发给所有感兴趣的订阅者。订阅管理器则维护着所有活动订阅的注册表,包括主题、队列组以及相关的客户端连接信息。
在集群模式下,NATS 服务器之间通过 gossip 协议来传播集群成员信息和订阅兴趣。当一个服务器接收到来自客户端的订阅时,它会通过 RS+
消息将订阅信息广播给集群中的其他服务器 。同样,当客户端取消订阅时,会通过 RS-
消息通知其他服务器 。这种机制确保了集群中的所有服务器都了解整个系统的订阅状态,从而能够正确地路由消息。NATS 集群采用全连接拓扑,即每个服务器都与其他所有服务器建立路由连接。当一个服务器收到来自客户端的消息时,如果该消息的主题有其他服务器上的订阅者,它会通过 RMSG
消息将消息转发给那些服务器 。NATS 服务器的转发限制为一跳,即服务器只转发从客户端直接接收到的消息,或者将从其他服务器接收到的消息分发给本地连接的客户端,而不会进行多跳转发 。这种设计简化了路由逻辑,并避免了消息循环。
JetStream 是 NATS 2.0 引入的持久化引擎,它为 NATS 提供了消息的持久化存储、至少一次交付语义、流处理能力以及基于消费者的消息确认机制。JetStream 的引入使得 NATS 不仅仅是一个「最多一次」交付的内存消息系统,还能够满足更广泛的应用场景,如事件溯源、数据复制和流处理。JetStream 通过在服务器上创建流(Streams)来存储消息,流可以配置不同的存储后端(文件或内存)和保留策略。消费者(Consumers)可以从流中拉取或推送消息,并支持多种确认模式(如显式确认、自动确认等)。JetStream 的 API 通过一组预定义的主题(如 $JS.API.*
)暴露给客户端,客户端可以通过向这些主题发送请求来管理流、消费者以及发布和消费消息 。JetStream 集群利用 Raft 共识算法来保证数据的一致性和高可用性。
3.2 Go 客户端使用方法与示例
NATS 提供了官方的 Go 语言客户端库 nats.go
,该库允许 Go 应用程序与 NATS 消息系统进行交互。开发者可以通过此库实现消息的发布与订阅、请求与响应等核心通信模式。nats.go
库以其简洁的 API 和高性能著称,广泛应用于构建分布式系统和微服务架构。该库支持 NATS 的核心功能,包括基于主题的消息传递、队列组、以及可选的持久化机制 JetStream。通过 nats.go
,Go 开发者可以轻松地将 NATS 集成到他们的应用程序中,实现服务间的解耦和异步通信。该库的 GitHub 仓库地址为 https://github.com/nats-io/nats.go
,其中包含了库的源代码、示例以及详细的文档。此外,Go 开发者可以通过 go get github.com/nats-io/nats.go
命令来安装该库 。
3.2.1 连接到 NATS 服务器
在 Go 应用程序中使用 NATS 客户端库的首要步骤是建立与 NATS 服务器的连接。这通常通过调用 nats.Connect()
函数来完成。该函数接受一个或多个服务器地址作为参数,并返回一个 *nats.Conn
对象以及一个可能的错误。如果连接成功建立,返回的错误将为 nil
。NATS 客户端库提供了一个默认的服务器地址 nats.DefaultURL
,其值为 "nats://localhost:4222"
。这意味着如果本地运行了 NATS 服务器且监听在默认端口,可以直接使用 nats.Connect(nats.DefaultURL)
进行连接 , 。在实际应用中,服务器地址通常通过环境变量或配置文件来指定,以增加灵活性。例如,可以从环境变量 NATS_URL
中获取服务器地址,如果该环境变量未设置,则回退到默认地址 , 。连接建立后,建议使用 defer nc.Close()
或 defer nc.Drain()
来确保在程序退出或不再需要连接时能够优雅地关闭连接,释放资源并通知服务器 , 。nc.Drain()
方法会优雅地关闭连接,确保所有挂起的消息都被处理完毕后再断开连接,这对于需要保证消息可靠性的场景尤为重要。
3.2.2 发布消息
一旦成功连接到 NATS 服务器,应用程序就可以开始发布消息。发布消息是通过 *nats.Conn
对象的 Publish()
方法实现的。该方法接受两个主要参数:主题 (subject) 和消息数据 (data)。主题是一个字符串,用于标识消息的类型或目的地,而消息数据则是一个字节切片 ([]byte
),允许发送任意格式的数据。例如,nc.Publish("foo", []byte("Message"))
会将包含 “Message” 的字节数据发布到名为 “foo” 的主题上 。如果发布操作失败,Publish()
方法会返回一个错误。在实际应用中,通常会将需要发送的数据序列化为字节流,例如使用 JSON、Protocol Buffers 等格式。例如,一个通知系统可能会将通知内容序列化为 JSON 字符串,然后通过 nc.Publish("notifications", []byte(jsonData))
发送到 “notifications” 主题 。发布操作是异步的,意味着 Publish()
方法在将消息交给客户端库后立即返回,而不会等待消息被服务器接收或由订阅者处理。这种异步特性有助于提高发布者的吞吐量。
3.2.3 订阅消息
订阅消息是接收 NATS 消息的核心机制。Go 客户端库提供了 Subscribe()
方法来订阅特定的主题。该方法接受两个主要参数:主题 (subject) 和一个消息处理函数 (handler)。消息处理函数是一个回调函数,其签名为 func(msg *nats.Msg)
。当有消息发布到订阅的主题时,NATS 服务器会将消息传递给客户端库,客户端库则会调用相应的消息处理函数来处理这条消息。例如,nc.Subscribe("foo", func(msg *nats.Msg) { log.Println("Subscriber 1:", string(msg.Data)) })
会订阅 “foo” 主题,并在收到消息时打印消息内容 。*nats.Msg
对象包含了消息的详细信息,如主题 (msg.Subject
)、数据 (msg.Data
)、以及一个可选的回复主题 (msg.Reply
),用于实现请求-响应模式。订阅可以是持久的,只要订阅者保持连接并且没有显式地取消订阅,它就会持续接收消息。如果需要同步接收消息,可以使用 SubscribeSync()
方法,它返回一个 *nats.Subscription
对象,然后可以通过调用该对象的 NextMsg()
方法来阻塞等待下一条消息 。此外,还可以使用 QueueSubscribe()
方法创建队列订阅,多个订阅者使用相同的队列组名订阅同一个主题时,消息会以负载均衡的方式分发给其中一个订阅者,这对于实现工作队列模式非常有用 。
3.2.4 请求-响应模式
NATS 内置了对请求-响应模式的支持,这是一种常见的分布式系统通信模式,其中一个服务(请求者)向另一个服务(响应者)发送请求并等待响应。Go 客户端库通过 Request()
方法和消息的 Respond()
方法简化了这一模式的实现。响应者通过订阅一个特定的主题(例如 “greet.*”)来监听请求。当收到请求消息时,响应者可以从消息的 msg.Subject
中提取信息,处理请求,然后调用 msg.Respond([]byte("response data"))
方法将响应发送回请求者 , 。请求者则使用 nc.Request(subject, requestData, timeout)
方法发送请求。该方法会发布请求消息到一个内部生成的回复主题,并阻塞等待响应,直到超时或收到响应为止。例如,rep, err := nc.Request("greet.joe", nil, time.Second)
会向 “greet.joe” 主题发送一个请求,并等待最多1秒钟以获取响应 。这种模式使得服务间的同步调用变得简单,同时 NATS 的底层机制确保了响应能够正确地路由回请求者。NATS 的服务框架 (micro
包) 进一步简化了构建和发现基于请求-响应模式的服务,例如通过 micro.AddService()
和 srv.AddGroup().AddEndpoint()
来定义服务及其处理端点 。
3.2.5 使用示例与代码片段
以下是一些使用 NATS Go 客户端的核心示例,涵盖了发布-订阅和请求-响应模式:
发布-订阅示例 :
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接到 NATS 服务器
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
// 创建三个订阅者订阅 "foo" 主题
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 1:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 2:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 3:", string(msg.Data))
})
// 发布一条消息到 "foo" 主题
if err := nc.Publish("foo", []byte("Message")); err != nil {
log.Fatalln(err)
}
// 等待消息被处理
time.Sleep(2 * time.Second)
}
此示例展示了如何连接到 NATS 服务器,创建多个订阅者来监听同一个主题 “foo”,然后发布一条消息到该主题。所有订阅者都会收到这条消息并打印出来。
请求-响应示例 :
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
// 订阅 "greet.*" 主题,处理请求并响应
sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {
name := msg.Subject[6:] // 从主题中提取名字
msg.Respond([]byte("hello, " + name))
})
// 发送请求到 "greet.joe"
rep, _ := nc.Request("greet.joe", nil, time.Second)
fmt.Println(string(rep.Data)) // 输出: hello, joe
// 发送请求到 "greet.sue"
rep, _ = nc.Request("greet.sue", nil, time.Second)
fmt.Println(string(rep.Data)) // 输出: hello, sue
// 取消订阅
sub.Unsubscribe()
// 再次发送请求,此时应超时或失败
_, err := nc.Request("greet.joe", nil, time.Second)
fmt.Println(err) // 输出: nats: timeout
}
此示例演示了请求-响应模式。一个服务订阅 “greet.*” 主题,并根据请求的主题后缀(如 “joe”)返回个性化的问候。客户端使用 nc.Request()
发送请求并获取响应。
使用 Protocol Buffers 作为消息负载的请求-响应示例 :
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
// 假设已通过 protoc 生成 greet.pb.go,包含 GreetRequest 和 GreetReply 结构体
// "your_project_path/greet"
)
func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
nc, _ := nats.Connect(url)
defer nc.Drain()
// 订阅 "greet" 主题,处理 Protobuf 编码的请求并响应
nc.Subscribe("greet", func(msg *nats.Msg) {
var req GreetRequest
proto.Unmarshal(msg.Data, &req) // 解码请求
rep := GreetReply{
Text: fmt.Sprintf("hello %q!", req.Name),
}
data, _ := proto.Marshal(&rep) // 编码响应
msg.Respond(data)
})
// 准备 Protobuf 请求
req := GreetRequest{
Name: "joe",
}
data, _ := proto.Marshal(&req)
// 发送 Protobuf 编码的请求
msg, _ := nc.Request("greet", data, time.Second)
var rep GreetReply
proto.Unmarshal(msg.Data, &rep) // 解码响应
fmt.Printf("reply: %s\n", rep.Text) // 输出: reply: hello "joe"!
}
此示例展示了如何在请求-响应模式中使用 Protocol Buffers 作为消息的编码格式,以提高效率和类型安全。
NATS 服务框架示例 :
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/micro"
"golang.org/x/exp/slices"
)
func handleMin(req micro.Request) {
var numbers []int
json.Unmarshal(req.Data(), &numbers)
if len(numbers) == 0 {
req.Error("400", "No numbers provided", nil)
return
}
min := slices.Min(numbers)
req.RespondJSON(&struct{ Min int }{Min: min})
}
func handleMax(req micro.Request) {
var numbers []int
json.Unmarshal(req.Data(), &numbers)
if len(numbers) == 0 {
req.Error("400", "No numbers provided", nil)
return
}
max := slices.Max(numbers)
req.RespondJSON(&struct{ Max int }{Max: max})
}
func main() {
url, exists := os.LookupEnv("NATS_URL")
if !exists {
url = nats.DefaultURL
} else {
url = strings.TrimSpace(url)
}
if strings.TrimSpace(url) == "" {
url = nats.DefaultURL
}
nc, err := nats.Connect(url)
if err != nil {
log.Fatal(err)
return
}
// 使用 micro 包创建服务
srv, err := micro.AddService(nc, micro.Config{
Name: "minmax",
Version: "0.0.1",
Description: "Returns the min/max number in a request",
})
if err != nil {
log.Fatal(err)
return
}
fmt.Printf("Created service: %s (%s)\n", srv.Info().Name, srv.Info().ID)
// 为服务添加端点
root := srv.AddGroup("minmax")
root.AddEndpoint("min", micro.HandlerFunc(handleMin))
root.AddEndpoint("max", micro.HandlerFunc(handleMax))
// 客户端请求示例
requestSlice := []int{-1, 2, 100, -2000}
requestData, _ := json.Marshal(requestSlice)
msg, _ := nc.Request("minmax.min", requestData, 2*time.Second)
var result struct{ Min int }
json.Unmarshal(msg.Data, &result)
fmt.Printf("Requested min value, got %d\n", result.Min)
}
此示例展示了如何使用 nats.go/micro
包构建一个简单的微服务。服务名为 “minmax”,提供了 “min” 和 “max” 两个端点,分别用于计算请求中数字列表的最小值和最大值。客户端可以通过向 “minmax.min” 或 “minmax.max” 主题发送请求来调用这些服务。
这些示例代码清晰地展示了 NATS Go 客户端库在实现不同消息模式时的基本用法,包括连接管理、消息发布、消息订阅以及请求-响应交互。通过这些基础操作,开发者可以构建出复杂的分布式应用。
3.3 NATS JetStream:持久化与流处理
NATS JetStream 是 NATS 2.0 引入的核心组件,它为 NATS 生态系统带来了强大的消息持久化、流处理和「至少一次」交付语义。在 JetStream 出现之前,核心 NATS 是一个纯内存的消息系统,提供「至多一次」的交付,消息不持久化,服务器重启后消息会丢失 。NATS Streaming(STAN)是早期的持久化解决方案,但它存在一些架构上的限制,如难以水平扩展等 。JetStream 旨在克服这些限制,提供一个可水平扩展、多租户、高性能的持久化层,同时保持 NATS 的简单性和易用性。
JetStream 的核心概念包括流(Streams)、消费者(Consumers)和消息确认(Acknowledgements)。流定义了消息的存储方式、保留策略(如基于时间、消息数量、总大小等)以及消息的来源主题(Subjects)。一个流可以关联多个主题,消息发布到这些主题时会被流捕获并持久化存储。JetStream 支持两种存储类型:文件存储(File Storage)和内存存储(Memory Storage)。文件存储将消息持久化到磁盘,确保服务器重启后消息不丢失,适用于需要高可靠性的生产环境。内存存储则将消息保存在内存中,性能更高,但服务器重启后消息会丢失,适用于临时持久化或对性能要求极高的场景 。
消费者定义了如何从流中读取消息。每个消费者都有自己的状态,包括已处理消息的偏移量(ACK 位置)。JetStream 支持两种主要的消费模式:推送模式(Push-based)和拉取模式(Pull-based)。推送模式下,JetStream 服务器主动将消息推送给消费者,类似于核心 NATS 的订阅机制。拉取模式下,消费者主动从服务器批量拉取消息,这种模式非常适合工作队列场景,可以方便地水平扩展消费者数量而不会导致消息重复处理。消费者还可以配置各种策略,如消息确认超时(AckWait)、最大重试次数(MaxDeliver)、死信队列(Dead Letter Queue)等,以满足不同的应用需求。
使用 NATS Go 客户端操作 JetStream 也非常便捷。首先需要创建一个 nats.JetStreamContext
对象,通常通过 nc.JetStream()
方法从已建立的 NATS 连接获取。然后可以使用 js.AddStream()
和 js.AddConsumer()
来创建流和消费者。发布消息到 JetStream 流可以使用 js.Publish()
或 js.PublishAsync()
方法。消费消息则根据消费模式不同而有所区别:推送模式下可以使用 js.Subscribe()
或 js.SubscribeSync()
;拉取模式下可以使用 js.PullSubscribe()
并结合 sub.Fetch()
或 sub.NextMsg()
等方法。例如,创建一个流并发布消息的 Go 代码示例如下 :
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
// 创建流
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
if err != nil {
log.Fatal(err)
}
// 发布消息
ack, err := js.Publish("ORDERS.new", []byte(`{"order_id": 123}`))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message published with sequence number: %d\n", ack.Sequence)
消费消息的示例(推送模式):
// 获取流和消费者句柄
stream, _ := js.Stream(ctx, "foo")
cons, _ := stream.Consumer(ctx, "cons")
// 消费消息
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Println("Received jetstream message: ", string(msg.Data()))
msg.Ack() // 显式确认消息
})
defer cc.Stop()
JetStream 还支持消息重放(Replay),允许消费者从流的任意点开始重新消费消息,这对于数据恢复、审计和回溯分析非常有用。通过合理的流设计和消费者配置,JetStream 能够满足从简单的消息队列到复杂的事件流处理等多种应用场景的需求。
3.4 NATS 集群搭建与运维
NATS 服务器支持集群模式,通过将多个 NATS 服务器节点组成集群,可以实现高可用性、负载均衡和横向扩展。NATS 集群采用去中心化的架构,每个节点都与其他节点建立连接,形成一个全网状的拓扑结构。这种设计避免了单点故障,并且能够自动处理节点加入和离开集群的情况。当客户端连接到集群中的任何一个节点时,它实际上就连接到了整个集群。如果某个节点发生故障,客户端可以自动重连到集群中的其他健康节点,从而实现故障转移。
搭建 NATS 集群通常涉及配置每个节点的监听地址、集群监听地址以及路由地址。路由地址用于节点间建立连接和交换路由信息。例如,一个简单的三节点集群配置可能如下所示:
# 节点 1 配置
listen: 0.0.0.0:4222
cluster {
listen: 0.0.0.0:6222
routes: [
nats-route://node2_ip:6222
nats-route://node3_ip:6222
]
}
# 节点 2 配置
listen: 0.0.0.0:4222
cluster {
listen: 0.0.0.0:6222
routes: [
nats-route://node1_ip:6222
nats-route://node3_ip:6222
]
}
# 节点 3 配置
listen: 0.0.0.0:4222
cluster {
listen: 0.0.0.0:6222
routes: [
nats-route://node1_ip:6222
nats-route://node2_ip:6222
]
}
在实际部署中,通常建议使用奇数个节点(如 3 或 5 个)组成集群,这有助于在发生网络分区时保持集群的可用性和一致性。NATS 集群的运维相对简单,节点的加入和移除是动态的,集群会自动调整路由信息。监控集群状态、节点健康状况以及消息流量是日常运维的重要部分。NATS 提供了 nats-server
的命令行工具和 HTTP 监控端点,可以方便地查看集群信息、连接数、订阅数、消息速率等指标。
当 NATS JetStream 与集群结合使用时,其持久化的流数据也会在集群中进行复制,以提供数据冗余和高可用性。JetStream 使用 Raft 共识算法来管理流和消费者的元数据以及数据的复制。每个 JetStream 流都可以配置一个复制因子(Replication Factor),指定数据在集群中的副本数量。例如,如果复制因子为 3,则流的数据会在三个不同的节点上存储。当某个存储节点发生故障
3.5 NATS 性能测试工具
NATS 生态系统提供了一系列工具来帮助用户评估、监控和调优其 NATS 服务的性能。这些工具对于确保 NATS 系统在高负载下稳定运行、识别瓶颈以及进行容量规划至关重要。
nats-bench
/ nats bench
:
nats-bench
(旧版,通常作为 nats.go
的一部分) 或 nats bench
(新版,集成在 natscli
中) 是 NATS 官方提供的性能基准测试工具,用于测量 NATS 服务器的消息吞吐量和延迟等关键指标 。它允许用户模拟不同数量的发布者 (publishers) 和订阅者 (subscribers),发送特定数量和大小 (message size) 的消息到指定的主题 (subject) 。通过调整这些参数,用户可以评估 NATS 服务器在不同工作负载下的表现。
- 安装与使用:
- 旧版
nats-bench
可以通过编译nats.go
项目中的examples/nats-bench/
目录下的代码获得 (go build main.go
) 。 - 新版
nats bench
命令通常作为natscli
(NATS 命令行工具) 的一部分提供,可以通过go install github.com/nats-io/natscli/nats@latest
安装 。 - 基本用法示例:
- 测试发布者性能:
nats-bench -np 1 -n 100000 -ms 16 foo
(1个发布者,发送10万条16字节的消息到主题 “foo”) 。 - 测试发布者和订阅者性能:
nats-bench -np 1 -ns 1 -n 100000 -ms 16 foo
(1个发布者,1个订阅者) 。 - 使用
natscli
的nats bench pub
和nats bench sub
命令可以进行更细致的测试,例如指定客户端数量、消息数量、是否显示进度等 。例如:nats bench pub test --msgs 10000000 --clients 2 --no-progress
。 - 还可以测试 JetStream 的发布性能:
nats bench js pub js.bench --clients 2 --msgs 1000000 --no-progress --create
。 - 测试请求/响应模式的性能:
nats bench service serve
和nats bench service request
。
- 测试发布者性能:
- 旧版
- 测试指标解读:
- 消息吞吐量: 每秒处理的消息数量 (msgs/sec) 和带宽 (MB/sec) 。例如,一个发布者发送一千万条16字节消息,吞吐量可达 ~7.67 million msgs/sec (~117.06 MB/sec) 。
- 延迟: 消息从发送到接收的时间。
nats latency
命令可以专门用于测量延迟 。 - CPU 和内存使用: 在测试过程中,可以结合
nats-top
等工具监控服务器的资源消耗情况 。 - 当指定多个发布者或订阅者时,
nats-bench
会报告每个客户端的吞吐量以及最小值、平均值、最大值和标准差,有助于分析负载均衡和个体性能差异 .
nats-top
:
nats-top
是一个类似于 Linux top
命令的实时监控工具,用于查看 NATS 服务器的性能指标,如连接数、订阅数、待处理消息数、消息流入流出速率、字节流入流出速率、客户端语言和版本等 。它通过连接到 NATS 服务器的监控端口 (通常是 8222) 来获取数据。
- 安装与使用:
- 安装:
go get github.com/nats-io/nats-top
。 - 启动 NATS 服务器并启用监控端口:
nats-server -m 8222
或gnatsd -m 8222
。 - 启动
nats-top
: 直接运行nats-top
命令 。 - 交互命令:
o <field>
: 按指定字段排序 (如o subs
按订阅数排序) 。s
: 切换只显示订阅者信息 。q
: 安全退出 。?
: 查看帮助信息 。
- 启动参数: 可以在启动时指定查询规则,例如
nats-top -n 1 -sort subs
只显示连接数最多的一个服务并按订阅数排序 .
- 安装:
nats latency
:
nats latency
是 natscli
提供的一个工具,专门用于测量 NATS 系统的延迟。它可以测试发布服务器往返时间 (Pub Server RTT) 和订阅服务器往返时间 (Sub Server RTT),并提供详细的延迟百分位数报告 (HDR Histogram) 。这对于评估 NATS 在实时系统中的响应速度非常有用。
- 使用示例:
nats latency --server-b localhost:4222 --rate 500000
。- 该命令会以指定的速率 (如 500,000 msg/s) 发送消息,并报告最小、中位数、最大延迟以及不同百分位(如 P50, P90, P99, P99.9, P99.99, P99.999, P99.9999, P99.99999, P100)的延迟值 .
其他性能测试相关工具和实践:
- iperf3: 虽然不是 NATS 专用工具,但
iperf3
可以用来测试网络基础架构的传输性能,这对于理解 NATS 性能测试结果的环境背景非常重要 。例如,测试本地宿主机到虚拟机的网络传输速率为 13.6 Gbps 。 - 自定义监控: 开发者可以通过 NATS 客户端库(如 Go 客户端
nats.go
)提供的统计信息接口 (nc.Stats()
) 来收集和打印连接级别的统计数据,如已发送/接收消息数、字节数、重连次数等,并集成到 Prometheus 等监控系统中 。 - 性能优化最佳实践: 包括使用异步发布以最大化吞吐量,并控制进行中的异步发布请求数量 (例如
js = nc.JetStream(nats.PublishAsyncMaxPending(100))
) 以避免压垮服务器 。