All articles
Architecture Backend gRPC Microservices

gRPC Communication Server-to-Server Communication Technologies

Palakorn Voramongkol
April 16, 2025 14 min read

“A comprehensive guide to gRPC for server-to-server communication — covering Protocol Buffers, service definitions, streaming patterns, interceptors, error handling, load balancing, and production deployment.”

Deep Dive: gRPC Communication

gRPC has become a cornerstone technology for building high-performance, scalable microservices. Unlike REST APIs that rely on JSON over HTTP/1.1, gRPC leverages Protocol Buffers and HTTP/2 to deliver exceptional speed, efficiency, and developer experience. This guide covers everything you need to know about gRPC for server-to-server communication.

What is gRPC?

gRPC (gRPC Remote Procedure Call) is Google’s open-source framework for building high-performance RPC systems. It abstracts away the complexity of distributed systems and allows you to define services and messages in a language-agnostic way using Protocol Buffers.

Key Characteristics

  • Binary Serialization: Uses Protocol Buffers instead of JSON, reducing payload size by 3-10x
  • HTTP/2 Transport: Enables multiplexing, server push, and header compression
  • Code Generation: Automatically generates client and server code from .proto definitions
  • Streaming Support: Native support for bidirectional streaming over a single connection
  • Type Safety: Strong typing across language boundaries
  • Cross-Language: Works seamlessly across 10+ programming languages

Communication Flow

Below is a sequence diagram showing how a unary gRPC call flows between two services:

sequenceDiagram
    participant Client as Service A<br/>(gRPC Client)
    participant Transport as HTTP/2<br/>Network
    participant Server as Service B<br/>(gRPC Server)
    
    Client->>Client: 1. Create request message<br/>(UserRequest proto)
    Client->>Client: 2. Serialize to protobuf<br/>(binary format)
    Client->>Transport: 3. Send HTTP/2 frame<br/>(method: GetUser)
    Transport->>Server: 4. Receive frame
    Server->>Server: 5. Deserialize message
    Server->>Server: 6. Invoke handler logic
    Server->>Server: 7. Serialize response<br/>(UserResponse proto)
    Server->>Transport: 8. Send HTTP/2 frame
    Transport->>Client: 9. Receive frame
    Client->>Client: 10. Deserialize response
    Client->>Client: 11. Return typed object

Core Principles

  • Efficiency: Binary format and HTTP/2 multiplexing minimize latency and bandwidth
  • Simplicity: Declarative service definitions eliminate boilerplate
  • Reliability: Strong typing and explicit error handling reduce runtime surprises
  • Scalability: Streaming and multiplexing handle thousands of concurrent connections
  • Interoperability: Language-agnostic design enables polyglot architectures

Protocol Buffers: The Foundation

Protocol Buffers (protobuf) are the serialization format underlying gRPC. They provide a compact, language-neutral mechanism for serializing structured data.

Defining Messages

// user.proto
syntax = "proto3";

package users;

// Simple scalar types
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  bool is_active = 4;
  double account_balance = 5;
  
  // Timestamps use google.protobuf types
  google.protobuf.Timestamp created_at = 6;
  
  // Repeated fields (arrays)
  repeated string tags = 7;
}

// Nested messages
message Address {
  string street = 1;
  string city = 2;
  string postal_code = 3;
  Country country = 4;
}

message UserProfile {
  User user = 1;
  Address address = 2;
  repeated PhoneNumber phone_numbers = 3;
}

message PhoneNumber {
  enum Type {
    TYPE_UNSPECIFIED = 0;
    MOBILE = 1;
    HOME = 2;
    WORK = 3;
  }
  
  Type type = 1;
  string number = 2;
}

// Enums for type-safe options
enum Status {
  STATUS_UNSPECIFIED = 0;
  ACTIVE = 1;
  INACTIVE = 2;
  SUSPENDED = 3;
}

// Oneof for mutually exclusive fields
message Contact {
  string name = 1;
  oneof contact_info {
    string email = 2;
    string phone = 3;
    string website = 4;
  }
}

Scalar Types Reference

TypeSizeUse Case
int32 / int644 / 8 bytesIntegers (use sint32 for negative numbers)
float / double4 / 8 bytesDecimals
bool1 byteBooleans
stringVariableUTF-8 strings
bytesVariableBinary data
google.protobuf.Timestamp12 bytesTimestamps
google.protobuf.Duration12 bytesTime durations

Service Definitions and Code Generation

Defining gRPC Services

// user_service.proto
syntax = "proto3";

package users;

import "google/protobuf/empty.proto";

message GetUserRequest {
  int32 user_id = 1;
}

message GetUserResponse {
  User user = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total_count = 2;
}

message DeleteUserRequest {
  int32 user_id = 1;
}

service UserService {
  // Unary RPC: single request, single response
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (GetUserResponse);
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
  
  // Server streaming: single request, stream of responses
  rpc ListUsers(ListUsersRequest) returns (stream GetUserResponse);
  
  // Client streaming: stream of requests, single response
  rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
  
  // Bidirectional streaming: stream of requests and responses
  rpc SyncUsers(stream SyncUserRequest) returns (stream SyncUserResponse);
}

Generating Code

# Install protoc compiler
# macOS: brew install protobuf
# Linux: apt-get install protobuf-compiler
# Windows: choco install protoc

# Install Node.js/TypeScript plugins
npm install --save-dev grpc-tools @grpc/grpc-js @grpc/proto-loader

# Generate TypeScript code
protoc \
  --plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
  --ts_proto_out=./src/generated \
  --ts_proto_opt=outputServices=grpc-js \
  --ts_proto_opt=useExactProtoTypes=true \
  user_service.proto user.proto

Implementing a gRPC Server

Node.js/TypeScript Server

// src/server.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceService, IUserServiceServer } from './generated/user_service';
import { User, GetUserResponse, ListUsersResponse } from './generated/user';
import { GetUserRequest, CreateUserRequest, ListUsersRequest } from './generated/user_service';

// Mock database
const usersDb = new Map<number, any>([
  [1, { id: 1, name: 'Alice', email: 'alice@example.com', isActive: true }],
  [2, { id: 2, name: 'Bob', email: 'bob@example.com', isActive: true }],
]);

let nextUserId = 3;

// Implement service handlers
const userServiceImpl: IUserServiceServer = {
  // Unary RPC
  async getUser(
    call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
    callback: grpc.sendUnaryData<GetUserResponse>
  ) {
    const { userId } = call.request;
    const user = usersDb.get(userId);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: `User ${userId} not found`,
      });
    }

    callback(null, { user });
  },

  // Unary RPC with validation
  async createUser(
    call: grpc.ServerUnaryCall<CreateUserRequest, GetUserResponse>,
    callback: grpc.sendUnaryData<GetUserResponse>
  ) {
    const { name, email } = call.request;

    if (!name || !email) {
      return callback({
        code: grpc.status.INVALID_ARGUMENT,
        message: 'Name and email are required',
      });
    }

    const user = {
      id: nextUserId++,
      name,
      email,
      isActive: true,
      createdAt: new Date(),
    };

    usersDb.set(user.id, user);
    callback(null, { user });
  },

  // Server streaming
  async listUsers(call: grpc.ServerWritableStream<ListUsersRequest, GetUserResponse>) {
    const { page = 1, pageSize = 10 } = call.request;
    const users = Array.from(usersDb.values());
    const start = (page - 1) * pageSize;
    const paginatedUsers = users.slice(start, start + pageSize);

    for (const user of paginatedUsers) {
      call.write({ user });
    }

    call.end();
  },

  // Client streaming
  async batchCreateUsers(
    call: grpc.ServerReadableStream<CreateUserRequest, ListUsersResponse>,
    callback: grpc.sendUnaryData<ListUsersResponse>
  ) {
    const users: any[] = [];

    call.on('data', (request: CreateUserRequest) => {
      const user = {
        id: nextUserId++,
        name: request.name,
        email: request.email,
        isActive: true,
      };
      users.push(user);
      usersDb.set(user.id, user);
    });

    call.on('end', () => {
      callback(null, { users, totalCount: users.length });
    });

    call.on('error', (err) => {
      console.error('Stream error:', err);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Stream error',
      });
    });
  },

  // Bidirectional streaming
  async syncUsers(call: grpc.ServerDuplexStream<any, any>) {
    call.on('data', (request: any) => {
      const user = {
        id: request.userId || nextUserId++,
        name: request.name,
        email: request.email,
        isActive: true,
      };

      usersDb.set(user.id, user);
      call.write({
        userId: user.id,
        status: 'SYNCED',
        message: `User ${user.name} synced successfully`,
      });
    });

    call.on('end', () => {
      call.end();
    });

    call.on('error', (err) => {
      console.error('Sync error:', err);
    });
  },

  async deleteUser(
    call: grpc.ServerUnaryCall<any, any>,
    callback: grpc.sendUnaryData<any>
  ) {
    const { userId } = call.request;

    if (usersDb.has(userId)) {
      usersDb.delete(userId);
      return callback(null, {});
    }

    callback({
      code: grpc.status.NOT_FOUND,
      message: `User ${userId} not found`,
    });
  },
};

// Create and start server
function startServer() {
  const server = new grpc.Server();

  server.addService(UserServiceService, userServiceImpl);

  const PORT = process.env.PORT || 50051;
  server.bindAsync(`0.0.0.0:${PORT}`, grpc.ServerCredentials.createInsecure(), (err) => {
    if (err) {
      console.error('Failed to bind server:', err);
      process.exit(1);
    }

    server.start();
    console.log(`gRPC server running on port ${PORT}`);
  });
}

