กลับไปที่บทความ
Architecture Backend gRPC Microservices

การสื่อสาร gRPC เทคโนโลยีการสื่อสารระหว่างเซิร์ฟเวอร์

พลากร วรมงคล
16 เมษายน 2568 14 นาที

“คำแนะนำอย่างครอบคลุมเกี่ยวกับ gRPC สำหรับการสื่อสารแบบเซิร์ฟเวอร์ต่อเซิร์ฟเวอร์ — ครอบคลุม Protocol Buffers, การกำหนดบริการ, รูปแบบการสตรีมมิง, ตัวสกัดกั้น, การจัดการข้อผิดพลาด, การปรับสมดุลโหลด และการปรับใช้เพื่อการผลิต”

บทวิเคราะห์เชิงลึก: การสื่อสาร gRPC

gRPC ได้กลายเป็นเทคโนโลยีหลักสำหรับการสร้าง microservices ที่มีประสิทธิภาพสูงและสามารถปรับขนาดได้ ต่างจาก REST APIs ที่อาศัย JSON มากกว่า HTTP/1.1, gRPC ใช้ประโยชน์จาก Protocol Buffers และ HTTP/2 เพื่อให้ได้ความเร็ว, ประสิทธิภาพ และประสบการณ์ผู้พัฒนาที่ยอดเยี่ยม คำแนะนำนี้ครอบคลุมทุกสิ่งที่คุณจำเป็นต้องรู้เกี่ยวกับ gRPC สำหรับการสื่อสารแบบเซิร์ฟเวอร์ต่อเซิร์ฟเวอร์

gRPC คืออะไร?

gRPC (gRPC Remote Procedure Call) คือกรอบการทำงานแบบโอเพนซอร์สของ Google สำหรับการสร้างระบบ RPC ที่มีประสิทธิภาพสูง มันเป็นการนำเสนอความซับซ้อนของระบบแบบกระจายอำนาจและช่วยให้คุณกำหนดบริการและข้อความในแบบที่เป็นอิสระจากภาษาโดยใช้ Protocol Buffers

ลักษณะสำคัญ

  • Binary Serialization: ใช้ Protocol Buffers แทน JSON, ลดขนาดเพย์โหลด 3-10 เท่า
  • HTTP/2 Transport: เปิดใช้งาน multiplexing, server push และการบีบอัดส่วนหัว
  • Code Generation: สร้างโค้ดไคลเอนต์และเซิร์ฟเวอร์โดยอัตโนมัติจาก .proto definitions
  • Streaming Support: การสนับสนุนดั้งเดิมสำหรับการสตรีมมิงแบบสองทิศทางในการเชื่อมต่อเดียว
  • Type Safety: การพิมพ์อักษรที่แข็งแกร่งในขอบเขตของภาษา
  • Cross-Language: ทำงานได้อย่างไร้รอยต่อในโปรแกรมมิ่ง 10+ ภาษา

การไหลของการสื่อสาร

ด้านล่างเป็นแผนภาพลำดับที่แสดงว่าการเรียก gRPC แบบ unary ไหลระหว่างสองบริการ:

sequenceDiagram
    participant Client as Service A<br/>(gRPC Client)
    participant Transport as HTTP/2<br/>Network
    participant Server as Service B<br/>(gRPC Server)
    
    Client->>Client: 1. Create request message<br/>(UserRequest proto)
    Client->>Client: 2. Serialize to protobuf<br/>(binary format)
    Client->>Transport: 3. Send HTTP/2 frame<br/>(method: GetUser)
    Transport->>Server: 4. Receive frame
    Server->>Server: 5. Deserialize message
    Server->>Server: 6. Invoke handler logic
    Server->>Server: 7. Serialize response<br/>(UserResponse proto)
    Server->>Transport: 8. Send HTTP/2 frame
    Transport->>Client: 9. Receive frame
    Client->>Client: 10. Deserialize response
    Client->>Client: 11. Return typed object

หลักการหลัก

  • Efficiency: รูปแบบไบนารีและการ multiplexing แบบ HTTP/2 ลดเวลาแฝงและแบนด์วิดธ์
  • Simplicity: คำจำกัดความบริการที่เป็นอธิบาย ลบกำจัด boilerplate
  • Reliability: การพิมพ์ที่แข็งแกร่งและการจัดการข้อผิดพลาดที่ชัดเจนลดความประหลาดใจที่ runtime
  • Scalability: การสตรีมมิงและการ multiplexing จัดการการเชื่อมต่อพร้อมกันหลายพัน
  • Interoperability: การออกแบบที่เป็นอิสระจากภาษาช่วยให้สถาปัตยกรรม polyglot

Protocol Buffers: พื้นฐาน

Protocol Buffers (protobuf) คือรูปแบบการ serialization ที่อยู่เบื้องหลัง gRPC มันให้กลไกที่กะทัดรัดและเป็นอิสระจากภาษาสำหรับการ serialization ข้อมูลที่มีโครงสร้าง

การกำหนดข้อความ

// user.proto
syntax = "proto3";

package users;

// Simple scalar types
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  bool is_active = 4;
  double account_balance = 5;
  
  // Timestamps use google.protobuf types
  google.protobuf.Timestamp created_at = 6;
  
  // Repeated fields (arrays)
  repeated string tags = 7;
}

// Nested messages
message Address {
  string street = 1;
  string city = 2;
  string postal_code = 3;
  Country country = 4;
}

message UserProfile {
  User user = 1;
  Address address = 2;
  repeated PhoneNumber phone_numbers = 3;
}

message PhoneNumber {
  enum Type {
    TYPE_UNSPECIFIED = 0;
    MOBILE = 1;
    HOME = 2;
    WORK = 3;
  }
  
  Type type = 1;
  string number = 2;
}

// Enums for type-safe options
enum Status {
  STATUS_UNSPECIFIED = 0;
  ACTIVE = 1;
  INACTIVE = 2;
  SUSPENDED = 3;
}

// Oneof for mutually exclusive fields
message Contact {
  string name = 1;
  oneof contact_info {
    string email = 2;
    string phone = 3;
    string website = 4;
  }
}

ตารางอ้างอิงประเภท Scalar

ประเภทขนาดกรณีการใช้งาน
int32 / int644 / 8 bytesจำนวนเต็ม (ใช้ sint32 สำหรับตัวเลขลบ)
float / double4 / 8 bytesทศนิยม
bool1 byteบูลีน
stringVariableสตริง UTF-8
bytesVariableข้อมูลไบนารี
google.protobuf.Timestamp12 bytesTimestamps
google.protobuf.Duration12 bytesระยะเวลา

คำจำกัดความบริการและการสร้างโค้ด

การกำหนด gRPC Services

// user_service.proto
syntax = "proto3";

package users;

import "google/protobuf/empty.proto";

message GetUserRequest {
  int32 user_id = 1;
}

message GetUserResponse {
  User user = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total_count = 2;
}

message DeleteUserRequest {
  int32 user_id = 1;
}

service UserService {
  // Unary RPC: single request, single response
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (GetUserResponse);
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
  
  // Server streaming: single request, stream of responses
  rpc ListUsers(ListUsersRequest) returns (stream GetUserResponse);
  
  // Client streaming: stream of requests, single response
  rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
  
  // Bidirectional streaming: stream of requests and responses
  rpc SyncUsers(stream SyncUserRequest) returns (stream SyncUserResponse);
}

การสร้างโค้ด

# Install protoc compiler
# macOS: brew install protobuf
# Linux: apt-get install protobuf-compiler
# Windows: choco install protoc

# Install Node.js/TypeScript plugins
npm install --save-dev grpc-tools @grpc/grpc-js @grpc/proto-loader

# Generate TypeScript code
protoc \
  --plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
  --ts_proto_out=./src/generated \
  --ts_proto_opt=outputServices=grpc-js \
  --ts_proto_opt=useExactProtoTypes=true \
  user_service.proto user.proto

การใช้งาน gRPC Server

Node.js/TypeScript Server

// src/server.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceService, IUserServiceServer } from './generated/user_service';
import { User, GetUserResponse, ListUsersResponse } from './generated/user';
import { GetUserRequest, CreateUserRequest, ListUsersRequest } from './generated/user_service';

// Mock database
const usersDb = new Map<number, any>([
  [1, { id: 1, name: 'Alice', email: 'alice@example.com', isActive: true }],
  [2, { id: 2, name: 'Bob', email: 'bob@example.com', isActive: true }],
]);

let nextUserId = 3;

// Implement service handlers
const userServiceImpl: IUserServiceServer = {
  // Unary RPC
  async getUser(
    call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
    callback: grpc.sendUnaryData<GetUserResponse>
  ) {
    const { userId } = call.request;
    const user = usersDb.get(userId);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: `User ${userId} not found`,
      });
    }

    callback(null, { user });
  },

  // Unary RPC with validation
  async createUser(
    call: grpc.ServerUnaryCall<CreateUserRequest, GetUserResponse>,
    callback: grpc.sendUnaryData<GetUserResponse>
  ) {
    const { name, email } = call.request;

    if (!name || !email) {
      return callback({
        code: grpc.status.INVALID_ARGUMENT,
        message: 'Name and email are required',
      });
    }

    const user = {
      id: nextUserId++,
      name,
      email,
      isActive: true,
      createdAt: new Date(),
    };

    usersDb.set(user.id, user);
    callback(null, { user });
  },

  // Server streaming
  async listUsers(call: grpc.ServerWritableStream<ListUsersRequest, GetUserResponse>) {
    const { page = 1, pageSize = 10 } = call.request;
    const users = Array.from(usersDb.values());
    const start = (page - 1) * pageSize;
    const paginatedUsers = users.slice(start, start + pageSize);

    for (const user of paginatedUsers) {
      call.write({ user });
    }

    call.end();
  },

  // Client streaming
  async batchCreateUsers(
    call: grpc.ServerReadableStream<CreateUserRequest, ListUsersResponse>,
    callback: grpc.sendUnaryData<ListUsersResponse>
  ) {
    const users: any[] = [];

    call.on('data', (request: CreateUserRequest) => {
      const user = {
        id: nextUserId++,
        name: request.name,
        email: request.email,
        isActive: true,
      };
      users.push(user);
      usersDb.set(user.id, user);
    });

    call.on('end', () => {
      callback(null, { users, totalCount: users.length });
    });

    call.on('error', (err) => {
      console.error('Stream error:', err);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Stream error',
      });
    });
  },

  // Bidirectional streaming
  async syncUsers(call: grpc.ServerDuplexStream<any, any>) {
    call.on('data', (request: any) => {
      const user = {
        id: request.userId || nextUserId++,
        name: request.name,
        email: request.email,
        isActive: true,
      };

      usersDb.set(user.id, user);
      call.write({
        userId: user.id,
        status: 'SYNCED',
        message: `User ${user.name} synced successfully`,
      });
    });

    call.on('end', () => {
      call.end();
    });

    call.on('error', (err) => {
      console.error('Sync error:', err);
    });
  },

  async deleteUser(
    call: grpc.ServerUnaryCall<any, any>,
    callback: grpc.sendUnaryData<any>
  ) {
    const { userId } = call.request;

    if (usersDb.has(userId)) {
      usersDb.delete(userId);
      return callback(null, {});
    }

    callback({
      code: grpc.status.NOT_FOUND,
      message: `User ${userId} not found`,
    });
  },
};

