Dapr 发布订阅

发布/订阅模式允许微服务使用消息相互通信。生产者或发布者在不知道哪个应用程序将接收它们的情况下向主题发送消息。这涉及将它们写入输入通道。同样,消费者或订阅者订阅该主题并接收其消息,而不知道是什么服务产生了这些消息。这涉及从输出通道接收消息。中间消息代理负责将每条消息从输入通道复制到所有对该消息感兴趣的订阅者的输出通道。当您需要将微服务彼此分离时,这种模式特别有用。

Dapr 中的发布/订阅 API 提供至少一次(at-least-once)的保证,并与各种消息代理和队列系统集成。 您的服务所使用的特定实现是可插入的,并被配置为运行时的 Dapr Pub/Sub 组件。 这种方法消除了您服务的依赖性,从而使您的服务可以更便携,更灵活地适应更改。

pubsub-overview-pattern.png

Dapr 发布/订阅构建块提供了一个与平台无关的 API 来发送和接收消息。您的服务将消息发布到命名主题,并订阅主题以使用这些消息。

下图显示了一个“shipping”服务和一个“email”服务的例子,它们都订阅了“cart”服务发布的主题。每个服务都会加载指向同一发布/订阅消息总线组件的发布/订阅组件配置文件,例如 Redis Streams、NATS Streaming、Azure Service Bus 或 GCP Pub/Sub。

pubsub-overview-components.png

下图具有相同的服务,但是这次显示的是 Dapr 发布 API,它发送“订单”主题和订阅服务上的订单端点,这些主题消息由 Dapr 发布到。

pubsub-overview-publish-API.png

特性

Cloud Events消息格式

为了启用消息路由并为每条消息提供额外的上下文,Dapr 使用 CloudEvents 1.0 规范作为其消息格式。应用程序使用 Dapr 发送到主题的任何消息都会自动“包装”在 Cloud Events 信封中,使用 datacontenttype 属性的 Content-Type 标头值。

Dapr 实现了以下 Cloud Events 字段:

  • id
  • source
  • specversion
  • type
  • datacontenttype (Optional)

以下示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:

{
    "specversion" : "1.0",
    "type" : "xml.message",
    "source" : "https://example.com/message",
    "subject" : "Test XML Message",
    "id" : "id-1234-5678-9101",
    "time" : "2020-09-23T06:23:21Z",
    "datacontenttype" : "text/xml",
    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}

消息订阅

Dapr 应用程序可以订阅已发布的主题。 Dapr 允许您的应用程序订阅主题的两种方法:

  • 声明式,其中订阅在外部文件中定义
  • 程序化,在用户代码中定义订阅

消息传递

原则上,当订阅者在处理消息后以非错误响应进行响应时,Dapr 认为消息已成功传递。为了进行更精细的控制,Dapr 的发布/订阅 API 还提供了在响应负载中定义的显式状态,订阅者可以使用这些状态向 Dapr 指示特定的处理指令(例如 RETRY 或 DROP)。如果两个不同的应用程序(不同的 app-ID)订阅了同一个主题,Dapr 将每条消息只传递给每个应用程序的一个实例。

pubsub-overview-pattern-competing-consumers.png

主题范围

默认情况下,所有支持 Dapr 发布/订阅组件(例如 Kafka、Redis Stream、RabbitMQ)的主题都可用于配置了该组件的每个应用程序。为了限制哪个应用程序可以发布或订阅主题,Dapr 提供了主题范围。这使您能够允许应用程序发布哪些主题以及允许应用程序订阅哪些主题。

消息生存时间(TTL)

Dapr 可以在每条消息的基础上设置超时消息,这意味着如果没有从 pub/sub 组件读取消息,则该消息将被丢弃。这是为了防止堆积未读的消息。在队列中比配置的 TTL 时间长的消息称为死消息。

注:也可以在组件创建时为给定队列设置消息 TTL。

与不使用 Dapr 和 CloudEvents 的应用程序通信

对于一个应用程序使用 Dapr 而另一个应用程序不使用的场景,可以为发布者或订阅者禁用 CloudEvent 包装。这允许在不能一次全部采用 Dapr 的应用程序中部分采用 Dapr 发布订阅。

使用.Net调用Dapr的发布订阅

以下示例创建应用程序来发布和订阅名为 deathStarStatus 的主题

pubsub-publish-subscribe-example.png

先决条件

运行 dapr init 时,Redis Streams 默认安装在本地机器上。如果是 dapr init --slim 需要自己动手操作一些东西了,这里就不演示了。

通过打开您的组件文件进行验证 %UserProfile%\.dapr\components\pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

注:这里redisHost的value可以根据实际情况设置,比如某云的redis实例等。又有人问过是否可以切换默认db,当然可以,name设置redisDB,value设置为你要使用的db即可

如果你要更详细的yaml配置参数,比如并发设置、最大重试次数等等都可以看这里 https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/

订阅主题

订阅主题有两种方式:声明式、程序化

声明式订阅