startServer();
// src/main/java/com/example/grpc/UserServiceServer.java
package com.example.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class UserServiceServer {

    private static final Map<Integer, User> usersDb = new ConcurrentHashMap<>();
    private static final AtomicInteger nextUserId = new AtomicInteger(3);

    static {
        usersDb.put(1, User.newBuilder().setId(1).setName("Alice").setEmail("alice@example.com").setIsActive(true).build());
        usersDb.put(2, User.newBuilder().setId(2).setName("Bob").setEmail("bob@example.com").setIsActive(true).build());
    }

    static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {

        @Override
        public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
            int userId = request.getUserId();
            User user = usersDb.get(userId);

            if (user == null) {
                responseObserver.onError(Status.NOT_FOUND
                    .withDescription("User " + userId + " not found")
                    .asRuntimeException());
                return;
            }

            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();
        }

        @Override
        public void createUser(CreateUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
            if (request.getName().isEmpty() || request.getEmail().isEmpty()) {
                responseObserver.onError(Status.INVALID_ARGUMENT
                    .withDescription("Name and email are required")
                    .asRuntimeException());
                return;
            }

            int id = nextUserId.getAndIncrement();
            User user = User.newBuilder()
                .setId(id)
                .setName(request.getName())
                .setEmail(request.getEmail())
                .setIsActive(true)
                .build();

            usersDb.put(id, user);
            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();
        }

        @Override
        public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
            int page = request.getPage() > 0 ? request.getPage() : 1;
            int pageSize = request.getPageSize() > 0 ? request.getPageSize() : 10;
            List<User> users = new ArrayList<>(usersDb.values());
            int start = (page - 1) * pageSize;
            int end = Math.min(start + pageSize, users.size());

            for (int i = start; i < end; i++) {
                responseObserver.onNext(GetUserResponse.newBuilder().setUser(users.get(i)).build());
            }
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
            List<User> createdUsers = new ArrayList<>();

            return new StreamObserver<>() {
                @Override
                public void onNext(CreateUserRequest request) {
                    int id = nextUserId.getAndIncrement();
                    User user = User.newBuilder()
                        .setId(id)
                        .setName(request.getName())
                        .setEmail(request.getEmail())
                        .setIsActive(true)
                        .build();
                    createdUsers.add(user);
                    usersDb.put(id, user);
                }

                @Override
                public void onError(Throwable t) {
                    responseObserver.onError(Status.INTERNAL.withDescription("Stream error").asRuntimeException());
                }

                @Override
                public void onCompleted() {
                    responseObserver.onNext(ListUsersResponse.newBuilder()
                        .addAllUsers(createdUsers)
                        .setTotalCount(createdUsers.size())
                        .build());
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
            return new StreamObserver<>() {
                @Override
                public void onNext(SyncUserRequest request) {
                    int id = request.getUserId() > 0 ? request.getUserId() : nextUserId.getAndIncrement();
                    User user = User.newBuilder()
                        .setId(id).setName(request.getName()).setEmail(request.getEmail()).setIsActive(true)
                        .build();
                    usersDb.put(id, user);
                    responseObserver.onNext(SyncUserResponse.newBuilder()
                        .setUserId(id).setStatus("SYNCED")
                        .setMessage("User " + user.getName() + " synced successfully")
                        .build());
                }

                @Override public void onError(Throwable t) { System.err.println("Sync error: " + t.getMessage()); }
                @Override public void onCompleted() { responseObserver.onCompleted(); }
            };
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "50051"));
        Server server = ServerBuilder.forPort(port)
            .addService(new UserServiceImpl())
            .build()
            .start();

        System.out.println("gRPC server running on port " + port);
        server.awaitTermination();
    }
}
# src/server.py
import grpc
import users_pb2
import users_pb2_grpc
from concurrent import futures
import os

# Mock database
users_db: dict[int, dict] = {
    1: {"id": 1, "name": "Alice", "email": "alice@example.com", "is_active": True},
    2: {"id": 2, "name": "Bob", "email": "bob@example.com", "is_active": True},
}
next_user_id = 3


class UserServiceServicer(users_pb2_grpc.UserServiceServicer):

    def GetUser(self, request, context):
        user_data = users_db.get(request.user_id)
        if not user_data:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
        user = users_pb2.User(**user_data)
        return users_pb2.GetUserResponse(user=user)

    def CreateUser(self, request, context):
        global next_user_id
        if not request.name or not request.email:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Name and email are required")
        user_data = {"id": next_user_id, "name": request.name, "email": request.email, "is_active": True}
        users_db[next_user_id] = user_data
        next_user_id += 1
        return users_pb2.GetUserResponse(user=users_pb2.User(**user_data))

    def ListUsers(self, request, context):
        page = request.page or 1
        page_size = request.page_size or 10
        users = list(users_db.values())
        start = (page - 1) * page_size
        for u in users[start:start + page_size]:
            yield users_pb2.GetUserResponse(user=users_pb2.User(**u))

    def BatchCreateUsers(self, request_iterator, context):
        global next_user_id
        created = []
        for req in request_iterator:
            user_data = {"id": next_user_id, "name": req.name, "email": req.email, "is_active": True}
            users_db[next_user_id] = user_data
            next_user_id += 1
            created.append(users_pb2.User(**user_data))
        return users_pb2.ListUsersResponse(users=created, total_count=len(created))

    def SyncUsers(self, request_iterator, context):
        global next_user_id
        for req in request_iterator:
            uid = req.user_id or next_user_id
            user_data = {"id": uid, "name": req.name, "email": req.email, "is_active": True}
            users_db[uid] = user_data
            if uid == next_user_id:
                next_user_id += 1
            yield users_pb2.SyncUserResponse(
                user_id=uid, status="SYNCED",
                message=f"User {req.name} synced successfully"
            )

    def DeleteUser(self, request, context):
        if request.user_id in users_db:
            del users_db[request.user_id]
            return users_pb2.Empty()
        context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")


def serve():
    port = os.getenv("PORT", "50051")
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
    server.add_insecure_port(f"0.0.0.0:{port}")
    server.start()
    print(f"gRPC server running on port {port}")
    server.wait_for_termination()


if __name__ == "__main__":
    serve()
// src/Services/UserService.cs
using Grpc.Core;
using Users;

namespace GrpcServer.Services;

public class UserServiceImpl : UserService.UserServiceBase
{
    private static readonly Dictionary<int, User> UsersDb = new()
    {
        [1] = new User { Id = 1, Name = "Alice", Email = "alice@example.com", IsActive = true },
        [2] = new User { Id = 2, Name = "Bob", Email = "bob@example.com", IsActive = true },
    };
    private static int _nextUserId = 3;

    public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
    {
        if (!UsersDb.TryGetValue(request.UserId, out var user))
            throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));

        return Task.FromResult(new GetUserResponse { User = user });
    }

    public override Task<GetUserResponse> CreateUser(CreateUserRequest request, ServerCallContext context)
    {
        if (string.IsNullOrEmpty(request.Name) || string.IsNullOrEmpty(request.Email))
            throw new RpcException(new Status(StatusCode.InvalidArgument, "Name and email are required"));

        var user = new User { Id = _nextUserId++, Name = request.Name, Email = request.Email, IsActive = true };
        UsersDb[user.Id] = user;
        return Task.FromResult(new GetUserResponse { User = user });
    }

    public override async Task ListUsers(ListUsersRequest request, IServerStreamWriter<GetUserResponse> responseStream, ServerCallContext context)
    {
        int page = request.Page > 0 ? request.Page : 1;
        int pageSize = request.PageSize > 0 ? request.PageSize : 10;
        var users = UsersDb.Values.Skip((page - 1) * pageSize).Take(pageSize);

        foreach (var user in users)
            await responseStream.WriteAsync(new GetUserResponse { User = user });
    }

    public override async Task<ListUsersResponse> BatchCreateUsers(
        IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
    {
        var created = new List<User>();
        await foreach (var req in requestStream.ReadAllAsync())
        {
            var user = new User { Id = _nextUserId++, Name = req.Name, Email = req.Email, IsActive = true };
            UsersDb[user.Id] = user;
            created.Add(user);
        }
        var response = new ListUsersResponse { TotalCount = created.Count };
        response.Users.AddRange(created);
        return response;
    }

    public override async Task SyncUsers(
        IAsyncStreamReader<SyncUserRequest> requestStream,
        IServerStreamWriter<SyncUserResponse> responseStream,
        ServerCallContext context)
    {
        await foreach (var req in requestStream.ReadAllAsync())
        {
            int uid = req.UserId > 0 ? req.UserId : _nextUserId++;
            UsersDb[uid] = new User { Id = uid, Name = req.Name, Email = req.Email, IsActive = true };
            await responseStream.WriteAsync(new SyncUserResponse
            {
                UserId = uid, Status = "SYNCED",
                Message = $"User {req.Name} synced successfully"
            });
        }
    }

    public override Task<Empty> DeleteUser(DeleteUserRequest request, ServerCallContext context)
    {
        if (!UsersDb.Remove(request.UserId))
            throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));

        return Task.FromResult(new Empty());
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
int port = int.Parse(Environment.GetEnvironmentVariable("PORT") ?? "50051");
app.Run($"http://0.0.0.0:{port}");

Implementing a gRPC Client

Basic Client Implementation

// src/client.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceClient } from './generated/user_service';

class UserServiceClient {
  private client: UserServiceClient;

  constructor(address: string = 'localhost:50051') {
    this.client = new UserServiceClient(
      address,
      grpc.credentials.createInsecure()
    );
  }

  // Unary call
  async getUser(userId: number): Promise<any> {
    return new Promise((resolve, reject) => {
      this.client.getUser({ userId }, (err, response) => {
        if (err) reject(err);
        else resolve(response?.user);
      });
    });
  }