// Create and start server
function startServer() {
  const server = new grpc.Server();

  server.addService(UserServiceService, userServiceImpl);

  const PORT = process.env.PORT || 50051;
  server.bindAsync(`0.0.0.0:${PORT}`, grpc.ServerCredentials.createInsecure(), (err) => {
    if (err) {
      console.error('Failed to bind server:', err);
      process.exit(1);
    }

    server.start();
    console.log(`gRPC server running on port ${PORT}`);
  });
}

startServer();
// src/main/java/com/example/grpc/UserServiceServer.java
package com.example.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class UserServiceServer {

    private static final Map<Integer, User> usersDb = new ConcurrentHashMap<>();
    private static final AtomicInteger nextUserId = new AtomicInteger(3);

    static {
        usersDb.put(1, User.newBuilder().setId(1).setName("Alice").setEmail("alice@example.com").setIsActive(true).build());
        usersDb.put(2, User.newBuilder().setId(2).setName("Bob").setEmail("bob@example.com").setIsActive(true).build());
    }

    static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {

        @Override
        public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
            int userId = request.getUserId();
            User user = usersDb.get(userId);

            if (user == null) {
                responseObserver.onError(Status.NOT_FOUND
                    .withDescription("User " + userId + " not found")
                    .asRuntimeException());
                return;
            }

            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();
        }

        @Override
        public void createUser(CreateUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
            if (request.getName().isEmpty() || request.getEmail().isEmpty()) {
                responseObserver.onError(Status.INVALID_ARGUMENT
                    .withDescription("Name and email are required")
                    .asRuntimeException());
                return;
            }

            int id = nextUserId.getAndIncrement();
            User user = User.newBuilder()
                .setId(id)
                .setName(request.getName())
                .setEmail(request.getEmail())
                .setIsActive(true)
                .build();

            usersDb.put(id, user);
            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();
        }

        @Override
        public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
            int page = request.getPage() > 0 ? request.getPage() : 1;
            int pageSize = request.getPageSize() > 0 ? request.getPageSize() : 10;
            List<User> users = new ArrayList<>(usersDb.values());
            int start = (page - 1) * pageSize;
            int end = Math.min(start + pageSize, users.size());

            for (int i = start; i < end; i++) {
                responseObserver.onNext(GetUserResponse.newBuilder().setUser(users.get(i)).build());
            }
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
            List<User> createdUsers = new ArrayList<>();

            return new StreamObserver<>() {
                @Override
                public void onNext(CreateUserRequest request) {
                    int id = nextUserId.getAndIncrement();
                    User user = User.newBuilder()
                        .setId(id)
                        .setName(request.getName())
                        .setEmail(request.getEmail())
                        .setIsActive(true)
                        .build();
                    createdUsers.add(user);
                    usersDb.put(id, user);
                }

                @Override
                public void onError(Throwable t) {
                    responseObserver.onError(Status.INTERNAL.withDescription("Stream error").asRuntimeException());
                }

                @Override
                public void onCompleted() {
                    responseObserver.onNext(ListUsersResponse.newBuilder()
                        .addAllUsers(createdUsers)
                        .setTotalCount(createdUsers.size())
                        .build());
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
            return new StreamObserver<>() {
                @Override
                public void onNext(SyncUserRequest request) {
                    int id = request.getUserId() > 0 ? request.getUserId() : nextUserId.getAndIncrement();
                    User user = User.newBuilder()
                        .setId(id).setName(request.getName()).setEmail(request.getEmail()).setIsActive(true)
                        .build();
                    usersDb.put(id, user);
                    responseObserver.onNext(SyncUserResponse.newBuilder()
                        .setUserId(id).setStatus("SYNCED")
                        .setMessage("User " + user.getName() + " synced successfully")
                        .build());
                }

                @Override public void onError(Throwable t) { System.err.println("Sync error: " + t.getMessage()); }
                @Override public void onCompleted() { responseObserver.onCompleted(); }
            };
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "50051"));
        Server server = ServerBuilder.forPort(port)
            .addService(new UserServiceImpl())
            .build()
            .start();

        System.out.println("gRPC server running on port " + port);
        server.awaitTermination();
    }
}
# src/server.py
import grpc
import users_pb2
import users_pb2_grpc
from concurrent import futures
import os

# Mock database
users_db: dict[int, dict] = {
    1: {"id": 1, "name": "Alice", "email": "alice@example.com", "is_active": True},
    2: {"id": 2, "name": "Bob", "email": "bob@example.com", "is_active": True},
}
next_user_id = 3


class UserServiceServicer(users_pb2_grpc.UserServiceServicer):

    def GetUser(self, request, context):
        user_data = users_db.get(request.user_id)
        if not user_data:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
        user = users_pb2.User(**user_data)
        return users_pb2.GetUserResponse(user=user)

    def CreateUser(self, request, context):
        global next_user_id
        if not request.name or not request.email:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Name and email are required")
        user_data = {"id": next_user_id, "name": request.name, "email": request.email, "is_active": True}
        users_db[next_user_id] = user_data
        next_user_id += 1
        return users_pb2.GetUserResponse(user=users_pb2.User(**user_data))

    def ListUsers(self, request, context):
        page = request.page or 1
        page_size = request.page_size or 10
        users = list(users_db.values())
        start = (page - 1) * page_size
        for u in users[start:start + page_size]:
            yield users_pb2.GetUserResponse(user=users_pb2.User(**u))

    def BatchCreateUsers(self, request_iterator, context):
        global next_user_id
        created = []
        for req in request_iterator:
            user_data = {"id": next_user_id, "name": req.name, "email": req.email, "is_active": True}
            users_db[next_user_id] = user_data
            next_user_id += 1
            created.append(users_pb2.User(**user_data))
        return users_pb2.ListUsersResponse(users=created, total_count=len(created))

    def SyncUsers(self, request_iterator, context):
        global next_user_id
        for req in request_iterator:
            uid = req.user_id or next_user_id
            user_data = {"id": uid, "name": req.name, "email": req.email, "is_active": True}
            users_db[uid] = user_data
            if uid == next_user_id:
                next_user_id += 1
            yield users_pb2.SyncUserResponse(
                user_id=uid, status="SYNCED",
                message=f"User {req.name} synced successfully"
            )

    def DeleteUser(self, request, context):
        if request.user_id in users_db:
            del users_db[request.user_id]
            return users_pb2.Empty()
        context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")


def serve():
    port = os.getenv("PORT", "50051")
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
    server.add_insecure_port(f"0.0.0.0:{port}")
    server.start()
    print(f"gRPC server running on port {port}")
    server.wait_for_termination()


if __name__ == "__main__":
    serve()
// src/Services/UserService.cs
using Grpc.Core;
using Users;

namespace GrpcServer.Services;

public class UserServiceImpl : UserService.UserServiceBase
{
    private static readonly Dictionary<int, User> UsersDb = new()
    {
        [1] = new User { Id = 1, Name = "Alice", Email = "alice@example.com", IsActive = true },
        [2] = new User { Id = 2, Name = "Bob", Email = "bob@example.com", IsActive = true },
    };
    private static int _nextUserId = 3;

