Declarative and programmatic subscription methods
Dapr applications can subscribe to published topics via two methods that support the same features: declarative and programmatic.
The examples below demonstrate pub/sub messaging between a app and an orderprocessing
app via the orders
topic. The examples demonstrate the same Dapr pub/sub component used first declaratively, then programmatically.
You can subscribe declaratively to a topic using an external component file. This example uses a YAML component file named subscription.yaml
:
- Uses the pub/sub component called
pubsub
to subscribes to the topic calledorders
. - Sets the
route
field to send all topic messages to the/checkout
endpoint in the app. - Sets
scopes
field to scope this subscription for access only by apps with IDsorderprocessing
andcheckout
.
When running Dapr, set the YAML component file path to point Dapr to the component.
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- npm start
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
In Kubernetes, apply the component to the cluster:
In your application code, subscribe to the topic specified in the Dapr pub/sub component.
//Subscribe to a topic
[HttpPost("checkout")]
public void getCheckout([FromBody] int orderId)
{
Console.WriteLine("Subscriber received : " + orderId);
}
import io.dapr.client.domain.CloudEvent;
//Subscribe to a topic
@PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
log.info("Subscriber received: " + cloudEvent.getData());
}
});
}
from cloudevents.sdk.event import v1
#Subscribe to a topic
@app.route('/checkout', methods=['POST'])
def checkout(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('Subscriber received: ' + str(data))
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
// listen to the declarative route
app.post('/checkout', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
//Subscribe to a topic
var sub = &common.Subscription{
PubsubName: "pubsub",
Topic: "orders",
Route: "/checkout",
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.Data)
return false, nil
}
Programmatic subscriptions
The programmatic approach returns the JSON structure within the code, unlike the declarative approach’s route
YAML structure. In the example below, you define the values found in the declarative YAML subscription above within the application code.
or
// Dapr subscription in [Topic] routes orders topic to this route
app.MapPost("/checkout", [Topic("pubsub", "orders")] (Order order) => {
Console.WriteLine("Subscriber received : " + order);
return Results.Ok(order);
});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Topic(name = "checkout", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber received: " + cloudEvent.getData());
System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'checkout',
'routes': {
'rules': [
{
'match': 'event.type == "order"',
'path': '/orders'
},
],
'default': '/orders'
}
}]
return jsonify(subscriptions)
@app.route('/orders', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "checkout",
routes: {
rules: [
{
match: 'event.type == "order"',
path: '/orders'
},
],
default: '/products'
}
]);
})
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "checkout",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "order"`,
Path: "/orders",
},
},
Default: "/orders",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
}
Next Steps
- Try out the pub/sub Quickstart
- Follow:
- Learn more about declarative and programmatic subscription methods.
- Learn about
- Learn about message TTL
- Learn more about
- List of pub/sub components
- Read the