  // Server streaming
  async listUsers(page: number = 1, pageSize: number = 10): Promise<any[]> {
    return new Promise((resolve, reject) => {
      const users: any[] = [];
      const call = this.client.listUsers({ page, pageSize });

      call.on('data', (response) => {
        users.push(response.user);
      });

      call.on('end', () => {
        resolve(users);
      });

      call.on('error', reject);
    });
  }

  // Client streaming
  async batchCreateUsers(requests: any[]): Promise<any> {
    return new Promise((resolve, reject) => {
      const call = this.client.batchCreateUsers((err, response) => {
        if (err) reject(err);
        else resolve(response);
      });

      for (const request of requests) {
        call.write(request);
      }

      call.end();
    });
  }

  // Bidirectional streaming
  async syncUsers(userUpdates: any[]): Promise<void> {
    return new Promise((resolve, reject) => {
      const call = this.client.syncUsers();

      call.on('data', (response) => {
        console.log('Sync response:', response);
      });

      call.on('end', resolve);
      call.on('error', reject);

      for (const update of userUpdates) {
        call.write(update);
      }

      call.end();
    });
  }

  close() {
    grpc.closeClient(this.client);
  }
}

// Usage
async function main() {
  const client = new UserServiceClient();

  try {
    const user = await client.getUser(1);
    console.log('User:', user);

    const users = await client.listUsers(1, 5);
    console.log('Users:', users);
  } catch (error) {
    console.error('Error:', error);
  } finally {
    client.close();
  }
}

main();
// src/main/java/com/example/grpc/UserServiceClientExample.java
package com.example.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class UserServiceClientExample {

    private final ManagedChannel channel;
    private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
    private final UserServiceGrpc.UserServiceStub asyncStub;

    public UserServiceClientExample(String address) {
        channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
        blockingStub = UserServiceGrpc.newBlockingStub(channel);
        asyncStub = UserServiceGrpc.newStub(channel);
    }

    // Unary call
    public User getUser(int userId) {
        GetUserResponse response = blockingStub.getUser(
            GetUserRequest.newBuilder().setUserId(userId).build()
        );
        return response.getUser();
    }

    // Server streaming
    public List<User> listUsers(int page, int pageSize) {
        List<User> users = new ArrayList<>();
        blockingStub.listUsers(ListUsersRequest.newBuilder().setPage(page).setPageSize(pageSize).build())
            .forEachRemaining(r -> users.add(r.getUser()));
        return users;
    }

    // Client streaming
    public ListUsersResponse batchCreateUsers(List<CreateUserRequest> requests) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        List<ListUsersResponse> result = new ArrayList<>();

        StreamObserver<CreateUserRequest> requestObserver = asyncStub.batchCreateUsers(
            new StreamObserver<>() {
                @Override public void onNext(ListUsersResponse r) { result.add(r); }
                @Override public void onError(Throwable t) { latch.countDown(); }
                @Override public void onCompleted() { latch.countDown(); }
            }
        );

        for (CreateUserRequest req : requests) requestObserver.onNext(req);
        requestObserver.onCompleted();
        latch.await(30, TimeUnit.SECONDS);
        return result.isEmpty() ? null : result.get(0);
    }

    // Bidirectional streaming
    public void syncUsers(List<SyncUserRequest> updates) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        StreamObserver<SyncUserRequest> requestObserver = asyncStub.syncUsers(
            new StreamObserver<>() {
                @Override public void onNext(SyncUserResponse r) { System.out.println("Sync response: " + r); }
                @Override public void onError(Throwable t) { latch.countDown(); }
                @Override public void onCompleted() { latch.countDown(); }
            }
        );

        for (SyncUserRequest update : updates) requestObserver.onNext(update);
        requestObserver.onCompleted();
        latch.await(30, TimeUnit.SECONDS);
    }

    public void close() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException {
        UserServiceClientExample client = new UserServiceClientExample("localhost:50051");
        try {
            User user = client.getUser(1);
            System.out.println("User: " + user);

            List<User> users = client.listUsers(1, 5);
            System.out.println("Users: " + users);
        } finally {
            client.close();
        }
    }
}
# src/client.py
import grpc
import users_pb2
import users_pb2_grpc
from typing import Iterator


class UserServiceClient:
    def __init__(self, address: str = "localhost:50051"):
        self.channel = grpc.insecure_channel(address)
        self.stub = users_pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: int):
        response = self.stub.GetUser(users_pb2.GetUserRequest(user_id=user_id))
        return response.user

    def list_users(self, page: int = 1, page_size: int = 10) -> list:
        users = []
        for response in self.stub.ListUsers(
            users_pb2.ListUsersRequest(page=page, page_size=page_size)
        ):
            users.append(response.user)
        return users

    def batch_create_users(self, requests: list[dict]):
        def request_iterator() -> Iterator[users_pb2.CreateUserRequest]:
            for r in requests:
                yield users_pb2.CreateUserRequest(name=r["name"], email=r["email"])

        return self.stub.BatchCreateUsers(request_iterator())

    def sync_users(self, updates: list[dict]) -> None:
        def request_iterator() -> Iterator[users_pb2.SyncUserRequest]:
            for u in updates:
                yield users_pb2.SyncUserRequest(
                    user_id=u.get("user_id", 0), name=u["name"], email=u["email"]
                )

        for response in self.stub.SyncUsers(request_iterator()):
            print("Sync response:", response)

    def close(self):
        self.channel.close()


def main():
    client = UserServiceClient()
    try:
        user = client.get_user(1)
        print("User:", user)

        users = client.list_users(1, 5)
        print("Users:", users)
    except grpc.RpcError as e:
        print("Error:", e)
    finally:
        client.close()


if __name__ == "__main__":
    main()
// src/Clients/UserServiceClient.cs
using Grpc.Net.Client;
using Users;

namespace GrpcClient;

public class UserServiceClientExample : IDisposable
{
    private readonly GrpcChannel _channel;
    private readonly UserService.UserServiceClient _client;

    public UserServiceClientExample(string address = "http://localhost:50051")
    {
        _channel = GrpcChannel.ForAddress(address);
        _client = new UserService.UserServiceClient(_channel);
    }

    // Unary call
    public async Task<User> GetUserAsync(int userId)
    {
        var response = await _client.GetUserAsync(new GetUserRequest { UserId = userId });
        return response.User;
    }

    // Server streaming
    public async Task<List<User>> ListUsersAsync(int page = 1, int pageSize = 10)
    {
        var users = new List<User>();
        using var call = _client.ListUsers(new ListUsersRequest { Page = page, PageSize = pageSize });
        await foreach (var response in call.ResponseStream.ReadAllAsync())
            users.Add(response.User);
        return users;
    }

    // Client streaming
    public async Task<ListUsersResponse> BatchCreateUsersAsync(IEnumerable<(string Name, string Email)> requests)
    {
        using var call = _client.BatchCreateUsers();
        foreach (var (name, email) in requests)
            await call.RequestStream.WriteAsync(new CreateUserRequest { Name = name, Email = email });
        await call.RequestStream.CompleteAsync();
        return await call.ResponseAsync;
    }

    // Bidirectional streaming
    public async Task SyncUsersAsync(IEnumerable<(int UserId, string Name, string Email)> updates)
    {
        using var call = _client.SyncUsers();
        var readTask = Task.Run(async () =>
        {
            await foreach (var response in call.ResponseStream.ReadAllAsync())
                Console.WriteLine($"Sync response: {response}");
        });

        foreach (var (userId, name, email) in updates)
            await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = userId, Name = name, Email = email });

        await call.RequestStream.CompleteAsync();
        await readTask;
    }

    public void Dispose() => _channel.Dispose();
}

// Program.cs usage
using var client = new UserServiceClientExample();

var user = await client.GetUserAsync(1);
Console.WriteLine($"User: {user}");

var users = await client.ListUsersAsync(1, 5);
Console.WriteLine($"Users: {users.Count}");

The Four Communication Patterns

1. Unary RPC (Single Request/Response)

The most common pattern, equivalent to a traditional function call:

// Handler
async getUser(call, callback) {
  const user = await fetchUser(call.request.userId);
  callback(null, { user });
}

// Client
const user = await client.getUser({ userId: 1 });
// Handler
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
    User user = fetchUser(request.getUserId());
    responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
    responseObserver.onCompleted();
}

// Client (blocking stub)
GetUserResponse response = blockingStub.getUser(
    GetUserRequest.newBuilder().setUserId(1).build()
);
User user = response.getUser();
# Handler
def GetUser(self, request, context):
    user = fetch_user(request.user_id)
    return GetUserResponse(user=user)

# Client
response = stub.GetUser(GetUserRequest(user_id=1))
user = response.user
// Handler
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
    var user = FetchUser(request.UserId);
    return Task.FromResult(new GetUserResponse { User = user });
}

// Client
var response = await client.GetUserAsync(new GetUserRequest { UserId = 1 });
var user = response.User;

Use Cases: Fetching a single resource, creating resources with validation, simple updates.

2. Server Streaming RPC

Server sends multiple responses to a single client request:

// Handler
async listUsers(call) {
  const users = await fetchAllUsers();
  
  for (const user of users) {
    call.write({ user });
  }
  
  call.end();
}

// Client
const call = client.listUsers({ pageSize: 10 });

call.on('data', (response) => {
  console.log('Received user:', response.user);
});

call.on('end', () => {
  console.log('Stream complete');
});
// Handler
@Override
public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
    List<User> users = fetchAllUsers();
    for (User user : users) {
        responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
    }
    responseObserver.onCompleted();
}

// Client (blocking iterator)
Iterator<GetUserResponse> responses = blockingStub.listUsers(
    ListUsersRequest.newBuilder().setPageSize(10).build()
);
while (responses.hasNext()) {
    System.out.println("Received user: " + responses.next().getUser());
}
System.out.println("Stream complete");
# Handler
def ListUsers(self, request, context):
    users = fetch_all_users()
    for user in users:
        yield GetUserResponse(user=user)