    public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
    {
        if (!UsersDb.TryGetValue(request.UserId, out var user))
            throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));

        return Task.FromResult(new GetUserResponse { User = user });
    }

    public override Task<GetUserResponse> CreateUser(CreateUserRequest request, ServerCallContext context)
    {
        if (string.IsNullOrEmpty(request.Name) || string.IsNullOrEmpty(request.Email))
            throw new RpcException(new Status(StatusCode.InvalidArgument, "Name and email are required"));

        var user = new User { Id = _nextUserId++, Name = request.Name, Email = request.Email, IsActive = true };
        UsersDb[user.Id] = user;
        return Task.FromResult(new GetUserResponse { User = user });
    }

    public override async Task ListUsers(ListUsersRequest request, IServerStreamWriter<GetUserResponse> responseStream, ServerCallContext context)
    {
        int page = request.Page > 0 ? request.Page : 1;
        int pageSize = request.PageSize > 0 ? request.PageSize : 10;
        var users = UsersDb.Values.Skip((page - 1) * pageSize).Take(pageSize);

        foreach (var user in users)
            await responseStream.WriteAsync(new GetUserResponse { User = user });
    }

    public override async Task<ListUsersResponse> BatchCreateUsers(
        IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
    {
        var created = new List<User>();
        await foreach (var req in requestStream.ReadAllAsync())
        {
            var user = new User { Id = _nextUserId++, Name = req.Name, Email = req.Email, IsActive = true };
            UsersDb[user.Id] = user;
            created.Add(user);
        }
        var response = new ListUsersResponse { TotalCount = created.Count };
        response.Users.AddRange(created);
        return response;
    }

    public override async Task SyncUsers(
        IAsyncStreamReader<SyncUserRequest> requestStream,
        IServerStreamWriter<SyncUserResponse> responseStream,
        ServerCallContext context)
    {
        await foreach (var req in requestStream.ReadAllAsync())
        {
            int uid = req.UserId > 0 ? req.UserId : _nextUserId++;
            UsersDb[uid] = new User { Id = uid, Name = req.Name, Email = req.Email, IsActive = true };
            await responseStream.WriteAsync(new SyncUserResponse
            {
                UserId = uid, Status = "SYNCED",
                Message = $"User {req.Name} synced successfully"
            });
        }
    }

    public override Task<Empty> DeleteUser(DeleteUserRequest request, ServerCallContext context)
    {
        if (!UsersDb.Remove(request.UserId))
            throw new RpcException(new Status(StatusCode.NotFound, $"User {request.UserId} not found"));

        return Task.FromResult(new Empty());
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
int port = int.Parse(Environment.GetEnvironmentVariable("PORT") ?? "50051");
app.Run($"http://0.0.0.0:{port}");

การใช้งาน gRPC Client

การใช้งาน Client พื้นฐาน

// src/client.ts
import * as grpc from '@grpc/grpc-js';
import { UserServiceClient } from './generated/user_service';

class UserServiceClient {
  private client: UserServiceClient;

  constructor(address: string = 'localhost:50051') {
    this.client = new UserServiceClient(
      address,
      grpc.credentials.createInsecure()
    );
  }

  // Unary call
  async getUser(userId: number): Promise<any> {
    return new Promise((resolve, reject) => {
      this.client.getUser({ userId }, (err, response) => {
        if (err) reject(err);
        else resolve(response?.user);
      });
    });
  }

  // Server streaming
  async listUsers(page: number = 1, pageSize: number = 10): Promise<any[]> {
    return new Promise((resolve, reject) => {
      const users: any[] = [];
      const call = this.client.listUsers({ page, pageSize });

      call.on('data', (response) => {
        users.push(response.user);
      });

      call.on('end', () => {
        resolve(users);
      });

      call.on('error', reject);
    });
  }

  // Client streaming
  async batchCreateUsers(requests: any[]): Promise<any> {
    return new Promise((resolve, reject) => {
      const call = this.client.batchCreateUsers((err, response) => {
        if (err) reject(err);
        else resolve(response);
      });

      for (const request of requests) {
        call.write(request);
      }

      call.end();
    });
  }

  // Bidirectional streaming
  async syncUsers(userUpdates: any[]): Promise<void> {
    return new Promise((resolve, reject) => {
      const call = this.client.syncUsers();

      call.on('data', (response) => {
        console.log('Sync response:', response);
      });

      call.on('end', resolve);
      call.on('error', reject);

      for (const update of userUpdates) {
        call.write(update);
      }

      call.end();
    });
  }

  close() {
    grpc.closeClient(this.client);
  }
}

// Usage
async function main() {
  const client = new UserServiceClient();

  try {
    const user = await client.getUser(1);
    console.log('User:', user);

    const users = await client.listUsers(1, 5);
    console.log('Users:', users);
  } catch (error) {
    console.error('Error:', error);
  } finally {
    client.close();
  }
}

main();
// src/main/java/com/example/grpc/UserServiceClientExample.java
package com.example.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class UserServiceClientExample {

    private final ManagedChannel channel;
    private final UserServiceGrpc.UserServiceBlockingStub blockingStub;
    private final UserServiceGrpc.UserServiceStub asyncStub;

    public UserServiceClientExample(String address) {
        channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
        blockingStub = UserServiceGrpc.newBlockingStub(channel);
        asyncStub = UserServiceGrpc.newStub(channel);
    }

    // Unary call
    public User getUser(int userId) {
        GetUserResponse response = blockingStub.getUser(
            GetUserRequest.newBuilder().setUserId(userId).build()
        );
        return response.getUser();
    }

    // Server streaming
    public List<User> listUsers(int page, int pageSize) {
        List<User> users = new ArrayList<>();
        blockingStub.listUsers(ListUsersRequest.newBuilder().setPage(page).setPageSize(pageSize).build())
            .forEachRemaining(r -> users.add(r.getUser()));
        return users;
    }

    // Client streaming
    public ListUsersResponse batchCreateUsers(List<CreateUserRequest> requests) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        List<ListUsersResponse> result = new ArrayList<>();

        StreamObserver<CreateUserRequest> requestObserver = asyncStub.batchCreateUsers(
            new StreamObserver<>() {
                @Override public void onNext(ListUsersResponse r) { result.add(r); }
                @Override public void onError(Throwable t) { latch.countDown(); }
                @Override public void onCompleted() { latch.countDown(); }
            }
        );

        for (CreateUserRequest req : requests) requestObserver.onNext(req);
        requestObserver.onCompleted();
        latch.await(30, TimeUnit.SECONDS);
        return result.isEmpty() ? null : result.get(0);
    }

    // Bidirectional streaming
    public void syncUsers(List<SyncUserRequest> updates) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        StreamObserver<SyncUserRequest> requestObserver = asyncStub.syncUsers(
            new StreamObserver<>() {
                @Override public void onNext(SyncUserResponse r) { System.out.println("Sync response: " + r); }
                @Override public void onError(Throwable t) { latch.countDown(); }
                @Override public void onCompleted() { latch.countDown(); }
            }
        );

        for (SyncUserRequest update : updates) requestObserver.onNext(update);
        requestObserver.onCompleted();
        latch.await(30, TimeUnit.SECONDS);
    }

    public void close() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException {
        UserServiceClientExample client = new UserServiceClientExample("localhost:50051");
        try {
            User user = client.getUser(1);
            System.out.println("User: " + user);

            List<User> users = client.listUsers(1, 5);
            System.out.println("Users: " + users);
        } finally {
            client.close();
        }
    }
}
# src/client.py
import grpc
import users_pb2
import users_pb2_grpc
from typing import Iterator


class UserServiceClient:
    def __init__(self, address: str = "localhost:50051"):
        self.channel = grpc.insecure_channel(address)
        self.stub = users_pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: int):
        response = self.stub.GetUser(users_pb2.GetUserRequest(user_id=user_id))
        return response.user

    def list_users(self, page: int = 1, page_size: int = 10) -> list:
        users = []
        for response in self.stub.ListUsers(
            users_pb2.ListUsersRequest(page=page, page_size=page_size)
        ):
            users.append(response.user)
        return users

    def batch_create_users(self, requests: list[dict]):
        def request_iterator() -> Iterator[users_pb2.CreateUserRequest]:
            for r in requests:
                yield users_pb2.CreateUserRequest(name=r["name"], email=r["email"])

        return self.stub.BatchCreateUsers(request_iterator())

    def sync_users(self, updates: list[dict]) -> None:
        def request_iterator() -> Iterator[users_pb2.SyncUserRequest]:
            for u in updates:
                yield users_pb2.SyncUserRequest(
                    user_id=u.get("user_id", 0), name=u["name"], email=u["email"]
                )

        for response in self.stub.SyncUsers(request_iterator()):
            print("Sync response:", response)

    def close(self):
        self.channel.close()


def main():
    client = UserServiceClient()
    try:
        user = client.get_user(1)
        print("User:", user)

        users = client.list_users(1, 5)
        print("Users:", users)
    except grpc.RpcError as e:
        print("Error:", e)
    finally:
        client.close()


if __name__ == "__main__":
    main()
// src/Clients/UserServiceClient.cs
using Grpc.Net.Client;
using Users;

namespace GrpcClient;

public class UserServiceClientExample : IDisposable
{
    private readonly GrpcChannel _channel;
    private readonly UserService.UserServiceClient _client;

    public UserServiceClientExample(string address = "http://localhost:50051")
    {
        _channel = GrpcChannel.ForAddress(address);
        _client = new UserService.UserServiceClient(_channel);
    }

    // Unary call
    public async Task<User> GetUserAsync(int userId)
    {
        var response = await _client.GetUserAsync(new GetUserRequest { UserId = userId });
        return response.User;
    }

    // Server streaming
    public async Task<List<User>> ListUsersAsync(int page = 1, int pageSize = 10)
    {
        var users = new List<User>();
        using var call = _client.ListUsers(new ListUsersRequest { Page = page, PageSize = pageSize });
        await foreach (var response in call.ResponseStream.ReadAllAsync())
            users.Add(response.User);
        return users;
    }

    // Client streaming
    public async Task<ListUsersResponse> BatchCreateUsersAsync(IEnumerable<(string Name, string Email)> requests)
    {
        using var call = _client.BatchCreateUsers();
        foreach (var (name, email) in requests)
            await call.RequestStream.WriteAsync(new CreateUserRequest { Name = name, Email = email });
        await call.RequestStream.CompleteAsync();
        return await call.ResponseAsync;
    }

    // Bidirectional streaming
    public async Task SyncUsersAsync(IEnumerable<(int UserId, string Name, string Email)> updates)
    {
        using var call = _client.SyncUsers();
        var readTask = Task.Run(async () =>
        {
            await foreach (var response in call.ResponseStream.ReadAllAsync())
                Console.WriteLine($"Sync response: {response}");
        });

        foreach (var (userId, name, email) in updates)
            await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = userId, Name = name, Email = email });

        await call.RequestStream.CompleteAsync();
        await readTask;
    }

    public void Dispose() => _channel.Dispose();
}

// Program.cs usage
using var client = new UserServiceClientExample();

var user = await client.GetUserAsync(1);
Console.WriteLine($"User: {user}");

var users = await client.ListUsersAsync(1, 5);
Console.WriteLine($"Users: {users.Count}");

รูปแบบการสื่อสารสี่แบบ

1. Unary RPC (Single Request/Response)

รูปแบบที่พบบ่อยที่สุด เทียบเท่ากับการเรียกใช้ฟังก์ชันแบบดั้งเดิม:

// Handler
async getUser(call, callback) {
  const user = await fetchUser(call.request.userId);
  callback(null, { user });
}

// Client
const user = await client.getUser({ userId: 1 });
// Handler
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
    User user = fetchUser(request.getUserId());
    responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
    responseObserver.onCompleted();
}

// Client (blocking stub)
GetUserResponse response = blockingStub.getUser(
    GetUserRequest.newBuilder().setUserId(1).build()
);
User user = response.getUser();
# Handler
def GetUser(self, request, context):
    user = fetch_user(request.user_id)
    return GetUserResponse(user=user)

# Client
response = stub.GetUser(GetUserRequest(user_id=1))
user = response.user
// Handler
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
    var user = FetchUser(request.UserId);
    return Task.FromResult(new GetUserResponse { User = user });
}

// Client
var response = await client.GetUserAsync(new GetUserRequest { UserId = 1 });
var user = response.User;

Use Cases: การดึงทรัพยากรเดียว, การสร้างทรัพยากรพร้อมการตรวจสอบความถูกต้อง, การอัปเดตอย่างง่าย

2. Server Streaming RPC

เซิร์ฟเวอร์ส่งการตอบสนองหลายครั้งไปยังคำขอไคลเอนต์เดียว:

// Handler
async listUsers(call) {
  const users = await fetchAllUsers();
  
  for (const user of users) {
    call.write({ user });
  }
  
  call.end();
}

// Client
const call = client.listUsers({ pageSize: 10 });

call.on('data', (response) => {
  console.log('Received user:', response.user);
});

call.on('end', () => {
  console.log('Stream complete');
});
// Handler
@Override
public void listUsers(ListUsersRequest request, StreamObserver<GetUserResponse> responseObserver) {
    List<User> users = fetchAllUsers();
    for (User user : users) {
        responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
    }
    responseObserver.onCompleted();
}

// Client (blocking iterator)
Iterator<GetUserResponse> responses = blockingStub.listUsers(
    ListUsersRequest.newBuilder().setPageSize(10).build()
);
while (responses.hasNext()) {
    System.out.println("Received user: " + responses.next().getUser());
}
System.out.println("Stream complete");
# Handler
def ListUsers(self, request, context):
    users = fetch_all_users()
    for user in users:
        yield GetUserResponse(user=user)

