Deep Dive: gRPC Communication
gRPC has become a cornerstone technology for building high-performance, scalable microservices. Unlike REST APIs that rely on JSON over HTTP/1.1, gRPC leverages Protocol Buffers and HTTP/2 to deliver exceptional speed, efficiency, and developer experience. This guide covers everything you need to know about gRPC for server-to-server communication.
What is gRPC?
gRPC (gRPC Remote Procedure Call) is Google’s open-source framework for building high-performance RPC systems. It abstracts away the complexity of distributed systems and allows you to define services and messages in a language-agnostic way using Protocol Buffers.
Key Characteristics
- Binary Serialization: Uses Protocol Buffers instead of JSON, reducing payload size by 3-10x
- HTTP/2 Transport: Enables multiplexing, server push, and header compression
- Code Generation: Automatically generates client and server code from
.protodefinitions - Streaming Support: Native support for bidirectional streaming over a single connection
- Type Safety: Strong typing across language boundaries
- Cross-Language: Works seamlessly across 10+ programming languages
Communication Flow
Below is a sequence diagram showing how a unary gRPC call flows between two services:
sequenceDiagram
participant Client as Service A<br/>(gRPC Client)
participant Transport as HTTP/2<br/>Network
participant Server as Service B<br/>(gRPC Server)
Client->>Client: 1. Create request message<br/>(UserRequest proto)
Client->>Client: 2. Serialize to protobuf<br/>(binary format)
Client->>Transport: 3. Send HTTP/2 frame<br/>(method: GetUser)
Transport->>Server: 4. Receive frame
Server->>Server: 5. Deserialize message
Server->>Server: 6. Invoke handler logic
Server->>Server: 7. Serialize response<br/>(UserResponse proto)
Server->>Transport: 8. Send HTTP/2 frame
Transport->>Client: 9. Receive frame
Client->>Client: 10. Deserialize response
Client->>Client: 11. Return typed object
Core Principles
- Efficiency: Binary format and HTTP/2 multiplexing minimize latency and bandwidth
- Simplicity: Declarative service definitions eliminate boilerplate
- Reliability: Strong typing and explicit error handling reduce runtime surprises
- Scalability: Streaming and multiplexing handle thousands of concurrent connections
- Interoperability: Language-agnostic design enables polyglot architectures
Protocol Buffers: The Foundation
Protocol Buffers (protobuf) are the serialization format underlying gRPC. They provide a compact, language-neutral mechanism for serializing structured data.
Defining Messages
// user.proto
syntax = "proto3";
package users;
// Simple scalar types
message User {
int32 id = 1;
string name = 2;
string email = 3;
bool is_active = 4;
double account_balance = 5;
// Timestamps use google.protobuf types
google.protobuf.Timestamp created_at = 6;
// Repeated fields (arrays)
repeated string tags = 7;
}
// Nested messages
message Address {
string street = 1;
string city = 2;
string postal_code = 3;
Country country = 4;
}
message UserProfile {
User user = 1;
Address address = 2;
repeated PhoneNumber phone_numbers = 3;
}
message PhoneNumber {
enum Type {
TYPE_UNSPECIFIED = 0;
MOBILE = 1;
HOME = 2;
WORK = 3;
}
Type type = 1;
string number = 2;
}
// Enums for type-safe options
enum Status {
STATUS_UNSPECIFIED = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
// Oneof for mutually exclusive fields
message Contact {
string name = 1;
oneof contact_info {
string email = 2;
string phone = 3;
string website = 4;
}
}
Scalar Types Reference
| Type | Size | Use Case |
|---|---|---|
int32 / int64 | 4 / 8 bytes | Integers (use sint32 for negative numbers) |
float / double | 4 / 8 bytes | Decimals |
bool | 1 byte | Booleans |
string | Variable | UTF-8 strings |
bytes | Variable | Binary data |
google.protobuf.Timestamp | 12 bytes | Timestamps |
google.protobuf.Duration | 12 bytes | Time durations |
Service Definitions and Code Generation
Defining gRPC Services
// user_service.proto
syntax = "proto3";
package users;
import "google/protobuf/empty.proto";
message GetUserRequest {
int32 user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message ListUsersResponse {
repeated User users = 1;
int32 total_count = 2;
}
message DeleteUserRequest {
int32 user_id = 1;
}
service UserService {
// Unary RPC: single request, single response
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (GetUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
// Server streaming: single request, stream of responses
rpc ListUsers(ListUsersRequest) returns (stream GetUserResponse);
// Client streaming: stream of requests, single response
rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// Bidirectional streaming: stream of requests and responses
rpc SyncUsers(stream SyncUserRequest) returns (stream SyncUserResponse);
}
Generating Code
# Install protoc compiler
# macOS: brew install protobuf
# Linux: apt-get install protobuf-compiler
# Windows: choco install protoc
# Install Node.js/TypeScript plugins
npm install --save-dev grpc-tools @grpc/grpc-js @grpc/proto-loader
# Generate TypeScript code
protoc \
--plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
--ts_proto_out=./src/generated \
--ts_proto_opt=outputServices=grpc-js \
--ts_proto_opt=useExactProtoTypes=true \
user_service.proto user.proto
Implementing a gRPC Server
Node.js/TypeScript Server
// src/server.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceService, IUserServiceServer } from './generated/user_service';
import { User, GetUserResponse, ListUsersResponse } from './generated/user';
import { GetUserRequest, CreateUserRequest, ListUsersRequest } from './generated/user_service';
// Mock database
const usersDb = new Map<number, any>([
[1, { id: 1, name: 'Alice', email: 'alice@example.com', isActive: true }],
[2, { id: 2, name: 'Bob', email: 'bob@example.com', isActive: true }],
]);
let nextUserId = 3;
// Implement service handlers
const userServiceImpl: IUserServiceServer = {
// Unary RPC
async getUser(
call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
callback: grpc.sendUnaryData<GetUserResponse>
) {
const { userId } = call.request;
const user = usersDb.get(userId);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: `User ${userId} not found`,
});
}
callback(null, { user });
},
// Unary RPC with validation
async createUser(
call: grpc.ServerUnaryCall<CreateUserRequest, GetUserResponse>,
callback: grpc.sendUnaryData<GetUserResponse>
) {
const { name, email } = call.request;
if (!name || !email) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'Name and email are required',
});
}
const user = {
id: nextUserId++,
name,
email,
isActive: true,
createdAt: new Date(),
};
usersDb.set(user.id, user);
callback(null, { user });
},
// Server streaming
async listUsers(call: grpc.ServerWritableStream<ListUsersRequest, GetUserResponse>) {
const { page = 1, pageSize = 10 } = call.request;
const users = Array.from(usersDb.values());
const start = (page - 1) * pageSize;
const paginatedUsers = users.slice(start, start + pageSize);
for (const user of paginatedUsers) {
call.write({ user });
}
call.end();
},
// Client streaming
async batchCreateUsers(
call: grpc.ServerReadableStream<CreateUserRequest, ListUsersResponse>,
callback: grpc.sendUnaryData<ListUsersResponse>
) {
const users: any[] = [];
call.on('data', (request: CreateUserRequest) => {
const user = {
id: nextUserId++,
name: request.name,
email: request.email,
isActive: true,
};
users.push(user);
usersDb.set(user.id, user);
});
call.on('end', () => {
callback(null, { users, totalCount: users.length });
});
call.on('error', (err) => {
console.error('Stream error:', err);
callback({
code: grpc.status.INTERNAL,
message: 'Stream error',
});
});
},
// Bidirectional streaming
async syncUsers(call: grpc.ServerDuplexStream<any, any>) {
call.on('data', (request: any) => {
const user = {
id: request.userId || nextUserId++,
name: request.name,
email: request.email,
isActive: true,
};
usersDb.set(user.id, user);
call.write({
userId: user.id,
status: 'SYNCED',
message: `User ${user.name} synced successfully`,
});
});
call.on('end', () => {
call.end();
});
call.on('error', (err) => {
console.error('Sync error:', err);
});
},
async deleteUser(
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) {
const { userId } = call.request;
if (usersDb.has(userId)) {
usersDb.delete(userId);
return callback(null, {});
}
callback({
code: grpc.status.NOT_FOUND,
message: `User ${userId} not found`,
});
},
};
// Create and start server
function startServer() {
const server = new grpc.Server();
server.addService(UserServiceService, userServiceImpl);
const PORT = process.env.PORT || 50051;
server.bindAsync(`0.0.0.0:${PORT}`, grpc.ServerCredentials.createInsecure(), (err) => {
if (err) {
console.error('Failed to bind server:', err);
process.exit(1);
}
server.start();
console.log(`gRPC server running on port ${PORT}`);
});
}
startServer();// src/main/java/com/example/grpc/UserServiceServer.java
package com.example.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class UserServiceServer {
private static final Map<Integer, User> usersDb = new ConcurrentHashMap<>();
private static final AtomicInteger nextUserId = new AtomicInteger(3);
static {
usersDb.put(1, User.newBuilder().setId(1).setName("Alice").setEmail("alice@example.com").setIsActive(true).build());
usersDb.put(2, User.newBuilder().setId(2).setName("Bob").setEmail("bob@example.com").setIsActive(true).build());
}
static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
int userId = request.getUserId();
User user = usersDb.get(userId);
if (user == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User " + userId + " not found")
.asRuntimeException());
return;
}
responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
responseObserver.onCompleted();
}
@Override
public void createUser(CreateUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
if (request.getName().isEmpty() || request.getEmail().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Name and email are required")
.asRuntimeException());
return;
}
int id = nextUserId.getAndIncrement();
User user = User.newBuilder()
.setId(id)
.setName(request.getName())
.setEmail(request.getEmail())
.setIsActive(true)
.build();
usersDb.put(id, user);
responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
responseObserver.onCompleted();
}
@Override
public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
int page = request.getPage() > 0 ? request.getPage() : 1;
int pageSize = request.getPageSize() > 0 ? request.getPageSize() : 10;
List<User> users = new ArrayList<>(usersDb.values());
int start = (page - 1) * pageSize;
int end = Math.min(start + pageSize, users.size());
for (int i = start; i < end; i++) {
responseObserver.onNext(GetUserResponse.newBuilder().setUser(users.get(i)).build());
}
responseObserver.onCompleted();
}
@Override
public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
List<User> createdUsers = new ArrayList<>();
return new StreamObserver<>() {
@Override
public void onNext(CreateUserRequest request) {
int id = nextUserId.getAndIncrement();
User user = User.newBuilder()
.setId(id)
.setName(request.getName())
.setEmail(request.getEmail())
.setIsActive(true)
.build();
createdUsers.add(user);
usersDb.put(id, user);
}
@Override
public void onError(Throwable t) {
responseObserver.onError(Status.INTERNAL.withDescription("Stream error").asRuntimeException());
}
@Override
public void onCompleted() {
responseObserver.onNext(ListUsersResponse.newBuilder()
.addAllUsers(createdUsers)
.setTotalCount(createdUsers.size())
.build());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(SyncUserRequest request) {
int id = request.getUserId() > 0 ? request.getUserId() : nextUserId.getAndIncrement();
User user = User.newBuilder()
.setId(id).setName(request.getName()).setEmail(request.getEmail()).setIsActive(true)
.build();
usersDb.put(id, user);
responseObserver.onNext(SyncUserResponse.newBuilder()
.setUserId(id).setStatus("SYNCED")
.setMessage("User " + user.getName() + " synced successfully")
.build());
}
@Override public void onError(Throwable t) { System.err.println("Sync error: " + t.getMessage()); }
@Override public void onCompleted() { responseObserver.onCompleted(); }
};
}
}
public static void main(String[] args) throws IOException, InterruptedException {
int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "50051"));
Server server = ServerBuilder.forPort(port)
.addService(new UserServiceImpl())
.build()
.start();
System.out.println("gRPC server running on port " + port);
server.awaitTermination();
}
}# src/server.py
import grpc
import users_pb2
import users_pb2_grpc
from concurrent import futures
import os
# Mock database
users_db: dict[int, dict] = {
1: {"id": 1, "name": "Alice", "email": "alice@example.com", "is_active": True},
2: {"id": 2, "name": "Bob", "email": "bob@example.com", "is_active": True},
}
next_user_id = 3
class UserServiceServicer(users_pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
user_data = users_db.get(request.user_id)
if not user_data:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
user = users_pb2.User(**user_data)
return users_pb2.GetUserResponse(user=user)
def CreateUser(self, request, context):
global next_user_id
if not request.name or not request.email:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Name and email are required")
user_data = {"id": next_user_id, "name": request.name, "email": request.email, "is_active": True}
users_db[next_user_id] = user_data
next_user_id += 1
return users_pb2.GetUserResponse(user=users_pb2.User(**user_data))
def ListUsers(self, request, context):
page = request.page or 1
page_size = request.page_size or 10
users = list(users_db.values())
start = (page - 1) * page_size
for u in users[start:start + page_size]:
yield users_pb2.GetUserResponse(user=users_pb2.User(**u))
def BatchCreateUsers(self, request_iterator, context):
global next_user_id
created = []
for req in request_iterator:
user_data = {"id": next_user_id, "name": req.name, "email": req.email, "is_active": True}
users_db[next_user_id] = user_data
next_user_id += 1
created.append(users_pb2.User(**user_data))
return users_pb2.ListUsersResponse(users=created, total_count=len(created))
def SyncUsers(self, request_iterator, context):
global next_user_id
for req in request_iterator:
uid = req.user_id or next_user_id
user_data = {"id": uid, "name": req.name, "email": req.email, "is_active": True}
users_db[uid] = user_data
if uid == next_user_id:
next_user_id += 1
yield users_pb2.SyncUserResponse(
user_id=uid, status="SYNCED",
message=f"User {req.name} synced successfully"
)
def DeleteUser(self, request, context):
if request.user_id in users_db:
del users_db[request.user_id]
return users_pb2.Empty()
context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
def serve():
port = os.getenv("PORT", "50051")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_insecure_port(f"0.0.0.0:{port}")
server.start()
print(f"gRPC server running on port {port}")
server.wait_for_termination()
if __name__ == "__main__":
serve()// src/Services/UserService.cs
using Grpc.Core;
using Users;
namespace GrpcServer.Services;
public class UserServiceImpl : UserService.UserServiceBase
{
private static readonly Dictionary<int, User> UsersDb = new()
{
[1] = new User { Id = 1, Name = "Alice", Email = "alice@example.com", IsActive = true },
[2] = new User { Id = 2, Name = "Bob", Email = "bob@example.com", IsActive = true },
};
private static int _nextUserId = 3;
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
if (!UsersDb.TryGetValue(request.UserId, out var user))
throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));
return Task.FromResult(new GetUserResponse { User = user });
}
public override Task<GetUserResponse> CreateUser(CreateUserRequest request, ServerCallContext context)
{
if (string.IsNullOrEmpty(request.Name) || string.IsNullOrEmpty(request.Email))
throw new RpcException(new Status(StatusCode.InvalidArgument, "Name and email are required"));
var user = new User { Id = _nextUserId++, Name = request.Name, Email = request.Email, IsActive = true };
UsersDb[user.Id] = user;
return Task.FromResult(new GetUserResponse { User = user });
}
public override async Task ListUsers(ListUsersRequest request, IServerStreamWriter<GetUserResponse> responseStream, ServerCallContext context)
{
int page = request.Page > 0 ? request.Page : 1;
int pageSize = request.PageSize > 0 ? request.PageSize : 10;
var users = UsersDb.Values.Skip((page - 1) * pageSize).Take(pageSize);
foreach (var user in users)
await responseStream.WriteAsync(new GetUserResponse { User = user });
}
public override async Task<ListUsersResponse> BatchCreateUsers(
IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
{
var created = new List<User>();
await foreach (var req in requestStream.ReadAllAsync())
{
var user = new User { Id = _nextUserId++, Name = req.Name, Email = req.Email, IsActive = true };
UsersDb[user.Id] = user;
created.Add(user);
}
var response = new ListUsersResponse { TotalCount = created.Count };
response.Users.AddRange(created);
return response;
}
public override async Task SyncUsers(
IAsyncStreamReader<SyncUserRequest> requestStream,
IServerStreamWriter<SyncUserResponse> responseStream,
ServerCallContext context)
{
await foreach (var req in requestStream.ReadAllAsync())
{
int uid = req.UserId > 0 ? req.UserId : _nextUserId++;
UsersDb[uid] = new User { Id = uid, Name = req.Name, Email = req.Email, IsActive = true };
await responseStream.WriteAsync(new SyncUserResponse
{
UserId = uid, Status = "SYNCED",
Message = $"User {req.Name} synced successfully"
});
}
}
public override Task<Empty> DeleteUser(DeleteUserRequest request, ServerCallContext context)
{
if (!UsersDb.Remove(request.UserId))
throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));
return Task.FromResult(new Empty());
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
int port = int.Parse(Environment.GetEnvironmentVariable("PORT") ?? "50051");
app.Run($"http://0.0.0.0:{port}");Implementing a gRPC Client
Basic Client Implementation
// src/client.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceClient } from './generated/user_service';
class UserServiceClient {
private client: UserServiceClient;
constructor(address: string = 'localhost:50051') {
this.client = new UserServiceClient(
address,
grpc.credentials.createInsecure()
);
}
// Unary call
async getUser(userId: number): Promise<any> {
return new Promise((resolve, reject) => {
this.client.getUser({ userId }, (err, response) => {
if (err) reject(err);
else resolve(response?.user);
});
});
}
// Server streaming
async listUsers(page: number = 1, pageSize: number = 10): Promise<any[]> {
return new Promise((resolve, reject) => {
const users: any[] = [];
const call = this.client.listUsers({ page, pageSize });
call.on('data', (response) => {
users.push(response.user);
});
call.on('end', () => {
resolve(users);
});
call.on('error', reject);
});
}
// Client streaming
async batchCreateUsers(requests: any[]): Promise<any> {
return new Promise((resolve, reject) => {
const call = this.client.batchCreateUsers((err, response) => {
if (err) reject(err);
else resolve(response);
});
for (const request of requests) {
call.write(request);
}
call.end();
});
}
// Bidirectional streaming
async syncUsers(userUpdates: any[]): Promise<void> {
return new Promise((resolve, reject) => {
const call = this.client.syncUsers();
call.on('data', (response) => {
console.log('Sync response:', response);
});
call.on('end', resolve);
call.on('error', reject);
for (const update of userUpdates) {
call.write(update);
}
call.end();
});
}
close() {
grpc.closeClient(this.client);
}
}
// Usage
async function main() {
const client = new UserServiceClient();
try {
const user = await client.getUser(1);
console.log('User:', user);
const users = await client.listUsers(1, 5);
console.log('Users:', users);
} catch (error) {
console.error('Error:', error);
} finally {
client.close();
}
}
main();// src/main/java/com/example/grpc/UserServiceClientExample.java
package com.example.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UserServiceClientExample {
private final ManagedChannel channel;
private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
private final UserServiceGrpc.UserServiceStub asyncStub;
public UserServiceClientExample(String address) {
channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
blockingStub = UserServiceGrpc.newBlockingStub(channel);
asyncStub = UserServiceGrpc.newStub(channel);
}
// Unary call
public User getUser(int userId) {
GetUserResponse response = blockingStub.getUser(
GetUserRequest.newBuilder().setUserId(userId).build()
);
return response.getUser();
}
// Server streaming
public List<User> listUsers(int page, int pageSize) {
List<User> users = new ArrayList<>();
blockingStub.listUsers(ListUsersRequest.newBuilder().setPage(page).setPageSize(pageSize).build())
.forEachRemaining(r -> users.add(r.getUser()));
return users;
}
// Client streaming
public ListUsersResponse batchCreateUsers(List<CreateUserRequest> requests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
List<ListUsersResponse> result = new ArrayList<>();
StreamObserver<CreateUserRequest> requestObserver = asyncStub.batchCreateUsers(
new StreamObserver<>() {
@Override public void onNext(ListUsersResponse r) { result.add(r); }
@Override public void onError(Throwable t) { latch.countDown(); }
@Override public void onCompleted() { latch.countDown(); }
}
);
for (CreateUserRequest req : requests) requestObserver.onNext(req);
requestObserver.onCompleted();
latch.await(30, TimeUnit.SECONDS);
return result.isEmpty() ? null : result.get(0);
}
// Bidirectional streaming
public void syncUsers(List<SyncUserRequest> updates) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<SyncUserRequest> requestObserver = asyncStub.syncUsers(
new StreamObserver<>() {
@Override public void onNext(SyncUserResponse r) { System.out.println("Sync response: " + r); }
@Override public void onError(Throwable t) { latch.countDown(); }
@Override public void onCompleted() { latch.countDown(); }
}
);
for (SyncUserRequest update : updates) requestObserver.onNext(update);
requestObserver.onCompleted();
latch.await(30, TimeUnit.SECONDS);
}
public void close() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws InterruptedException {
UserServiceClientExample client = new UserServiceClientExample("localhost:50051");
try {
User user = client.getUser(1);
System.out.println("User: " + user);
List<User> users = client.listUsers(1, 5);
System.out.println("Users: " + users);
} finally {
client.close();
}
}
}# src/client.py
import grpc
import users_pb2
import users_pb2_grpc
from typing import Iterator
class UserServiceClient:
def __init__(self, address: str = "localhost:50051"):
self.channel = grpc.insecure_channel(address)
self.stub = users_pb2_grpc.UserServiceStub(self.channel)
def get_user(self, user_id: int):
response = self.stub.GetUser(users_pb2.GetUserRequest(user_id=user_id))
return response.user
def list_users(self, page: int = 1, page_size: int = 10) -> list:
users = []
for response in self.stub.ListUsers(
users_pb2.ListUsersRequest(page=page, page_size=page_size)
):
users.append(response.user)
return users
def batch_create_users(self, requests: list[dict]):
def request_iterator() -> Iterator[users_pb2.CreateUserRequest]:
for r in requests:
yield users_pb2.CreateUserRequest(name=r["name"], email=r["email"])
return self.stub.BatchCreateUsers(request_iterator())
def sync_users(self, updates: list[dict]) -> None:
def request_iterator() -> Iterator[users_pb2.SyncUserRequest]:
for u in updates:
yield users_pb2.SyncUserRequest(
user_id=u.get("user_id", 0), name=u["name"], email=u["email"]
)
for response in self.stub.SyncUsers(request_iterator()):
print("Sync response:", response)
def close(self):
self.channel.close()
def main():
client = UserServiceClient()
try:
user = client.get_user(1)
print("User:", user)
users = client.list_users(1, 5)
print("Users:", users)
except grpc.RpcError as e:
print("Error:", e)
finally:
client.close()
if __name__ == "__main__":
main()// src/Clients/UserServiceClient.cs
using Grpc.Net.Client;
using Users;
namespace GrpcClient;
public class UserServiceClientExample : IDisposable
{
private readonly GrpcChannel _channel;
private readonly UserService.UserServiceClient _client;
public UserServiceClientExample(string address = "http://localhost:50051")
{
_channel = GrpcChannel.ForAddress(address);
_client = new UserService.UserServiceClient(_channel);
}
// Unary call
public async Task<User> GetUserAsync(int userId)
{
var response = await _client.GetUserAsync(new GetUserRequest { UserId = userId });
return response.User;
}
// Server streaming
public async Task<List<User>> ListUsersAsync(int page = 1, int pageSize = 10)
{
var users = new List<User>();
using var call = _client.ListUsers(new ListUsersRequest { Page = page, PageSize = pageSize });
await foreach (var response in call.ResponseStream.ReadAllAsync())
users.Add(response.User);
return users;
}
// Client streaming
public async Task<ListUsersResponse> BatchCreateUsersAsync(IEnumerable<(string Name, string Email)> requests)
{
using var call = _client.BatchCreateUsers();
foreach (var (name, email) in requests)
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = name, Email = email });
await call.RequestStream.CompleteAsync();
return await call.ResponseAsync;
}
// Bidirectional streaming
public async Task SyncUsersAsync(IEnumerable<(int UserId, string Name, string Email)> updates)
{
using var call = _client.SyncUsers();
var readTask = Task.Run(async () =>
{
await foreach (var response in call.ResponseStream.ReadAllAsync())
Console.WriteLine($"Sync response: {response}");
});
foreach (var (userId, name, email) in updates)
await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = userId, Name = name, Email = email });
await call.RequestStream.CompleteAsync();
await readTask;
}
public void Dispose() => _channel.Dispose();
}
// Program.cs usage
using var client = new UserServiceClientExample();
var user = await client.GetUserAsync(1);
Console.WriteLine($"User: {user}");
var users = await client.ListUsersAsync(1, 5);
Console.WriteLine($"Users: {users.Count}");The Four Communication Patterns
1. Unary RPC (Single Request/Response)
The most common pattern, equivalent to a traditional function call:
// Handler
async getUser(call, callback) {
const user = await fetchUser(call.request.userId);
callback(null, { user });
}
// Client
const user = await client.getUser({ userId: 1 });// Handler
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
User user = fetchUser(request.getUserId());
responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
responseObserver.onCompleted();
}
// Client (blocking stub)
GetUserResponse response = blockingStub.getUser(
GetUserRequest.newBuilder().setUserId(1).build()
);
User user = response.getUser();# Handler
def GetUser(self, request, context):
user = fetch_user(request.user_id)
return GetUserResponse(user=user)
# Client
response = stub.GetUser(GetUserRequest(user_id=1))
user = response.user// Handler
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
var user = FetchUser(request.UserId);
return Task.FromResult(new GetUserResponse { User = user });
}
// Client
var response = await client.GetUserAsync(new GetUserRequest { UserId = 1 });
var user = response.User;Use Cases: Fetching a single resource, creating resources with validation, simple updates.
2. Server Streaming RPC
Server sends multiple responses to a single client request:
// Handler
async listUsers(call) {
const users = await fetchAllUsers();
for (const user of users) {
call.write({ user });
}
call.end();
}
// Client
const call = client.listUsers({ pageSize: 10 });
call.on('data', (response) => {
console.log('Received user:', response.user);
});
call.on('end', () => {
console.log('Stream complete');
});// Handler
@Override
public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
List<User> users = fetchAllUsers();
for (User user : users) {
responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
}
responseObserver.onCompleted();
}
// Client (blocking iterator)
Iterator<GetUserResponse> responses = blockingStub.listUsers(
ListUsersRequest.newBuilder().setPageSize(10).build()
);
while (responses.hasNext()) {
System.out.println("Received user: " + responses.next().getUser());
}
System.out.println("Stream complete");# Handler
def ListUsers(self, request, context):
users = fetch_all_users()
for user in users:
yield GetUserResponse(user=user)
# Client
for response in stub.ListUsers(ListUsersRequest(page_size=10)):
print("Received user:", response.user)
print("Stream complete")// Handler
public override async Task ListUsers(
ListUsersRequest request,
IServerStreamWriter<GetUserResponse> responseStream,
ServerCallContext context)
{
var users = await FetchAllUsersAsync();
foreach (var user in users)
await responseStream.WriteAsync(new GetUserResponse { User = user });
}
// Client
using var call = client.ListUsers(new ListUsersRequest { PageSize = 10 });
await foreach (var response in call.ResponseStream.ReadAllAsync())
Console.WriteLine($"Received user: {response.User}");
Console.WriteLine("Stream complete");Use Cases: Pagination, real-time data feeds, large dataset exports, log streaming.
3. Client Streaming RPC
Client sends multiple requests to a single server response:
// Handler
async batchCreateUsers(call, callback) {
const users = [];
call.on('data', (request) => {
users.push(request);
});
call.on('end', () => {
const saved = saveUsers(users);
callback(null, { users: saved });
});
}
// Client
const call = client.batchCreateUsers((err, response) => {
console.log('All users created:', response.users);
});
call.write({ name: 'Alice', email: 'alice@example.com' });
call.write({ name: 'Bob', email: 'bob@example.com' });
call.end();// Handler
@Override
public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
List<User> users = new ArrayList<>();
return new StreamObserver<>() {
@Override public void onNext(CreateUserRequest req) { users.add(saveUser(req)); }
@Override public void onError(Throwable t) { responseObserver.onError(t); }
@Override public void onCompleted() {
ListUsersResponse resp = ListUsersResponse.newBuilder()
.addAllUsers(users).setTotalCount(users.size()).build();
responseObserver.onNext(resp);
responseObserver.onCompleted();
}
};
}
// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<CreateUserRequest> req = asyncStub.batchCreateUsers(new StreamObserver<>() {
@Override public void onNext(ListUsersResponse r) { System.out.println("All users created: " + r.getUsersList()); }
@Override public void onError(Throwable t) { latch.countDown(); }
@Override public void onCompleted() { latch.countDown(); }
});
req.onNext(CreateUserRequest.newBuilder().setName("Alice").setEmail("alice@example.com").build());
req.onNext(CreateUserRequest.newBuilder().setName("Bob").setEmail("bob@example.com").build());
req.onCompleted();
latch.await();# Handler
def BatchCreateUsers(self, request_iterator, context):
users = []
for request in request_iterator:
users.append(save_user(request))
return ListUsersResponse(users=users, total_count=len(users))
# Client
def request_iterator():
yield CreateUserRequest(name="Alice", email="alice@example.com")
yield CreateUserRequest(name="Bob", email="bob@example.com")
response = stub.BatchCreateUsers(request_iterator())
print("All users created:", list(response.users))// Handler
public override async Task<ListUsersResponse> BatchCreateUsers(
IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
{
var users = new List<User>();
await foreach (var req in requestStream.ReadAllAsync())
users.Add(SaveUser(req));
var response = new ListUsersResponse { TotalCount = users.Count };
response.Users.AddRange(users);
return response;
}
// Client
using var call = client.BatchCreateUsers();
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Alice", Email = "alice@example.com" });
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Bob", Email = "bob@example.com" });
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
Console.WriteLine($"All users created: {response.Users.Count}");Use Cases: Bulk imports, file uploads, batch processing, data synchronization.
4. Bidirectional Streaming RPC
Both client and server send multiple messages over a persistent connection:
// Handler
async syncUsers(call) {
call.on('data', (request) => {
const synced = syncUser(request);
call.write({
userId: synced.id,
status: 'SYNCED',
});
});
call.on('end', () => {
call.end();
});
}
// Client
const call = client.syncUsers();
call.on('data', (response) => {
console.log('Sync result:', response);
});
call.write({ userId: 1, name: 'Alice Updated' });
call.write({ userId: 2, name: 'Bob Updated' });
call.end();// Handler
@Override
public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
return new StreamObserver<>() {
@Override
public void onNext(SyncUserRequest req) {
User synced = syncUser(req);
responseObserver.onNext(SyncUserResponse.newBuilder()
.setUserId(synced.getId()).setStatus("SYNCED").build());
}
@Override public void onError(Throwable t) { responseObserver.onError(t); }
@Override public void onCompleted() { responseObserver.onCompleted(); }
};
}
// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<SyncUserRequest> req = asyncStub.syncUsers(new StreamObserver<>() {
@Override public void onNext(SyncUserResponse r) { System.out.println("Sync result: " + r); }
@Override public void onError(Throwable t) { latch.countDown(); }
@Override public void onCompleted() { latch.countDown(); }
});
req.onNext(SyncUserRequest.newBuilder().setUserId(1).setName("Alice Updated").build());
req.onNext(SyncUserRequest.newBuilder().setUserId(2).setName("Bob Updated").build());
req.onCompleted();
latch.await();# Handler
def SyncUsers(self, request_iterator, context):
for request in request_iterator:
synced = sync_user(request)
yield SyncUserResponse(user_id=synced.id, status="SYNCED")
# Client
def request_iterator():
yield SyncUserRequest(user_id=1, name="Alice Updated")
yield SyncUserRequest(user_id=2, name="Bob Updated")
for response in stub.SyncUsers(request_iterator()):
print("Sync result:", response)// Handler
public override async Task SyncUsers(
IAsyncStreamReader<SyncUserRequest> requestStream,
IServerStreamWriter<SyncUserResponse> responseStream,
ServerCallContext context)
{
await foreach (var req in requestStream.ReadAllAsync())
{
var synced = SyncUser(req);
await responseStream.WriteAsync(new SyncUserResponse { UserId = synced.Id, Status = "SYNCED" });
}
}
// Client
using var call = client.SyncUsers();
var readTask = Task.Run(async () =>
{
await foreach (var response in call.ResponseStream.ReadAllAsync())
Console.WriteLine($"Sync result: {response}");
});
await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 1, Name = "Alice Updated" });
await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 2, Name = "Bob Updated" });
await call.RequestStream.CompleteAsync();
await readTask;Use Cases: Chat applications, live collaboration, bidirectional notifications, game state sync.
Interceptors and Middleware
Interceptors allow you to intercept and inspect RPC calls, enabling logging, authentication, metrics, and request/response modification.
Server-Side Interceptor
// src/middleware/serverInterceptor.ts
import * as grpc from '@grpc/grpc-js';
interface CallMetadata {
userId?: string;
requestId?: string;
startTime: number;
}
export function createServerInterceptor() {
return (
methodDescriptor: any,
call: any
): grpc.Listener => {
const metadata: CallMetadata = {
requestId: call.metadata.get('x-request-id')?.[0] as string,
userId: call.metadata.get('x-user-id')?.[0] as string,
startTime: Date.now(),
};
// Log incoming call
console.log('[gRPC Server]', {
method: methodDescriptor.path,
requestId: metadata.requestId,
userId: metadata.userId,
});
// Wrap call handlers
const originalSendMetadata = call.sendMetadata;
call.sendMetadata = function(responseMetadata: grpc.Metadata) {
responseMetadata.add('x-request-id', metadata.requestId || '');
originalSendMetadata.call(this, responseMetadata);
};
return {
onReceiveMetadata(metadata: grpc.Metadata, next: any) {
next(metadata);
},
onReceiveMessage(message: any, next: any) {
console.log('[gRPC Message]', {
method: methodDescriptor.path,
requestId: metadata.requestId,
payload: JSON.stringify(message).substring(0, 200),
});
next(message);
},
onReceiveHalfClose(next: any) {
next();
},
onCancel(next: any) {
const duration = Date.now() - metadata.startTime;
console.log('[gRPC Cancelled]', {
method: methodDescriptor.path,
requestId: metadata.requestId,
duration,
});
next();
},
};
};
}// src/main/java/com/example/grpc/ServerLoggingInterceptor.java
package com.example.grpc;
import io.grpc.*;
public class ServerLoggingInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String requestId = headers.get(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER));
String userId = headers.get(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER));
long startTime = System.currentTimeMillis();
System.out.printf("[gRPC Server] method=%s requestId=%s userId=%s%n",
call.getMethodDescriptor().getFullMethodName(), requestId, userId);
ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
if (requestId != null) {
responseHeaders.put(
Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
}
super.sendHeaders(responseHeaders);
}
};
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(next.startCall(wrappedCall, headers)) {
@Override
public void onMessage(ReqT message) {
System.out.printf("[gRPC Message] method=%s requestId=%s payload=%s%n",
call.getMethodDescriptor().getFullMethodName(), requestId,
message.toString().substring(0, Math.min(200, message.toString().length())));
super.onMessage(message);
}
@Override
public void onCancel() {
long duration = System.currentTimeMillis() - startTime;
System.out.printf("[gRPC Cancelled] method=%s requestId=%s duration=%dms%n",
call.getMethodDescriptor().getFullMethodName(), requestId, duration);
super.onCancel();
}
};
}
}# src/interceptors/server_interceptor.py
import grpc
import time
import json
from grpc import ServerInterceptor
class LoggingServerInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
metadata = dict(handler_call_details.invocation_metadata)
request_id = metadata.get("x-request-id")
user_id = metadata.get("x-user-id")
start_time = time.time()
print(f"[gRPC Server] method={method} requestId={request_id} userId={user_id}")
handler = continuation(handler_call_details)
if handler is None:
return None
def wrap_unary(behavior):
def new_behavior(request, context):
try:
payload = str(request)[:200]
print(f"[gRPC Message] method={method} requestId={request_id} payload={payload}")
return behavior(request, context)
except Exception as e:
duration = (time.time() - start_time) * 1000
print(f"[gRPC Error] method={method} requestId={request_id} duration={duration:.0f}ms error={e}")
raise
return new_behavior
if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
wrap_unary(handler.unary_unary),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler// src/Interceptors/ServerLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace GrpcServer.Interceptors;
public class ServerLoggingInterceptor : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var requestId = context.RequestHeaders.GetValue("x-request-id");
var userId = context.RequestHeaders.GetValue("x-user-id");
var startTime = DateTimeOffset.UtcNow;
Console.WriteLine($"[gRPC Server] method={context.Method} requestId={requestId} userId={userId}");
Console.WriteLine($"[gRPC Message] method={context.Method} requestId={requestId} payload={request?.ToString()?[..Math.Min(200, request.ToString()?.Length ?? 0)]}");
context.ResponseTrailers.Add("x-request-id", requestId ?? string.Empty);
try
{
return await continuation(request, context);
}
catch (Exception)
{
var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
Console.WriteLine($"[gRPC Cancelled] method={context.Method} requestId={requestId} duration={duration:F0}ms");
throw;
}
}
}
// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<ServerLoggingInterceptor>());Client-Side Interceptor
// src/middleware/clientInterceptor.ts
import * as grpc from '@grpc/grpc-js';
export interface ClientInterceptorOptions {
requestId?: string;
userId?: string;
authToken?: string;
}
export function createClientInterceptor(options: ClientInterceptorOptions) {
return (methodDescriptor: any, createCall: any) => {
return (metadata: grpc.Metadata, listener: any, next: any) => {
// Add custom metadata
if (options.requestId) {
metadata.add('x-request-id', options.requestId);
}
if (options.userId) {
metadata.add('x-user-id', options.userId);
}
if (options.authToken) {
metadata.add('authorization', `Bearer ${options.authToken}`);
}
console.log('[gRPC Client]', {
method: methodDescriptor.path,
requestId: options.requestId,
});
const startTime = Date.now();
// Create listener wrapper to capture response
const wrappedListener: any = {
onReceiveMetadata(metadata: grpc.Metadata) {
listener.onReceiveMetadata(metadata);
},
onReceiveMessage(message: any) {
console.log('[gRPC Response]', {
duration: Date.now() - startTime,
message: JSON.stringify(message).substring(0, 200),
});
listener.onReceiveMessage(message);
},
onReceiveStatus(status: grpc.Status) {
const duration = Date.now() - startTime;
console.log('[gRPC Status]', {
code: status.code,
details: status.details,
duration,
});
listener.onReceiveStatus(status);
},
};
return next(metadata, wrappedListener);
};
};
}// src/main/java/com/example/grpc/ClientLoggingInterceptor.java
package com.example.grpc;
import io.grpc.*;
public class ClientLoggingInterceptor implements ClientInterceptor {
private final String requestId;
private final String userId;
private final String authToken;
public ClientLoggingInterceptor(String requestId, String userId, String authToken) {
this.requestId = requestId;
this.userId = userId;
this.authToken = authToken;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (requestId != null) headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
if (userId != null) headers.put(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER), userId);
if (authToken != null) headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "Bearer " + authToken);
System.out.printf("[gRPC Client] method=%s requestId=%s%n", method.getFullMethodName(), requestId);
long startTime = System.currentTimeMillis();
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
@Override
public void onMessage(RespT message) {
System.out.printf("[gRPC Response] duration=%dms message=%s%n",
System.currentTimeMillis() - startTime,
message.toString().substring(0, Math.min(200, message.toString().length())));
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
System.out.printf("[gRPC Status] code=%s details=%s duration=%dms%n",
status.getCode(), status.getDescription(), System.currentTimeMillis() - startTime);
super.onClose(status, trailers);
}
}, headers);
}
};
}
}
// Usage: ManagedChannelBuilder.forTarget(addr).intercept(new ClientLoggingInterceptor(...)).build()# src/interceptors/client_interceptor.py
import grpc
import time
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails
class ClientLoggingInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor):
def __init__(self, request_id: str = None, user_id: str = None, auth_token: str = None):
self.request_id = request_id
self.user_id = user_id
self.auth_token = auth_token
def _add_metadata(self, client_call_details: ClientCallDetails):
metadata = list(client_call_details.metadata or [])
if self.request_id:
metadata.append(("x-request-id", self.request_id))
if self.user_id:
metadata.append(("x-user-id", self.user_id))
if self.auth_token:
metadata.append(("authorization", f"Bearer {self.auth_token}"))
return metadata
def intercept_unary_unary(self, continuation, client_call_details, request):
metadata = self._add_metadata(client_call_details)
new_details = client_call_details._replace(metadata=metadata)
print(f"[gRPC Client] method={client_call_details.method} requestId={self.request_id}")
start = time.time()
response = continuation(new_details, request)
duration = (time.time() - start) * 1000
print(f"[gRPC Response] duration={duration:.0f}ms")
return response
def intercept_unary_stream(self, continuation, client_call_details, request):
metadata = self._add_metadata(client_call_details)
new_details = client_call_details._replace(metadata=metadata)
return continuation(new_details, request)
# Usage:
# channel = grpc.intercept_channel(
# grpc.insecure_channel("localhost:50051"),
# ClientLoggingInterceptor(request_id="abc", auth_token="token")
# )// src/Interceptors/ClientLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;
namespace GrpcClient.Interceptors;
public class ClientLoggingInterceptor : Interceptor
{
private readonly string? _requestId;
private readonly string? _userId;
private readonly string? _authToken;
public ClientLoggingInterceptor(string? requestId = null, string? userId = null, string? authToken = null)
{
_requestId = requestId;
_userId = userId;
_authToken = authToken;
}
private Metadata AddHeaders(Metadata? headers = null)
{
var metadata = headers ?? new Metadata();
if (_requestId is not null) metadata.Add("x-request-id", _requestId);
if (_userId is not null) metadata.Add("x-user-id", _userId);
if (_authToken is not null) metadata.Add("authorization", $"Bearer {_authToken}");
return metadata;
}
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
var newContext = new ClientInterceptorContext<TRequest, TResponse>(
context.Method, context.Host,
context.Options.WithHeaders(AddHeaders(context.Options.Headers)));
Console.WriteLine($"[gRPC Client] method={context.Method.FullName} requestId={_requestId}");
var startTime = DateTimeOffset.UtcNow;
var call = continuation(request, newContext);
return new AsyncUnaryCall<TResponse>(
call.ResponseAsync.ContinueWith(t =>
{
var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
Console.WriteLine($"[gRPC Response] duration={duration:F0}ms");
return t.Result;
}),
call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
}
}
// Usage: channel.Intercept(new ClientLoggingInterceptor(requestId: "abc", authToken: "token"))Error Handling with gRPC Status Codes
gRPC defines 16 standard error codes for different failure scenarios:
// src/errors/grpcErrors.ts
import * as grpc from '@grpc/grpc-js';
export enum GrpcErrorCode {
OK = grpc.status.OK,
CANCELLED = grpc.status.CANCELLED,
UNKNOWN = grpc.status.UNKNOWN,
INVALID_ARGUMENT = grpc.status.INVALID_ARGUMENT,
DEADLINE_EXCEEDED = grpc.status.DEADLINE_EXCEEDED,
NOT_FOUND = grpc.status.NOT_FOUND,
ALREADY_EXISTS = grpc.status.ALREADY_EXISTS,
PERMISSION_DENIED = grpc.status.PERMISSION_DENIED,
RESOURCE_EXHAUSTED = grpc.status.RESOURCE_EXHAUSTED,
FAILED_PRECONDITION = grpc.status.FAILED_PRECONDITION,
ABORTED = grpc.status.ABORTED,
OUT_OF_RANGE = grpc.status.OUT_OF_RANGE,
UNIMPLEMENTED = grpc.status.UNIMPLEMENTED,
INTERNAL = grpc.status.INTERNAL,
UNAVAILABLE = grpc.status.UNAVAILABLE,
DATA_LOSS = grpc.status.DATA_LOSS,
UNAUTHENTICATED = grpc.status.UNAUTHENTICATED,
}
export class GrpcError extends Error {
constructor(
public code: GrpcErrorCode,
message: string,
public metadata?: grpc.Metadata
) {
super(message);
this.name = 'GrpcError';
}
}
// Handler example with proper error responses
async function getUserHandler(
call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
callback: grpc.sendUnaryData<GetUserResponse>
) {
try {
const { userId } = call.request;
if (!userId || userId <= 0) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'User ID must be a positive integer',
});
}
const user = await fetchUser(userId);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: `User ${userId} not found`,
});
}
callback(null, { user });
} catch (error) {
console.error('Unexpected error:', error);
callback({
code: grpc.status.INTERNAL,
message: 'An unexpected error occurred',
});
}
}
// Status code guide
const statusCodeGuide = {
[grpc.status.INVALID_ARGUMENT]: 'Client sent invalid argument',
[grpc.status.NOT_FOUND]: 'Requested resource does not exist',
[grpc.status.ALREADY_EXISTS]: 'Resource already exists',
[grpc.status.PERMISSION_DENIED]: 'Client lacks permission',
[grpc.status.RESOURCE_EXHAUSTED]: 'Server resource exhausted',
[grpc.status.DEADLINE_EXCEEDED]: 'Operation deadline exceeded',
[grpc.status.UNAUTHENTICATED]: 'Request authentication failed',
[grpc.status.UNAVAILABLE]: 'Service temporarily unavailable',
[grpc.status.INTERNAL]: 'Internal server error',
};// src/main/java/com/example/grpc/UserServiceWithErrorHandling.java
package com.example.grpc;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;
public class UserServiceWithErrorHandling extends UserServiceGrpc.UserServiceImplBase {
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
try {
int userId = request.getUserId();
if (userId <= 0) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("User ID must be a positive integer")
.asRuntimeException());
return;
}
User user = fetchUser(userId);
if (user == null) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("User " + userId + " not found")
.asRuntimeException());
return;
}
responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
responseObserver.onCompleted();
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
responseObserver.onError(Status.INTERNAL
.withDescription("An unexpected error occurred")
.withCause(e)
.asRuntimeException());
}
}
// Status code guide (as constants)
// Status.INVALID_ARGUMENT -> Client sent invalid argument
// Status.NOT_FOUND -> Requested resource does not exist
// Status.ALREADY_EXISTS -> Resource already exists
// Status.PERMISSION_DENIED -> Client lacks permission
// Status.RESOURCE_EXHAUSTED-> Server resource exhausted
// Status.DEADLINE_EXCEEDED -> Operation deadline exceeded
// Status.UNAUTHENTICATED -> Request authentication failed
// Status.UNAVAILABLE -> Service temporarily unavailable
// Status.INTERNAL -> Internal server error
}# src/errors/grpc_errors.py
import grpc
from enum import IntEnum
class GrpcErrorCode(IntEnum):
OK = grpc.StatusCode.OK.value[0]
CANCELLED = grpc.StatusCode.CANCELLED.value[0]
UNKNOWN = grpc.StatusCode.UNKNOWN.value[0]
INVALID_ARGUMENT = grpc.StatusCode.INVALID_ARGUMENT.value[0]
DEADLINE_EXCEEDED = grpc.StatusCode.DEADLINE_EXCEEDED.value[0]
NOT_FOUND = grpc.StatusCode.NOT_FOUND.value[0]
ALREADY_EXISTS = grpc.StatusCode.ALREADY_EXISTS.value[0]
PERMISSION_DENIED = grpc.StatusCode.PERMISSION_DENIED.value[0]
RESOURCE_EXHAUSTED = grpc.StatusCode.RESOURCE_EXHAUSTED.value[0]
INTERNAL = grpc.StatusCode.INTERNAL.value[0]
UNAVAILABLE = grpc.StatusCode.UNAVAILABLE.value[0]
UNAUTHENTICATED = grpc.StatusCode.UNAUTHENTICATED.value[0]
# Handler with proper error responses
def get_user_handler(request, context):
try:
user_id = request.user_id
if not user_id or user_id <= 0:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "User ID must be a positive integer")
user = fetch_user(user_id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")
return GetUserResponse(user=user)
except grpc.RpcError:
raise
except Exception as e:
print(f"Unexpected error: {e}")
context.abort(grpc.StatusCode.INTERNAL, "An unexpected error occurred")
STATUS_CODE_GUIDE = {
grpc.StatusCode.INVALID_ARGUMENT: "Client sent invalid argument",
grpc.StatusCode.NOT_FOUND: "Requested resource does not exist",
grpc.StatusCode.ALREADY_EXISTS: "Resource already exists",
grpc.StatusCode.PERMISSION_DENIED: "Client lacks permission",
grpc.StatusCode.RESOURCE_EXHAUSTED: "Server resource exhausted",
grpc.StatusCode.DEADLINE_EXCEEDED: "Operation deadline exceeded",
grpc.StatusCode.UNAUTHENTICATED: "Request authentication failed",
grpc.StatusCode.UNAVAILABLE: "Service temporarily unavailable",
grpc.StatusCode.INTERNAL: "Internal server error",
}// src/Errors/GrpcErrors.cs
using Grpc.Core;
namespace GrpcServer.Errors;
public static class GrpcErrorHandler
{
// Handler with proper error responses
public static async Task<GetUserResponse> GetUserHandler(GetUserRequest request, ServerCallContext context)
{
try
{
if (request.UserId <= 0)
throw new RpcException(new Status(StatusCode.InvalidArgument,
"User ID must be a positive integer"));
var user = await FetchUserAsync(request.UserId);
if (user is null)
throw new RpcException(new Status(StatusCode.NotFound,
$"User {request.UserId} not found"));
return new GetUserResponse { User = user };
}
catch (RpcException) { throw; }
catch (Exception e)
{
Console.Error.WriteLine($"Unexpected error: {e}");
throw new RpcException(new Status(StatusCode.Internal, "An unexpected error occurred"));
}
}
// Status code guide
public static readonly Dictionary<StatusCode, string> StatusCodeGuide = new()
{
[StatusCode.InvalidArgument] = "Client sent invalid argument",
[StatusCode.NotFound] = "Requested resource does not exist",
[StatusCode.AlreadyExists] = "Resource already exists",
[StatusCode.PermissionDenied] = "Client lacks permission",
[StatusCode.ResourceExhausted]= "Server resource exhausted",
[StatusCode.DeadlineExceeded] = "Operation deadline exceeded",
[StatusCode.Unauthenticated] = "Request authentication failed",
[StatusCode.Unavailable] = "Service temporarily unavailable",
[StatusCode.Internal] = "Internal server error",
};
}Deadlines and Timeout Propagation
Deadlines ensure operations complete within a specified time frame. gRPC automatically propagates deadlines across service calls:
// Client setting a deadline
async function getUserWithTimeout(userId: number) {
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 5); // 5-second timeout
return new Promise((resolve, reject) => {
const client = new UserServiceClient('localhost:50051');
const call = client.getUser(
{ userId },
{ deadline },
(err, response) => {
if (err) {
if (err.code === grpc.status.DEADLINE_EXCEEDED) {
reject(new Error('Request exceeded 5-second deadline'));
} else {
reject(err);
}
} else {
resolve(response?.user);
}
}
);
});
}
// Server checking deadline
async function getUser(
call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
callback: grpc.sendUnaryData<GetUserResponse>
) {
const deadline = call.deadline;
const timeRemaining = deadline.getTime() - Date.now();
if (timeRemaining < 1000) {
// Less than 1 second remaining
return callback({
code: grpc.status.DEADLINE_EXCEEDED,
message: 'Insufficient time to process request',
});
}
// ... process request with knowledge of remaining time
}// Client setting a deadline
import io.grpc.Deadline;
import java.util.concurrent.TimeUnit;
public User getUserWithTimeout(int userId) {
try {
return blockingStub
.withDeadline(Deadline.after(5, TimeUnit.SECONDS))
.getUser(GetUserRequest.newBuilder().setUserId(userId).build())
.getUser();
} catch (io.grpc.StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.DEADLINE_EXCEEDED) {
throw new RuntimeException("Request exceeded 5-second deadline", e);
}
throw e;
}
}
// Server checking deadline
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
// Check if context is already cancelled/deadline exceeded
if (Context.current().isCancelled()) {
responseObserver.onError(Status.DEADLINE_EXCEEDED
.withDescription("Insufficient time to process request")
.asRuntimeException());
return;
}
// ... process request
}# Client setting a deadline
def get_user_with_timeout(user_id: int):
try:
response = stub.GetUser(
GetUserRequest(user_id=user_id),
timeout=5 # 5-second timeout
)
return response.user
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
raise TimeoutError("Request exceeded 5-second deadline")
raise
# Server checking deadline (via context)
def GetUser(self, request, context):
# Check remaining time
deadline = context.time_remaining()
if deadline is not None and deadline < 1.0: # Less than 1 second remaining
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED,
"Insufficient time to process request")
# ... process request// Client setting a deadline
public async Task<User> GetUserWithTimeoutAsync(int userId)
{
try
{
var deadline = DateTime.UtcNow.AddSeconds(5); // 5-second timeout
var response = await _client.GetUserAsync(
new GetUserRequest { UserId = userId },
deadline: deadline
);
return response.User;
}
catch (RpcException e) when (e.StatusCode == StatusCode.DeadlineExceeded)
{
throw new TimeoutException("Request exceeded 5-second deadline", e);
}
}
// Server checking deadline
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
var deadline = context.Deadline;
var timeRemaining = deadline - DateTime.UtcNow;
if (timeRemaining < TimeSpan.FromSeconds(1))
{
throw new RpcException(new Status(StatusCode.DeadlineExceeded,
"Insufficient time to process request"));
}
// ... process request with knowledge of remaining time
return Task.FromResult(new GetUserResponse());
}Load Balancing Strategies
Client-Side Load Balancing
// src/loadbalancer/clientSideLoadBalancer.ts
import * as grpc from '@grpc/grpc-js';
interface ServerAddress {
host: string;
port: number;
}
export class RoundRobinLoadBalancer {
private currentIndex = 0;
private servers: ServerAddress[];
constructor(servers: ServerAddress[]) {
this.servers = servers;
}
getNextServer(): ServerAddress {
const server = this.servers[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.servers.length;
return server;
}
createClient(): UserServiceClient {
const { host, port } = this.getNextServer();
return new UserServiceClient(
`${host}:${port}`,
grpc.credentials.createInsecure()
);
}
}
// Usage
const lb = new RoundRobinLoadBalancer([
{ host: 'localhost', port: 50051 },
{ host: 'localhost', port: 50052 },
{ host: 'localhost', port: 50053 },
]);
async function callUserService(userId: number) {
const client = lb.createClient();
try {
return await client.getUser(userId);
} finally {
grpc.closeClient(client);
}
}// src/main/java/com/example/grpc/RoundRobinLoadBalancer.java
package com.example.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import users.UserServiceGrpc;
import users.UserServiceProto.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinLoadBalancer {
record ServerAddress(String host, int port) {}
private final List<ServerAddress> servers;
private final AtomicInteger currentIndex = new AtomicInteger(0);
public RoundRobinLoadBalancer(List<ServerAddress> servers) {
this.servers = servers;
}
public ServerAddress getNextServer() {
int index = currentIndex.getAndUpdate(i -> (i + 1) % servers.size());
return servers.get(index);
}
public UserServiceGrpc.UserServiceBlockingStub createStub() {
ServerAddress server = getNextServer();
ManagedChannel channel = ManagedChannelBuilder
.forAddress(server.host(), server.port())
.usePlaintext()
.build();
return UserServiceGrpc.newBlockingStub(channel);
}
// Usage
public static void main(String[] args) {
RoundRobinLoadBalancer lb = new RoundRobinLoadBalancer(List.of(
new ServerAddress("localhost", 50051),
new ServerAddress("localhost", 50052),
new ServerAddress("localhost", 50053)
));
// gRPC also supports built-in round_robin via service config
ManagedChannel channel = ManagedChannelBuilder
.forTarget("dns:///user-service:50051")
.defaultLoadBalancingPolicy("round_robin")
.usePlaintext()
.build();
}
}# src/loadbalancer/client_side_load_balancer.py
import grpc
import users_pb2_grpc
from dataclasses import dataclass
from itertools import cycle
from threading import Lock
@dataclass
class ServerAddress:
host: str
port: int
class RoundRobinLoadBalancer:
def __init__(self, servers: list[ServerAddress]):
self._servers = servers
self._cycle = cycle(servers)
self._lock = Lock()
def get_next_server(self) -> ServerAddress:
with self._lock:
return next(self._cycle)
def create_stub(self):
server = self.get_next_server()
channel = grpc.insecure_channel(f"{server.host}:{server.port}")
return users_pb2_grpc.UserServiceStub(channel)
# Usage
lb = RoundRobinLoadBalancer([
ServerAddress("localhost", 50051),
ServerAddress("localhost", 50052),
ServerAddress("localhost", 50053),
])
def call_user_service(user_id: int):
stub = lb.create_stub()
return stub.GetUser(users_pb2.GetUserRequest(user_id=user_id)).user
# gRPC also supports built-in round-robin via channel options:
# channel = grpc.insecure_channel(
# "dns:///user-service:50051",
# options=[("grpc.lb_policy_name", "round_robin")]
# )// src/LoadBalancing/RoundRobinLoadBalancer.cs
using Grpc.Net.Client;
using Users;
namespace GrpcClient.LoadBalancing;
public class RoundRobinLoadBalancer
{
private record ServerAddress(string Host, int Port);
private readonly ServerAddress[] _servers;
private int _currentIndex;
public RoundRobinLoadBalancer(params (string Host, int Port)[] servers)
{
_servers = servers.Select(s => new ServerAddress(s.Host, s.Port)).ToArray();
}
private ServerAddress GetNextServer()
{
int index = Interlocked.Increment(ref _currentIndex) % _servers.Length;
return _servers[Math.Abs(index)];
}
public UserService.UserServiceClient CreateClient()
{
var server = GetNextServer();
var channel = GrpcChannel.ForAddress($"http://{server.Host}:{server.Port}");
return new UserService.UserServiceClient(channel);
}
}
// Usage
var lb = new RoundRobinLoadBalancer(
("localhost", 50051),
("localhost", 50052),
("localhost", 50053)
);
async Task<User> CallUserService(int userId)
{
var client = lb.CreateClient();
var response = await client.GetUserAsync(new GetUserRequest { UserId = userId });
return response.User;
}
// gRPC also supports built-in round_robin via service config JSON on GrpcChannelProxy-Based Load Balancing
For production environments, use a dedicated reverse proxy:
# envoy.yaml
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address:
address: 127.0.0.1
port_value: 9901
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 50051
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
access_log:
- name: envoy.access_loggers.stdout
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
route_config:
name: local_route
virtual_hosts:
- name: backend
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: user_service
timeout: 5s
clusters:
- name: user_service
connect_timeout: 1s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: user_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: service1.local
port_value: 50051
- endpoint:
address:
socket_address:
address: service2.local
port_value: 50051
- endpoint:
address:
socket_address:
address: service3.local
port_value: 50051
health_checks:
- timeout: 1s
interval: 10s
unhealthy_threshold: 2
healthy_threshold: 2
grpc_health_check:
service_name: grpc.health.v1.Health
gRPC vs REST: A Comparison
| Aspect | gRPC | REST |
|---|---|---|
| Serialization | Protocol Buffers (binary) | JSON (text) |
| Transport | HTTP/2 | HTTP/1.1 |
| Payload Size | 3-10x smaller | Larger |
| Latency | Lower (binary + multiplexing) | Higher |
| Streaming | Native, bidirectional | Polling or WebSockets |
| Browser Support | gRPC-Web required | Native |
| Debugging | Requires tools (grpcurl) | curl, browser |
| Learning Curve | Steeper (proto files) | Gentle |
| Type Safety | Strong typing | Runtime validation |
| Caching | Limited | HTTP caching |
| Use Case | Microservices, real-time | Public APIs, web apps |
Choose gRPC when:
- Building microservices with strict latency requirements
- Implementing real-time bidirectional communication
- Serving polyglot services that need strong typing
- Handling high throughput with many concurrent connections
Choose REST when:
- Building public-facing APIs
- Supporting browser clients
- Prioritizing simplicity and discoverability
- Need standard HTTP caching
Production Deployment Checklist
Pre-Deployment
- Protocol buffer definitions are versioned and documented
- Service interfaces define all error cases with appropriate status codes
- Bidirectional streaming has timeout/heartbeat mechanisms
- Logging captures request ID, duration, and error details
- Metrics (latency, error rate, throughput) are instrumented
- Load tests verify performance under expected load
- Circuit breakers protect against cascading failures
- TLS certificates configured for production encryption
TLS/SSL Configuration
// src/server-secure.ts
import * as fs from 'fs';
import * as grpc from '@grpc/grpc-js';
const credentials = grpc.ServerCredentials.createSsl(
fs.readFileSync('./server.crt'),
[
{
cert_chain: fs.readFileSync('./server.crt'),
private_key: fs.readFileSync('./server.key'),
},
],
false
);
const server = new grpc.Server();
server.bindAsync('0.0.0.0:50051', credentials, (err) => {
if (err) throw err;
server.start();
console.log('Secure gRPC server listening on port 50051');
});
// Client with TLS
const clientCreds = grpc.ChannelCredentials.createSsl(
fs.readFileSync('./ca.crt')
);
const client = new UserServiceClient(
'secure-grpc-service.example.com:50051',
clientCreds
);// src/main/java/com/example/grpc/SecureServer.java
import io.grpc.Server;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContext;
import java.io.File;
// Server with TLS
SslContext sslContext = GrpcSslContexts.forServer(
new File("server.crt"),
new File("server.key")
).build();
Server server = NettyServerBuilder.forPort(50051)
.sslContext(sslContext)
.addService(new UserServiceImpl())
.build()
.start();
System.out.println("Secure gRPC server listening on port 50051");
// Client with TLS
import io.grpc.netty.NettyChannelBuilder;
SslContext clientSsl = GrpcSslContexts.forClient()
.trustManager(new File("ca.crt"))
.build();
ManagedChannel channel = NettyChannelBuilder
.forAddress("secure-grpc-service.example.com", 50051)
.sslContext(clientSsl)
.build();# src/secure_server.py
import grpc
from concurrent import futures
with open("server.key", "rb") as f:
private_key = f.read()
with open("server.crt", "rb") as f:
certificate_chain = f.read()
server_credentials = grpc.ssl_server_credentials(
[(private_key, certificate_chain)]
)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_secure_port("0.0.0.0:50051", server_credentials)
server.start()
print("Secure gRPC server listening on port 50051")
# Client with TLS
with open("ca.crt", "rb") as f:
trusted_certs = f.read()
client_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel(
"secure-grpc-service.example.com:50051", client_credentials
)
stub = users_pb2_grpc.UserServiceStub(channel)// Server with TLS in Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenAnyIP(50051, listenOptions =>
{
listenOptions.Protocols = HttpProtocols.Http2;
listenOptions.UseHttps("server.pfx", "password");
});
});
var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
app.Run();
// Client with TLS
using Grpc.Net.Client;
var channel = GrpcChannel.ForAddress("https://secure-grpc-service.example.com:50051",
new GrpcChannelOptions
{
HttpHandler = new HttpClientHandler
{
// Optionally: provide custom CA certificate
// ServerCertificateCustomValidationCallback = ...
}
});
var client = new UserService.UserServiceClient(channel);Health Checks
// src/health.ts
import * as grpc from '@grpc/grpc-js';
import { Health } from './generated/health';
export const healthImpl: any = {
check(
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) {
callback(null, { status: Health.ServingStatus.SERVING });
},
watch(call: grpc.ServerWritableStream<any, any>) {
call.write({ status: Health.ServingStatus.SERVING });
call.end();
},
};// src/main/java/com/example/grpc/HealthServiceImpl.java
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import io.grpc.protobuf.services.HealthStatusManager;
// Use built-in gRPC health service:
HealthStatusManager healthStatusManager = new HealthStatusManager();
Server server = ServerBuilder.forPort(50051)
.addService(new UserServiceImpl())
.addService(healthStatusManager.getHealthService())
.build();
// Set status for a specific service
healthStatusManager.setStatus("users.UserService",
HealthCheckResponse.ServingStatus.SERVING);# src/health.py
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
from concurrent import futures
import grpc
health_servicer = health.HealthServicer()
health_servicer.set("users.UserService", health_pb2.HealthCheckResponse.SERVING)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_insecure_port("0.0.0.0:50051")
server.start()// src/Services/HealthService.cs
// ASP.NET Core has built-in gRPC health checks via Grpc.AspNetCore.HealthChecks
// In Program.cs:
builder.Services.AddGrpcHealthChecks()
.AddCheck("UserService", () => HealthCheckResult.Healthy());
app.MapGrpcHealthChecksService();
// Or implement manually:
using Grpc.Health.V1;
using Grpc.HealthCheck;
public class HealthServiceImpl : Health.HealthBase
{
public override Task<HealthCheckResponse> Check(
HealthCheckRequest request, ServerCallContext context)
{
return Task.FromResult(new HealthCheckResponse
{
Status = HealthCheckResponse.Types.ServingStatus.Serving
});
}
public override async Task Watch(
HealthCheckRequest request,
IServerStreamWriter<HealthCheckResponse> responseStream,
ServerCallContext context)
{
await responseStream.WriteAsync(new HealthCheckResponse
{
Status = HealthCheckResponse.Types.ServingStatus.Serving
});
}
}Container and Orchestration
# Dockerfile
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY src ./src
COPY generated ./generated
EXPOSE 50051 9090
HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=3 \
CMD node -e "require('http').get('http://localhost:9090/health', (r) => {if (r.statusCode !== 200) throw new Error(r.statusCode)})"
CMD ["node", "src/server.ts"]
# kubernetes.yaml
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- name: grpc
port: 50051
targetPort: 50051
- name: metrics
port: 9090
targetPort: 9090
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- name: grpc
containerPort: 50051
- name: metrics
containerPort: 9090
livenessProbe:
grpc:
port: 50051
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 50051
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
Monitoring and Observability
// src/metrics.ts
import * as promClient from 'prom-client';
import * as grpc from '@grpc/grpc-js';
const grpcRequestDuration = new promClient.Histogram({
name: 'grpc_request_duration_ms',
help: 'Duration of gRPC requests in milliseconds',
labelNames: ['method', 'status'],
buckets: [1, 5, 10, 50, 100, 500, 1000, 5000],
});
const grpcRequestCount = new promClient.Counter({
name: 'grpc_requests_total',
help: 'Total gRPC requests',
labelNames: ['method', 'status'],
});
export function metricsInterceptor() {
return (methodDescriptor: any, createCall: any) => {
return (metadata: grpc.Metadata, listener: any, next: any) => {
const startTime = Date.now();
const method = methodDescriptor.path;
const wrappedListener: any = {
onReceiveMetadata(metadata: grpc.Metadata) {
listener.onReceiveMetadata(metadata);
},
onReceiveMessage(message: any) {
listener.onReceiveMessage(message);
},
onReceiveStatus(status: grpc.Status) {
const duration = Date.now() - startTime;
const statusName = grpc.status[status.code];
grpcRequestDuration
.labels(method, statusName)
.observe(duration);
grpcRequestCount
.labels(method, statusName)
.inc();
listener.onReceiveStatus(status);
},
};
return next(metadata, wrappedListener);
};
};
}// src/main/java/com/example/grpc/MetricsInterceptor.java
package com.example.grpc;
import io.grpc.*;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
public class MetricsInterceptor implements ServerInterceptor {
private final MeterRegistry registry;
public MetricsInterceptor(MeterRegistry registry) {
this.registry = registry;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String method = call.getMethodDescriptor().getFullMethodName();
long startTime = System.currentTimeMillis();
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
@Override
public void close(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
String statusCode = status.getCode().name();
registry.timer("grpc.request.duration",
"method", method, "status", statusCode)
.record(duration, java.util.concurrent.TimeUnit.MILLISECONDS);
registry.counter("grpc.requests.total",
"method", method, "status", statusCode)
.increment();
super.close(status, trailers);
}
}, headers)) {};
}
}# src/metrics.py
import time
from prometheus_client import Histogram, Counter
grpc_request_duration = Histogram(
"grpc_request_duration_ms",
"Duration of gRPC requests in milliseconds",
["method", "status"],
buckets=[1, 5, 10, 50, 100, 500, 1000, 5000],
)
grpc_request_count = Counter(
"grpc_requests_total",
"Total gRPC requests",
["method", "status"],
)
class MetricsServerInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
handler = continuation(handler_call_details)
if handler is None:
return None
def wrap_unary(behavior):
def new_behavior(request, context):
start = time.time()
status = "OK"
try:
result = behavior(request, context)
return result
except Exception as e:
status = "INTERNAL"
raise
finally:
duration = (time.time() - start) * 1000
grpc_request_duration.labels(method=method, status=status).observe(duration)
grpc_request_count.labels(method=method, status=status).inc()
return new_behavior
if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
wrap_unary(handler.unary_unary),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler// src/Interceptors/MetricsInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;
using Prometheus;
namespace GrpcServer.Interceptors;
public class MetricsInterceptor : Interceptor
{
private static readonly Histogram GrpcRequestDuration = Metrics.CreateHistogram(
"grpc_request_duration_ms",
"Duration of gRPC requests in milliseconds",
new HistogramConfiguration
{
LabelNames = new[] { "method", "status" },
Buckets = new double[] { 1, 5, 10, 50, 100, 500, 1000, 5000 },
});
private static readonly Counter GrpcRequestCount = Metrics.CreateCounter(
"grpc_requests_total",
"Total gRPC requests",
new CounterConfiguration { LabelNames = new[] { "method", "status" } });
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var method = context.Method;
var startTime = DateTimeOffset.UtcNow;
var statusCode = "OK";
try
{
var response = await continuation(request, context);
return response;
}
catch (RpcException e)
{
statusCode = e.StatusCode.ToString();
throw;
}
finally
{
var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
GrpcRequestDuration.WithLabels(method, statusCode).Observe(duration);
GrpcRequestCount.WithLabels(method, statusCode).Inc();
}
}
}
// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<MetricsInterceptor>());Conclusion
gRPC is a powerful technology for building high-performance, distributed systems. By leveraging Protocol Buffers and HTTP/2, it delivers exceptional efficiency and developer experience. Follow the patterns and practices in this guide to build production-ready gRPC services that scale reliably.
Key takeaways:
- Define clear service contracts with
.protofiles - Choose the right communication pattern for your use case
- Implement comprehensive error handling with status codes
- Use interceptors for cross-cutting concerns
- Monitor and observe all production services
- Design for graceful degradation and failure modes
For more information, visit the official gRPC documentation.