aeus/app.go

264 lines
6.0 KiB
Go

package aeus
import (
"context"
"os"
"os/signal"
"reflect"
"runtime"
"sync/atomic"
"syscall"
"time"
"git.nobla.cn/golang/aeus/pkg/errors"
"git.nobla.cn/golang/aeus/pkg/logger"
"git.nobla.cn/golang/aeus/registry"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
type Service struct {
ctx context.Context
opts *options
errGroup *errgroup.Group
refValues []reflect.Value
service *registry.Service
exitFlag int32
}
func (s *Service) ID() string {
return s.opts.id
}
func (s *Service) Name() string {
return s.opts.name
}
func (s *Service) Debug() bool {
return false
}
func (s *Service) Version() string {
return s.opts.version
}
func (s *Service) Metadata() map[string]string {
if s.service == nil {
return nil
}
return s.service.Metadata
}
func (s *Service) Endpoint() []string {
if s.service == nil {
return nil
}
return s.service.Endpoints
}
func (s *Service) Logger() logger.Logger {
if s.opts.logger == nil {
return logger.Default()
}
return s.opts.logger
}
func (s *Service) build(ctx context.Context) (*registry.Service, error) {
svr := &registry.Service{
ID: s.ID(),
Name: s.Name(),
Version: s.Version(),
Metadata: s.opts.metadata,
Endpoints: make([]string, 0, 4),
}
if svr.Metadata == nil {
svr.Metadata = make(map[string]string)
}
svr.Metadata["os"] = runtime.GOOS
svr.Metadata["go"] = runtime.Version()
svr.Metadata["hostname"], _ = os.Hostname()
svr.Metadata["uptime"] = time.Now().Format(time.DateTime)
if s.opts.endpoints != nil {
svr.Endpoints = append(svr.Endpoints, s.opts.endpoints...)
}
for _, ptr := range s.opts.servers {
if e, ok := ptr.(Endpointer); ok {
if uri, err := e.Endpoint(ctx); err != nil {
return nil, err
} else {
svr.Endpoints = append(svr.Endpoints, uri)
}
}
}
return svr, nil
}
func (s *Service) injectVars(v any) {
refValue := reflect.Indirect(reflect.ValueOf(v))
refType := refValue.Type()
for i := range refValue.NumField() {
fieldValue := refValue.Field(i)
if !fieldValue.CanSet() {
continue
}
fieldType := refType.Field(i)
if fieldType.Type.Kind() != reflect.Ptr {
continue
}
for _, rv := range s.refValues {
if fieldType.Type == rv.Type() {
refValue.Field(i).Set(rv)
break
}
}
}
}
func (s *Service) preStart(ctx context.Context) (err error) {
s.Logger().Info(s.ctx, "starting")
for _, ptr := range s.opts.servers {
s.refValues = append(s.refValues, reflect.ValueOf(ptr))
}
if s.opts.registry != nil {
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.registry))
}
if s.opts.scope != nil {
s.injectVars(s.opts.scope)
if err = s.opts.scope.Init(ctx); err != nil {
return
}
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.scope))
}
if s.opts.serviceLoader != nil {
s.injectVars(s.opts.serviceLoader)
if err = s.opts.serviceLoader.Init(ctx); err != nil {
return
}
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.serviceLoader))
}
for _, srv := range s.opts.servers {
svr := srv
s.errGroup.Go(func() error {
return svr.Start(ctx)
})
}
if s.opts.serviceLoader != nil {
s.errGroup.Go(func() error {
return s.opts.serviceLoader.Run(ctx)
})
}
if s.opts.registry != nil {
s.errGroup.Go(func() error {
childCtx, cancel := context.WithTimeout(ctx, s.opts.registrarTimeout)
defer cancel()
opts := func(o *registry.RegisterOptions) {
o.Context = childCtx
}
if err = s.opts.registry.Register(s.service, opts); err != nil {
return err
}
s.Logger().Info(ctx, "service registered")
duration := s.opts.registrarTimeout
if duration > time.Minute {
duration = time.Minute
} else {
if duration > time.Second*10 {
duration = duration - time.Second*5
}
}
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.Logger().Info(ctx, "service registar stopped")
return nil
case <-ticker.C:
if err = s.opts.registry.Register(s.service, func(o *registry.RegisterOptions) {
o.Context = ctx
o.TTL = s.opts.registrarTimeout
}); err != nil {
s.Logger().Warn(ctx, "service register error: %v", err)
}
}
}
})
}
s.Logger().Info(s.ctx, "started")
return
}
func (s *Service) preStop() (err error) {
if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) {
return
}
s.Logger().Info(s.ctx, "stopping")
ctx, cancelFunc := context.WithTimeoutCause(s.ctx, s.opts.stopTimeout, errors.ErrTimeout)
defer func() {
cancelFunc()
}()
for _, srv := range s.opts.servers {
if err = srv.Stop(ctx); err != nil {
s.Logger().Warn(ctx, "server stop error: %v", err)
}
}
if s.opts.registry != nil {
if err = s.opts.registry.Deregister(s.service, func(o *registry.DeregisterOptions) {
o.Context = ctx
}); err != nil {
s.Logger().Warn(ctx, "server deregister error: %v", err)
}
}
s.Logger().Info(ctx, "stopped")
return
}
func (s *Service) Run() (err error) {
var (
ctx context.Context
errCtx context.Context
cancelFunc context.CancelFunc
)
s.ctx = WithContext(s.opts.ctx, s)
if s.service, err = s.build(s.ctx); err != nil {
return
}
ctx, cancelFunc = context.WithCancel(s.ctx)
defer cancelFunc()
s.errGroup, errCtx = errgroup.WithContext(ctx)
if err = s.preStart(errCtx); err != nil {
return
}
s.errGroup.Go(func() error {
ch := make(chan os.Signal, 1)
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
signal.Notify(ch, signals...)
select {
case <-ch:
cancelFunc()
case <-ctx.Done():
case <-s.ctx.Done():
}
return s.preStop()
})
err = s.errGroup.Wait()
return
}
func New(cbs ...Option) *Service {
s := &Service{
refValues: make([]reflect.Value, 0, 20),
opts: &options{
id: uuid.New().String(),
ctx: context.Background(),
logger: logger.Default(),
stopTimeout: time.Second * 10,
registrarTimeout: time.Second * 30,
},
}
s.opts.metadata = make(map[string]string)
for _, cb := range cbs {
cb(s.opts)
}
return s
}