# Client
for response in stub.ListUsers(ListUsersRequest(page_size=10)):
    print("Received user:", response.user)
print("Stream complete")
// Handler
public override async Task ListUsers(
    ListUsersRequest request,
    IServerStreamWriter<GetUserResponse> responseStream,
    ServerCallContext context)
{
    var users = await FetchAllUsersAsync();
    foreach (var user in users)
        await responseStream.WriteAsync(new GetUserResponse { User = user });
}

// Client
using var call = client.ListUsers(new ListUsersRequest { PageSize = 10 });
await foreach (var response in call.ResponseStream.ReadAllAsync())
    Console.WriteLine($"Received user: {response.User}");
Console.WriteLine("Stream complete");

Use Cases: Pagination, ฟีดข้อมูล real-time, การส่งออกชุดข้อมูลขนาดใหญ่, log streaming

3. Client Streaming RPC

ไคลเอนต์ส่งคำขอหลายครั้งไปยังการตอบสนองเซิร์ฟเวอร์เดียว:

// Handler
async batchCreateUsers(call, callback) {
  const users = [];
  
  call.on('data', (request) => {
    users.push(request);
  });
  
  call.on('end', () => {
    const saved = saveUsers(users);
    callback(null, { users: saved });
  });
}

// Client
const call = client.batchCreateUsers((err, response) => {
  console.log('All users created:', response.users);
});

call.write({ name: 'Alice', email: 'alice@example.com' });
call.write({ name: 'Bob', email: 'bob@example.com' });
call.end();
// Handler
@Override
public StreamObserver<CreateUserRequest> batchCreateUsers(StreamObserver<ListUsersResponse> responseObserver) {
    List<User> users = new ArrayList<>();
    return new StreamObserver<>() {
        @Override public void onNext(CreateUserRequest req) { users.add(saveUser(req)); }
        @Override public void onError(Throwable t) { responseObserver.onError(t); }
        @Override public void onCompleted() {
            ListUsersResponse resp = ListUsersResponse.newBuilder()
                .addAllUsers(users).setTotalCount(users.size()).build();
            responseObserver.onNext(resp);
            responseObserver.onCompleted();
        }
    };
}

// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<CreateUserRequest> req = asyncStub.batchCreateUsers(new StreamObserver<>() {
    @Override public void onNext(ListUsersResponse r) { System.out.println("All users created: " + r.getUsersList()); }
    @Override public void onError(Throwable t) { latch.countDown(); }
    @Override public void onCompleted() { latch.countDown(); }
});
req.onNext(CreateUserRequest.newBuilder().setName("Alice").setEmail("alice@example.com").build());
req.onNext(CreateUserRequest.newBuilder().setName("Bob").setEmail("bob@example.com").build());
req.onCompleted();
latch.await();
# Handler
def BatchCreateUsers(self, request_iterator, context):
    users = []
    for request in request_iterator:
        users.append(save_user(request))
    return ListUsersResponse(users=users, total_count=len(users))

# Client
def request_iterator():
    yield CreateUserRequest(name="Alice", email="alice@example.com")
    yield CreateUserRequest(name="Bob", email="bob@example.com")

response = stub.BatchCreateUsers(request_iterator())
print("All users created:", list(response.users))
// Handler
public override async Task<ListUsersResponse> BatchCreateUsers(
    IAsyncStreamReader<CreateUserRequest> requestStream, ServerCallContext context)
{
    var users = new List<User>();
    await foreach (var req in requestStream.ReadAllAsync())
        users.Add(SaveUser(req));

    var response = new ListUsersResponse { TotalCount = users.Count };
    response.Users.AddRange(users);
    return response;
}

// Client
using var call = client.BatchCreateUsers();
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Alice", Email = "alice@example.com" });
await call.RequestStream.WriteAsync(new CreateUserRequest { Name = "Bob", Email = "bob@example.com" });
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
Console.WriteLine($"All users created: {response.Users.Count}");

Use Cases: การนำเข้าเป็นกลุ่มใหญ่, การอัปโหลดไฟล์, การประมวลผลเป็นกลุ่ม, การซิงค์ข้อมูล

4. Bidirectional Streaming RPC

ทั้งไคลเอนต์และเซิร์ฟเวอร์ส่งข้อความหลายครั้งผ่านการเชื่อมต่อที่ยั่งยืน:

// Handler
async syncUsers(call) {
  call.on('data', (request) => {
    const synced = syncUser(request);
    call.write({
      userId: synced.id,
      status: 'SYNCED',
    });
  });
  
  call.on('end', () => {
    call.end();
  });
}

// Client
const call = client.syncUsers();

call.on('data', (response) => {
  console.log('Sync result:', response);
});

call.write({ userId: 1, name: 'Alice Updated' });
call.write({ userId: 2, name: 'Bob Updated' });
call.end();
// Handler
@Override
public StreamObserver<SyncUserRequest> syncUsers(StreamObserver<SyncUserResponse> responseObserver) {
    return new StreamObserver<>() {
        @Override
        public void onNext(SyncUserRequest req) {
            User synced = syncUser(req);
            responseObserver.onNext(SyncUserResponse.newBuilder()
                .setUserId(synced.getId()).setStatus("SYNCED").build());
        }
        @Override public void onError(Throwable t) { responseObserver.onError(t); }
        @Override public void onCompleted() { responseObserver.onCompleted(); }
    };
}

// Client (async stub)
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<SyncUserRequest> req = asyncStub.syncUsers(new StreamObserver<>() {
    @Override public void onNext(SyncUserResponse r) { System.out.println("Sync result: " + r); }
    @Override public void onError(Throwable t) { latch.countDown(); }
    @Override public void onCompleted() { latch.countDown(); }
});
req.onNext(SyncUserRequest.newBuilder().setUserId(1).setName("Alice Updated").build());
req.onNext(SyncUserRequest.newBuilder().setUserId(2).setName("Bob Updated").build());
req.onCompleted();
latch.await();
# Handler
def SyncUsers(self, request_iterator, context):
    for request in request_iterator:
        synced = sync_user(request)
        yield SyncUserResponse(user_id=synced.id, status="SYNCED")

# Client
def request_iterator():
    yield SyncUserRequest(user_id=1, name="Alice Updated")
    yield SyncUserRequest(user_id=2, name="Bob Updated")

for response in stub.SyncUsers(request_iterator()):
    print("Sync result:", response)
// Handler
public override async Task SyncUsers(
    IAsyncStreamReader<SyncUserRequest> requestStream,
    IServerStreamWriter<SyncUserResponse> responseStream,
    ServerCallContext context)
{
    await foreach (var req in requestStream.ReadAllAsync())
    {
        var synced = SyncUser(req);
        await responseStream.WriteAsync(new SyncUserResponse { UserId = synced.Id, Status = "SYNCED" });
    }
}

// Client
using var call = client.SyncUsers();
var readTask = Task.Run(async () =>
{
    await foreach (var response in call.ResponseStream.ReadAllAsync())
        Console.WriteLine($"Sync result: {response}");
});

await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 1, Name = "Alice Updated" });
await call.RequestStream.WriteAsync(new SyncUserRequest { UserId = 2, Name = "Bob Updated" });
await call.RequestStream.CompleteAsync();
await readTask;

Use Cases: แอปพลิเคชัน Chat, การทำงานร่วมกัน real-time, การแจ้งเตือน bidirectional, การซิงค์สถานะเกม

ตัวสกัดกั้นและ Middleware

ตัวสกัดกั้นช่วยให้คุณสกัดกั้นและตรวจสอบการเรียก RPC, เปิดใช้งานการบันทึก, การตรวจสอบสิทธิ์, เมตริกส์ และการปรับเปลี่ยนคำขอ/การตอบสนอง

ตัวสกัดกั้นด้านเซิร์ฟเวอร์

// src/middleware/serverInterceptor.ts
import * as grpc from '@grpc/grpc-js';

interface CallMetadata {
  userId?: string;
  requestId?: string;
  startTime: number;
}

export function createServerInterceptor() {
  return (
    methodDescriptor: any,
    call: any
  ): grpc.Listener => {
    const metadata: CallMetadata = {
      requestId: call.metadata.get('x-request-id')?.[0] as string,
      userId: call.metadata.get('x-user-id')?.[0] as string,
      startTime: Date.now(),
    };

    // Log incoming call
    console.log('[gRPC Server]', {
      method: methodDescriptor.path,
      requestId: metadata.requestId,
      userId: metadata.userId,
    });

    // Wrap call handlers
    const originalSendMetadata = call.sendMetadata;
    call.sendMetadata = function(responseMetadata: grpc.Metadata) {
      responseMetadata.add('x-request-id', metadata.requestId || '');
      originalSendMetadata.call(this, responseMetadata);
    };

    return {
      onReceiveMetadata(metadata: grpc.Metadata, next: any) {
        next(metadata);
      },
      onReceiveMessage(message: any, next: any) {
        console.log('[gRPC Message]', {
          method: methodDescriptor.path,
          requestId: metadata.requestId,
          payload: JSON.stringify(message).substring(0, 200),
        });
        next(message);
      },
      onReceiveHalfClose(next: any) {
        next();
      },
      onCancel(next: any) {
        const duration = Date.now() - metadata.startTime;
        console.log('[gRPC Cancelled]', {
          method: methodDescriptor.path,
          requestId: metadata.requestId,
          duration,
        });
        next();
      },
    };
  };
}
// src/main/java/com/example/grpc/ServerLoggingInterceptor.java
package com.example.grpc;

import io.grpc.*;

public class ServerLoggingInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {

        String requestId = headers.get(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER));
        String userId = headers.get(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER));
        long startTime = System.currentTimeMillis();

        System.out.printf("[gRPC Server] method=%s requestId=%s userId=%s%n",
            call.getMethodDescriptor().getFullMethodName(), requestId, userId);

        ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
            @Override
            public void sendHeaders(Metadata responseHeaders) {
                if (requestId != null) {
                    responseHeaders.put(
                        Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
                }
                super.sendHeaders(responseHeaders);
            }
        };

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(next.startCall(wrappedCall, headers)) {
            @Override
            public void onMessage(ReqT message) {
                System.out.printf("[gRPC Message] method=%s requestId=%s payload=%s%n",
                    call.getMethodDescriptor().getFullMethodName(), requestId,
                    message.toString().substring(0, Math.min(200, message.toString().length())));
                super.onMessage(message);
            }

            @Override
            public void onCancel() {
                long duration = System.currentTimeMillis() - startTime;
                System.out.printf("[gRPC Cancelled] method=%s requestId=%s duration=%dms%n",
                    call.getMethodDescriptor().getFullMethodName(), requestId, duration);
                super.onCancel();
            }
        };
    }
}
# src/interceptors/server_interceptor.py
import grpc
import time
import json
from grpc import ServerInterceptor


class LoggingServerInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        metadata = dict(handler_call_details.invocation_metadata)
        request_id = metadata.get("x-request-id")
        user_id = metadata.get("x-user-id")
        start_time = time.time()

        print(f"[gRPC Server] method={method} requestId={request_id} userId={user_id}")

        handler = continuation(handler_call_details)
        if handler is None:
            return None

        def wrap_unary(behavior):
            def new_behavior(request, context):
                try:
                    payload = str(request)[:200]
                    print(f"[gRPC Message] method={method} requestId={request_id} payload={payload}")
                    return behavior(request, context)
                except Exception as e:
                    duration = (time.time() - start_time) * 1000
                    print(f"[gRPC Error] method={method} requestId={request_id} duration={duration:.0f}ms error={e}")
                    raise
            return new_behavior

        if handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(
                wrap_unary(handler.unary_unary),
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        return handler
// src/Interceptors/ServerLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;

namespace GrpcServer.Interceptors;

public class ServerLoggingInterceptor : Interceptor
{
    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var requestId = context.RequestHeaders.GetValue("x-request-id");
        var userId = context.RequestHeaders.GetValue("x-user-id");
        var startTime = DateTimeOffset.UtcNow;

        Console.WriteLine($"[gRPC Server] method={context.Method} requestId={requestId} userId={userId}");
        Console.WriteLine($"[gRPC Message] method={context.Method} requestId={requestId} payload={request?.ToString()?[..Math.Min(200, request.ToString()?.Length ?? 0)]}");

        context.ResponseTrailers.Add("x-request-id", requestId ?? string.Empty);

        try
        {
            return await continuation(request, context);
        }
        catch (Exception)
        {
            var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
            Console.WriteLine($"[gRPC Cancelled] method={context.Method} requestId={requestId} duration={duration:F0}ms");
            throw;
        }
    }
}

// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<ServerLoggingInterceptor>());

ตัวสกัดกั้นด้านไคลเอนต์

// src/middleware/clientInterceptor.ts
import * as grpc from '@grpc/grpc-js';

export interface ClientInterceptorOptions {
  requestId?: string;
  userId?: string;
  authToken?: string;
}

export function createClientInterceptor(options: ClientInterceptorOptions) {
  return (methodDescriptor: any, createCall: any) => {
    return (metadata: grpc.Metadata, listener: any, next: any) => {
      // Add custom metadata
      if (options.requestId) {
        metadata.add('x-request-id', options.requestId);
      }
      if (options.userId) {
        metadata.add('x-user-id', options.userId);
      }
      if (options.authToken) {
        metadata.add('authorization', `Bearer ${options.authToken}`);
      }

      console.log('[gRPC Client]', {
        method: methodDescriptor.path,
        requestId: options.requestId,
      });

      const startTime = Date.now();

      // Create listener wrapper to capture response
      const wrappedListener: any = {
        onReceiveMetadata(metadata: grpc.Metadata) {
          listener.onReceiveMetadata(metadata);
        },
        onReceiveMessage(message: any) {
          console.log('[gRPC Response]', {
            duration: Date.now() - startTime,
            message: JSON.stringify(message).substring(0, 200),
          });
          listener.onReceiveMessage(message);
        },
        onReceiveStatus(status: grpc.Status) {
          const duration = Date.now() - startTime;
          console.log('[gRPC Status]', {
            code: status.code,
            details: status.details,
            duration,
          });
          listener.onReceiveStatus(status);
        },
      };

      return next(metadata, wrappedListener);
    };
  };
}
// src/main/java/com/example/grpc/ClientLoggingInterceptor.java
package com.example.grpc;

import io.grpc.*;

public class ClientLoggingInterceptor implements ClientInterceptor {
    private final String requestId;
    private final String userId;
    private final String authToken;

    public ClientLoggingInterceptor(String requestId, String userId, String authToken) {
        this.requestId = requestId;
        this.userId = userId;
        this.authToken = authToken;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
            MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {

        return new ForwardingClientCall.SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                if (requestId != null) headers.put(Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER), requestId);
                if (userId != null) headers.put(Metadata.Key.of("x-user-id", Metadata.ASCII_STRING_MARSHALLER), userId);
                if (authToken != null) headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), "Bearer " + authToken);

                System.out.printf("[gRPC Client] method=%s requestId=%s%n", method.getFullMethodName(), requestId);
                long startTime = System.currentTimeMillis();

                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
                    @Override
                    public void onMessage(RespT message) {
                        System.out.printf("[gRPC Response] duration=%dms message=%s%n",
                            System.currentTimeMillis() - startTime,
                            message.toString().substring(0, Math.min(200, message.toString().length())));
                        super.onMessage(message);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        System.out.printf("[gRPC Status] code=%s details=%s duration=%dms%n",
                            status.getCode(), status.getDescription(), System.currentTimeMillis() - startTime);
                        super.onClose(status, trailers);
                    }
                }, headers);
            }
        };
    }
}
// Usage: ManagedChannelBuilder.forTarget(addr).intercept(new ClientLoggingInterceptor(...)).build()
# src/interceptors/client_interceptor.py
import grpc
import time
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails


class ClientLoggingInterceptor(grpc.UnaryUnaryClientInterceptor,
                                grpc.UnaryStreamClientInterceptor):
    def __init__(self, request_id: str = None, user_id: str = None, auth_token: str = None):
        self.request_id = request_id
        self.user_id = user_id
        self.auth_token = auth_token

    def _add_metadata(self, client_call_details: ClientCallDetails):
        metadata = list(client_call_details.metadata or [])
        if self.request_id:
            metadata.append(("x-request-id", self.request_id))
        if self.user_id:
            metadata.append(("x-user-id", self.user_id))
        if self.auth_token:
            metadata.append(("authorization", f"Bearer {self.auth_token}"))
        return metadata

    def intercept_unary_unary(self, continuation, client_call_details, request):
        metadata = self._add_metadata(client_call_details)
        new_details = client_call_details._replace(metadata=metadata)
        print(f"[gRPC Client] method={client_call_details.method} requestId={self.request_id}")
        start = time.time()
        response = continuation(new_details, request)
        duration = (time.time() - start) * 1000
        print(f"[gRPC Response] duration={duration:.0f}ms")
        return response

    def intercept_unary_stream(self, continuation, client_call_details, request):
        metadata = self._add_metadata(client_call_details)
        new_details = client_call_details._replace(metadata=metadata)
        return continuation(new_details, request)

# Usage:
# channel = grpc.intercept_channel(
#     grpc.insecure_channel("localhost:50051"),
#     ClientLoggingInterceptor(request_id="abc", auth_token="token")
# )
// src/Interceptors/ClientLoggingInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;

namespace GrpcClient.Interceptors;

public class ClientLoggingInterceptor : Interceptor
{
    private readonly string? _requestId;
    private readonly string? _userId;
    private readonly string? _authToken;

    public ClientLoggingInterceptor(string? requestId = null, string? userId = null, string? authToken = null)
    {
        _requestId = requestId;
        _userId = userId;
        _authToken = authToken;
    }

    private Metadata AddHeaders(Metadata? headers = null)
    {
        var metadata = headers ?? new Metadata();
        if (_requestId is not null) metadata.Add("x-request-id", _requestId);
        if (_userId is not null) metadata.Add("x-user-id", _userId);
        if (_authToken is not null) metadata.Add("authorization", $"Bearer {_authToken}");
        return metadata;
    }

    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
        TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
        AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        var newContext = new ClientInterceptorContext<TRequest, TResponse>(
            context.Method, context.Host,
            context.Options.WithHeaders(AddHeaders(context.Options.Headers)));

        Console.WriteLine($"[gRPC Client] method={context.Method.FullName} requestId={_requestId}");
        var startTime = DateTimeOffset.UtcNow;
        var call = continuation(request, newContext);

        return new AsyncUnaryCall<TResponse>(
            call.ResponseAsync.ContinueWith(t =>
            {
                var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
                Console.WriteLine($"[gRPC Response] duration={duration:F0}ms");
                return t.Result;
            }),
            call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
    }
}

// Usage: channel.Intercept(new ClientLoggingInterceptor(requestId: "abc", authToken: "token"))

การจัดการข้อผิดพลาดด้วย gRPC Status Codes

gRPC กำหนดรหัสข้อผิดพลาดมาตรฐาน 16 รหัสสำหรับสถานการณ์ล้มเหลวต่างๆ:

// src/errors/grpcErrors.ts
import * as grpc from '@grpc/grpc-js';

export enum GrpcErrorCode {
  OK = grpc.status.OK,
  CANCELLED = grpc.status.CANCELLED,
  UNKNOWN = grpc.status.UNKNOWN,
  INVALID_ARGUMENT = grpc.status.INVALID_ARGUMENT,
  DEADLINE_EXCEEDED = grpc.status.DEADLINE_EXCEEDED,
  NOT_FOUND = grpc.status.NOT_FOUND,
  ALREADY_EXISTS = grpc.status.ALREADY_EXISTS,
  PERMISSION_DENIED = grpc.status.PERMISSION_DENIED,
  RESOURCE_EXHAUSTED = grpc.status.RESOURCE_EXHAUSTED,
  FAILED_PRECONDITION = grpc.status.FAILED_PRECONDITION,
  ABORTED = grpc.status.ABORTED,
  OUT_OF_RANGE = grpc.status.OUT_OF_RANGE,
  UNIMPLEMENTED = grpc.status.UNIMPLEMENTED,
  INTERNAL = grpc.status.INTERNAL,
  UNAVAILABLE = grpc.status.UNAVAILABLE,
  DATA_LOSS = grpc.status.DATA_LOSS,
  UNAUTHENTICATED = grpc.status.UNAUTHENTICATED,
}

export class GrpcError extends Error {
  constructor(
    public code: GrpcErrorCode,
    message: string,
    public metadata?: grpc.Metadata
  ) {
    super(message);
    this.name = 'GrpcError';
  }
}

// Handler example with proper error responses
async function getUserHandler(
  call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
  callback: grpc.sendUnaryData<GetUserResponse>
) {
  try {
    const { userId } = call.request;

    if (!userId || userId <= 0) {
      return callback({
        code: grpc.status.INVALID_ARGUMENT,
        message: 'User ID must be a positive integer',
      });
    }

    const user = await fetchUser(userId);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: `User ${userId} not found`,
      });
    }

    callback(null, { user });
  } catch (error) {
    console.error('Unexpected error:', error);
    callback({
      code: grpc.status.INTERNAL,
      message: 'An unexpected error occurred',
    });
  }
}

