NATS 通讯协议及
nats-server
深度调研报告
探索轻量级、高性能消息系统的核心原理、设计哲学与应用实践
关键洞察
- 亚毫秒级延迟
- 每秒数百万消息吞吐
- 极简架构设计
- 云原生友好
执行摘要
NATS 是一个开源的、轻量级、高性能的消息系统,其核心设计哲学围绕简单性、性能和弹性。它支持多种消息传递模式,包括发布-订阅、请求-回复和队列组,并通过 JetStream 模块提供消息持久化能力。NATS 以其低延迟、高吞吐量和易于部署的特性,在微服务通信、物联网(IoT)和实时消息推送等场景中得到广泛应用。
极致性能
Core NATS 可实现亚毫秒级延迟,在"发后即忘"模式下吞吐量可达每秒数百万条消息
极简设计
紧凑的 API 设计,轻量级二进制文件,无外部依赖,配置简单直观
弹性扩展
支持集群和超级集群模式,可实现透明扩展和高可用性部署
1. NATS 核心架构原理
NATS 通讯协议及其核心实现 nats-server 的设计,旨在提供一个高性能、轻量级且易于使用的消息传递系统。其核心架构原理围绕着几个关键概念构建,包括灵活的消息传递模式、可扩展的架构拓扑以及通过 JetStream 实现的持久化能力294 536。
1.1 消息传递模式
NATS 支持多种消息传递模式,以满足不同应用场景的需求。最基础的模式是发布-订阅(Pub/Sub),允许消息生产者将消息发送到特定的主题(Subject),而消息消费者则可以订阅一个或多个主题来接收消息294 536。
发布-订阅模式
天然支持一对多的广播通信,主题支持通配符匹配
请求-回复模式
同步通信方式,自动包含回复主题实现请求响应
1.2 架构拓扑
NATS 服务器的架构设计注重简洁性和可扩展性。单个 NATS 服务器实例是一个轻量级的进程,资源占用小,启动速度快。为了实现高可用性和横向扩展,多个 NATS 服务器实例可以组成集群(Cluster) 498 536。
关键架构特性
- 全连接网状网络(full mesh network)
- 支持超级集群(Supercluster)跨地域部署
- 叶节点(Leaf Nodes)支持边缘计算场景
1.3 JetStream 持久化
原生的 NATS(Core NATS)是一个"最多一次"(at-most-once)的消息传递系统。为了满足需要消息持久化、至少一次交付的场景,NATS 引入了 JetStream 292 536。
# JetStream 核心功能
- 消息持久化存储(磁盘或内存)
- 可配置的保留策略(时间、数量、大小)
- 多种消息确认机制(Ack)
- 键值存储(Key-Value Store)
- 对象存储(Object Store)
- RAFT 共识算法保证一致性
3. NATS 与其他消息系统对比分析
在选择消息系统时,理解不同系统之间的差异及其各自的优势和劣势至关重要。NATS、Apache Kafka、RabbitMQ 和 Redis Pub/Sub 都是流行的消息传递解决方案,但它们的设计目标、核心特性以及适用场景各不相同。
3.1 NATS vs. Apache Kafka
特性 | NATS (Core) | NATS (with JetStream) | Apache Kafka |
---|---|---|---|
主要设计目标 | 轻量级、高性能 Pub/Sub,服务间通信501 | Core NATS + 持久化、流处理501 | 分布式日志,大规模数据流处理,事件溯源501 |
持久化 | 无 (内存传递)501 | 有 (流存储,内存或文件)501 | 有 (磁盘日志)501 |
默认交付语义 | 至多一次 (At-most-once)475 | 至少一次 (At-least-once),可配置475 | 至少一次 (At-least-once)475 |
延迟 | 极低 (微秒级)464 | 低,但高于 Core NATS (因持久化) | 较高 (毫秒级,因磁盘 I/O 和批处理)475 |
运维复杂度 | 低475 | 中等 | 高475 |
3.2 NATS vs. RabbitMQ
特性 | NATS (Core) | NATS (with JetStream) | RabbitMQ |
---|---|---|---|
主要设计目标 | 轻量级、高性能 Pub/Sub,服务间通信501 | Core NATS + 持久化、流处理501 | 企业级消息代理,可靠消息传递,复杂路由504 |
协议 | 自定义文本/二进制协议472 | 自定义文本/二进制协议472 | AMQP, MQTT, STOMP 等472 |
路由灵活性 | 基于主题 (支持通配符)294 | 基于主题 (支持通配符) | 高度灵活 (交换器、绑定)333 |
性能 | 极低延迟,高吞吐量316 | 高 | 良好,但通常低于 NATS (Core NATS)100 |
3.3 NATS vs. Redis Pub/Sub
4. NATS Demo Code 示例
为了更具体地展示 NATS 在不同场景下的应用,本节将提供使用 Go、TypeScript 和 Python 编写的简单 Demo 代码。这些示例将涵盖微服务通信、实时消息推送和物联网应用等典型用例。
4.1 Go 语言示例:微服务通信
Go 语言因其并发性能和简洁性,在微服务开发中非常流行。NATS 提供了功能强大的 Go 客户端库 `nats.go`,支持 Core NATS 和 JetStream 的所有功能。
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接到 NATS 服务器
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 发布-订阅示例
// 订阅主题 "updates"
sub, err := nc.Subscribe("updates", func(msg *nats.Msg) {
fmt.Printf("Received a message on subject %s: %s\n", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
// 发布消息到主题 "updates"
err = nc.Publish("updates", []byte("Hello, NATS!"))
if err != nil {
log.Fatal(err)
}
// 请求-回复示例
// 订阅服务请求主题 "service.request"
_, err = nc.Subscribe("service.request", func(msg *nats.Msg) {
fmt.Printf("Received a request: %s\n", string(msg.Data))
// 处理请求并发送回复
nc.Publish(msg.Reply, []byte("Response to: "+string(msg.Data)))
})
// 发送请求并等待回复
response, err := nc.Request("service.request", []byte("Request data"), 2*time.Second)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received a reply: %s\n", string(response.Data))
}
4.2 TypeScript 语言示例:实时消息推送
TypeScript 结合 Node.js 常用于构建实时 Web 应用。NATS 提供了 Node.js 客户端库 `nats.js`,支持在 TypeScript 和 JavaScript 中使用 NATS。
import { connect, StringCodec } from 'nats';
// 连接到 NATS 服务器
const nc = await connect({ servers: 'nats://localhost:4222' });
const sc = StringCodec();
// 模拟一个实时数据源,例如股票行情或体育赛事比分
setInterval(() => {
const message = `Current update: ${new Date().toLocaleTimeString()}`;
// 发布实时更新到主题 "live.updates"
nc.publish("live.updates", sc.encode(message));
console.log(`Published: ${message}`);
}, 1000); // 每秒推送一次
// 客户端订阅示例
const sub = nc.subscribe("live.updates", {
callback: (err, msg) => {
if (err) {
console.error(err);
return;
}
console.log(`[Subscriber] Received: ${sc.decode(msg.data)}`);
},
});
4.3 Python 语言示例:物联网 (IoT) 应用
Python 因其易用性和丰富的库,在物联网(IoT)领域,特别是在设备端脚本和服务器端数据处理方面有广泛应用。NATS 提供了 Python 客户端库 `nats-py`。
import asyncio
from nats.aio.client import Client as NATS
async def iot_device(device_id):
nc = NATS()
await nc.connect(servers=["nats://localhost:4222"])
# 模拟设备定期发送传感器数据
while True:
sensor_data = f"Device {device_id}: Sensor reading {asyncio.get_event_loop().time()}"
print(f"Sending: {sensor_data}")
await nc.publish(f"iot.data.{device_id}", sensor_data.encode())
await asyncio.sleep(2) # 每2秒发送一次数据
async def cloud_service():
nc = NATS()
await nc.connect(servers=["nats://localhost:4222"])
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received on [{subject}]: {data}")
# 在这里处理接收到的 IoT 数据
# 订阅所有 IoT 设备的数据主题 "iot.data.>"
await nc.subscribe("iot.data.>", cb=message_handler)
print("Cloud service subscribed to iot.data.>")
async def main():
# 启动模拟的 IoT 设备
device_task = asyncio.create_task(iot_device("sensor01"))
# 启动云端服务
cloud_task = asyncio.create_task(cloud_service())
await asyncio.gather(device_task, cloud_task)
if __name__ == '__main__':
asyncio.run(main())