Transport Layer
This guide covers the transport layer in Swit, which provides a unified abstraction for managing HTTP and gRPC transports with service registration, lifecycle management, and error handling.
Overview
The transport layer (pkg/transport
) implements a pluggable architecture that allows services to register with multiple transport types seamlessly through a centralized coordination system. It provides unified lifecycle management, error handling, and service health monitoring across different transport protocols.
Architecture Components
NetworkTransport Interface
The base interface for all transport implementations:
type NetworkTransport interface {
Start(ctx context.Context) error // Start the transport server
Stop(ctx context.Context) error // Gracefully stop the transport
GetName() string // Returns transport name ("http" or "grpc")
GetAddress() string // Returns listening address
}
TransportCoordinator
Central coordinator managing multiple transport instances:
// Create transport coordinator
coordinator := transport.NewTransportCoordinator()
// Register transports
coordinator.Register(httpTransport)
coordinator.Register(grpcTransport)
// Start all transports
err := coordinator.Start(ctx)
// Stop all transports with timeout
err = coordinator.Stop(30 * time.Second)
Service Handler System
Unified service registration interface:
type TransportServiceHandler interface {
RegisterHTTP(router *gin.Engine) error // Register HTTP routes
RegisterGRPC(server *grpc.Server) error // Register gRPC services
GetMetadata() *HandlerMetadata // Service metadata
GetHealthEndpoint() string // Health check endpoint
IsHealthy(ctx context.Context) (*types.HealthStatus, error) // Health check
Initialize(ctx context.Context) error // Service initialization
Shutdown(ctx context.Context) error // Service shutdown
}
HTTP Transport
Basic HTTP Transport Setup
import (
"github.com/innovationmech/swit/pkg/transport"
)
// Create HTTP transport with default settings
httpTransport := transport.NewHTTPTransport(":8080")
// Create HTTP transport with custom configuration
httpConfig := &transport.HTTPTransportConfig{
Address: ":8080",
TestMode: false,
EnableReady: true,
}
httpTransport := transport.NewHTTPTransportWithConfig(httpConfig)
// Register with coordinator
coordinator.Register(httpTransport)
HTTP Service Implementation
type UserHTTPService struct {
userRepo *UserRepository
logger *zap.Logger
}
func (s *UserHTTPService) RegisterHTTP(router *gin.Engine) error {
v1 := router.Group("/api/v1")
{
users := v1.Group("/users")
{
users.GET("", s.getUsers)
users.POST("", s.createUser)
users.GET("/:id", s.getUser)
users.PUT("/:id", s.updateUser)
users.DELETE("/:id", s.deleteUser)
}
}
return nil
}
func (s *UserHTTPService) RegisterGRPC(server *grpc.Server) error {
// Not used for HTTP-only service
return nil
}
func (s *UserHTTPService) GetMetadata() *transport.HandlerMetadata {
return &transport.HandlerMetadata{
Name: "user-http-service",
Version: "v1.0.0",
Description: "User management HTTP API",
HealthEndpoint: "/api/v1/users/health",
Tags: []string{"users", "http", "api"},
Dependencies: []string{"database", "redis"},
}
}
func (s *UserHTTPService) IsHealthy(ctx context.Context) (*types.HealthStatus, error) {
// Check database connection
if err := s.userRepo.Ping(ctx); err != nil {
return &types.HealthStatus{
Status: "unhealthy",
Message: "Database connection failed",
}, err
}
return &types.HealthStatus{
Status: "healthy",
Message: "All dependencies are healthy",
}, nil
}
func (s *UserHTTPService) Initialize(ctx context.Context) error {
s.logger.Info("Initializing user HTTP service")
return s.userRepo.Initialize(ctx)
}
func (s *UserHTTPService) Shutdown(ctx context.Context) error {
s.logger.Info("Shutting down user HTTP service")
return s.userRepo.Close()
}
HTTP Route Handlers
func (s *UserHTTPService) getUsers(c *gin.Context) {
ctx := c.Request.Context()
// Parse query parameters
limit := c.DefaultQuery("limit", "10")
offset := c.DefaultQuery("offset", "0")
limitInt, _ := strconv.Atoi(limit)
offsetInt, _ := strconv.Atoi(offset)
// Get users from repository
users, total, err := s.userRepo.GetUsers(ctx, limitInt, offsetInt)
if err != nil {
s.logger.Error("Failed to get users", zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to retrieve users",
})
return
}
c.JSON(http.StatusOK, gin.H{
"users": users,
"total": total,
"limit": limitInt,
"offset": offsetInt,
})
}
func (s *UserHTTPService) createUser(c *gin.Context) {
ctx := c.Request.Context()
var req CreateUserRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Invalid request format",
"details": err.Error(),
})
return
}
// Validate request
if err := req.Validate(); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "Validation failed",
"details": err.Error(),
})
return
}
// Create user
user, err := s.userRepo.CreateUser(ctx, &req)
if err != nil {
s.logger.Error("Failed to create user", zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to create user",
})
return
}
c.JSON(http.StatusCreated, gin.H{
"user": user,
})
}
gRPC Transport
Basic gRPC Transport Setup
// Create gRPC transport with default settings
grpcTransport := transport.NewGRPCTransport(":9080")
// Create gRPC transport with custom configuration
grpcConfig := &transport.GRPCTransportConfig{
Address: ":9080",
EnableKeepalive: true,
EnableReflection: true,
EnableHealthService: true,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
MaxSendMsgSize: 4 * 1024 * 1024, // 4MB
KeepaliveParams: &keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second,
MaxConnectionAge: 30 * time.Second,
MaxConnectionAgeGrace: 5 * time.Second,
Time: 5 * time.Second,
Timeout: 1 * time.Second,
},
}
grpcTransport := transport.NewGRPCTransportWithConfig(grpcConfig)
// Register with coordinator
coordinator.Register(grpcTransport)
gRPC Service Implementation
type AuthGRPCService struct {
authpb.UnimplementedAuthServiceServer
tokenManager *TokenManager
userRepo *UserRepository
logger *zap.Logger
}
func (s *AuthGRPCService) RegisterHTTP(router *gin.Engine) error {
// Not used for gRPC-only service
return nil
}
func (s *AuthGRPCService) RegisterGRPC(server *grpc.Server) error {
authpb.RegisterAuthServiceServer(server, s)
return nil
}
func (s *AuthGRPCService) GetMetadata() *transport.HandlerMetadata {
return &transport.HandlerMetadata{
Name: "auth-grpc-service",
Version: "v1.0.0",
Description: "Authentication gRPC service",
HealthEndpoint: "/grpc.health.v1.Health/Check",
Tags: []string{"auth", "grpc", "security"},
Dependencies: []string{"database", "redis"},
}
}
func (s *AuthGRPCService) IsHealthy(ctx context.Context) (*types.HealthStatus, error) {
// Check token manager
if err := s.tokenManager.Ping(ctx); err != nil {
return &types.HealthStatus{
Status: "unhealthy",
Message: "Token manager not available",
}, err
}
// Check user repository
if err := s.userRepo.Ping(ctx); err != nil {
return &types.HealthStatus{
Status: "unhealthy",
Message: "User repository not available",
}, err
}
return &types.HealthStatus{
Status: "healthy",
Message: "All dependencies are healthy",
}, nil
}
gRPC Method Implementation
func (s *AuthGRPCService) Login(ctx context.Context, req *authpb.LoginRequest) (*authpb.LoginResponse, error) {
// Validate input
if req.Username == "" || req.Password == "" {
return nil, status.Errorf(codes.InvalidArgument, "username and password are required")
}
// Authenticate user
user, err := s.userRepo.ValidateCredentials(ctx, req.Username, req.Password)
if err != nil {
s.logger.Error("Authentication failed",
zap.String("username", req.Username),
zap.Error(err))
return nil, status.Errorf(codes.Unauthenticated, "invalid credentials")
}
// Generate token
token, expiresAt, err := s.tokenManager.GenerateToken(ctx, user.ID)
if err != nil {
s.logger.Error("Token generation failed",
zap.String("userID", user.ID),
zap.Error(err))
return nil, status.Errorf(codes.Internal, "failed to generate token")
}
return &authpb.LoginResponse{
Token: token,
ExpiresAt: expiresAt.Unix(),
User: &authpb.User{
Id: user.ID,
Username: user.Username,
Email: user.Email,
Role: user.Role,
},
}, nil
}
func (s *AuthGRPCService) ValidateToken(ctx context.Context, req *authpb.ValidateTokenRequest) (*authpb.ValidateTokenResponse, error) {
if req.Token == "" {
return nil, status.Errorf(codes.InvalidArgument, "token is required")
}
// Validate token
claims, err := s.tokenManager.ValidateToken(ctx, req.Token)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid token")
}
return &authpb.ValidateTokenResponse{
Valid: true,
UserId: claims.UserID,
Role: claims.Role,
}, nil
}
Multi-Transport Services
Hybrid Service Implementation
Services can support both HTTP and gRPC transports:
type UserManagementService struct {
userpb.UnimplementedUserServiceServer
userRepo *UserRepository
logger *zap.Logger
}
func (s *UserManagementService) RegisterHTTP(router *gin.Engine) error {
v1 := router.Group("/api/v1/users")
{
v1.GET("", s.getUsersHTTP)
v1.POST("", s.createUserHTTP)
v1.GET("/:id", s.getUserHTTP)
v1.PUT("/:id", s.updateUserHTTP)
v1.DELETE("/:id", s.deleteUserHTTP)
}
return nil
}
func (s *UserManagementService) RegisterGRPC(server *grpc.Server) error {
userpb.RegisterUserServiceServer(server, s)
return nil
}
// HTTP handlers
func (s *UserManagementService) getUsersHTTP(c *gin.Context) {
// HTTP-specific implementation
}
func (s *UserManagementService) createUserHTTP(c *gin.Context) {
// HTTP-specific implementation
}
// gRPC handlers
func (s *UserManagementService) GetUsers(ctx context.Context, req *userpb.GetUsersRequest) (*userpb.GetUsersResponse, error) {
// gRPC-specific implementation
}
func (s *UserManagementService) CreateUser(ctx context.Context, req *userpb.CreateUserRequest) (*userpb.CreateUserResponse, error) {
// gRPC-specific implementation
}
Service Registration and Lifecycle
Complete Service Lifecycle
// Create transport coordinator
coordinator := transport.NewTransportCoordinator()
// Create and register transports
httpTransport := transport.NewHTTPTransport(":8080")
grpcTransport := transport.NewGRPCTransport(":9080")
coordinator.Register(httpTransport)
coordinator.Register(grpcTransport)
// Register services
userService := &UserManagementService{
userRepo: userRepo,
logger: logger,
}
authService := &AuthGRPCService{
tokenManager: tokenManager,
userRepo: userRepo,
logger: logger,
}
coordinator.RegisterHTTPService(userService)
coordinator.RegisterGRPCService(userService)
coordinator.RegisterGRPCService(authService)
// Start transports
ctx := context.Background()
if err := coordinator.Start(ctx); err != nil {
log.Fatalf("Failed to start transports: %v", err)
}
// Initialize services
if err := coordinator.InitializeTransportServices(ctx); err != nil {
log.Fatalf("Failed to initialize services: %v", err)
}
// Bind services to transports
httpRouter := gin.New()
if err := coordinator.BindAllHTTPEndpoints(httpRouter); err != nil {
log.Fatalf("Failed to bind HTTP endpoints: %v", err)
}
grpcServer := grpc.NewServer()
if err := coordinator.BindAllGRPCServices(grpcServer); err != nil {
log.Fatalf("Failed to bind gRPC services: %v", err)
}
// Health checks
healthStatus := coordinator.CheckAllServicesHealth(ctx)
log.Printf("Service health: %+v", healthStatus)
// Graceful shutdown
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
coordinator.ShutdownAllServices(shutdownCtx)
coordinator.Stop(30 * time.Second)
}()
Error Handling
Transport Error Types
// Single transport error
type TransportError struct {
TransportName string
Err error
}
// Multiple transport errors
type MultiError struct {
Errors []TransportError
}
Error Handling Patterns
if err := coordinator.Stop(timeout); err != nil {
if multiErr, ok := err.(*transport.MultiError); ok {
// Handle multiple transport errors
for _, transportErr := range multiErr.Errors {
log.Printf("Transport %s error: %v", transportErr.TransportName, transportErr.Err)
}
// Check specific transport errors
if httpErr := multiErr.GetErrorByTransport("http"); httpErr != nil {
log.Printf("HTTP transport error: %v", httpErr.Err)
}
if grpcErr := multiErr.GetErrorByTransport("grpc"); grpcErr != nil {
log.Printf("gRPC transport error: %v", grpcErr.Err)
}
}
}
// Use utility functions
if transport.IsStopError(err) {
stopErrors := transport.ExtractStopErrors(err)
for _, stopErr := range stopErrors {
log.Printf("Stop error in %s: %v", stopErr.TransportName, stopErr.Err)
}
}
Testing Transport Layer
HTTP Transport Testing
func TestHTTPTransport(t *testing.T) {
// Create test transport with dynamic port
transport := transport.NewHTTPTransportWithConfig(&transport.HTTPTransportConfig{
Address: ":0", // Dynamic port allocation
TestMode: true,
EnableReady: true,
})
// Start transport
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := transport.Start(ctx)
require.NoError(t, err)
defer transport.Stop(ctx)
// Wait for transport to be ready
err = transport.WaitReady()
require.NoError(t, err)
// Test HTTP requests
address := transport.GetAddress()
resp, err := http.Get(fmt.Sprintf("http://localhost%s/health", address))
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
gRPC Transport Testing
func TestGRPCTransport(t *testing.T) {
// Create test transport with dynamic port
transport := transport.NewGRPCTransportWithConfig(&transport.GRPCTransportConfig{
Address: ":0", // Dynamic port allocation
EnableHealthService: true,
EnableReflection: true,
})
// Start transport
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := transport.Start(ctx)
require.NoError(t, err)
defer transport.Stop(ctx)
// Test gRPC connection
address := transport.GetAddress()
conn, err := grpc.Dial(address, grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
// Test health service
healthClient := healthpb.NewHealthClient(conn)
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
require.NoError(t, err)
assert.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status)
}
Service Integration Testing
func TestServiceIntegration(t *testing.T) {
// Create coordinator
coordinator := transport.NewTransportCoordinator()
// Create test transports
httpTransport := transport.NewHTTPTransportWithConfig(&transport.HTTPTransportConfig{
Address: ":0",
TestMode: true,
})
grpcTransport := transport.NewGRPCTransportWithConfig(&transport.GRPCTransportConfig{
Address: ":0",
EnableHealthService: true,
})
coordinator.Register(httpTransport)
coordinator.Register(grpcTransport)
// Register test service
testService := &TestMultiTransportService{}
coordinator.RegisterHTTPService(testService)
coordinator.RegisterGRPCService(testService)
// Start everything
ctx := context.Background()
err := coordinator.Start(ctx)
require.NoError(t, err)
defer coordinator.Stop(30 * time.Second)
err = coordinator.InitializeTransportServices(ctx)
require.NoError(t, err)
// Test both transports
httpAddr := httpTransport.GetAddress()
grpcAddr := grpcTransport.GetAddress()
// Test HTTP
resp, err := http.Get(fmt.Sprintf("http://localhost%s/api/test", httpAddr))
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
// Test gRPC
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := testpb.NewTestServiceClient(conn)
grpcResp, err := client.TestMethod(ctx, &testpb.TestRequest{})
require.NoError(t, err)
assert.NotNil(t, grpcResp)
}
Best Practices
Service Design
- Transport Independence - Design services to work with either transport
- Proper Error Handling - Return appropriate HTTP status codes and gRPC status codes
- Context Propagation - Always use and propagate context
- Health Checks - Implement meaningful health checks
- Metadata Management - Provide accurate service metadata
Performance Optimization
- Connection Pooling - Use connection pooling for database and external services
- Resource Management - Properly close resources and handle cleanup
- Timeout Configuration - Set appropriate timeouts for operations
- Message Size Limits - Configure appropriate message size limits for gRPC
- Keepalive Settings - Tune keepalive parameters for your network conditions
Testing Strategy
- Unit Testing - Test service logic independently of transports
- Integration Testing - Test with actual transport implementations
- Dynamic Ports - Use dynamic port allocation for concurrent test runs
- Mock Dependencies - Mock external dependencies for reliable testing
- Health Check Testing - Test health check implementations
Production Readiness
- Graceful Shutdown - Implement proper shutdown handling
- Error Recovery - Handle transport errors gracefully
- Monitoring Integration - Integrate with monitoring systems
- Security Configuration - Configure TLS and authentication properly
- Resource Limits - Set appropriate resource limits and timeouts
This transport layer guide provides comprehensive coverage of HTTP and gRPC transport implementations, service registration patterns, error handling, testing strategies, and production deployment considerations.