// Status code guide
const statusCodeGuide = {
  [grpc.status.INVALID_ARGUMENT]: 'Client sent invalid argument',
  [grpc.status.NOT_FOUND]: 'Requested resource does not exist',
  [grpc.status.ALREADY_EXISTS]: 'Resource already exists',
  [grpc.status.PERMISSION_DENIED]: 'Client lacks permission',
  [grpc.status.RESOURCE_EXHAUSTED]: 'Server resource exhausted',
  [grpc.status.DEADLINE_EXCEEDED]: 'Operation deadline exceeded',
  [grpc.status.UNAUTHENTICATED]: 'Request authentication failed',
  [grpc.status.UNAVAILABLE]: 'Service temporarily unavailable',
  [grpc.status.INTERNAL]: 'Internal server error',
};
// src/main/java/com/example/grpc/UserServiceWithErrorHandling.java
package com.example.grpc;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

public class UserServiceWithErrorHandling extends UserServiceGrpc.UserServiceImplBase {

    @Override
    public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
        try {
            int userId = request.getUserId();

            if (userId <= 0) {
                responseObserver.onError(Status.INVALID_ARGUMENT
                    .withDescription("User ID must be a positive integer")
                    .asRuntimeException());
                return;
            }

            User user = fetchUser(userId);

            if (user == null) {
                responseObserver.onError(Status.NOT_FOUND
                    .withDescription("User " + userId + " not found")
                    .asRuntimeException());
                return;
            }

            responseObserver.onNext(GetUserResponse.newBuilder().setUser(user).build());
            responseObserver.onCompleted();

        } catch (Exception e) {
            System.err.println("Unexpected error: " + e.getMessage());
            responseObserver.onError(Status.INTERNAL
                .withDescription("An unexpected error occurred")
                .withCause(e)
                .asRuntimeException());
        }
    }

    // Status code guide (as constants)
    // Status.INVALID_ARGUMENT  -> Client sent invalid argument
    // Status.NOT_FOUND         -> Requested resource does not exist
    // Status.ALREADY_EXISTS    -> Resource already exists
    // Status.PERMISSION_DENIED -> Client lacks permission
    // Status.RESOURCE_EXHAUSTED-> Server resource exhausted
    // Status.DEADLINE_EXCEEDED -> Operation deadline exceeded
    // Status.UNAUTHENTICATED   -> Request authentication failed
    // Status.UNAVAILABLE       -> Service temporarily unavailable
    // Status.INTERNAL          -> Internal server error
}
# src/errors/grpc_errors.py
import grpc
from enum import IntEnum


class GrpcErrorCode(IntEnum):
    OK = grpc.StatusCode.OK.value[0]
    CANCELLED = grpc.StatusCode.CANCELLED.value[0]
    UNKNOWN = grpc.StatusCode.UNKNOWN.value[0]
    INVALID_ARGUMENT = grpc.StatusCode.INVALID_ARGUMENT.value[0]
    DEADLINE_EXCEEDED = grpc.StatusCode.DEADLINE_EXCEEDED.value[0]
    NOT_FOUND = grpc.StatusCode.NOT_FOUND.value[0]
    ALREADY_EXISTS = grpc.StatusCode.ALREADY_EXISTS.value[0]
    PERMISSION_DENIED = grpc.StatusCode.PERMISSION_DENIED.value[0]
    RESOURCE_EXHAUSTED = grpc.StatusCode.RESOURCE_EXHAUSTED.value[0]
    INTERNAL = grpc.StatusCode.INTERNAL.value[0]
    UNAVAILABLE = grpc.StatusCode.UNAVAILABLE.value[0]
    UNAUTHENTICATED = grpc.StatusCode.UNAUTHENTICATED.value[0]


# Handler with proper error responses
def get_user_handler(request, context):
    try:
        user_id = request.user_id

        if not user_id or user_id <= 0:
            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "User ID must be a positive integer")

        user = fetch_user(user_id)

        if not user:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

        return GetUserResponse(user=user)

    except grpc.RpcError:
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        context.abort(grpc.StatusCode.INTERNAL, "An unexpected error occurred")


STATUS_CODE_GUIDE = {
    grpc.StatusCode.INVALID_ARGUMENT: "Client sent invalid argument",
    grpc.StatusCode.NOT_FOUND: "Requested resource does not exist",
    grpc.StatusCode.ALREADY_EXISTS: "Resource already exists",
    grpc.StatusCode.PERMISSION_DENIED: "Client lacks permission",
    grpc.StatusCode.RESOURCE_EXHAUSTED: "Server resource exhausted",
    grpc.StatusCode.DEADLINE_EXCEEDED: "Operation deadline exceeded",
    grpc.StatusCode.UNAUTHENTICATED: "Request authentication failed",
    grpc.StatusCode.UNAVAILABLE: "Service temporarily unavailable",
    grpc.StatusCode.INTERNAL: "Internal server error",
}
// src/Errors/GrpcErrors.cs
using Grpc.Core;

namespace GrpcServer.Errors;

public static class GrpcErrorHandler
{
    // Handler with proper error responses
    public static async Task<GetUserResponse> GetUserHandler(GetUserRequest request, ServerCallContext context)
    {
        try
        {
            if (request.UserId <= 0)
                throw new RpcException(new Status(StatusCode.InvalidArgument,
                    "User ID must be a positive integer"));

            var user = await FetchUserAsync(request.UserId);

            if (user is null)
                throw new RpcException(new Status(StatusCode.NotFound,
                    $"User {request.UserId} not found"));

            return new GetUserResponse { User = user };
        }
        catch (RpcException) { throw; }
        catch (Exception e)
        {
            Console.Error.WriteLine($"Unexpected error: {e}");
            throw new RpcException(new Status(StatusCode.Internal, "An unexpected error occurred"));
        }
    }

    // Status code guide
    public static readonly Dictionary<StatusCode, string> StatusCodeGuide = new()
    {
        [StatusCode.InvalidArgument]  = "Client sent invalid argument",
        [StatusCode.NotFound]         = "Requested resource does not exist",
        [StatusCode.AlreadyExists]    = "Resource already exists",
        [StatusCode.PermissionDenied] = "Client lacks permission",
        [StatusCode.ResourceExhausted]= "Server resource exhausted",
        [StatusCode.DeadlineExceeded] = "Operation deadline exceeded",
        [StatusCode.Unauthenticated]  = "Request authentication failed",
        [StatusCode.Unavailable]      = "Service temporarily unavailable",
        [StatusCode.Internal]         = "Internal server error",
    };
}

Deadlines และ Timeout Propagation

Deadlines รับประกันว่าการดำเนินการจะเสร็จสิ้นภายในกรอบเวลาที่ระบุ gRPC จะแพร่กระจาย deadlines อัตโนมัติในการเรียกบริการ:

// Client setting a deadline
async function getUserWithTimeout(userId: number) {
  const deadline = new Date();
  deadline.setSeconds(deadline.getSeconds() + 5); // 5-second timeout

  return new Promise((resolve, reject) => {
    const client = new UserServiceClient('localhost:50051');
    const call = client.getUser(
      { userId },
      { deadline },
      (err, response) => {
        if (err) {
          if (err.code === grpc.status.DEADLINE_EXCEEDED) {
            reject(new Error('Request exceeded 5-second deadline'));
          } else {
            reject(err);
          }
        } else {
          resolve(response?.user);
        }
      }
    );
  });
}

// Server checking deadline
async function getUser(
  call: grpc.ServerUnaryCall<GetUserRequest, GetUserResponse>,
  callback: grpc.sendUnaryData<GetUserResponse>
) {
  const deadline = call.deadline;
  const timeRemaining = deadline.getTime() - Date.now();

  if (timeRemaining < 1000) {
    // Less than 1 second remaining
    return callback({
      code: grpc.status.DEADLINE_EXCEEDED,
      message: 'Insufficient time to process request',
    });
  }

  // ... process request with knowledge of remaining time
}
// Client setting a deadline
import io.grpc.Deadline;
import java.util.concurrent.TimeUnit;

public User getUserWithTimeout(int userId) {
    try {
        return blockingStub
            .withDeadline(Deadline.after(5, TimeUnit.SECONDS))
            .getUser(GetUserRequest.newBuilder().setUserId(userId).build())
            .getUser();
    } catch (io.grpc.StatusRuntimeException e) {
        if (e.getStatus().getCode() == io.grpc.Status.Code.DEADLINE_EXCEEDED) {
            throw new RuntimeException("Request exceeded 5-second deadline", e);
        }
        throw e;
    }
}

// Server checking deadline
@Override
public void getUser(GetUserRequest request, StreamObserver<GetUserResponse> responseObserver) {
    // Check if context is already cancelled/deadline exceeded
    if (Context.current().isCancelled()) {
        responseObserver.onError(Status.DEADLINE_EXCEEDED
            .withDescription("Insufficient time to process request")
            .asRuntimeException());
        return;
    }
    // ... process request
}
# Client setting a deadline
def get_user_with_timeout(user_id: int):
    try:
        response = stub.GetUser(
            GetUserRequest(user_id=user_id),
            timeout=5  # 5-second timeout
        )
        return response.user
    except grpc.RpcError as e:
        if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
            raise TimeoutError("Request exceeded 5-second deadline")
        raise

# Server checking deadline (via context)
def GetUser(self, request, context):
    # Check remaining time
    deadline = context.time_remaining()
    if deadline is not None and deadline < 1.0:  # Less than 1 second remaining
        context.abort(grpc.StatusCode.DEADLINE_EXCEEDED,
                      "Insufficient time to process request")
    # ... process request
// Client setting a deadline
public async Task<User> GetUserWithTimeoutAsync(int userId)
{
    try
    {
        var deadline = DateTime.UtcNow.AddSeconds(5); // 5-second timeout
        var response = await _client.GetUserAsync(
            new GetUserRequest { UserId = userId },
            deadline: deadline
        );
        return response.User;
    }
    catch (RpcException e) when (e.StatusCode == StatusCode.DeadlineExceeded)
    {
        throw new TimeoutException("Request exceeded 5-second deadline", e);
    }
}

// Server checking deadline
public override Task<GetUserResponse> GetUser(GetUserRequest request, ServerCallContext context)
{
    var deadline = context.Deadline;
    var timeRemaining = deadline - DateTime.UtcNow;

    if (timeRemaining < TimeSpan.FromSeconds(1))
    {
        throw new RpcException(new Status(StatusCode.DeadlineExceeded,
            "Insufficient time to process request"));
    }

    // ... process request with knowledge of remaining time
    return Task.FromResult(new GetUserResponse());
}

กลยุทธ์การปรับสมดุลโหลด

