This tutorial shows you how to write a simple server and client in C++ usinggRPC’s asynchronous/non-blocking APIs. It assumes you are already familiar withwriting simple synchronous gRPC code, as described ingRPC Basics:C++. The example used in this tutorial follows onfrom the basic we used in theoverview. You’ll find it along with installationinstructions in.
gRPC uses theAPI for asynchronous operations. The basic work flowis as follows:
- bind a
CompletionQueue
to an RPC call - do something like a read or write, present with a unique
void*
tag - call
CompletionQueue::Next
to wait for operations to complete. If a tagappears, it indicates that the corresponding operation is complete.
To use an asynchronous client to call a remote method, you first create achannel and stub, just as you do in a. Once you have your stub, you dothe following to make an asynchronous call:
- Initiate the RPC and create a handle for it. Bind the RPC to a
CompletionQueue
.
- Ask for the reply and final status, with a unique tag
Status status;
rpc->Finish(&reply, &status, (void*)1);
- Wait for the completion queue to return the next tag. The reply and status areready once the tag passed into the corresponding
Finish()
call is returned.
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// check reply and status
}
The server implementation requests an RPC call with a tag and then waits for thecompletion queue to return the tag. The basic flow for handling an RPCasynchronously is:
- Build a server exporting the async service
- Request one RPC, providing a unique tag
ServerContext context;
HelloRequest request;
ServerAsyncResponseWriter<HelloReply> responder;
service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
- Wait for the completion queue to return the tag. The context, request andresponder are ready once the tag is retrieved.
HelloReply reply;
Status status;
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
responder.Finish(reply, status, (void*)2);
}
- Wait for the completion queue to return the tag. The RPC is finished when thetag is back.
This basic flow, however, doesn’t take into account the server handling multiplerequests concurrently. To deal with this, our complete async server example usesa CallData
object to maintain the state of each RPC, and uses the address ofthis object as the unique tag for the call.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
responder_.Finish(reply_, Status::OK, this);
status_ = FINISH;
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
}
For simplicity the server only uses one completion queue for all events, andruns a main loop in HandleRpcs
to query the queue:
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok);
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
Remember we got our completion queue instance cq
in ServerImpl::Run()
byrunning cq
= builder.AddCompletionQueue()
. Looking atServerBuilder::AddCompletionQueue
's documentation we see that
Refer to ServerBuilder::AddCompletionQueue
's full docstring for more details.What this means in our example is that destructor looks like: