NATS协议与实现深度调研

NATS协议与实现深度调研

NATS协议与实现深度调研

轻量级、高性能的分布式消息传递系统

NATS是一个简单、安全且高性能的通信系统和数据层,专为数字系统、服务和设备设计。它采用发布-订阅模型,支持请求-响应模式,并具备队列组功能,为现代分布式系统提供灵活、可靠的通信解决方案。

architecture NATS协议基本原理

NATS协议是一个简单的、基于文本的发布/订阅风格协议,具有以下特点:

  • 客户端连接到gnatsd(NATS服务器),通信基于普通的TCP/IP套接字
  • 协议定义了很小的操作集,换行表示终止
  • 与传统的二进制消息格式不同,NATS使用基于文本的协议,使得客户端实现更简单
  • 支持多种编程语言实现,便于集成到不同技术栈

协议消息格式

主题名(Subject Name)是大小写敏感的,必须是非空字符串,不能包含空格,可以使用”.”符号。例如:FOO、BAR、foo.bar都是有效的主题名。

通配符支持

  • *:匹配主题的任意级别的任意字符(一个*只能匹配一个级别)
  • 例如:foo.* 匹配 foo.a 但不匹配 foo.a.b
  • >:匹配后面的任意字符
  • 例如:foo.> 匹配 foo.bar 和 foo.bar.baz.1,但不匹配 foo

协议操作集

操作名 发送端 描述
INFO 服务器 初始化TCP/IP连接后发送给客户端
CONNECT 客户端 发送给服务器指定连接信息
PUB 客户端 发布消息到主题或Reply主题
SUB 客户端 订阅主题(或主题通配符)
UNSUB 客户端 取消订阅主题(或自动取消订阅)
MSG 服务器 交付一条消息负载给订阅者
PING/PONG 两端 保持连接有效的活跃消息

design_services NATS架构和设计思想

NATS是一个轻量级、高性能的消息传递系统,用于构建分布式系统和微服务架构,其核心设计思想包括:

hub 发布-订阅模型

基于主题/主题寻址的发布-订阅模型,实现消息的解耦传递

location_on 位置独立性

发布者和订阅者无需知道对方的位置,由NATS服务器负责消息路由

group_work 多对多通信

默认的多对多(M. N)通信模式,支持多个发布者和多个订阅者

extension 可扩展性

通过JetStream增强,添加持久化功能,支持至少一次和精确一次的语义

核心特性

  • 高性能:设计为低延迟、高吞吐量的消息系统
  • 轻量级:核心服务器仅2MB多,资源占用少
  • 简单易用:API简洁,易于集成和使用
  • 灵活性:支持多种消息模式,包括发布-订阅、请求-响应和队列组
  • 安全性:支持TLS加密、认证和授权机制
  • 集群支持:支持服务器集群,提供高可用性和可扩展性

code Go实现的nats-server

nats-server是NATS的官方Go语言实现,具有以下特点:

  • 使用Golang语言开发,可执行文件名为gnatsd(Go NATS Daemon)
  • 开源软件,基于MIT许可证发布
  • 非常轻量级,发布包只有2MB多,启动时可以无需任何参数,直接运行
  • 支持多种配置选项,包括服务器选项、日志选项、授权认证选项、TLS安全选项和集群选项

主要配置选项

服务器选项
  • -a, –addr HOST:绑定主机IP地址(默认是0.0.0.0)
  • -p, –port PORT:客户端连接端口(默认是4222)
  • -m, –http_port PORT:HTTP监控端口
  • -c, –config FILE:指定配置文件
安全选项
  • –user user:连接需要的用户名
  • –pass password:连接需要的密码
  • –tls:启用TLS
  • –tlscert FILE:服务器证书文件
