aeus/transport/grpc/server.go

117 lines
2.8 KiB
Go

package grpc
import (
"context"
"net"
"net/url"
"git.nobla.cn/golang/aeus/metadata"
"git.nobla.cn/golang/aeus/middleware"
"git.nobla.cn/golang/aeus/pkg/logger"
netutil "git.nobla.cn/golang/aeus/pkg/net"
"github.com/google/uuid"
"google.golang.org/grpc"
grpcmd "google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
)
type Server struct {
ctx context.Context
opts *options
uri *url.URL
serve *grpc.Server
listener net.Listener
middlewares []middleware.Middleware
}
func (s *Server) createListener() (err error) {
if s.listener == nil {
if s.listener, err = net.Listen(s.opts.network, s.opts.address); err != nil {
return
}
s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener)
}
return
}
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
next := func(ctx context.Context) (err error) {
resp, err = handler(ctx, req)
return
}
h := middleware.Chain(s.middlewares...)(next)
md := make(metadata.Metadata)
if !md.Has(metadata.RequestIDKey) {
md.Set(metadata.RequestIDKey, uuid.New().String())
}
md.Set(metadata.RequestPathKey, info.FullMethod)
md.Set(metadata.RequestProtocolKey, Protocol)
if gmd, ok := grpcmd.FromIncomingContext(ctx); ok {
for k, v := range gmd {
if len(v) > 0 {
md.Set(k, v[0])
}
}
}
ctx = metadata.MergeContext(ctx, md, true)
ctx = context.WithValue(ctx, requestValueContextKey{}, req)
err = h(ctx)
return
}
}
func (s *Server) Use(middlewares ...middleware.Middleware) {
s.middlewares = append(s.middlewares, middlewares...)
}
func (s *Server) Start(ctx context.Context) (err error) {
s.ctx = ctx
if err = s.createListener(); err != nil {
return
}
s.opts.logger.Info(ctx, "grpc server listen on: %s", s.uri.Host)
reflection.Register(s.serve)
s.serve.Serve(s.listener)
return
}
func (s *Server) Endpoint(ctx context.Context) (string, error) {
if err := s.createListener(); err != nil {
return "", err
}
return s.uri.String(), nil
}
func (s *Server) RegisterService(sd *grpc.ServiceDesc, ss any) {
s.serve.RegisterService(sd, ss)
}
func (s *Server) Stop(ctx context.Context) (err error) {
s.serve.GracefulStop()
s.opts.logger.Info(s.ctx, "grpc server stopped")
return
}
func New(cbs ...Option) *Server {
svr := &Server{
opts: &options{
network: "tcp",
logger: logger.Default(),
address: ":0",
grpcOpts: make([]grpc.ServerOption, 0, 10),
},
uri: &url.URL{
Scheme: "grpc",
},
middlewares: make([]middleware.Middleware, 0, 10),
}
for _, cb := range cbs {
cb(svr.opts)
}
svr.opts.grpcOpts = append(svr.opts.grpcOpts, grpc.ChainUnaryInterceptor(svr.unaryServerInterceptor()))
svr.serve = grpc.NewServer(svr.opts.grpcOpts...)
return svr
}