How to: Publish a message and subscribe to a topic

Now that you’ve learned what the Dapr pub/sub building block provides, learn how it can work in your service. The below code example loosely describes an application that processes orders with two services, each with Dapr sidecars:

  • A checkout service using Dapr to subscribe to the topic in the message queue.
  • An order processing service using Dapr to publish a message to RabbitMQ.

Dapr automatically wraps the user payload in a CloudEvents v1.0 compliant envelope, using header value for datacontenttype attribute.

The following example demonstrates how your applications publish and subscribe to a topic called orders.

Note

If you haven’t already, for a quick walk-through on how to use pub/sub.

The first step is to set up the pub/sub component:

When you run dapr init, Dapr creates a default Redis pubsub.yaml and runs a Redis container on your local machine, located:

  • On Windows, under %UserProfile%\.dapr\components\pubsub.yaml
  • On Linux/MacOS, under ~/.dapr/components/pubsub.yaml

With the pubsub.yaml component, you can easily swap out underlying components without application code changes. In this example, RabbitMQ is used.

You can override this file with another by creating a components directory (in this example, myComponents) containing the file and using the flag --resources-path with the dapr run CLI command.

  1. dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
  1. dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
  1. dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
  1. dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
  1. dapr run --app-id myapp --resources-path ./myComponents -- npm start
  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "amqp://localhost:5672"
  11. - name: protocol
  12. value: amqp
  13. - name: hostname
  14. value: localhost
  15. - name: username
  16. value: username
  17. - name: password
  18. value: password
  19. - name: durable
  20. value: "false"
  21. - name: deletedWhenUnused
  22. value: "false"
  23. - name: autoAck
  24. value: "false"
  25. - name: reconnectWait
  26. value: "0"
  27. - name: concurrency
  28. value: parallel
  29. scopes:
  30. - orderprocessing
  31. - checkout

Dapr provides two methods by which you can subscribe to topics:

  • Declaratively, where subscriptions are defined in an external file.
  • Programmatically, where subscriptions are defined in user code.

Learn more in the declarative and programmatic subscriptions doc. This example demonstrates a declarative subscription.

Create a file named subscription.yaml and paste the following:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: order-pub-sub
  10. scopes:
  11. - orderprocessing
  12. - checkout

The example above shows an event subscription to topic orders, for the pubsub component order-pub-sub.

  • The route field tells Dapr to send all topic messages to the /checkout endpoint in the app.
  • The scopes field enables this subscription for apps with IDs orderprocessing and checkout.

Place subscription.yaml in the same directory as your pubsub.yaml component. When Dapr starts up, it loads subscriptions along with the components.

Below are code examples that leverage Dapr SDKs to subscribe to the topic you defined in subscription.yaml.

  1. //dependencies
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System;
  5. using Microsoft.AspNetCore.Mvc;
  6. using Dapr;
  7. using Dapr.Client;
  8. //code
  9. namespace CheckoutService.controller
  10. {
  11. [ApiController]
  12. public class CheckoutServiceController : Controller
  13. {
  14. //Subscribe to a topic
  15. [Topic("order-pub-sub", "orders")]
  16. [HttpPost("checkout")]
  17. public void getCheckout([FromBody] int orderId)
  18. {
  19. Console.WriteLine("Subscriber received : " + orderId);
  20. }
  21. }
  22. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-ssl dotnet run

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
  1. #dependencies
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. import logging
  5. import json
  6. #code
  7. logging.basicConfig(level = logging.INFO)
  8. @app.subscribe(pubsub_name='order-pub-sub', topic='orders')
  9. def mytopic(event: v1.Event) -> None:
  10. data = json.loads(event.Data())
  11. logging.info('Subscriber received: ' + str(data))
  12. app.run(6002)

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
  1. //dependencies
  2. import (
  3. "log"
  4. "net/http"
  5. "context"
  6. "github.com/dapr/go-sdk/service/common"
  7. daprd "github.com/dapr/go-sdk/service/http"
  8. )
  9. //code
  10. var sub = &common.Subscription{
  11. PubsubName: "order-pub-sub",
  12. Topic: "orders",
  13. Route: "/checkout",
  14. }
  15. func main() {
  16. s := daprd.NewService(":6002")
  17. //Subscribe to a topic
  18. if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
  19. log.Fatalf("error adding topic subscription: %v", err)
  20. }
  21. if err := s.Start(); err != nil && err != http.ErrServerClosed {
  22. log.Fatalf("error listenning: %v", err)
  23. }
  24. }
  25. func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
  26. log.Printf("Subscriber received: %s", e.Data)
  27. return false, nil
  28. }
  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
  1. //dependencies
  2. import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. const serverHost = "127.0.0.1";
  6. const serverPort = "6002";
  7. start().catch((e) => {
  8. console.error(e);
  9. process.exit(1);
  10. });
  11. async function start(orderId) {
  12. const server = new DaprServer(
  13. serverHost,
  14. serverPort,
  15. daprHost,
  16. process.env.DAPR_HTTP_PORT,
  17. CommunicationProtocolEnum.HTTP
  18. );
  19. //Subscribe to a topic
  20. await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
  21. console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
  22. });
  23. await server.start();
  24. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