# Client
for response in stub.ListUsers(ListUsersRequest(page_size=10)):
    print("Received user:", response.user)
print("Stream complete")
// Handler
public override async Task ListUsers(
    ListUsersRequest request,
    IServerStreamWriter<GetUserResponse> responseStream,
    ServerCallContext context)
{
    var users = await FetchAllUsersAsync();
    foreach (var user in users)
        await responseStream.WriteAsync(new GetUserResponse { User = user });
}

// Client
using var call = client.ListUsers(new ListUsersRequest { PageSize = 10 });
await foreach (var response in call.ResponseStream.ReadAllAsync())
    Console.WriteLine($"Received user: {response.User}");
Console.WriteLine("Stream complete");

Use Cases: Pagination, real-time data feeds, large dataset exports, log streaming.

3. Client Streaming RPC

Client sends multiple requests to a single server response:

// Handler
async batchCreateUsers(call, callback) {
  const users = [];
  
  call.on('data', (request) => {
    users.push(request);
  });
  
  call.on('end', () => {
    const saved = saveUsers(users);
    callback(null, { users: saved });
  });
}

// Client
const call = client.batchCreateUsers((err, response) => {
  console.log('All users created:', response.users);
});

call.write({ name: 'Alice', email: 'alice@example.com' });
call.write({ name: 'Bob', email: 'bob@example.com' });
call.end();
// Handler
@Override
public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
    List<User> users = new ArrayList<>();
    return new StreamObserver<>() {
        @Override public void onNext(CreateUserRequest req) { users.add(saveUser(req)); }
        @Override public void onError(Throwable t) { responseObserver.onError(t); }
        @Override public void onCompleted() {
            ListUsersResponse resp = ListUsersResponse.newBuilder()
                .addAllUsers(users).setTotalCount(users.size()).build();
            responseObserver.onNext(resp);
            responseObserver.onCompleted();
        }
    };
}

// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<CreateUserRequest> req = asyncStub.batchCreateUsers(new StreamObserver<>() {
    @Override public void onNext(ListUsersResponse r) { System.out.println("All users created: " + r.getUsersList()); }
    @Override public void onError(Throwable t) { latch.countDown(); }
    @Override public void onCompleted() { latch.countDown(); }
});
req.onNext(CreateUserRequest.newBuilder().setName("Alice").setEmail("alice@example.com").build());
req.onNext(CreateUserRequest.newBuilder().setName("Bob").setEmail("bob@example.com").build());
req.onCompleted();
latch.await();
# Handler
def BatchCreateUsers(self, request_iterator, context):
    users = []
    for request in request_iterator:
        users.append(save_user(request))
    return ListUsersResponse(users=users, total_count=len(users))

# Client
def request_iterator():
    yield CreateUserRequest(name="Alice", email="alice@example.com")
    yield CreateUserRequest(name="Bob", email="bob@example.com")

response = stub.BatchCreateUsers(request_iterator())
print("All users created:", list(response.users))
// Handler
public override async Task<ListUsersResponse> BatchCreateUsers(
    IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
{
    var users = new List<User>();
    await foreach (var req in requestStream.ReadAllAsync())
        users.Add(SaveUser(req));

    var response = new ListUsersResponse { TotalCount = users.Count };
    response.Users.AddRange(users);
    return response;
}

// Client
using var call = client.BatchCreateUsers();
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Alice", Email = "alice@example.com" });
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Bob", Email = "bob@example.com" });
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
Console.WriteLine($"All users created: {response.Users.Count}");

Use Cases: Bulk imports, file uploads, batch processing, data synchronization.

4. Bidirectional Streaming RPC

Both client and server send multiple messages over a persistent connection:

// Handler
async syncUsers(call) {
  call.on('data', (request) => {
    const synced = syncUser(request);
    call.write({
      userId: synced.id,
      status: 'SYNCED',
    });
  });
  
  call.on('end', () => {
    call.end();
  });
}

// Client
const call = client.syncUsers();

call.on('data', (response) => {
  console.log('Sync result:', response);
});

call.write({ userId: 1, name: 'Alice Updated' });
call.write({ userId: 2, name: 'Bob Updated' });
call.end();
// Handler
@Override
public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
    return new StreamObserver<>() {
        @Override
        public void onNext(SyncUserRequest req) {
            User synced = syncUser(req);
            responseObserver.onNext(SyncUserResponse.newBuilder()
                .setUserId(synced.getId()).setStatus("SYNCED").build());
        }
        @Override public void onError(Throwable t) { responseObserver.onError(t); }
        @Override public void onCompleted() { responseObserver.onCompleted(); }
    };
}

// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<SyncUserRequest> req = asyncStub.syncUsers(new StreamObserver<>() {
    @Override public void onNext(SyncUserResponse r) { System.out.println("Sync result: " + r); }
    @Override public void onError(Throwable t) { latch.countDown(); }
    @Override public void onCompleted() { latch.countDown(); }
});
req.onNext(SyncUserRequest.newBuilder().setUserId(1).setName("Alice Updated").build());
req.onNext(SyncUserRequest.newBuilder().setUserId(2).setName("Bob Updated").build());
req.onCompleted();
latch.await();
# Handler
def SyncUsers(self, request_iterator, context):
    for request in request_iterator:
        synced = sync_user(request)
        yield SyncUserResponse(user_id=synced.id, status="SYNCED")

# Client
def request_iterator():
    yield SyncUserRequest(user_id=1, name="Alice Updated")
    yield SyncUserRequest(user_id=2, name="Bob Updated")

for response in stub.SyncUsers(request_iterator()):
    print("Sync result:", response)
// Handler
public override async Task SyncUsers(
    IAsyncStreamReader<SyncUserRequest> requestStream,
    IServerStreamWriter<SyncUserResponse> responseStream,
    ServerCallContext context)
{
    await foreach (var req in requestStream.ReadAllAsync())
    {
        var synced = SyncUser(req);
        await responseStream.WriteAsync(new SyncUserResponse { UserId = synced.Id, Status = "SYNCED" });
    }
}

// Client
using var call = client.SyncUsers();
var readTask = Task.Run(async () =>
{
    await foreach (var response in call.ResponseStream.ReadAllAsync())
        Console.WriteLine($"Sync result: {response}");
});

await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 1, Name = "Alice Updated" });
await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 2, Name = "Bob Updated" });
await call.RequestStream.CompleteAsync();
await readTask;

Use Cases: Chat applications, live collaboration, bidirectional notifications, game state sync.

Interceptors and Middleware

Interceptors allow you to intercept and inspect RPC calls, enabling logging, authentication, metrics, and request/response modification.

Server-Side Interceptor

// src/middleware/serverInterceptor.ts
import * as grpc from '@grpc/grpc-js';

interface CallMetadata {
  userId?: string;
  requestId?: string;
  startTime: number;
}

export function createServerInterceptor() {
  return (
    methodDescriptor: any,
    call: any
  ): grpc.Listener => {
    const metadata: CallMetadata = {
      requestId: call.metadata.get('x-request-id')?.[0] as string,
      userId: call.metadata.get('x-user-id')?.[0] as string,
      startTime: Date.now(),
    };

    // Log incoming call
    console.log('[gRPC Server]', {
      method: methodDescriptor.path,
      requestId: metadata.requestId,
      userId: metadata.userId,
    });

    // Wrap call handlers
    const originalSendMetadata = call.sendMetadata;
    call.sendMetadata = function(responseMetadata: grpc.Metadata) {
      responseMetadata.add('x-request-id', metadata.requestId || '');
      originalSendMetadata.call(this, responseMetadata);
    };

    return {
      onReceiveMetadata(metadata: grpc.Metadata, next: any) {
        next(metadata);
      },
      onReceiveMessage(message: any, next: any) {
        console.log('[gRPC Message]', {
          method: methodDescriptor.path,
          requestId: metadata.requestId,
          payload: JSON.stringify(message).substring(0, 200),
        });
        next(message);
      },
      onReceiveHalfClose(next: any) {
        next();
      },
      onCancel(next: any) {
        const duration = Date.now() - metadata.startTime;
        console.log('[gRPC Cancelled]', {
          method: methodDescriptor.path,
          requestId: metadata.requestId,
          duration,
        });
        next();
      },
    };
  };
}
// src/main/java/com/example/grpc/ServerLoggingInterceptor.java
package com.example.grpc;

import io.grpc.*;

public class ServerLoggingInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {

        String requestId = headers.get(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER));
        String userId = headers.get(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER));
        long startTime = System.currentTimeMillis();

        System.out.printf("[gRPC Server] method=%s requestId=%s userId=%s%n",
            call.getMethodDescriptor().getFullMethodName(), requestId, userId);

        ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
            @Override
            public void sendHeaders(Metadata responseHeaders) {
                if (requestId != null) {
                    responseHeaders.put(
                        Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
                }
                super.sendHeaders(responseHeaders);
            }
        };

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(next.startCall(wrappedCall, headers)) {
            @Override
            public void onMessage(ReqT message) {
                System.out.printf("[gRPC Message] method=%s requestId=%s payload=%s%n",
                    call.getMethodDescriptor().getFullMethodName(), requestId,
                    message.toString().substring(0, Math.min(200, message.toString().length())));
                super.onMessage(message);
            }

            @Override
            public void onCancel() {
                long duration = System.currentTimeMillis() - startTime;
                System.out.printf("[gRPC Cancelled] method=%s requestId=%s duration=%dms%n",
                    call.getMethodDescriptor().getFullMethodName(), requestId, duration);
                super.onCancel();
            }
        };
    }
}
# src/interceptors/server_interceptor.py
import grpc
import time
import json
from grpc import ServerInterceptor


class LoggingServerInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        metadata = dict(handler_call_details.invocation_metadata)
        request_id = metadata.get("x-request-id")
        user_id = metadata.get("x-user-id")
        start_time = time.time()

        print(f"[gRPC Server] method={method} requestId={request_id} userId={user_id}")

        handler = continuation(handler_call_details)
        if handler is None:
            return None

        def wrap_unary(behavior):
            def new_behavior(request, context):
                try:
                    payload = str(request)[:200]
                    print(f"[gRPC Message] method={method} requestId={request_id} payload={payload}")
                    return behavior(request, context)
                except Exception as e:
                    duration = (time.time() - start_time) * 1000
                    print(f"[gRPC Error] method={method} requestId={request_id} duration={duration:.0f}ms error={e}")
                    raise
            return new_behavior

        if handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(
                wrap_unary(handler.unary_unary),
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        return handler
// src/Interceptors/ServerLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;

namespace GrpcServer.Interceptors;

public class ServerLoggingInterceptor : Interceptor
{
    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var requestId = context.RequestHeaders.GetValue("x-request-id");
        var userId = context.RequestHeaders.GetValue("x-user-id");
        var startTime = DateTimeOffset.UtcNow;

        Console.WriteLine($"[gRPC Server] method={context.Method} requestId={requestId} userId={userId}");
        Console.WriteLine($"[gRPC Message] method={context.Method} requestId={requestId} payload={request?.ToString()?[..Math.Min(200, request.ToString()?.Length ?? 0)]}");

        context.ResponseTrailers.Add("x-request-id", requestId ?? string.Empty);

        try
        {
            return await continuation(request, context);
        }
        catch (Exception)
        {
            var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
            Console.WriteLine($"[gRPC Cancelled] method={context.Method} requestId={requestId} duration={duration:F0}ms");
            throw;
        }
    }
}

// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<ServerLoggingInterceptor>());

Client-Side Interceptor

// src/middleware/clientInterceptor.ts
import * as grpc from '@grpc/grpc-js';

export interface ClientInterceptorOptions {
  requestId?: string;
  userId?: string;
  authToken?: string;
}

export function createClientInterceptor(options: ClientInterceptorOptions) {
  return (methodDescriptor: any, createCall: any) => {
    return (metadata: grpc.Metadata, listener: any, next: any) => {
      // Add custom metadata
      if (options.requestId) {
        metadata.add('x-request-id', options.requestId);
      }
      if (options.userId) {
        metadata.add('x-user-id', options.userId);
      }
      if (options.authToken) {
        metadata.add('authorization', `Bearer ${options.authToken}`);
      }

      console.log('[gRPC Client]', {
        method: methodDescriptor.path,
        requestId: options.requestId,
      });

      const startTime = Date.now();

      // Create listener wrapper to capture response
      const wrappedListener: any = {
        onReceiveMetadata(metadata: grpc.Metadata) {
          listener.onReceiveMetadata(metadata);
        },
        onReceiveMessage(message: any) {
          console.log('[gRPC Response]', {
            duration: Date.now() - startTime,
            message: JSON.stringify(message).substring(0, 200),
          });
          listener.onReceiveMessage(message);
        },
        onReceiveStatus(status: grpc.Status) {
          const duration = Date.now() - startTime;
          console.log('[gRPC Status]', {
            code: status.code,
            details: status.details,
            duration,
          });
          listener.onReceiveStatus(status);
        },
      };

      return next(metadata, wrappedListener);
    };
  };
}
// src/main/java/com/example/grpc/ClientLoggingInterceptor.java
package com.example.grpc;

import io.grpc.*;

public class ClientLoggingInterceptor implements ClientInterceptor {
    private final String requestId;
    private final String userId;
    private final String authToken;

    public ClientLoggingInterceptor(String requestId, String userId, String authToken) {
        this.requestId = requestId;
        this.userId = userId;
        this.authToken = authToken;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {

        return new ForwardingClientCall.SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                if (requestId != null) headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
                if (userId != null) headers.put(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER), userId);
                if (authToken != null) headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "Bearer " + authToken);

                System.out.printf("[gRPC Client] method=%s requestId=%s%n", method.getFullMethodName(), requestId);
                long startTime = System.currentTimeMillis();

                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
                    @Override
                    public void onMessage(RespT message) {
                        System.out.printf("[gRPC Response] duration=%dms message=%s%n",
                            System.currentTimeMillis() - startTime,
                            message.toString().substring(0, Math.min(200, message.toString().length())));
                        super.onMessage(message);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        System.out.printf("[gRPC Status] code=%s details=%s duration=%dms%n",
                            status.getCode(), status.getDescription(), System.currentTimeMillis() - startTime);
                        super.onClose(status, trailers);
                    }
                }, headers);
            }
        };
    }
}
// Usage: ManagedChannelBuilder.forTarget(addr).intercept(new ClientLoggingInterceptor(...)).build()
# src/interceptors/client_interceptor.py
import grpc
import time
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails


class ClientLoggingInterceptor(grpc.UnaryUnaryClientInterceptor,
                                grpc.UnaryStreamClientInterceptor):
    def __init__(self, request_id: str = None, user_id: str = None, auth_token: str = None):
        self.request_id = request_id
        self.user_id = user_id
        self.auth_token = auth_token

    def _add_metadata(self, client_call_details: ClientCallDetails):
        metadata = list(client_call_details.metadata or [])
        if self.request_id:
            metadata.append(("x-request-id", self.request_id))
        if self.user_id:
            metadata.append(("x-user-id", self.user_id))
        if self.auth_token:
            metadata.append(("authorization", f"Bearer {self.auth_token}"))
        return metadata

    def intercept_unary_unary(self, continuation, client_call_details, request):
        metadata = self._add_metadata(client_call_details)
        new_details = client_call_details._replace(metadata=metadata)
        print(f"[gRPC Client] method={client_call_details.method} requestId={self.request_id}")
        start = time.time()
        response = continuation(new_details, request)
        duration = (time.time() - start) * 1000
        print(f"[gRPC Response] duration={duration:.0f}ms")
        return response

    def intercept_unary_stream(self, continuation, client_call_details, request):
        metadata = self._add_metadata(client_call_details)
        new_details = client_call_details._replace(metadata=metadata)
        return continuation(new_details, request)

# Usage:
# channel = grpc.intercept_channel(
#     grpc.insecure_channel("localhost:50051"),
#     ClientLoggingInterceptor(request_id="abc", auth_token="token")
# )
// src/Interceptors/ClientLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;

namespace GrpcClient.Interceptors;

public class ClientLoggingInterceptor : Interceptor
{
    private readonly string? _requestId;
    private readonly string? _userId;
    private readonly string? _authToken;

    public ClientLoggingInterceptor(string? requestId = null, string? userId = null, string? authToken = null)
    {
        _requestId = requestId;
        _userId = userId;
        _authToken = authToken;
    }

    private Metadata AddHeaders(Metadata? headers = null)
    {
        var metadata = headers ?? new Metadata();
        if (_requestId is not null) metadata.Add("x-request-id", _requestId);
        if (_userId is not null) metadata.Add("x-user-id", _userId);
        if (_authToken is not null) metadata.Add("authorization", $"Bearer {_authToken}");
        return metadata;
    }

    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
        TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
        AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        var newContext = new ClientInterceptorContext<TRequest, TResponse>(
            context.Method, context.Host,
            context.Options.WithHeaders(AddHeaders(context.Options.Headers)));

        Console.WriteLine($"[gRPC Client] method={context.Method.FullName} requestId={_requestId}");
        var startTime = DateTimeOffset.UtcNow;
        var call = continuation(request, newContext);

        return new AsyncUnaryCall<TResponse>(
            call.ResponseAsync.ContinueWith(t =>
            {
                var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
                Console.WriteLine($"[gRPC Response] duration={duration:F0}ms");
                return t.Result;
            }),
            call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
    }
}

// Usage: channel.Intercept(new ClientLoggingInterceptor(requestId: "abc", authToken: "token"))

Error Handling with gRPC Status Codes

gRPC defines 16 standard error codes for different failure scenarios:

// src/errors/grpcErrors.ts
import * as grpc from '@grpc/grpc-js';

export enum GrpcErrorCode {
  OK = grpc.status.OK,
  CANCELLED = grpc.status.CANCELLED,
  UNKNOWN = grpc.status.UNKNOWN,
  INVALID_ARGUMENT = grpc.status.INVALID_ARGUMENT,
  DEADLINE_EXCEEDED = grpc.status.DEADLINE_EXCEEDED,
  NOT_FOUND = grpc.status.NOT_FOUND,
  ALREADY_EXISTS = grpc.status.ALREADY_EXISTS,
  PERMISSION_DENIED = grpc.status.PERMISSION_DENIED,
  RESOURCE_EXHAUSTED = grpc.status.RESOURCE_EXHAUSTED,
  FAILED_PRECONDITION = grpc.status.FAILED_PRECONDITION,
  ABORTED = grpc.status.ABORTED,
  OUT_OF_RANGE = grpc.status.OUT_OF_RANGE,
  UNIMPLEMENTED = grpc.status.UNIMPLEMENTED,
  INTERNAL = grpc.status.INTERNAL,
  UNAVAILABLE = grpc.status.UNAVAILABLE,
  DATA_LOSS = grpc.status.DATA_LOSS,
  UNAUTHENTICATED = grpc.status.UNAUTHENTICATED,
}

export class GrpcError extends Error {
  constructor(
    public code: GrpcErrorCode,
    message: string,
    public metadata?: grpc.Metadata
  ) {
    super(message);
    this.name = 'GrpcError';
  }
}