Client-Side Load Balancing

// src/loadbalancer/clientSideLoadBalancer.ts
import * as grpc from '@grpc/grpc-js';

interface ServerAddress {
  host: string;
  port: number;
}

export class RoundRobinLoadBalancer {
  private currentIndex = 0;
  private servers: ServerAddress[];

  constructor(servers: ServerAddress[]) {
    this.servers = servers;
  }

  getNextServer(): ServerAddress {
    const server = this.servers[this.currentIndex];
    this.currentIndex = (this.currentIndex + 1) % this.servers.length;
    return server;
  }

  createClient(): UserServiceClient {
    const { host, port } = this.getNextServer();
    return new UserServiceClient(
      `${host}:${port}`,
      grpc.credentials.createInsecure()
    );
  }
}

// Usage
const lb = new RoundRobinLoadBalancer([
  { host: 'localhost', port: 50051 },
  { host: 'localhost', port: 50052 },
  { host: 'localhost', port: 50053 },
]);

async function callUserService(userId: number) {
  const client = lb.createClient();
  try {
    return await client.getUser(userId);
  } finally {
    grpc.closeClient(client);
  }
}
// src/main/java/com/example/grpc/RoundRobinLoadBalancer.java
package com.example.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import users.UserServiceGrpc;
import users.UserServiceProto.*;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinLoadBalancer {

    record ServerAddress(String host, int port) {}

    private final List<ServerAddress> servers;
    private final AtomicInteger currentIndex = new AtomicInteger(0);

    public RoundRobinLoadBalancer(List<ServerAddress> servers) {
        this.servers = servers;
    }

    public ServerAddress getNextServer() {
        int index = currentIndex.getAndUpdate(i -> (i + 1) % servers.size());
        return servers.get(index);
    }

    public UserServiceGrpc.UserServiceBlockingStub createStub() {
        ServerAddress server = getNextServer();
        ManagedChannel channel = ManagedChannelBuilder
            .forAddress(server.host(), server.port())
            .usePlaintext()
            .build();
        return UserServiceGrpc.newBlockingStub(channel);
    }

    // Usage
    public static void main(String[] args) {
        RoundRobinLoadBalancer lb = new RoundRobinLoadBalancer(List.of(
            new ServerAddress("localhost", 50051),
            new ServerAddress("localhost", 50052),
            new ServerAddress("localhost", 50053)
        ));

        // gRPC also supports built-in round_robin via service config
        ManagedChannel channel = ManagedChannelBuilder
            .forTarget("dns:///user-service:50051")
            .defaultLoadBalancingPolicy("round_robin")
            .usePlaintext()
            .build();
    }
}
# src/loadbalancer/client_side_load_balancer.py
import grpc
import users_pb2_grpc
from dataclasses import dataclass
from itertools import cycle
from threading import Lock


@dataclass
class ServerAddress:
    host: str
    port: int


class RoundRobinLoadBalancer:
    def __init__(self, servers: list[ServerAddress]):
        self._servers = servers
        self._cycle = cycle(servers)
        self._lock = Lock()

    def get_next_server(self) -> ServerAddress:
        with self._lock:
            return next(self._cycle)

    def create_stub(self):
        server = self.get_next_server()
        channel = grpc.insecure_channel(f"{server.host}:{server.port}")
        return users_pb2_grpc.UserServiceStub(channel)


# Usage
lb = RoundRobinLoadBalancer([
    ServerAddress("localhost", 50051),
    ServerAddress("localhost", 50052),
    ServerAddress("localhost", 50053),
])

def call_user_service(user_id: int):
    stub = lb.create_stub()
    return stub.GetUser(users_pb2.GetUserRequest(user_id=user_id)).user

# gRPC also supports built-in round-robin via channel options:
# channel = grpc.insecure_channel(
#     "dns:///user-service:50051",
#     options=[("grpc.lb_policy_name", "round_robin")]
# )
// src/LoadBalancing/RoundRobinLoadBalancer.cs
using Grpc.Net.Client;
using Users;

namespace GrpcClient.LoadBalancing;

public class RoundRobinLoadBalancer
{
    private record ServerAddress(string Host, int Port);

    private readonly ServerAddress[] _servers;
    private int _currentIndex;

    public RoundRobinLoadBalancer(params (string Host, int Port)[] servers)
    {
        _servers = servers.Select(s => new ServerAddress(s.Host, s.Port)).ToArray();
    }

    private ServerAddress GetNextServer()
    {
        int index = Interlocked.Increment(ref _currentIndex) % _servers.Length;
        return _servers[Math.Abs(index)];
    }

    public UserService.UserServiceClient CreateClient()
    {
        var server = GetNextServer();
        var channel = GrpcChannel.ForAddress($"http://{server.Host}:{server.Port}");
        return new UserService.UserServiceClient(channel);
    }
}

// Usage
var lb = new RoundRobinLoadBalancer(
    ("localhost", 50051),
    ("localhost", 50052),
    ("localhost", 50053)
);

async Task<User> CallUserService(int userId)
{
    var client = lb.CreateClient();
    var response = await client.GetUserAsync(new GetUserRequest { UserId = userId });
    return response.User;
}

// gRPC also supports built-in round_robin via service config JSON on GrpcChannel

Proxy-Based Load Balancing

สำหรับสภาพแวดล้อมการผลิต ให้ใช้ dedicated reverse proxy:

# envoy.yaml
admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address:
      address: 127.0.0.1
      port_value: 9901

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 50051
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          access_log:
          - name: envoy.access_loggers.stdout
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
          route_config:
            name: local_route
            virtual_hosts:
            - name: backend
              domains: ["*"]
              routes:
              - match:
                  prefix: "/"
                route:
                  cluster: user_service
                  timeout: 5s

  clusters:
  - name: user_service
    connect_timeout: 1s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: user_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: service1.local
                port_value: 50051
        - endpoint:
            address:
              socket_address:
                address: service2.local
                port_value: 50051
        - endpoint:
            address:
              socket_address:
                address: service3.local
                port_value: 50051
    health_checks:
    - timeout: 1s
      interval: 10s
      unhealthy_threshold: 2
      healthy_threshold: 2
      grpc_health_check:
        service_name: grpc.health.v1.Health

gRPC กับ REST: การเปรียบเทียบ

ลักษณะgRPCREST
SerializationProtocol Buffers (binary)JSON (text)
TransportHTTP/2HTTP/1.1
Payload Size3-10x ขนาดเล็กกว่าใหญ่กว่า
Latencyต่ำกว่า (binary + multiplexing)สูงกว่า
Streamingดั้งเดิม, bidirectionalPolling หรือ WebSockets
Browser Supportต้องใช้ gRPC-Webดั้งเดิม
Debuggingต้องใช้เครื่องมือ (grpcurl)curl, browser
Learning Curveชันมากขึ้น (proto files)อ่อนโยน
Type Safetyการพิมพ์ที่แข็งแกร่งตรวจสอบ runtime
CachingจำกัดHTTP caching
Use CaseMicroservices, real-timePublic APIs, web apps

เลือก gRPC เมื่อ:

  • การสร้าง microservices ที่มีข้อกำหนด latency ที่เข้มงวด
  • การใช้งานการสื่อสาร bidirectional real-time
  • การให้บริการบริการ polyglot ที่ต้องการการพิมพ์ที่แข็งแกร่ง
  • การจัดการ throughput สูง ด้วยการเชื่อมต่อพร้อมกัน

เลือก REST เมื่อ:

  • การสร้าง public-facing APIs
  • การสนับสนุนไคลเอนต์เบราว์เซอร์
  • การให้ความสำคัญกับความเรียบง่ายและการค้นพบ
  • ต้องการ HTTP caching มาตรฐาน

Checklist การปรับใช้เพื่อการผลิต

ก่อนการปรับใช้

  • คำจำกัดความของ Protocol buffer มีการระบุเวอร์ชันและเอกสาร
  • อินเทอร์เฟซบริการกำหนดกรณีข้อผิดพลาดทั้งหมดด้วยรหัสสถานะที่เหมาะสม
  • Bidirectional streaming มี timeout/heartbeat mechanisms
  • Logging รวบรวม request ID, duration และรายละเอียดข้อผิดพลาด
  • Metrics (latency, error rate, throughput) ได้รับการ instrumented
  • Load tests ตรวจสอบประสิทธิภาพภายใต้โหลดที่คาดหวัง
  • Circuit breakers ป้องกันการล้มเหลวที่ชะลาช
  • TLS certificates ตั้งค่าสำหรับการเข้ารหัสการผลิต

TLS/SSL Configuration

// src/server-secure.ts
import * as fs from 'fs';
import * as grpc from '@grpc/grpc-js';

const credentials = grpc.ServerCredentials.createSsl(
  fs.readFileSync('./server.crt'),
  [
    {
      cert_chain: fs.readFileSync('./server.crt'),
      private_key: fs.readFileSync('./server.key'),
    },
  ],
  false
);

const server = new grpc.Server();
server.bindAsync('0.0.0.0:50051', credentials, (err) => {
  if (err) throw err;
  server.start();
  console.log('Secure gRPC server listening on port 50051');
});

// Client with TLS
const clientCreds = grpc.ChannelCredentials.createSsl(
  fs.readFileSync('./ca.crt')
);

const client = new UserServiceClient(
  'secure-grpc-service.example.com:50051',
  clientCreds
);
// src/main/java/com/example/grpc/SecureServer.java
import io.grpc.Server;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContext;

import java.io.File;

// Server with TLS
SslContext sslContext = GrpcSslContexts.forServer(
    new File("server.crt"),
    new File("server.key")
).build();

Server server = NettyServerBuilder.forPort(50051)
    .sslContext(sslContext)
    .addService(new UserServiceImpl())
    .build()
    .start();

System.out.println("Secure gRPC server listening on port 50051");

// Client with TLS
import io.grpc.netty.NettyChannelBuilder;

SslContext clientSsl = GrpcSslContexts.forClient()
    .trustManager(new File("ca.crt"))
    .build();

ManagedChannel channel = NettyChannelBuilder
    .forAddress("secure-grpc-service.example.com", 50051)
    .sslContext(clientSsl)
    .build();
# src/secure_server.py
import grpc
from concurrent import futures

with open("server.key", "rb") as f:
    private_key = f.read()
with open("server.crt", "rb") as f:
    certificate_chain = f.read()

server_credentials = grpc.ssl_server_credentials(
    [(private_key, certificate_chain)]
)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_secure_port("0.0.0.0:50051", server_credentials)
server.start()
print("Secure gRPC server listening on port 50051")

# Client with TLS
with open("ca.crt", "rb") as f:
    trusted_certs = f.read()