Start an instance of Dapr with an app-id called orderprocessing:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

Then publish a message to the orders topic:

  1. dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

Below are code examples that leverage Dapr SDKs to publish a topic.

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. //code
  11. namespace EventService
  12. {
  13. class Program
  14. {
  15. static async Task Main(string[] args)
  16. {
  17. string PUBSUB_NAME = "order-pub-sub";
  18. string TOPIC_NAME = "orders";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.Next(1,1000);
  23. CancellationTokenSource source = new CancellationTokenSource();
  24. CancellationToken cancellationToken = source.Token;
  25. using var client = new DaprClientBuilder().Build();
  26. //Using Dapr SDK to publish a topic
  27. await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
  28. Console.WriteLine("Published data: " + orderId);
  29. }
  30. }
  31. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-ssl dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.Metadata;
  5. import static java.util.Collections.singletonMap;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Random;
  10. import java.util.concurrent.TimeUnit;
  11. //code
  12. @SpringBootApplication
  13. public class OrderProcessingServiceApplication {
  14. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  15. public static void main(String[] args) throws InterruptedException{
  16. String MESSAGE_TTL_IN_SECONDS = "1000";
  17. String TOPIC_NAME = "orders";
  18. String PUBSUB_NAME = "order-pub-sub";
  19. while(true) {
  20. TimeUnit.MILLISECONDS.sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.nextInt(1000-1) + 1;
  23. DaprClient client = new DaprClientBuilder().build();
  24. //Using Dapr SDK to publish a topic
  25. client.publishEvent(
  26. PUBSUB_NAME,
  27. TOPIC_NAME,
  28. orderId,
  29. singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
  30. log.info("Published data:" + orderId);
  31. }
  32. }
  33. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. import json
  7. from dapr.clients import DaprClient
  8. #code
  9. logging.basicConfig(level = logging.INFO)
  10. while True:
  11. sleep(random.randrange(50, 5000) / 1000)
  12. orderId = random.randint(1, 1000)
  13. PUBSUB_NAME = 'order-pub-sub'
  14. TOPIC_NAME = 'orders'
  15. with DaprClient() as client:
  16. #Using Dapr SDK to publish a topic
  17. result = client.publish_event(
  18. pubsub_name=PUBSUB_NAME,
  19. topic_name=TOPIC_NAME,
  20. data=json.dumps(orderId),
  21. data_content_type='application/json',
  22. )
  23. logging.info('Published data: ' + str(orderId))

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
  1. //dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "strconv"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. //code
  11. var (
  12. PUBSUB_NAME = "order-pub-sub"
  13. TOPIC_NAME = "orders"
  14. )
  15. func main() {
  16. for i := 0; i < 10; i++ {
  17. time.Sleep(5000)
  18. orderId := rand.Intn(1000-1) + 1
  19. client, err := dapr.NewClient()
  20. if err != nil {
  21. panic(err)
  22. }
  23. defer client.Close()
  24. ctx := context.Background()
  25. //Using Dapr SDK to publish a topic
  26. if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
  27. err != nil {
  28. panic(err)
  29. }
  30. log.Println("Published data: " + strconv.Itoa(orderId))
  31. }
  32. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start