您可以使用以下自定义资源定义 (CRD) 订阅主题。创建一个名为 subscription.yaml 的文件并粘贴以下内容:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: pubsub
scopes:
- app1
- app2

上面的示例显示了对 pubsub 组件 pubsub 主题 deathStarStatus 的事件订阅。

将CRD文件放到组件目录即可,这里不继续展开说了。

程序化订阅

Dapr 实例在启动时调用您的应用程序并期待主题订阅的 JSON 响应:

  • pubsubname:Dapr 应该使用哪个 pub/sub 组件
  • topic:订阅哪个主题
  • route:当消息涉及该主题时,Dapr 调用哪个endpoint

注:你可能会觉得,这是不是很麻烦?是的,所以我们用dapr dotnet-sdk来帮助我们自动完成这些事情

.Net订阅

以上是让dapr sidecar知道这个消息的订阅最终给谁。但我们的程序里要怎么写呢?

如果你选择的是声明式订阅,你做一个route即可,而如果是程序化订阅则不需要多写一个yaml文件,且通过特性即可支持,接下来看看.Net SDK怎么做的吧。

创建Assignment.Server(Sub)

创建ASP.NET Core 空项目,同时根据之前的文章内容添加Dapr.AspNetCoreNuGet包和修改程序端口为5000

修改program.cs代码

using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});

app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");

app.Run();

注:为了告诉 Dapr 消息已成功处理,请返回 200 OK 响应。如果 Dapr 收到除 200 之外的任何其他返回状态代码,或者如果您的应用程序崩溃,Dapr 将尝试按照 At-Least-Once 语义重新传递消息。

运行Assignment.Server

使用Dapr CLI来启动,先使用命令行工具跳转到目录 dapr-study-room\Assignment06\Assignment.Server,然后执行下面命令

dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run

创建Assignment.Client(Publish)

创建控制台项目,并修改program.cs

var client = new Dapr.Client.DaprClientBuilder().Build();
await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World");

运行Assignment.Client即可看到Assignment.Server中会打印Hello World!

将消息路由到不同的事件处理程序

基于内容的路由是一种使用 DSL 而不是命令式应用程序代码的消息传递模式。PubSub 路由是此模式的一种实现,它允许开发人员使用表达式根据 CloudEvents 的内容将其路由到应用程序中的不同 URI/路径和事件处理程序。

注:这是个预览功能,如果你感兴趣可自行尝试,值得一提的是Common Expression Language (CEL)很有趣,这里就只贴一段代码看看吧。

        [Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
        [HttpPost("widgets")]
        public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
        [HttpPost("gadgets")]
        public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory")]
        [HttpPost("products")]
        public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

发布/订阅主题访问权限

命名空间或组件范围可用于限制对特定应用程序的组件访问。添加到组件的这些应用程序范围仅限制具有特定 ID 的应用程序能够使用该组件。

除了这个通用组件范围之外,发布/订阅组件还可以限制以下内容:

  • 可以使用哪些主题(已发布或已订阅)
  • 允许哪些应用程序发布到特定主题
  • 允许哪些应用程序订阅特定主题

主题访问权限

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: publishingScopes
    value: "app1=topic1;app2=topic2,topic3;app3="
  - name: subscriptionScopes
    value: "app2=;app3=topic1"

下表显示了允许哪些应用程序发布到主题中:

  topic1 topic2 topic3
app1 X    
app2   X X
app3      

下表显示了允许哪些应用程序订阅主题:

  topic1 topic2 topic3
app1 X X X
app2      
app3 X    

注:如果应用程序未列出(例如 subscriptionScopes 中的 app1),则允许订阅所有主题。由于未使用 allowedTopics 且 app1 没有任何订阅范围,因此它还可以使用上面未列出的其他主题。

限制允许的主题

如果 Dapr 应用程序向其发送消息,则会创建一个主题。在某些情况下,应该控制这个主题的创建。例如:

  • 在Dapr应用程序中,在生成主题名称时出现的错误可能导致创建无限数量的主题
  • 精简主题名称和总数,防止主题无限增长
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "topic1,topic2,topic3"

结合 allowedTopics 和 scopes

有时您希望组合这两个作用域,因此只允许一组固定的主题,并将作用域指定给特定的应用程序。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "A,B"
  - name: publishingScopes
    value: "app1=A"
  - name: subscriptionScopes
    value: "app1=;app2=A"

注:第三个应用程序没有列出,因为如果一个应用程序没有在范围内指定,它是允许使用所有主题的。

下表显示了允许发布到主题的应用程序:

  A B C
app1 X    
app2 X X  
app3 X X  

下表显示了哪个应用程序可以订阅主题:

  A B C
app1      
app2 X    
app3 X X  

消息TTL

同状态管理,使用 metadata 的 ttlInSeconds

本章源码

https://github.com/doddgu/dapr-study-room

下一章:Dapr Actors

Actor模式将Actor描述为最低级别的“计算单元”。换句话说,您在一个独立的单元(称为actor)中编写代码,该单元接收消息并一次处理一个消息,没有任何并发或线程。再换句话说,根据Acto ...