// Handler example with proper error responses
async function getUserHandler(
  call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
  callback: grpc.sendUnaryData<GetUserResponse>
) {
  try {
    const { userId } = call.request;

    if (!userId || userId <= 0) {
      return callback({
        code: grpc.status.INVALID_ARGUMENT,
        message: 'User ID must be a positive integer',
      });
    }

    const user = await fetchUser(userId);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: `User ${userId} not found`,
      });
    }

    callback(null, { user });
  } catch (error) {
    console.error('Unexpected error:', error);
    callback({
      code: grpc.status.INTERNAL,
      message: 'An unexpected error occurred',
    });
  }
}

// Status code guide
const statusCodeGuide = {
  [grpc.status.INVALID_ARGUMENT]: 'Client sent invalid argument',
  [grpc.status.NOT_FOUND]: 'Requested resource does not exist',
  [grpc.status.ALREADY_EXISTS]: 'Resource already exists',
  [grpc.status.PERMISSION_DENIED]: 'Client lacks permission',
  [grpc.status.RESOURCE_EXHAUSTED]: 'Server resource exhausted',
  [grpc.status.DEADLINE_EXCEEDED]: 'Operation deadline exceeded',
  [grpc.status.UNAUTHENTICATED]: 'Request authentication failed',
  [grpc.status.UNAVAILABLE]: 'Service temporarily unavailable',
  [grpc.status.INTERNAL]: 'Internal server error',
};
// src/main/java/com/example/grpc/UserServiceWithErrorHandling.java
package com.example.grpc;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

public class UserServiceWithErrorHandling extends UserServiceGrpc.UserServiceImplBase {

    @Override
    public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
        try {
            int userId = request.getUserId();

            if (userId <= 0) {
                responseObserver.onError(Status.INVALID_ARGUMENT
                    .withDescription("User ID must be a positive integer")
                    .asRuntimeException());
                return;
            }

            User user = fetchUser(userId);

            if (user == null) {
                responseObserver.onError(Status.NOT_FOUND
                    .withDescription("User " + userId + " not found")
                    .asRuntimeException());
                return;
            }

            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();

        } catch (Exception e) {
            System.err.println("Unexpected error: " + e.getMessage());
            responseObserver.onError(Status.INTERNAL
                .withDescription("An unexpected error occurred")
                .withCause(e)
                .asRuntimeException());
        }
    }

    // Status code guide (as constants)
    // Status.INVALID_ARGUMENT  -> Client sent invalid argument
    // Status.NOT_FOUND         -> Requested resource does not exist
    // Status.ALREADY_EXISTS    -> Resource already exists
    // Status.PERMISSION_DENIED -> Client lacks permission
    // Status.RESOURCE_EXHAUSTED-> Server resource exhausted
    // Status.DEADLINE_EXCEEDED -> Operation deadline exceeded
    // Status.UNAUTHENTICATED   -> Request authentication failed
    // Status.UNAVAILABLE       -> Service temporarily unavailable
    // Status.INTERNAL          -> Internal server error
}
# src/errors/grpc_errors.py
import grpc
from enum import IntEnum


class GrpcErrorCode(IntEnum):
    OK = grpc.StatusCode.OK.value[0]
    CANCELLED = grpc.StatusCode.CANCELLED.value[0]
    UNKNOWN = grpc.StatusCode.UNKNOWN.value[0]
    INVALID_ARGUMENT = grpc.StatusCode.INVALID_ARGUMENT.value[0]
    DEADLINE_EXCEEDED = grpc.StatusCode.DEADLINE_EXCEEDED.value[0]
    NOT_FOUND = grpc.StatusCode.NOT_FOUND.value[0]
    ALREADY_EXISTS = grpc.StatusCode.ALREADY_EXISTS.value[0]
    PERMISSION_DENIED = grpc.StatusCode.PERMISSION_DENIED.value[0]
    RESOURCE_EXHAUSTED = grpc.StatusCode.RESOURCE_EXHAUSTED.value[0]
    INTERNAL = grpc.StatusCode.INTERNAL.value[0]
    UNAVAILABLE = grpc.StatusCode.UNAVAILABLE.value[0]
    UNAUTHENTICATED = grpc.StatusCode.UNAUTHENTICATED.value[0]


# Handler with proper error responses
def get_user_handler(request, context):
    try:
        user_id = request.user_id

        if not user_id or user_id <= 0:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "User ID must be a positive integer")

        user = fetch_user(user_id)

        if not user:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

        return GetUserResponse(user=user)

    except grpc.RpcError:
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        context.abort(grpc.StatusCode.INTERNAL, "An unexpected error occurred")


STATUS_CODE_GUIDE = {
    grpc.StatusCode.INVALID_ARGUMENT: "Client sent invalid argument",
    grpc.StatusCode.NOT_FOUND: "Requested resource does not exist",
    grpc.StatusCode.ALREADY_EXISTS: "Resource already exists",
    grpc.StatusCode.PERMISSION_DENIED: "Client lacks permission",
    grpc.StatusCode.RESOURCE_EXHAUSTED: "Server resource exhausted",
    grpc.StatusCode.DEADLINE_EXCEEDED: "Operation deadline exceeded",
    grpc.StatusCode.UNAUTHENTICATED: "Request authentication failed",
    grpc.StatusCode.UNAVAILABLE: "Service temporarily unavailable",
    grpc.StatusCode.INTERNAL: "Internal server error",
}
// src/Errors/GrpcErrors.cs
using Grpc.Core;

namespace GrpcServer.Errors;

public static class GrpcErrorHandler
{
    // Handler with proper error responses
    public static async Task<GetUserResponse> GetUserHandler(GetUserRequest request, ServerCallContext context)
    {
        try
        {
            if (request.UserId <= 0)
                throw new RpcException(new Status(StatusCode.InvalidArgument,
                    "User ID must be a positive integer"));

            var user = await FetchUserAsync(request.UserId);

            if (user is null)
                throw new RpcException(new Status(StatusCode.NotFound,
                    $"User {request.UserId} not found"));

            return new GetUserResponse { User = user };
        }
        catch (RpcException) { throw; }
        catch (Exception e)
        {
            Console.Error.WriteLine($"Unexpected error: {e}");
            throw new RpcException(new Status(StatusCode.Internal, "An unexpected error occurred"));
        }
    }

    // Status code guide
    public static readonly Dictionary<StatusCode, string> StatusCodeGuide = new()
    {
        [StatusCode.InvalidArgument]  = "Client sent invalid argument",
        [StatusCode.NotFound]         = "Requested resource does not exist",
        [StatusCode.AlreadyExists]    = "Resource already exists",
        [StatusCode.PermissionDenied] = "Client lacks permission",
        [StatusCode.ResourceExhausted]= "Server resource exhausted",
        [StatusCode.DeadlineExceeded] = "Operation deadline exceeded",
        [StatusCode.Unauthenticated]  = "Request authentication failed",
        [StatusCode.Unavailable]      = "Service temporarily unavailable",
        [StatusCode.Internal]         = "Internal server error",
    };
}

Deadlines and Timeout Propagation

Deadlines ensure operations complete within a specified time frame. gRPC automatically propagates deadlines across service calls:

// Client setting a deadline
async function getUserWithTimeout(userId: number) {
  const deadline = new Date();
  deadline.setSeconds(deadline.getSeconds() + 5); // 5-second timeout

  return new Promise((resolve, reject) => {
    const client = new UserServiceClient('localhost:50051');
    const call = client.getUser(
      { userId },
      { deadline },
      (err, response) => {
        if (err) {
          if (err.code === grpc.status.DEADLINE_EXCEEDED) {
            reject(new Error('Request exceeded 5-second deadline'));
          } else {
            reject(err);
          }
        } else {
          resolve(response?.user);
        }
      }
    );
  });
}

// Server checking deadline
async function getUser(
  call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
  callback: grpc.sendUnaryData<GetUserResponse>
) {
  const deadline = call.deadline;
  const timeRemaining = deadline.getTime() - Date.now();

  if (timeRemaining < 1000) {
    // Less than 1 second remaining
    return callback({
      code: grpc.status.DEADLINE_EXCEEDED,
      message: 'Insufficient time to process request',
    });
  }

  // ... process request with knowledge of remaining time
}
// Client setting a deadline
import io.grpc.Deadline;
import java.util.concurrent.TimeUnit;

public User getUserWithTimeout(int userId) {
    try {
        return blockingStub
            .withDeadline(Deadline.after(5, TimeUnit.SECONDS))
            .getUser(GetUserRequest.newBuilder().setUserId(userId).build())
            .getUser();
    } catch (io.grpc.StatusRuntimeException e) {
        if (e.getStatus().getCode() == io.grpc.Status.Code.DEADLINE_EXCEEDED) {
            throw new RuntimeException("Request exceeded 5-second deadline", e);
        }
        throw e;
    }
}

// Server checking deadline
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
    // Check if context is already cancelled/deadline exceeded
    if (Context.current().isCancelled()) {
        responseObserver.onError(Status.DEADLINE_EXCEEDED
            .withDescription("Insufficient time to process request")
            .asRuntimeException());
        return;
    }
    // ... process request
}
# Client setting a deadline
def get_user_with_timeout(user_id: int):
    try:
        response = stub.GetUser(
            GetUserRequest(user_id=user_id),
            timeout=5  # 5-second timeout
        )
        return response.user
    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
            raise TimeoutError("Request exceeded 5-second deadline")
        raise

# Server checking deadline (via context)
def GetUser(self, request, context):
    # Check remaining time
    deadline = context.time_remaining()
    if deadline is not None and deadline < 1.0:  # Less than 1 second remaining
        context.abort(grpc.StatusCode.DEADLINE_EXCEEDED,
                      "Insufficient time to process request")
    # ... process request