client_credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel(
    "secure-grpc-service.example.com:50051", client_credentials
)
stub = users_pb2_grpc.UserServiceStub(channel)
// Server with TLS in Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(options =>
{
    options.ListenAnyIP(50051, listenOptions =>
    {
        listenOptions.Protocols = HttpProtocols.Http2;
        listenOptions.UseHttps("server.pfx", "password");
    });
});

var app = builder.Build();
app.MapGrpcService<UserServiceImpl>();
app.Run();

// Client with TLS
using Grpc.Net.Client;

var channel = GrpcChannel.ForAddress("https://secure-grpc-service.example.com:50051",
    new GrpcChannelOptions
    {
        HttpHandler = new HttpClientHandler
        {
            // Optionally: provide custom CA certificate
            // ServerCertificateCustomValidationCallback = ...
        }
    });
var client = new UserService.UserServiceClient(channel);

Health Checks

// src/health.ts
import * as grpc from '@grpc/grpc-js';
import { Health } from './generated/health';

export const healthImpl: any = {
  check(
    call: grpc.ServerUnaryCall<any, any>,
    callback: grpc.sendUnaryData<any>
  ) {
    callback(null, { status: Health.ServingStatus.SERVING });
  },

  watch(call: grpc.ServerWritableStream<any, any>) {
    call.write({ status: Health.ServingStatus.SERVING });
    call.end();
  },
};
// src/main/java/com/example/grpc/HealthServiceImpl.java
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import io.grpc.protobuf.services.HealthStatusManager;

// Use built-in gRPC health service:
HealthStatusManager healthStatusManager = new HealthStatusManager();
Server server = ServerBuilder.forPort(50051)
    .addService(new UserServiceImpl())
    .addService(healthStatusManager.getHealthService())
    .build();

// Set status for a specific service
healthStatusManager.setStatus("users.UserService",
    HealthCheckResponse.ServingStatus.SERVING);
# src/health.py
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
from concurrent import futures
import grpc

health_servicer = health.HealthServicer()
health_servicer.set("users.UserService", health_pb2.HealthCheckResponse.SERVING)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
users_pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_insecure_port("0.0.0.0:50051")
server.start()
// src/Services/HealthService.cs
// ASP.NET Core has built-in gRPC health checks via Grpc.AspNetCore.HealthChecks

// In Program.cs:
builder.Services.AddGrpcHealthChecks()
    .AddCheck("UserService", () => HealthCheckResult.Healthy());

app.MapGrpcHealthChecksService();

// Or implement manually:
using Grpc.Health.V1;
using Grpc.HealthCheck;

public class HealthServiceImpl : Health.HealthBase
{
    public override Task<HealthCheckResponse> Check(
        HealthCheckRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HealthCheckResponse
        {
            Status = HealthCheckResponse.Types.ServingStatus.Serving
        });
    }

    public override async Task Watch(
        HealthCheckRequest request,
        IServerStreamWriter<HealthCheckResponse> responseStream,
        ServerCallContext context)
    {
        await responseStream.WriteAsync(new HealthCheckResponse
        {
            Status = HealthCheckResponse.Types.ServingStatus.Serving
        });
    }
}

Container and Orchestration

# Dockerfile
FROM node:20-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY src ./src
COPY generated ./generated

EXPOSE 50051 9090

HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=3 \
  CMD node -e "require('http').get('http://localhost:9090/health', (r) => {if (r.statusCode !== 200) throw new Error(r.statusCode)})"

CMD ["node", "src/server.ts"]
# kubernetes.yaml
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - name: grpc
    port: 50051
    targetPort: 50051
  - name: metrics
    port: 9090
    targetPort: 9090
  type: ClusterIP

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        ports:
        - name: grpc
          containerPort: 50051
        - name: metrics
          containerPort: 9090
        livenessProbe:
          grpc:
            port: 50051
          initialDelaySeconds: 10
          periodSeconds: 10
        readinessProbe:
          grpc:
            port: 50051
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi

Monitoring and Observability

// src/metrics.ts
import * as promClient from 'prom-client';
import * as grpc from '@grpc/grpc-js';

const grpcRequestDuration = new promClient.Histogram({
  name: 'grpc_request_duration_ms',
  help: 'Duration of gRPC requests in milliseconds',
  labelNames: ['method', 'status'],
  buckets: [1, 5, 10, 50, 100, 500, 1000, 5000],
});

const grpcRequestCount = new promClient.Counter({
  name: 'grpc_requests_total',
  help: 'Total gRPC requests',
  labelNames: ['method', 'status'],
});

export function metricsInterceptor() {
  return (methodDescriptor: any, createCall: any) => {
    return (metadata: grpc.Metadata, listener: any, next: any) => {
      const startTime = Date.now();
      const method = methodDescriptor.path;

      const wrappedListener: any = {
        onReceiveMetadata(metadata: grpc.Metadata) {
          listener.onReceiveMetadata(metadata);
        },
        onReceiveMessage(message: any) {
          listener.onReceiveMessage(message);
        },
        onReceiveStatus(status: grpc.Status) {
          const duration = Date.now() - startTime;
          const statusName = grpc.status[status.code];

          grpcRequestDuration
            .labels(method, statusName)
            .observe(duration);

          grpcRequestCount
            .labels(method, statusName)
            .inc();

          listener.onReceiveStatus(status);
        },
      };

      return next(metadata, wrappedListener);
    };
  };
}
// src/main/java/com/example/grpc/MetricsInterceptor.java
package com.example.grpc;

import io.grpc.*;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;

public class MetricsInterceptor implements ServerInterceptor {

    private final MeterRegistry registry;

    public MetricsInterceptor(MeterRegistry registry) {
        this.registry = registry;
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

        String method = call.getMethodDescriptor().getFullMethodName();
        long startTime = System.currentTimeMillis();

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
                next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
                    @Override
                    public void close(Status status, Metadata trailers) {
                        long duration = System.currentTimeMillis() - startTime;
                        String statusCode = status.getCode().name();

                        registry.timer("grpc.request.duration",
                            "method", method, "status", statusCode)
                            .record(duration, java.util.concurrent.TimeUnit.MILLISECONDS);

                        registry.counter("grpc.requests.total",
                            "method", method, "status", statusCode)
                            .increment();

                        super.close(status, trailers);
                    }
                }, headers)) {};
    }
}
# src/metrics.py
import time
from prometheus_client import Histogram, Counter

grpc_request_duration = Histogram(
    "grpc_request_duration_ms",
    "Duration of gRPC requests in milliseconds",
    ["method", "status"],
    buckets=[1, 5, 10, 50, 100, 500, 1000, 5000],
)

grpc_request_count = Counter(
    "grpc_requests_total",
    "Total gRPC requests",
    ["method", "status"],
)


class MetricsServerInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        handler = continuation(handler_call_details)
        if handler is None:
            return None

        def wrap_unary(behavior):
            def new_behavior(request, context):
                start = time.time()
                status = "OK"
                try:
                    result = behavior(request, context)
                    return result
                except Exception as e:
                    status = "INTERNAL"
                    raise
                finally:
                    duration = (time.time() - start) * 1000
                    grpc_request_duration.labels(method=method, status=status).observe(duration)
                    grpc_request_count.labels(method=method, status=status).inc()
            return new_behavior

        if handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(
                wrap_unary(handler.unary_unary),
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        return handler
// src/Interceptors/MetricsInterceptor.cs
using Grpc.Core;
using Grpc.Core.Interceptors;
using Prometheus;

namespace GrpcServer.Interceptors;

public class MetricsInterceptor : Interceptor
{
    private static readonly Histogram GrpcRequestDuration = Metrics.CreateHistogram(
        "grpc_request_duration_ms",
        "Duration of gRPC requests in milliseconds",
        new HistogramConfiguration
        {
            LabelNames = new[] { "method", "status" },
            Buckets = new double[] { 1, 5, 10, 50, 100, 500, 1000, 5000 },
        });

    private static readonly Counter GrpcRequestCount = Metrics.CreateCounter(
        "grpc_requests_total",
        "Total gRPC requests",
        new CounterConfiguration { LabelNames = new[] { "method", "status" } });

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var method = context.Method;
        var startTime = DateTimeOffset.UtcNow;
        var statusCode = "OK";

        try
        {
            var response = await continuation(request, context);
            return response;
        }
        catch (RpcException e)
        {
            statusCode = e.StatusCode.ToString();
            throw;
        }
        finally
        {
            var duration = (DateTimeOffset.UtcNow - startTime).TotalMilliseconds;
            GrpcRequestDuration.WithLabels(method, statusCode).Observe(duration);
            GrpcRequestCount.WithLabels(method, statusCode).Inc();
        }
    }
}

// Register in Program.cs:
// builder.Services.AddGrpc(o => o.Interceptors.Add<MetricsInterceptor>());

บทสรุป

gRPC คือเทคโนโลยีที่มีประสิทธิภาพสำหรับการสร้างระบบแบบกระจายอำนาจที่มีประสิทธิภาพสูง โดยการใช้ประโยชน์จาก Protocol Buffers และ HTTP/2 มันให้ประสิทธิภาพและประสบการณ์ผู้พัฒนาที่ยอดเยี่ยม ทำตามรูปแบบและแนวปฏิบัติในคำแนะนำนี้เพื่อสร้างบริการ gRPC ที่พร้อมสำหรับการผลิตและมีความน่าเชื่อถือ

ประเด็นหลัก:

  1. กำหนดสัญญาบริการที่ชัดเจนด้วยไฟล์ .proto
  2. เลือกรูปแบบการสื่อสารที่เหมาะสมสำหรับกรณีการใช้งาน
  3. ใช้การจัดการข้อผิดพลาดอย่างครอบคลุมด้วยรหัสสถานะ
  4. ใช้ตัวสกัดกั้นสำหรับข้อกังวลที่ตัดขวาง
  5. ตรวจสอบและสังเกตการณ์บริการการผลิตทั้งหมด
  6. ออกแบบสำหรับการเสื่อมลงอย่างสง่างามและโหมดความล้มเหลว

สำหรับข้อมูลเพิ่มเติม โปรดไปที่ official gRPC documentation


กลับไป: เทคโนโลยีการสื่อสารแบบเซิร์ฟเวอร์ต่อเซิร์ฟเวอร์

Comments powered by Giscus are not yet configured. Set PUBLIC_GISCUS_REPO_ID and PUBLIC_GISCUS_CATEGORY_ID in apps/web/.env to enable.

PV

เขียนโดย พลากร วรมงคล

Software Engineer Specialist ประสบการณ์กว่า 20 ปี เขียนเกี่ยวกับ Architecture, Performance และการสร้างระบบ Production

เพิ่มเติมเกี่ยวกับผม

บทความที่เกี่ยวข้อง