264 lines
6.0 KiB
Go
264 lines
6.0 KiB
Go
package aeus
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"os/signal"
|
|
"reflect"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.nobla.cn/golang/aeus/pkg/errors"
|
|
"git.nobla.cn/golang/aeus/pkg/logger"
|
|
"git.nobla.cn/golang/aeus/registry"
|
|
"github.com/google/uuid"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type Service struct {
|
|
ctx context.Context
|
|
opts *options
|
|
errGroup *errgroup.Group
|
|
refValues []reflect.Value
|
|
service *registry.Service
|
|
exitFlag int32
|
|
}
|
|
|
|
func (s *Service) ID() string {
|
|
return s.opts.id
|
|
}
|
|
|
|
func (s *Service) Name() string {
|
|
return s.opts.name
|
|
}
|
|
|
|
func (s *Service) Debug() bool {
|
|
return false
|
|
}
|
|
|
|
func (s *Service) Version() string {
|
|
return s.opts.version
|
|
}
|
|
func (s *Service) Metadata() map[string]string {
|
|
if s.service == nil {
|
|
return nil
|
|
}
|
|
return s.service.Metadata
|
|
}
|
|
|
|
func (s *Service) Endpoint() []string {
|
|
if s.service == nil {
|
|
return nil
|
|
}
|
|
return s.service.Endpoints
|
|
}
|
|
|
|
func (s *Service) Logger() logger.Logger {
|
|
if s.opts.logger == nil {
|
|
return logger.Default()
|
|
}
|
|
return s.opts.logger
|
|
}
|
|
|
|
func (s *Service) build(ctx context.Context) (*registry.Service, error) {
|
|
svr := ®istry.Service{
|
|
ID: s.ID(),
|
|
Name: s.Name(),
|
|
Version: s.Version(),
|
|
Metadata: s.opts.metadata,
|
|
Endpoints: make([]string, 0, 4),
|
|
}
|
|
if svr.Metadata == nil {
|
|
svr.Metadata = make(map[string]string)
|
|
}
|
|
svr.Metadata["os"] = runtime.GOOS
|
|
svr.Metadata["go"] = runtime.Version()
|
|
svr.Metadata["hostname"], _ = os.Hostname()
|
|
svr.Metadata["uptime"] = time.Now().Format(time.DateTime)
|
|
if s.opts.endpoints != nil {
|
|
svr.Endpoints = append(svr.Endpoints, s.opts.endpoints...)
|
|
}
|
|
for _, ptr := range s.opts.servers {
|
|
if e, ok := ptr.(Endpointer); ok {
|
|
if uri, err := e.Endpoint(ctx); err != nil {
|
|
return nil, err
|
|
} else {
|
|
svr.Endpoints = append(svr.Endpoints, uri)
|
|
}
|
|
}
|
|
}
|
|
return svr, nil
|
|
}
|
|
|
|
func (s *Service) injectVars(v any) {
|
|
refValue := reflect.Indirect(reflect.ValueOf(v))
|
|
refType := refValue.Type()
|
|
for i := range refValue.NumField() {
|
|
fieldValue := refValue.Field(i)
|
|
if !fieldValue.CanSet() {
|
|
continue
|
|
}
|
|
fieldType := refType.Field(i)
|
|
if fieldType.Type.Kind() != reflect.Ptr {
|
|
continue
|
|
}
|
|
for _, rv := range s.refValues {
|
|
if fieldType.Type == rv.Type() {
|
|
refValue.Field(i).Set(rv)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) preStart(ctx context.Context) (err error) {
|
|
s.Logger().Info(s.ctx, "starting")
|
|
for _, ptr := range s.opts.servers {
|
|
s.refValues = append(s.refValues, reflect.ValueOf(ptr))
|
|
}
|
|
if s.opts.registry != nil {
|
|
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.registry))
|
|
}
|
|
if s.opts.scope != nil {
|
|
s.injectVars(s.opts.scope)
|
|
if err = s.opts.scope.Init(ctx); err != nil {
|
|
return
|
|
}
|
|
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.scope))
|
|
}
|
|
if s.opts.serviceLoader != nil {
|
|
s.injectVars(s.opts.serviceLoader)
|
|
if err = s.opts.serviceLoader.Init(ctx); err != nil {
|
|
return
|
|
}
|
|
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.serviceLoader))
|
|
}
|
|
for _, srv := range s.opts.servers {
|
|
svr := srv
|
|
s.errGroup.Go(func() error {
|
|
return svr.Start(ctx)
|
|
})
|
|
}
|
|
if s.opts.serviceLoader != nil {
|
|
s.errGroup.Go(func() error {
|
|
return s.opts.serviceLoader.Run(ctx)
|
|
})
|
|
}
|
|
if s.opts.registry != nil {
|
|
s.errGroup.Go(func() error {
|
|
childCtx, cancel := context.WithTimeout(ctx, s.opts.registrarTimeout)
|
|
defer cancel()
|
|
opts := func(o *registry.RegisterOptions) {
|
|
o.Context = childCtx
|
|
}
|
|
if err = s.opts.registry.Register(s.service, opts); err != nil {
|
|
return err
|
|
}
|
|
s.Logger().Info(ctx, "service registered")
|
|
duration := s.opts.registrarTimeout
|
|
if duration > time.Minute {
|
|
duration = time.Minute
|
|
} else {
|
|
if duration > time.Second*10 {
|
|
duration = duration - time.Second*5
|
|
}
|
|
}
|
|
ticker := time.NewTicker(duration)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.Logger().Info(ctx, "service registar stopped")
|
|
return nil
|
|
case <-ticker.C:
|
|
if err = s.opts.registry.Register(s.service, func(o *registry.RegisterOptions) {
|
|
o.Context = ctx
|
|
o.TTL = s.opts.registrarTimeout
|
|
}); err != nil {
|
|
s.Logger().Warn(ctx, "service register error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
s.Logger().Info(s.ctx, "started")
|
|
return
|
|
}
|
|
|
|
func (s *Service) preStop() (err error) {
|
|
if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) {
|
|
return
|
|
}
|
|
s.Logger().Info(s.ctx, "stopping")
|
|
ctx, cancelFunc := context.WithTimeoutCause(s.ctx, s.opts.stopTimeout, errors.ErrTimeout)
|
|
defer func() {
|
|
cancelFunc()
|
|
}()
|
|
for _, srv := range s.opts.servers {
|
|
if err = srv.Stop(ctx); err != nil {
|
|
s.Logger().Warn(ctx, "server stop error: %v", err)
|
|
}
|
|
}
|
|
if s.opts.registry != nil {
|
|
if err = s.opts.registry.Deregister(s.service, func(o *registry.DeregisterOptions) {
|
|
o.Context = ctx
|
|
}); err != nil {
|
|
s.Logger().Warn(ctx, "server deregister error: %v", err)
|
|
}
|
|
}
|
|
s.Logger().Info(ctx, "stopped")
|
|
return
|
|
}
|
|
|
|
func (s *Service) Run() (err error) {
|
|
var (
|
|
ctx context.Context
|
|
errCtx context.Context
|
|
cancelFunc context.CancelFunc
|
|
)
|
|
s.ctx = WithContext(s.opts.ctx, s)
|
|
if s.service, err = s.build(s.ctx); err != nil {
|
|
return
|
|
}
|
|
ctx, cancelFunc = context.WithCancel(s.ctx)
|
|
defer cancelFunc()
|
|
s.errGroup, errCtx = errgroup.WithContext(ctx)
|
|
if err = s.preStart(errCtx); err != nil {
|
|
return
|
|
}
|
|
s.errGroup.Go(func() error {
|
|
ch := make(chan os.Signal, 1)
|
|
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
|
|
signal.Notify(ch, signals...)
|
|
select {
|
|
case <-ch:
|
|
cancelFunc()
|
|
case <-ctx.Done():
|
|
case <-s.ctx.Done():
|
|
}
|
|
return s.preStop()
|
|
})
|
|
err = s.errGroup.Wait()
|
|
return
|
|
}
|
|
|
|
func New(cbs ...Option) *Service {
|
|
s := &Service{
|
|
refValues: make([]reflect.Value, 0, 20),
|
|
opts: &options{
|
|
id: uuid.New().String(),
|
|
ctx: context.Background(),
|
|
logger: logger.Default(),
|
|
stopTimeout: time.Second * 10,
|
|
registrarTimeout: time.Second * 30,
|
|
},
|
|
}
|
|
s.opts.metadata = make(map[string]string)
|
|
for _, cb := range cbs {
|
|
cb(s.opts)
|
|
}
|
|
return s
|
|
}
|