// Client setting a deadline
public async Task<User> GetUserWithTimeoutAsync(int userId)
{
    try
    {
        var deadline = DateTime.UtcNow.AddSeconds(5); // 5-second timeout
        var response = await _client.GetUserAsync(
            new GetUserRequest { UserId = userId },
            deadline: deadline
        );
        return response.User;
    }
    catch (RpcException e) when (e.StatusCode == StatusCode.DeadlineExceeded)
    {
        throw new TimeoutException("Request exceeded 5-second deadline", e);
    }
}

// Server checking deadline
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
    var deadline = context.Deadline;
    var timeRemaining = deadline - DateTime.UtcNow;

    if (timeRemaining < TimeSpan.FromSeconds(1))
    {
        throw new RpcException(new Status(StatusCode.DeadlineExceeded,
            "Insufficient time to process request"));
    }

    // ... process request with knowledge of remaining time
    return Task.FromResult(new GetUserResponse());
}

Load Balancing Strategies

Client-Side Load Balancing

// src/loadbalancer/clientSideLoadBalancer.ts
import * as grpc from '@grpc/grpc-js';

interface ServerAddress {
  host: string;
  port: number;
}

export class RoundRobinLoadBalancer {
  private currentIndex = 0;
  private servers: ServerAddress[];

  constructor(servers: ServerAddress[]) {
    this.servers = servers;
  }

  getNextServer(): ServerAddress {
    const server = this.servers[this.currentIndex];
    this.currentIndex = (this.currentIndex + 1) % this.servers.length;
    return server;
  }

  createClient(): UserServiceClient {
    const { host, port } = this.getNextServer();
    return new UserServiceClient(
      `${host}:${port}`,
      grpc.credentials.createInsecure()
    );
  }
}

// Usage
const lb = new RoundRobinLoadBalancer([
  { host: 'localhost', port: 50051 },
  { host: 'localhost', port: 50052 },
  { host: 'localhost', port: 50053 },
]);

async function callUserService(userId: number) {
  const client = lb.createClient();
  try {
    return await client.getUser(userId);
  } finally {
    grpc.closeClient(client);
  }
}
// src/main/java/com/example/grpc/RoundRobinLoadBalancer.java
package com.example.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinLoadBalancer {

    record ServerAddress(String host, int port) {}

    private final List<ServerAddress> servers;
    private final AtomicInteger currentIndex = new AtomicInteger(0);

    public RoundRobinLoadBalancer(List<ServerAddress> servers) {
        this.servers = servers;
    }

    public ServerAddress getNextServer() {
        int index = currentIndex.getAndUpdate(i -> (i + 1) % servers.size());
        return servers.get(index);
    }

    public UserServiceGrpc.UserServiceBlockingStub createStub() {
        ServerAddress server = getNextServer();
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress(server.host(), server.port())
            .usePlaintext()
            .build();
        return UserServiceGrpc.newBlockingStub(channel);
    }

    // Usage
    public static void main(String[] args) {
        RoundRobinLoadBalancer lb = new RoundRobinLoadBalancer(List.of(
            new ServerAddress("localhost", 50051),
            new ServerAddress("localhost", 50052),
            new ServerAddress("localhost", 50053)
        ));

        // gRPC also supports built-in round_robin via service config
        ManagedChannel channel = ManagedChannelBuilder
            .forTarget("dns:///user-service:50051")
            .defaultLoadBalancingPolicy("round_robin")
            .usePlaintext()
            .build();
    }
}
# src/loadbalancer/client_side_load_balancer.py
import grpc
import users_pb2_grpc
from dataclasses import dataclass
from itertools import cycle
from threading import Lock


@dataclass
class ServerAddress:
    host: str
    port: int


class RoundRobinLoadBalancer:
    def __init__(self, servers: list[ServerAddress]):
        self._servers = servers
        self._cycle = cycle(servers)
        self._lock = Lock()

    def get_next_server(self) -> ServerAddress:
        with self._lock:
            return next(self._cycle)

    def create_stub(self):
        server = self.get_next_server()
        channel = grpc.insecure_channel(f"{server.host}:{server.port}")
        return users_pb2_grpc.UserServiceStub(channel)


# Usage
lb = RoundRobinLoadBalancer([
    ServerAddress("localhost", 50051),
    ServerAddress("localhost", 50052),
    ServerAddress("localhost", 50053),
])

def call_user_service(user_id: int):
    stub = lb.create_stub()
    return stub.GetUser(users_pb2.GetUserRequest(user_id=user_id)).user

# gRPC also supports built-in round-robin via channel options:
# channel = grpc.insecure_channel(
#     "dns:///user-service:50051",
#     options=[("grpc.lb_policy_name", "round_robin")]
# )
// src/LoadBalancing/RoundRobinLoadBalancer.cs
using Grpc.Net.Client;
using Users;

namespace GrpcClient.LoadBalancing;

public class RoundRobinLoadBalancer
{
    private record ServerAddress(string Host, int Port);

    private readonly ServerAddress[] _servers;
    private int _currentIndex;

    public RoundRobinLoadBalancer(params (string Host, int Port)[] servers)
    {
        _servers = servers.Select(s => new ServerAddress(s.Host, s.Port)).ToArray();
    }

    private ServerAddress GetNextServer()
    {
        int index = Interlocked.Increment(ref _currentIndex) % _servers.Length;
        return _servers[Math.Abs(index)];
    }

    public UserService.UserServiceClient CreateClient()
    {
        var server = GetNextServer();
        var channel = GrpcChannel.ForAddress($"http://{server.Host}:{server.Port}");
        return new UserService.UserServiceClient(channel);
    }
}

// Usage
var lb = new RoundRobinLoadBalancer(
    ("localhost", 50051),
    ("localhost", 50052),
    ("localhost", 50053)
);

async Task<User> CallUserService(int userId)
{
    var client = lb.CreateClient();
    var response = await client.GetUserAsync(new GetUserRequest { UserId = userId });
    return response.User;
}

// gRPC also supports built-in round_robin via service config JSON on GrpcChannel

Proxy-Based Load Balancing

For production environments, use a dedicated reverse proxy:

# envoy.yaml
admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address:
      address: 127.0.0.1
      port_value: 9901

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 50051
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          access_log:
          - name: envoy.access_loggers.stdout
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
          route_config:
            name: local_route
            virtual_hosts:
            - name: backend
              domains: ["*"]
              routes:
              - match:
                  prefix: "/"
                route:
                  cluster: user_service
                  timeout: 5s

  clusters:
  - name: user_service
    connect_timeout: 1s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: user_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: service1.local
                port_value: 50051
        - endpoint:
            address:
              socket_address:
                address: service2.local
                port_value: 50051
        - endpoint:
            address:
              socket_address:
                address: service3.local
                port_value: 50051
    health_checks:
    - timeout: 1s
      interval: 10s
      unhealthy_threshold: 2
      healthy_threshold: 2
      grpc_health_check:
        service_name: grpc.health.v1.Health

gRPC vs REST: A Comparison

AspectgRPCREST
SerializationProtocol Buffers (binary)JSON (text)
TransportHTTP/2HTTP/1.1
Payload Size3-10x smallerLarger
LatencyLower (binary + multiplexing)Higher
StreamingNative, bidirectionalPolling or WebSockets
Browser SupportgRPC-Web requiredNative
DebuggingRequires tools (grpcurl)curl, browser
Learning CurveSteeper (proto files)Gentle
Type SafetyStrong typingRuntime validation
CachingLimitedHTTP caching
Use CaseMicroservices, real-timePublic APIs, web apps

Choose gRPC when:

  • Building microservices with strict latency requirements
  • Implementing real-time bidirectional communication
  • Serving polyglot services that need strong typing
  • Handling high throughput with many concurrent connections

Choose REST when:

  • Building public-facing APIs
  • Supporting browser clients
  • Prioritizing simplicity and discoverability
  • Need standard HTTP caching

Production Deployment Checklist

Pre-Deployment

  • Protocol buffer definitions are versioned and documented
  • Service interfaces define all error cases with appropriate status codes
  • Bidirectional streaming has timeout/heartbeat mechanisms
  • Logging captures request ID, duration, and error details
  • Metrics (latency, error rate, throughput) are instrumented
  • Load tests verify performance under expected load
  • Circuit breakers protect against cascading failures
  • TLS certificates configured for production encryption

TLS/SSL Configuration

// src/server-secure.ts
import * as fs from 'fs';
import * as grpc from '@grpc/grpc-js';

const credentials = grpc.ServerCredentials.createSsl(
  fs.readFileSync('./server.crt'),
  [
    {
      cert_chain: fs.readFileSync('./server.crt'),
      private_key: fs.readFileSync('./server.key'),
    },
  ],
  false
);

const server = new grpc.Server();
server.bindAsync('0.0.0.0:50051', credentials, (err) => {
  if (err) throw err;
  server.start();
  console.log('Secure gRPC server listening on port 50051');
});

// Client with TLS
const clientCreds = grpc.ChannelCredentials.createSsl(
  fs.readFileSync('./ca.crt')
);

const client = new UserServiceClient(
  'secure-grpc-service.example.com:50051',
  clientCreds
);
// src/main/java/com/example/grpc/SecureServer.java
import io.grpc.Server;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContext;

import java.io.File;

// Server with TLS
SslContext sslContext = GrpcSslContexts.forServer(
    new File("server.crt"),
    new File("server.key")
).build();

Server server = NettyServerBuilder.forPort(50051)
    .sslContext(sslContext)
    .addService(new UserServiceImpl())
    .build()
    .start();

System.out.println("Secure gRPC server listening on port 50051");

// Client with TLS
import io.grpc.netty.NettyChannelBuilder;

SslContext clientSsl = GrpcSslContexts.forClient()
    .trustManager(new File("ca.crt"))
    .build();

ManagedChannel channel = NettyChannelBuilder
    .forAddress("secure-grpc-service.example.com", 50051)
    .sslContext(clientSsl)
    .build();