Go客户端示例 go
package main import ( “fmt” “github.com/nats-io/nats.go” ) func main() { // 连接到NATS服务器 nc, err := nats.Connect(“nats://localhost:4222”) if err != nil { log.Fatalf(“连接到NATS服务器失败: %v”, err) } defer nc.Close() // 发布消息 subject := “my_subject” message := []byte(“Hello, NATS!”) err = nc.Publish(subject, message) if err != nil { log.Printf(“发布消息失败: %v”, err) } // 订阅主题 nc.Subscribe(subject, func(m *nats.Msg) { fmt.Printf(“收到消息: %s\n”, string(m.Data)) }) // 等待消息 select {} }

php PHP客户端实现

有多个PHP NATS客户端实现,如repejota/phpnats和basis-company/nats.php,它们提供以下功能:

  • 基本的NATS功能,如连接到NATS服务器、订阅主题、发布消息等
  • 可以通过Composer安装,使用简单
  • 适用于构建异步应用程序,特别是在微服务架构中实现服务间通信
  • 支持通配符订阅和队列组

应用场景

apps 微服务架构

PHP应用利用NATS实现服务间的高效沟通,解耦服务依赖

event 事件驱动

构建实时系统,如日志聚合、数据同步等,通过发布事件触发后续处理

PHP客户端示例 php
subscribe(‘hello’, function ($message) { echo “Received message: {$message}\n”; }); // 发布一条消息到’hello’主题 $client->publish(‘hello’, ‘Hello, NATS!’); // 等待一段时间以确保消息被处理 sleep(1); } catch (\Exception $e) { echo “Error: ” . $e->getMessage(); } finally { // 关闭连接 $client->close(); }

summarize 总结

NATS作为一个轻量级、高性能的分布式消息传递系统,具有以下优势:

  • 简单易用:基于文本的协议使得客户端实现简单,API设计直观
  • 高性能:低延迟、高吞吐量,适合实时通信场景
  • 灵活性:支持多种消息模式,适应不同的应用场景
  • 跨语言支持:提供多种编程语言的客户端实现,便于集成
  • 云原生:适合微服务架构和云原生应用开发

无论是Go实现的nats-server还是PHP客户端实现,都体现了NATS系统的设计理念:简单、高效、灵活。这使得NATS成为构建现代分布式系统的理想选择,特别是在需要高性能、低延迟通信的场景中。

NATS协议与实现深度调研 – NATS协议基本原理

NATS协议基本原理

简单、高效、基于文本的发布/订阅协议

description 协议概述

NATS协议是一个简单的、基于文本的发布/订阅风格协议,专为高性能消息传递设计。它具有以下核心特点:

text_format
基于文本
使用人类可读的文本格式,而非二进制协议,便于调试和实现
sync_alt
发布/订阅模型
支持一对多的消息分发模式,实现松耦合通信
lan
TCP/IP通信
基于标准TCP/IP套接字,确保跨平台兼容性
speed
高性能
最小化协议开销,优化低延迟和高吞吐量
lightbulb 设计优势

与传统的、使用了二进制消息格式的消息通信系统不同,NATS使用了基于文本的协议,使得客户端实现很简单,可以方便地选择多种编程语言或脚本语言来实现。这种设计选择大大降低了开发和维护成本,提高了系统的可扩展性。

swap_horiz 通信方式

NATS客户端与服务器之间的通信遵循以下模式:

  • 客户端连接到gnatsd(NATS服务器),通过普通的TCP/IP套接字进行通信
  • 协议定义了很小的操作集,消息以换行符(CR+LF)表示终止
  • 协议消息的域使用空格符或制表符(\t)进行分隔,多个连续空格会被视为一个空格
  • 支持同步和异步通信模式,满足不同场景需求
INFO {“server_id”:”NABCDEFG”,”version”:”2.2.0″,”go”:”go1.17″,”host”:”0.0.0.0″,”port”:4222,”auth_required”:false} CONNECT {“verbose”:false,”pedantic”:false,”tls_required”:false,”name”:”client-name”} SUB foo 1 PUB bar 11\r\nHello World MSG foo 1 bar 11\r\nHello World PING PONG

category 主题与通配符

NATS使用主题(Subject)进行消息路由,主题具有以下特性:

  • 主题名是大小写敏感的,必须是非空字符串
  • 主题名不能包含空格,但可以使用点号(.)作为分隔符,形成层级结构
  • 有效主题名示例:FOO、BAR、foo.bar、foo.BAR、FOO.BAR.BAZ

NATS支持两种通配符,用于灵活的消息订阅:

  • *(星号):匹配主题的任意级别的任意字符,但一个*只能匹配一个级别
    • 例如:foo.* 匹配 foo.a 但不匹配 foo.a.b
    • foo.*.* 可以匹配 foo.a.b 但不匹配 foo.a.b.c
  • >(大于号):匹配后面的任意字符,通常用于订阅主题的子树
    • 例如:foo.> 匹配 foo.bar 和 foo.bar.baz.1,但不匹配 foo
warning 通配符使用限制

通配符必须被点号分隔。例如:foo.bar 和 foo.> 都是有效的,而 foo..bar、f*o.b*r 和 foo> 都是无效的。此外,pub操作只能针对具体的某个topic,不能使用通配符!

list_alt 协议操作集

NATS协议定义了一组简单的操作,注意操作名是大小写不敏感的,因此 SUB foo 1\r\n 和 sub foo 1\r\n 是等价的。

info INFO
发送方:服务器
描述:服务器初始化TCP/IP连接后发送给客户端,包含服务器信息和配置
link CONNECT
发送方:客户端
描述:发送给服务器指定连接信息,包括认证选项和连接参数
send PUB
发送方:客户端
描述:发布消息到主题或Reply主题,格式:PUB <#bytes>\r\n[payload]\r\n
notifications SUB
发送方:客户端
描述:订阅主题(或主题通配符),格式:SUB \r\n
notifications_off UNSUB
发送方:客户端
描述:取消订阅主题(或自动取消订阅),格式:UNSUB \r\n
mail MSG
发送方:服务器
描述:交付一条消息负载给订阅者,格式:MSG <#bytes>\r\n[payload]\r\n
network_ping PING/PONG
发送方:两端
描述:保持连接有效的活跃消息,用于连接健康检查
check_circle +OK/-ERR
发送方:服务器
描述:+OK确认详细模式下协议消息的合法,-ERR指示协议错误,会导致客户端断开连接
NATS协议与实现深度调研 – NATS架构和设计思想

NATS架构和设计思想

轻量级、高性能的分布式消息传递系统

architecture 核心架构

NATS是一个轻量级、高性能的消息传递系统,专为构建分布式系统和微服务架构设计。其核心架构基于发布-订阅模型,通过主题寻址实现消息路由,具有以下特点:

hub

发布-订阅模型

NATS采用经典的发布-订阅模型,发布者将消息发送到特定主题,订阅者通过订阅主题接收消息。这种模式实现了消息生产者和消费者的解耦,使系统更加灵活和可扩展。

alt_route

主题寻址

NATS使用层级化的主题结构进行消息路由,主题由点号(.)分隔的多个部分组成,例如”service.order.created”。这种结构化的主题命名便于组织和管理消息流。

account_tree NATS发布-订阅模型
发布者1
发送消息到主题A
arrow_forward
NATS服务器
消息路由与分发
arrow_forward
订阅者1
订阅主题A
arrow_forward
订阅者2
订阅主题A

extension 关键设计特性

NATS的设计包含多个关键特性,这些特性使其成为构建现代分布式系统的理想选择:

location_on
位置独立性
发布者和订阅者无需知道对方的位置,由NATS服务器负责消息路由,实现真正的服务解耦
group_work
多对多通信
默认的多对多(M. N)通信模式,支持多个发布者和多个订阅者,提高系统灵活性和可扩展性
filter_alt
通配符支持
支持*和>通配符,实现灵活的主题匹配,简化订阅管理
speed
高性能
极低的消息处理延迟和高吞吐量,适合实时通信场景

通配符使用

通配符 描述 示例
* 匹配主题的任意级别的任意字符(一个*只能匹配一个级别) foo.* 匹配 foo.a 但不匹配 foo.a.b
> 匹配后面的任意字符,通常用于订阅主题的子树 foo.> 匹配 foo.bar 和 foo.bar.baz.1

stream JetStream增强功能

虽然核心NATS提供尽力而为、最多一次的消息传递,但通过JetStream可以增强NATS系统,添加持久化功能,引入更强大的消息语义:

save

持久化存储

JetStream为NATS添加了持久化存储能力,消息可以被持久化到磁盘,确保即使在服务器重启后也不会丢失。这使得NATS可以用于需要可靠消息传递的场景。

verified

消息语义

JetStream引入了至少一次和精确一次的语义,确保消息能够被正确传递和处理。通过消费者确认机制和消息重试策略,解决了核心NATS中可能出现的消息丢失问题。

history
消息流
JetStream引入了流(Stream)概念,消息被组织到流中,支持消息重放和历史查询
person
消费者
通过消费者(Consumer)机制,可以控制消息的传递速率和顺序,支持拉取和推送模式
JetStream基本使用示例 go
// 创建JetStream上下文 js, err := nc.JetStream() if err != nil { log.Fatal(err) } // 添加流 _, err = js.AddStream(&nats.StreamConfig{ Name: “ORDERS”, Subjects: []string{“orders.>”}, }) if err != nil { log.Fatal(err) } // 发布消息到流 _, err = js.Publish(“orders.new”, []byte(“order data”)) if err != nil { log.Fatal(err) } // 创建消费者 _, err = js.AddConsumer(“ORDERS”, &nats.ConsumerConfig{ Durable: “PROCESSOR”, AckPolicy: nats.AckExplicit, }) if err != nil { log.Fatal(err) } // 订阅消息 sub, err := js.Subscribe(“orders.new”, “PROCESSOR”, func(msg *nats.Msg) { fmt.Printf(“Received order: %s\n”, string(msg.Data)) msg.Ack() // 确认消息处理完成 })

integration_instructions 通信模式

NATS支持多种通信模式,满足不同场景的需求:

campaign
发布-订阅
一对多的消息分发模式,发布者发送消息到主题,所有订阅该主题的订阅者都会收到消息
sync_alt
请求-响应
支持请求-响应模式,客户端可以发送请求并等待响应,实现同步通信
queue
队列组
多个订阅者可以组成队列组,每条消息只被组中的一个订阅者处理,实现负载均衡
groups
服务发现
内置服务发现机制,服务可以注册自己并发现其他服务,简化微服务架构
NATS协议与实现深度调研 – Go实现的nats-server

Go实现的nats-server

轻量级、高性能的NATS服务器实现

code 实现特点

nats-server是NATS的官方Go语言实现,具有以下特点:

language

Go语言开发

使用Golang语言开发,可执行文件名为gnatsd(Go NATS Daemon)。Go语言的并发模型和高效垃圾回收机制使nats-server能够处理大量并发连接,同时保持低内存占用。

speed

轻量级设计

非常轻量级,发布包只有2MB多,启动时可以无需任何参数,直接运行即可。这种轻量级设计使得NATS非常适合边缘计算和资源受限环境。

基本启动命令 bash
# 下载并解压NATS服务器 $ tar -xzf gnatsd-v2.2.0-linux-amd64.tar.gz # 进入目录 $ cd gnatsd-v2.2.0-linux-amd64 # 直接运行(无需任何参数) $ ./gnatsd [12345] 2023/08/15 10:30:45.123456 [INF] Starting nats-server [12345] 2023/08/15 10:30:45.123789 [INF] Version 2.2.0 [12345] 2023/08/15 10:30:45.124012 [INF] Listening for client connections on 0.0.0.0:4222 [12345] 2023/08/15 10:30:45.124234 [INF] Server is ready

settings 配置选项

nats-server支持多种配置选项,可以通过命令行参数或配置文件进行设置。主要配置选项包括:

选项类型 参数 描述
服务器选项 -a, –addr HOST 绑定主机IP地址(默认是0.0.0.0)
-p, –port PORT 客户端连接NATS服务器使用的端口(默认是4222)
-m, –http_port PORT 使用HTTP端口作为监听端口
-c, –config FILE 指定配置文件
日志选项 -l, –log FILE 指定日志输出的文件
-T, –logtime 是否开启日志的时间戳(默认为true)
-D, –debug 开启调试输出
授权认证选项 –user user 连接需要的用户名
–pass password 连接需要的密码
TLS安全选项 –tls 启用TLS,不验证客户端(默认为false)
–tlscert FILE 服务器证书文件
–tlskey FILE 服务器证书私钥
–tlsverify 启用TLS,每一个客户端都要认证
集群选项 –routes [rurl-1, rurl-2] 路线征求并连接
配置文件示例 conf
# NATS服务器配置文件示例 # 服务器基本配置 server_name: “nats-server” host: “0.0.0.0” port: 4222 http_port: 8222 # 日志配置 log_file: “/var/log/nats-server.log” logtime: true debug: false trace: false # 认证配置 authorization: { users: [ {user: “admin”, password: “admin123”} {user: “client”, password: “client123”} ] } # TLS配置 tls: { cert_file: “/etc/certs/server-cert.pem” key_file: “/etc/certs/server-key.pem” ca_file: “/etc/certs/ca-cert.pem” verify: true } # 集群配置 cluster: { name: “nats-cluster” host: “0.0.0.0” port: 6222 routes: [ “nats://node1:6222” “nats://node2:6222” “nats://node3:6222” ] } # JetStream配置 jetstream: { store_dir: “/data/jetstream” max_memory_store: 1GB max_file_store: 10GB }

deployed_code 部署方式

nats-server支持多种部署方式,包括直接运行、系统服务、Docker容器和Kubernetes部署等。以下是基本部署步骤:

1
下载与解压
从NATS官方网站下载对应平台的二进制文件,并解压到目标目录。
2
配置服务器
根据需求创建配置文件,设置服务器选项、认证、安全等参数。
3
启动服务
使用命令行参数或配置文件启动nats-server,验证服务正常运行。
4
集群部署(可选)
如需高可用性,可配置多个nats-server节点组成集群,实现故障转移和负载均衡。

多平台支持

computer
Linux
支持x86_64、ARM等多种架构,提供systemd服务脚本
laptop_mac
macOS
提供Intel和Apple Silicon版本,支持Homebrew安装
desktop_windows
Windows
提供Windows服务安装选项,支持命令行和GUI管理
Docker部署示例 docker
# 使用Docker运行NATS服务器 $ docker run -d –name nats-server \ -p 4222:4222 -p 8222:8222 -p 6222:6222 \ -v /path/to/config:/etc/nats \ nats:2.2.0 -c /etc/nats/nats-server.conf # 使用Docker Compose部署NATS集群 version: ‘3.7’ services: nats1: image: nats:2.2.0 ports: – “4222:4222” – “8222:8222” – “6222:6222” command: | –cluster_name nats-cluster –http_port 8222 –port 4222 –cluster_port 6222 –routes nats://nats2:6222,nats://nats3:6222 nats2: image: nats:2.2.0 ports: – “4223:4222” – “8223:8222” command: | –cluster_name nats-cluster –http_port 8223 –port 4222 –cluster_port 6222 –routes nats://nats1:6222,nats://nats3:6222 nats3: image: nats:2.2.0 ports: – “4224:4222” – “8224:8222” command: | –cluster_name nats-cluster –http_port 8224 –port 4222 –cluster_port 6222 –routes nats://nats1:6222,nats://nats2:6222

architecture 架构设计

nats-server的架构设计充分考虑了性能、可靠性和可扩展性,其核心设计特点包括:

memory

内存高效

使用高效的内存管理策略,最小化内存占用。消息传递主要在内存中完成,只有在启用JetStream时才会持久化到磁盘,确保高性能。

sync

并发模型

充分利用Go语言的goroutine和channel机制,实现高效的并发处理。每个客户端连接由独立的goroutine处理,避免锁竞争,提高吞吐量。

Go客户端连接示例 go
package main import ( “fmt” “log” “time” “github.com/nats-io/nats.go” ) func main() { // 连接到NATS服务器 nc, err := nats.Connect(“nats://localhost:4222”) if err != nil { log.Fatal(err) } defer nc.Close() // 设置连接事件处理器 nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { log.Printf(“Error: %v”, err) }) // 发布消息 subject := “example.subject” message := []byte(“Hello, NATS!”) if err := nc.Publish(subject, message); err != nil { log.Fatal(err) } // 订阅主题 sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { fmt.Printf(“Received message: %s\n”, string(msg.Data)) }) if err != nil { log.Fatal(err) } defer sub.Unsubscribe() // 等待消息接收 time.Sleep(100 * time.Millisecond) }
NATS协议与实现深度调研 – PHP客户端实现

PHP客户端实现

为PHP应用提供高性能消息传递能力

integration_instructions 实现特点

目前有多个PHP NATS客户端实现,它们为PHP应用提供了与NATS服务器交互的能力。主要的PHP NATS客户端实现包括:

code

repejota/phpnats

这是一个流行的PHP NATS客户端库,由repejota开发并维护。它提供了简单易用的API,支持NATS的核心功能,包括发布-订阅、请求-响应和队列组模式。该库通过Composer进行安装,文档完善,社区活跃。

business

basis-company/nats.php

由Basis Company开发的PHP NATS客户端,专注于企业级应用场景。它提供了更高级的功能,如连接池、自动重连和更完善的错误处理机制。该库设计用于高负载环境,适合企业级应用集成。

php
PHP原生实现
纯PHP实现,无需额外扩展,兼容PHP 7.0+版本,易于部署和维护
bolt
高性能
优化的网络I/O处理,支持异步操作,满足高并发场景需求
extension
Composer集成
通过Composer包管理器安装,遵循PSR标准,易于集成到现代PHP项目中
security
安全可靠
支持TLS加密连接,提供完善的错误处理和连接状态管理

build 基本功能

PHP NATS客户端提供了与NATS服务器交互的核心功能,使PHP应用能够充分利用NATS的消息传递能力:

link
连接管理
支持连接到NATS服务器,提供连接池、自动重连和连接状态监控功能
send
消息发布
支持向指定主题发布消息,可设置回复主题,实现请求-响应模式
notifications
主题订阅
支持订阅特定主题,可使用通配符进行灵活的主题匹配
group_work
队列组
支持队列组订阅,实现消息在多个消费者之间的负载均衡
基本连接和发布/订阅示例 php
subscribe(‘hello’, function ($message) { echo “Received message: {$message}\n”; }); // 发布一条消息到’hello’主题 $client->publish(‘hello’, ‘Hello, NATS!’); // 等待一段时间以确保消息被处理 sleep(1); } catch (\Exception $e) { echo “Error: ” . $e->getMessage(); } finally { // 关闭连接 $client->close(); }
请求-响应模式示例 php
subscribe(‘help.request’, function ($message) { // 解析请求 $request = json_decode($message, true); // 处理请求并生成响应 $response = [ ‘status’ => ‘success’, ‘answer’ => ‘This is the answer to your question: ‘ . $request[‘question’] ]; // 发送响应到回复主题 $client->publish($message->getReplyTo(), json_encode($response)); }); // 发送请求并等待响应 $inbox = $client->newInbox(); $client->subscribe($inbox, function ($message) { $response = json_decode($message->getBody(), true); echo “Received response: ” . $response[‘answer’] . “\n”; }); // 发送请求 $request = [ ‘question’ => ‘What is NATS?’ ]; $client->publish(‘help.request’, json_encode($request), $inbox); // 等待响应 sleep(1); } catch (\Exception $e) { echo “Error: ” . $e->getMessage(); } finally { $client->close(); }

play_circle 使用方法

使用PHP NATS客户端非常简单,主要通过Composer进行安装,然后通过简单的API调用即可实现与NATS服务器的交互:

1
安装客户端库
使用Composer安装PHP NATS客户端库,例如:composer require repejota/phpnats
2
引入自动加载
在PHP脚本中引入Composer的自动加载文件:require 'vendor/autoload.php';
3
创建客户端实例
创建NATS客户端实例并连接到服务器:$client = new Client('nats://localhost:4222');
4
实现消息交互
使用客户端API进行消息发布、订阅或请求-响应操作
队列组示例 php
queueSubscribe(‘task.queue’, ‘workers’, function ($message) { $task = json_decode($message, true); echo “Processing task: ” . $task[‘id’] . “\n”; // 模拟任务处理 sleep(1); // 任务完成 echo “Task completed: ” . $task[‘id’] . “\n”; }); // 发布任务到队列 for ($i = 1; $i <= 5; $i++) { $task = [ 'id' => $i, ‘data’ => ‘Task data ‘ . $i ]; $client->publish(‘task.queue’, json_encode($task)); echo “Published task: ” . $i . “\n”; } // 等待任务处理完成 sleep(10); } catch (\Exception $e) { echo “Error: ” . $e->getMessage(); } finally { $client->close(); }

cases 应用场景

PHP NATS客户端在多种应用场景中都能发挥重要作用,特别是在需要高性能、低延迟消息传递的系统中:

apps
微服务架构
在微服务架构中,PHP应用可以利用NATS实现服务间的高效通信,实现服务解耦,提高系统的可扩展性和弹性
event
事件驱动系统
构建实时系统,如日志聚合、数据同步、用户行为追踪等,通过发布事件的方式触发后续处理,实现松耦合的事件驱动架构
api
API网关与代理
结合API网关,用作服务间通信的中间件,提高响应速度和解耦能力,实现请求路由、负载均衡和故障转移
notifications_active
实时通知系统
构建实时通知系统,如聊天应用、实时数据更新、推送通知等,利用NATS的高性能和低延迟特性,提供流畅的用户体验
微服务架构示例 php
client = new Client(‘nats://localhost:4222’); // 监听订单创建请求 $this->client->subscribe(‘orders.create’, function ($message) { $orderData = json_decode($message, true); // 创建订单 $orderId = $this->createOrder($orderData); // 发布订单创建事件 $event = [ ‘event’ => ‘order.created’, ‘orderId’ => $orderId, ‘data’ => $orderData ]; $this->client->publish(‘events.orders’, json_encode($event)); // 返回响应 $response = [ ‘status’ => ‘success’, ‘orderId’ => $orderId ]; $this->client->publish($message->getReplyTo(), json_encode($response)); }); } private function createOrder($data) { // 实际订单创建逻辑 return uniqid(‘order_’); } public function run() { // 保持服务运行 while (true) { $this->client->wait(1); } } } // 库存服务 class InventoryService { private $client; public function __construct() { $this->client = new Client(‘nats://localhost:4222’); // 监听订单创建事件 $this->client->subscribe(‘events.orders’, function ($message) { $event = json_decode($message, true); if ($event[‘event’] === ‘order.created’) { // 扣减库存 $this->deductInventory($event[‘data’][‘items’]); } }); } private function deductInventory($items) { // 实际库存扣减逻辑 echo “Deducting inventory for order items\n”; } public function run() { // 保持服务运行 while (true) { $this->client->wait(1); } } } // 启动服务 $orderService = new OrderService(); $inventoryService = new InventoryService(); // 在实际应用中,这些服务会运行在不同的进程中 // 这里仅作演示 $orderService->run();
NATS协议与实现深度调研 – 代码示例

代码示例

Go和PHP中使用NATS的实践案例

code Go语言示例

以下是使用Go语言实现NATS客户端的示例代码,展示了基本的连接、发布和订阅操作:

integration_instructions 基本连接与发布/订阅
go
package main import ( “fmt” “log” “time” “github.com/nats-io/nats.go” ) func main() { // 连接到NATS服务器 nc, err := nats.Connect(“nats://localhost:4222”) if err != nil { log.Fatalf(“连接到NATS服务器失败: %v”, err) } defer nc.Close() // 设置连接事件处理器 nc.SetDisconnectedHandler(func(nc *nats.Conn) { log.Println(“与NATS服务器的连接已断开”) }) nc.SetReconnectedHandler(func(nc *nats.Conn) { log.Println(“已重新连接到NATS服务器”) }) // 发布消息 subject := “example.subject” message := []byte(“Hello, NATS!”) if err := nc.Publish(subject, message); err != nil { log.Printf(“发布消息失败: %v”, err) } else { log.Printf(“已发布消息到主题: %s”, subject) } // 订阅主题 sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { fmt.Printf(“收到消息: 主题=%s, 数据=%s\n”, msg.Subject, string(msg.Data)) }) if err != nil { log.Fatalf(“订阅主题失败: %v”, err) } defer sub.Unsubscribe() // 等待消息接收 time.Sleep(100 * time.Millisecond) }

代码说明

  • 使用nats.Connect()函数连接到NATS服务器
  • 设置断开连接和重新连接的事件处理器,提高连接可靠性
  • 使用Publish()方法向指定主题发布消息
  • 使用Subscribe()方法订阅主题,并设置消息处理回调函数
  • 程序结束时,使用Unsubscribe()取消订阅,并关闭连接
sync_alt 请求-响应模式
go
package main import ( “encoding/json” “fmt” “log” “time” “github.com/nats-io/nats.go” ) // 定义请求和响应结构体 type Request struct { Query string `json:”query”` } type Response struct { Result string `json:”result”` Error string `json:”error,omitempty”` } func main() { // 连接到NATS服务器 nc, err := nats.Connect(“nats://localhost:4222”) if err != nil { log.Fatal(err) } defer nc.Close() // 设置响应处理程序 nc.Subscribe(“help.request”, func(msg *nats.Msg) { // 解析请求 var req Request if err := json.Unmarshal(msg.Data, &req); err != nil { // 发送错误响应 resp := Response{Error: “无效的请求格式”} data, _ := json.Marshal(resp) nc.Publish(msg.Reply, data) return } // 处理请求 var result string switch req.Query { case “time”: result = time.Now().Format(time.RFC3339) case “version”: result = “NATS示例 v1.0” default: result = “未知查询: ” + req.Query } // 发送响应 resp := Response{Result: result} data, _ := json.Marshal(resp) nc.Publish(msg.Reply, data) }) // 发送请求并等待响应 req := Request{Query: “time”} reqData, _ := json.Marshal(req) // 使用Request()方法发送请求并等待响应 msg, err := nc.Request(“help.request”, reqData, 2*time.Second) if err != nil { log.Fatalf(“请求失败: %v”, err) } // 解析响应 var resp Response if err := json.Unmarshal(msg.Data, &resp); err != nil { log.Fatalf(“响应解析失败: %v”, err) } if resp.Error != “” { fmt.Printf(“错误: %s\n”, resp.Error) } else { fmt.Printf(“结果: %s\n”, resp.Result) } }

代码说明

  • 定义RequestResponse结构体,用于序列化和反序列化JSON数据
  • 服务端订阅”help.request”主题,处理请求并向回复主题发送响应
  • 客户端使用Request()方法发送请求,并设置超时时间
  • 使用msg.Reply获取回复主题,确保响应发送到正确的位置
  • 通过JSON序列化实现结构化数据的传输
group_work 队列组示例
go
package main import ( “fmt” “log” “os” “strconv” “sync” “time” “github.com/nats-io/nats.go” ) func worker(id int, nc *nats.Conn, wg *sync.WaitGroup) { defer wg.Done() // 加入队列组,每个工作者共享负载 sub, err := nc.QueueSubscribe(“task.queue”, “workers”, func(msg *nats.Msg) { fmt.Printf(“工作者 %d 处理任务: %s\n”, id, string(msg.Data)) // 模拟任务处理 time.Sleep(500 * time.Millisecond) // 确认消息处理完成 msg.Ack() }) if err != nil { log.Printf(“工作者 %d 订阅失败: %v”, id, err) return } defer sub.Unsubscribe() // 设置手动确认模式 sub.SetPendingLimits(16, 16*1024*1024) fmt.Printf(“工作者 %d 已启动\n”, id) // 保持工作者运行 select {} } func main() { // 从命令行参数获取工作者数量 numWorkers := 3 if len(os.Args) > 1 { if n, err := strconv.Atoi(os.Args[1]); err == nil && n > 0 { numWorkers = n } } // 连接到NATS服务器 nc, err := nats.Connect(“nats://localhost:4222”) if err != nil { log.Fatal(err) } defer nc.Close() // 启动多个工作者 var wg sync.WaitGroup wg.Add(numWorkers) for i := 1; i <= numWorkers; i++ { go worker(i, nc, &wg) } // 等待工作者启动 time.Sleep(1 * time.Second) // 发布任务到队列 for i := 1; i <= 10; i++ { task := fmt.Sprintf("任务 #%d", i) if err := nc.Publish("task.queue", []byte(task)); err != nil { log.Printf("发布任务失败: %v", err) } else { fmt.Printf("已发布任务: %s\n", task) } time.Sleep(200 * time.Millisecond) } // 等待程序退出 wg.Wait() }

代码说明

  • 使用QueueSubscribe()方法加入队列组”workers”
  • 队列组中的每个工作者共享负载,每条消息只被一个工作者处理
  • 使用msg.Ack()手动确认消息处理完成
  • 使用SetPendingLimits()设置待处理消息的限制
  • 通过goroutine并发启动多个工作者,提高任务处理能力

php PHP语言示例

以下是使用PHP语言实现NATS客户端的示例代码,展示了基本的连接、发布和订阅操作:

integration_instructions 基本连接与发布/订阅
php
on(‘connect’, function() { echo “已连接到NATS服务器\n”; }); $client->on(‘disconnect’, function() { echo “与NATS服务器的连接已断开\n”; }); $client->on(‘reconnect’, function() { echo “已重新连接到NATS服务器\n”; }); // 连接到服务器 $client->connect(); // 订阅主题 $client->subscribe(‘example.subject’, function ($message) { echo “收到消息: ” . $message . “\n”; }); // 发布一条消息到主题 $client->publish(‘example.subject’, ‘Hello, NATS!’); echo “已发布消息到主题: example.subject\n”; // 等待消息接收 sleep(1); } catch (\Exception $e) { echo “错误: ” . $e->getMessage() . “\n”; } finally { // 关闭连接 $client->close(); } ?>

代码说明

  • 使用new Client()创建NATS客户端实例
  • 使用on()方法设置连接事件处理器
  • 调用connect()方法连接到NATS服务器
  • 使用subscribe()方法订阅主题,并设置消息处理回调函数
  • 使用publish()方法向指定主题发布消息
  • 程序结束时,使用close()方法关闭连接
sync_alt 请求-响应模式
php
connect(); // 设置响应处理程序 $client->subscribe(‘help.request’, function ($message) { // 解析请求 $request = json_decode($message->getBody(), true); if (!$request || !isset($request[‘query’])) { // 发送错误响应 $response = [ ‘error’ => ‘无效的请求格式’ ]; $client->publish($message->getReplyTo(), json_encode($response)); return; } // 处理请求 $result = ”; switch ($request[‘query’]) { case ‘time’: $result = date(‘Y-m-d H. i:s’); break; case ‘version’: $result = ‘NATS PHP示例 v1.0’; break; default: $result = ‘未知查询: ‘ . $request[‘query’]; } // 发送响应 $response = [ ‘result’ => $result ]; $client->publish($message->getReplyTo(), json_encode($response)); }); // 创建收件箱用于接收响应 $inbox = $client->newInbox(); // 订阅收件箱以接收响应 $response = null; $client->subscribe($inbox, function ($message) use (&$response) { $response = json_decode($message->getBody(), true); }); // 发送请求 $request = [ ‘query’ => ‘time’ ]; $client->publish(‘help.request’, json_encode($request), $inbox); // 等待响应 $timeout = 5; // 5秒超时 $startTime = time(); while ($response === null && (time() – $startTime) < $timeout) { $client->wait(0.1); // 等待0.1秒 } if ($response) { if (isset($response[‘error’])) { echo “错误: ” . $response[‘error’] . “\n”; } else { echo “结果: ” . $response[‘result’] . “\n”; } } else { echo “请求超时\n”; } } catch (\Exception $e) { echo “错误: ” . $e->getMessage() . “\n”; } finally { $client->close(); } ?>

代码说明

  • 服务端订阅”help.request”主题,处理请求并向回复主题发送响应
  • 使用newInbox()创建唯一的收件箱,用于接收响应
  • 使用publish()方法的第三个参数指定回复主题
  • 使用wait()方法等待消息到达,避免阻塞整个应用
  • 实现超时机制,防止无限等待响应
  • 通过JSON序列化实现结构化数据的传输
group_work 队列组示例
php
id = $id; $this->client = $client; } public function start() { $this->running = true; // 加入队列组,每个工作者共享负载 $this->client->queueSubscribe(‘task.queue’, ‘workers’, function ($message) { echo “工作者 ” . $this->id . ” 处理任务: ” . $message . “\n”; // 模拟任务处理 sleep(1); echo “工作者 ” . $this->id . ” 完成任务\n”; }); echo “工作者 ” . $this->id . ” 已启动\n”; // 保持工作者运行 while ($this->running) { $this->client->wait(1); } } public function stop() { $this->running = false; } } // 创建连接 $client = new Client(‘nats://localhost:4222’); try { $client->connect(); // 启动多个工作者 $workers = []; $numWorkers = 3; for ($i = 1; $i <= $numWorkers; $i++) { $worker = new Worker($i, $client); $workers[] = $worker; // 在实际应用中,工作者应该在单独的进程中运行 // 这里为了简单起见,我们只启动一个工作者作为示例 if ($i == 1) { $worker->start(); } } // 等待工作者启动 sleep(1); // 发布任务到队列 for ($i = 1; $i <= 10; $i++) { $task = "任务 #" . $i; $client->publish(‘task.queue’, $task); echo “已发布任务: ” . $task . “\n”; // 短暂延迟,避免消息处理过快 usleep(200000); // 200毫秒 } // 等待所有任务完成 sleep(15); // 停止所有工作者 foreach ($workers as $worker) { $worker->stop(); } } catch (\Exception $e) { echo “错误: ” . $e->getMessage() . “\n”; } finally { $client->close(); } ?>

代码说明

  • 创建Worker类封装工作者逻辑
  • 使用queueSubscribe()方法加入队列组”workers”
  • 队列组中的每个工作者共享负载,每条消息只被一个工作者处理
  • 使用wait()方法保持工作者运行,同时处理消息
  • 在实际应用中,每个工作者应该在单独的进程中运行,以实现真正的并行处理

compare Go与PHP实现对比

speed
性能特点

Go实现:原生并发支持,性能极高,适合高吞吐量场景,内存占用低,启动速度快。

PHP实现:基于事件循环,性能良好,适合中低吞吐量场景,内存占用相对较高,但易于集成到现有PHP应用中。

code
API设计

Go实现:强类型语言,编译时错误检查,API设计严谨,支持链式调用,提供丰富的配置选项。

PHP实现:动态类型语言,灵活易用,API设计简洁直观,支持回调函数和Promise风格异步操作。

integration_instructions
集成难度

Go实现:需要Go开发环境,编译为二进制文件,适合独立部署,与现有系统集成需要额外工作。

PHP实现:通过Composer轻松安装,与现有PHP框架无缝集成,适合快速开发和部署。

security
错误处理

Go实现:内置错误处理机制,支持多返回值,错误类型明确,便于调试和问题排查。

PHP实现:基于异常的错误处理,支持自定义错误处理器,提供详细的错误信息和堆栈跟踪。

NATS协议与实现深度调研 – 总结

总结

NATS协议与实现的核心优势与应用价值

stars 核心优势

NATS作为一个轻量级、高性能的分布式消息传递系统,具有以下核心优势,使其成为构建现代分布式系统的理想选择:

text_format

简单易用

基于文本的协议使得客户端实现简单,API设计直观。NATS协议定义了很小的操作集,换行表示终止,使得开发者能够快速上手并集成到现有系统中。无论是Go实现的nats-server还是PHP客户端,都提供了简洁明了的API,降低了学习和使用门槛。

speed

高性能

NATS设计为低延迟、高吞吐量的消息系统,能够处理大量并发连接和消息传递。Go实现的nats-server充分利用了Go语言的并发特性,实现了高效的内存管理和网络I/O处理。这种高性能特性使NATS特别适合实时通信场景,如金融交易、游戏和物联网应用。

compress

轻量级

NATS Server非常轻量级,发布包只有2MB多,启动时可以无需任何参数,直接运行即可。这种轻量级设计使得NATS非常适合边缘计算和资源受限环境。同时,低资源占用也意味着可以在同一台服务器上运行多个NATS实例,提高系统可用性。

language

跨语言支持

NATS提供多种编程语言的客户端实现,包括Go、Java、Python、JavaScript、C#、Ruby和PHP等,便于集成到不同技术栈。这种跨语言支持使得NATS能够在异构环境中无缝工作,连接使用不同编程语言开发的服务和应用程序。

compare 与其他消息系统的比较

为了更好地理解NATS的价值,我们可以将其与其他流行的消息系统进行比较:

特性 NATS RabbitMQ Kafka Redis Pub/Sub
协议复杂度 简单,基于文本 复杂,基于AMQP 复杂,自定义二进制协议 简单,基于Redis协议
消息持久化 可选(JetStream) 支持 支持 不支持
消息语义 最多一次/至少一次/精确一次 最多一次/至少一次 至少一次 最多一次
资源占用 极低 中等
集群支持 原生支持 支持 原生支持 有限支持
适用场景 微服务、实时通信、边缘计算 企业应用集成、工作流 日志聚合、事件溯源 简单消息通知、缓存
lightbulb 关键区别

NATS的核心优势在于其简单性性能的完美结合。与RabbitMQ相比,NATS更轻量、更快;与Kafka相比,NATS更适合实时通信和低延迟场景;与Redis Pub/Sub相比,NATS提供了更丰富的功能和更好的扩展性。NATS的设计哲学是”做减法”,专注于核心消息传递功能,避免不必要的复杂性,从而实现极致的性能和简洁性。

cases 应用价值

NATS的特性和优势使其在多种应用场景中都能发挥重要作用,特别是在需要高性能、低延迟消息传递的系统中:

apps
微服务架构
NATS的位置独立性和多对多通信模式使其成为微服务架构的理想选择。服务之间无需知道对方位置,通过NATS实现松耦合通信,提高系统弹性和可扩展性。
router
服务网格
NATS可以作为服务网格的控制平面,实现服务发现、配置管理和策略执行。其高性能和低延迟特性确保了服务网格的高效运行。
devices_other
物联网和边缘计算
NATS的轻量级设计和低资源占用使其非常适合物联网和边缘计算场景。它可以在资源受限的设备上运行,实现设备与云端的高效通信。
sync
实时数据同步
NATS的高性能和低延迟特性使其成为实时数据同步的理想选择。它可以用于多数据中心同步、数据库复制和实时分析等场景。
cloud
云原生应用
作为CNCF的孵化项目,NATS与云原生生态系统紧密集成。它支持容器化部署、Kubernetes编排和服务网格,是构建云原生应用的重要组件。
show_chart
金融交易系统
NATS的低延迟和高可靠性使其成为金融交易系统的理想选择。它可以用于市场数据分发、订单路由和风险计算等对延迟敏感的场景。

trending_up 未来发展

作为一个活跃的开源项目,NATS正在不断发展和演进,未来可能的发展方向包括:

security

增强安全性

随着安全需求的增加,NATS可能会进一步增强其安全特性,包括更细粒度的访问控制、端到端加密和更完善的审计日志功能。这将使NATS能够满足更严格的安全合规要求。

analytics

可观测性增强

更好的监控、追踪和日志记录功能将帮助运维人员更好地理解和调试NATS系统。这包括与OpenTelemetry等标准的集成,以及更丰富的指标和事件。

auto_awesome

智能化功能

引入更多智能化功能,如自动负载均衡、自适应路由和智能消息过滤等,将使NATS能够更好地应对复杂和动态的环境。

integration_instructions

生态系统扩展

随着NATS的普及,其生态系统将继续扩展,包括更多语言的客户端实现、更丰富的工具和更广泛的集成。这将进一步降低使用门槛,扩大应用范围。

summarize 总结

NATS作为一个轻量级、高性能的分布式消息传递系统,通过其简单的设计、卓越的性能和丰富的功能,为现代应用架构提供了强大的通信支持。无论是Go实现的nats-server还是PHP客户端实现,都体现了NATS系统的设计理念:简单、高效、灵活。

在微服务、云原生、物联网和边缘计算等新兴技术领域,NATS正在发挥越来越重要的作用。随着其不断发展和完善,NATS有望成为分布式系统通信的标准组件,为构建下一代应用提供坚实的基础。

发表评论

人生梦想 - 关注前沿的计算机技术 acejoy.com 🐾 步子哥の博客 🐾 背多分论坛 🐾 知差(chai)网 🐾 DeepracticeX 社区 🐾 老薛主机 🐾 智柴论坛 🐾