package aeus import ( "context" "os" "os/signal" "reflect" "runtime" "sync/atomic" "syscall" "time" "git.nobla.cn/golang/aeus/pkg/errors" "git.nobla.cn/golang/aeus/pkg/logger" "git.nobla.cn/golang/aeus/registry" "github.com/google/uuid" "golang.org/x/sync/errgroup" ) type Service struct { ctx context.Context opts *options errGroup *errgroup.Group refValues []reflect.Value service *registry.Service exitFlag int32 } func (s *Service) ID() string { return s.opts.id } func (s *Service) Name() string { return s.opts.name } func (s *Service) Debug() bool { return false } func (s *Service) Version() string { return s.opts.version } func (s *Service) Metadata() map[string]string { if s.service == nil { return nil } return s.service.Metadata } func (s *Service) Endpoint() []string { if s.service == nil { return nil } return s.service.Endpoints } func (s *Service) Logger() logger.Logger { if s.opts.logger == nil { return logger.Default() } return s.opts.logger } func (s *Service) build(ctx context.Context) (*registry.Service, error) { svr := ®istry.Service{ ID: s.ID(), Name: s.Name(), Version: s.Version(), Metadata: s.opts.metadata, Endpoints: make([]string, 0, 4), } if svr.Metadata == nil { svr.Metadata = make(map[string]string) } svr.Metadata["os"] = runtime.GOOS svr.Metadata["go"] = runtime.Version() svr.Metadata["hostname"], _ = os.Hostname() svr.Metadata["uptime"] = time.Now().Format(time.DateTime) if s.opts.endpoints != nil { svr.Endpoints = append(svr.Endpoints, s.opts.endpoints...) } for _, ptr := range s.opts.servers { if e, ok := ptr.(Endpointer); ok { if uri, err := e.Endpoint(ctx); err != nil { return nil, err } else { svr.Endpoints = append(svr.Endpoints, uri) } } } return svr, nil } func (s *Service) injectVars(v any) { refValue := reflect.Indirect(reflect.ValueOf(v)) refType := refValue.Type() for i := range refValue.NumField() { fieldValue := refValue.Field(i) if !fieldValue.CanSet() { continue } fieldType := refType.Field(i) if fieldType.Type.Kind() != reflect.Ptr { continue } for _, rv := range s.refValues { if fieldType.Type == rv.Type() { refValue.Field(i).Set(rv) break } } } } func (s *Service) preStart(ctx context.Context) (err error) { s.Logger().Info(s.ctx, "starting") for _, ptr := range s.opts.servers { s.refValues = append(s.refValues, reflect.ValueOf(ptr)) } if s.opts.registry != nil { s.refValues = append(s.refValues, reflect.ValueOf(s.opts.registry)) } if s.opts.scope != nil { s.injectVars(s.opts.scope) s.opts.scope.Init(ctx) s.refValues = append(s.refValues, reflect.ValueOf(s.opts.scope)) } if s.opts.serviceLoader != nil { s.injectVars(s.opts.serviceLoader) s.opts.serviceLoader.Init(ctx) s.refValues = append(s.refValues, reflect.ValueOf(s.opts.serviceLoader)) } for _, srv := range s.opts.servers { svr := srv s.errGroup.Go(func() error { return svr.Start(ctx) }) } if s.opts.scope != nil { s.errGroup.Go(func() error { return s.opts.scope.Run(ctx) }) } if s.opts.registry != nil { s.errGroup.Go(func() error { opts := func(o *registry.RegisterOptions) { o.Context = ctx } if err = s.opts.registry.Register(s.service, opts); err != nil { return err } s.Logger().Info(ctx, "service registered") duration := s.opts.registrarTimeout if duration > time.Minute { duration = time.Minute } else { if duration > time.Second*10 { duration = duration - time.Second*5 } } ticker := time.NewTicker(duration) defer ticker.Stop() for { select { case <-ctx.Done(): s.Logger().Info(ctx, "service registar stopped") return nil case <-ticker.C: if err = s.opts.registry.Register(s.service, func(o *registry.RegisterOptions) { o.Context = ctx o.TTL = s.opts.registrarTimeout }); err != nil { s.Logger().Warn(ctx, "service register error: %v", err) } } } }) } s.Logger().Info(s.ctx, "started") return } func (s *Service) preStop() (err error) { if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) { return } s.Logger().Info(s.ctx, "stopping") ctx, cancelFunc := context.WithTimeoutCause(s.ctx, s.opts.stopTimeout, errors.ErrTimeout) defer func() { cancelFunc() }() for _, srv := range s.opts.servers { if err = srv.Stop(ctx); err != nil { s.Logger().Warn(ctx, "server stop error: %v", err) } } if s.opts.registry != nil { if err = s.opts.registry.Deregister(s.service, func(o *registry.DeregisterOptions) { o.Context = ctx }); err != nil { s.Logger().Warn(ctx, "server deregister error: %v", err) } } s.Logger().Info(ctx, "stopped") return } func (s *Service) Run() (err error) { var ( ctx context.Context errCtx context.Context cancelFunc context.CancelFunc ) s.ctx = WithContext(s.opts.ctx, s) if s.service, err = s.build(s.ctx); err != nil { return } ctx, cancelFunc = context.WithCancel(s.ctx) defer cancelFunc() s.errGroup, errCtx = errgroup.WithContext(ctx) if err = s.preStart(errCtx); err != nil { return } s.errGroup.Go(func() error { ch := make(chan os.Signal, 1) signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL} signal.Notify(ch, signals...) select { case <-ch: cancelFunc() case <-ctx.Done(): case <-s.ctx.Done(): } return s.preStop() }) err = s.errGroup.Wait() return } func New(cbs ...Option) *Service { s := &Service{ refValues: make([]reflect.Value, 0, 20), opts: &options{ id: uuid.New().String(), ctx: context.Background(), logger: logger.Default(), stopTimeout: time.Second * 10, registrarTimeout: time.Second * 30, }, } s.opts.metadata = make(map[string]string) for _, cb := range cbs { cb(s.opts) } return s }