# src/secure_server.py
import grpc
from concurrent import futures

with open("server.key", "rb") as f:
    private_key = f.read()
with open("server.crt", "rb") as f:
    certificate_chain = f.read()

server_credentials = grpc.ssl_server_credentials(
    [(private_key, certificate_chain)]
)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_secure_port("0.0.0.0:50051", server_credentials)
server.start()
print("Secure gRPC server listening on port 50051")

# Client with TLS
with open("ca.crt", "rb") as f:
    trusted_certs = f.read()

client_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel(
    "secure-grpc-service.example.com:50051", client_credentials
)
stub = users_pb2_grpc.UserServiceStub(channel)
// Server with TLS in Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(options =>
{
    options.ListenAnyIP(50051, listenOptions =>
    {
        listenOptions.Protocols = HttpProtocols.Http2;
        listenOptions.UseHttps("server.pfx", "password");
    });
});

var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
app.Run();

// Client with TLS
using Grpc.Net.Client;

var channel = GrpcChannel.ForAddress("https://secure-grpc-service.example.com:50051",
    new GrpcChannelOptions
    {
        HttpHandler = new HttpClientHandler
        {
            // Optionally: provide custom CA certificate
            // ServerCertificateCustomValidationCallback = ...
        }
    });
var client = new UserService.UserServiceClient(channel);

Health Checks

// src/health.ts
import * as grpc from '@grpc/grpc-js';
import { Health } from './generated/health';

export const healthImpl: any = {
  check(
    call: grpc.ServerUnaryCall<any, any>,
    callback: grpc.sendUnaryData<any>
  ) {
    callback(null, { status: Health.ServingStatus.SERVING });
  },

  watch(call: grpc.ServerWritableStream<any, any>) {
    call.write({ status: Health.ServingStatus.SERVING });
    call.end();
  },
};
// src/main/java/com/example/grpc/HealthServiceImpl.java
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import io.grpc.protobuf.services.HealthStatusManager;

// Use built-in gRPC health service:
HealthStatusManager healthStatusManager = new HealthStatusManager();
Server server = ServerBuilder.forPort(50051)
    .addService(new UserServiceImpl())
    .addService(healthStatusManager.getHealthService())
    .build();

// Set status for a specific service
healthStatusManager.setStatus("users.UserService",
    HealthCheckResponse.ServingStatus.SERVING);
# src/health.py
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
from concurrent import futures
import grpc

health_servicer = health.HealthServicer()
health_servicer.set("users.UserService", health_pb2.HealthCheckResponse.SERVING)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_insecure_port("0.0.0.0:50051")
server.start()
// src/Services/HealthService.cs
// ASP.NET Core has built-in gRPC health checks via Grpc.AspNetCore.HealthChecks

// In Program.cs:
builder.Services.AddGrpcHealthChecks()
    .AddCheck("UserService", () => HealthCheckResult.Healthy());

app.MapGrpcHealthChecksService();

// Or implement manually:
using Grpc.Health.V1;
using Grpc.HealthCheck;

public class HealthServiceImpl : Health.HealthBase
{
    public override Task<HealthCheckResponse> Check(
        HealthCheckRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HealthCheckResponse
        {
            Status = HealthCheckResponse.Types.ServingStatus.Serving
        });
    }

    public override async Task Watch(
        HealthCheckRequest request,
        IServerStreamWriter<HealthCheckResponse> responseStream,
        ServerCallContext context)
    {
        await responseStream.WriteAsync(new HealthCheckResponse
        {
            Status = HealthCheckResponse.Types.ServingStatus.Serving
        });
    }
}

Container and Orchestration

# Dockerfile
FROM node:20-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY src ./src
COPY generated ./generated

EXPOSE 50051 9090

HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=3 \
  CMD node -e "require('http').get('http://localhost:9090/health', (r) => {if (r.statusCode !== 200) throw new Error(r.statusCode)})"

CMD ["node", "src/server.ts"]
# kubernetes.yaml
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - name: grpc
    port: 50051
    targetPort: 50051
  - name: metrics
    port: 9090
    targetPort: 9090
  type: ClusterIP

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        ports:
        - name: grpc
          containerPort: 50051
        - name: metrics
          containerPort: 9090
        livenessProbe:
          grpc:
            port: 50051
          initialDelaySeconds: 10
          periodSeconds: 10
        readinessProbe:
          grpc:
            port: 50051
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi

Monitoring and Observability

// src/metrics.ts
import * as promClient from 'prom-client';
import * as grpc from '@grpc/grpc-js';

const grpcRequestDuration = new promClient.Histogram({
  name: 'grpc_request_duration_ms',
  help: 'Duration of gRPC requests in milliseconds',
  labelNames: ['method', 'status'],
  buckets: [1, 5, 10, 50, 100, 500, 1000, 5000],
});

const grpcRequestCount = new promClient.Counter({
  name: 'grpc_requests_total',
  help: 'Total gRPC requests',
  labelNames: ['method', 'status'],
});

export function metricsInterceptor() {
  return (methodDescriptor: any, createCall: any) => {
    return (metadata: grpc.Metadata, listener: any, next: any) => {
      const startTime = Date.now();
      const method = methodDescriptor.path;

      const wrappedListener: any = {
        onReceiveMetadata(metadata: grpc.Metadata) {
          listener.onReceiveMetadata(metadata);
        },
        onReceiveMessage(message: any) {
          listener.onReceiveMessage(message);
        },
        onReceiveStatus(status: grpc.Status) {
          const duration = Date.now() - startTime;
          const statusName = grpc.status[status.code];

          grpcRequestDuration
            .labels(method, statusName)
            .observe(duration);

          grpcRequestCount
            .labels(method, statusName)
            .inc();

          listener.onReceiveStatus(status);
        },
      };

      return next(metadata, wrappedListener);
    };
  };
}
// src/main/java/com/example/grpc/MetricsInterceptor.java
package com.example.grpc;

import io.grpc.*;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;

public class MetricsInterceptor implements ServerInterceptor {

    private final MeterRegistry registry;

    public MetricsInterceptor(MeterRegistry registry) {
        this.registry = registry;
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

        String method = call.getMethodDescriptor().getFullMethodName();
        long startTime = System.currentTimeMillis();

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
                next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
                    @Override
                    public void close(Status status, Metadata trailers) {
                        long duration = System.currentTimeMillis() - startTime;
                        String statusCode = status.getCode().name();

                        registry.timer("grpc.request.duration",
                            "method", method, "status", statusCode)
                            .record(duration, java.util.concurrent.TimeUnit.MILLISECONDS);

                        registry.counter("grpc.requests.total",
                            "method", method, "status", statusCode)
                            .increment();

                        super.close(status, trailers);
                    }
                }, headers)) {};
    }
}
# src/metrics.py
import time
from prometheus_client import Histogram, Counter

grpc_request_duration = Histogram(
    "grpc_request_duration_ms",
    "Duration of gRPC requests in milliseconds",
    ["method", "status"],
    buckets=[1, 5, 10, 50, 100, 500, 1000, 5000],
)

grpc_request_count = Counter(
    "grpc_requests_total",
    "Total gRPC requests",
    ["method", "status"],
)


class MetricsServerInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        handler = continuation(handler_call_details)
        if handler is None:
            return None

        def wrap_unary(behavior):
            def new_behavior(request, context):
                start = time.time()
                status = "OK"
                try:
                    result = behavior(request, context)
                    return result
                except Exception as e:
                    status = "INTERNAL"
                    raise
                finally:
                    duration = (time.time() - start) * 1000
                    grpc_request_duration.labels(method=method, status=status).observe(duration)
                    grpc_request_count.labels(method=method, status=status).inc()
            return new_behavior

        if handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(
                wrap_unary(handler.unary_unary),
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        return handler
// src/Interceptors/MetricsInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;
using Prometheus;

namespace GrpcServer.Interceptors;

public class MetricsInterceptor : Interceptor
{
    private static readonly Histogram GrpcRequestDuration = Metrics.CreateHistogram(
        "grpc_request_duration_ms",
        "Duration of gRPC requests in milliseconds",
        new HistogramConfiguration
        {
            LabelNames = new[] { "method", "status" },
            Buckets = new double[] { 1, 5, 10, 50, 100, 500, 1000, 5000 },
        });

    private static readonly Counter GrpcRequestCount = Metrics.CreateCounter(
        "grpc_requests_total",
        "Total gRPC requests",
        new CounterConfiguration { LabelNames = new[] { "method", "status" } });

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var method = context.Method;
        var startTime = DateTimeOffset.UtcNow;
        var statusCode = "OK";

        try
        {
            var response = await continuation(request, context);
            return response;
        }
        catch (RpcException e)
        {
            statusCode = e.StatusCode.ToString();
            throw;
        }
        finally
        {
            var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
            GrpcRequestDuration.WithLabels(method, statusCode).Observe(duration);
            GrpcRequestCount.WithLabels(method, statusCode).Inc();
        }
    }
}

// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<MetricsInterceptor>());

Conclusion

gRPC is a powerful technology for building high-performance, distributed systems. By leveraging Protocol Buffers and HTTP/2, it delivers exceptional efficiency and developer experience. Follow the patterns and practices in this guide to build production-ready gRPC services that scale reliably.

Key takeaways:

  1. Define clear service contracts with .proto files
  2. Choose the right communication pattern for your use case
  3. Implement comprehensive error handling with status codes
  4. Use interceptors for cross-cutting concerns
  5. Monitor and observe all production services
  6. Design for graceful degradation and failure modes

For more information, visit the official gRPC documentation.

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

Written by Palakorn Voramongkol

Software Engineer Specialist with 20+ years of experience. Writing about architecture, performance, and building production systems.

More about me

Continue Reading