commit 7839f7caefc85a5f4bb9191c542062c4b27a98b0 Author: Yavolte Date: Fri Jun 6 13:31:33 2025 +0800 init project diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1962ccd --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +bin/ + +.svn/ +.godeps +./build +.cover/ +dist +_site +_posts +*.dat +.vscode +vendor +cmd/ +third_party/ + +# Go.gitignore + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test +storage +.idea +Makefile + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe + +profile + +# vim stuff +*.sw[op] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5c64670 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# AEUS 介绍 + +`AEUS` 是一个轻量级的golang脚手架框架,用于快速开发微服务。 + + + diff --git a/app.go b/app.go new file mode 100644 index 0000000..279d532 --- /dev/null +++ b/app.go @@ -0,0 +1,257 @@ +package aeus + +import ( + "context" + "os" + "os/signal" + "reflect" + "runtime" + "sync/atomic" + "syscall" + "time" + + "git.nobla.cn/golang/aeus/pkg/errors" + "git.nobla.cn/golang/aeus/pkg/logger" + "git.nobla.cn/golang/aeus/registry" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" +) + +type Service struct { + ctx context.Context + opts *options + errGroup *errgroup.Group + refValues []reflect.Value + service *registry.Service + exitFlag int32 +} + +func (s *Service) ID() string { + return s.opts.id +} + +func (s *Service) Name() string { + return s.opts.name +} + +func (s *Service) Debug() bool { + return false +} + +func (s *Service) Version() string { + return s.opts.version +} +func (s *Service) Metadata() map[string]string { + if s.service == nil { + return nil + } + return s.service.Metadata +} + +func (s *Service) Endpoint() []string { + if s.service == nil { + return nil + } + return s.service.Endpoints +} + +func (s *Service) Logger() logger.Logger { + if s.opts.logger == nil { + return logger.Default() + } + return s.opts.logger +} + +func (s *Service) build(ctx context.Context) (*registry.Service, error) { + svr := ®istry.Service{ + ID: s.ID(), + Name: s.Name(), + Version: s.Version(), + Metadata: s.opts.metadata, + Endpoints: make([]string, 0, 4), + } + if svr.Metadata == nil { + svr.Metadata = make(map[string]string) + } + svr.Metadata["os"] = runtime.GOOS + svr.Metadata["go"] = runtime.Version() + svr.Metadata["hostname"], _ = os.Hostname() + svr.Metadata["uptime"] = time.Now().Format(time.DateTime) + if s.opts.endpoints != nil { + svr.Endpoints = append(svr.Endpoints, s.opts.endpoints...) + } + for _, ptr := range s.opts.servers { + if e, ok := ptr.(Endpointer); ok { + if uri, err := e.Endpoint(ctx); err != nil { + return nil, err + } else { + svr.Endpoints = append(svr.Endpoints, uri) + } + } + } + return svr, nil +} + +func (s *Service) injectVars(v any) { + refValue := reflect.Indirect(reflect.ValueOf(v)) + refType := refValue.Type() + for i := range refValue.NumField() { + fieldValue := refValue.Field(i) + if !fieldValue.CanSet() { + continue + } + fieldType := refType.Field(i) + if fieldType.Type.Kind() != reflect.Ptr { + continue + } + for _, rv := range s.refValues { + if fieldType.Type == rv.Type() { + refValue.Field(i).Set(rv) + break + } + } + } +} + +func (s *Service) preStart(ctx context.Context) (err error) { + s.Logger().Info(s.ctx, "starting") + for _, ptr := range s.opts.servers { + s.refValues = append(s.refValues, reflect.ValueOf(ptr)) + } + if s.opts.registry != nil { + s.refValues = append(s.refValues, reflect.ValueOf(s.opts.registry)) + } + if s.opts.scope != nil { + s.injectVars(s.opts.scope) + s.opts.scope.Init(ctx) + s.refValues = append(s.refValues, reflect.ValueOf(s.opts.scope)) + } + if s.opts.serviceLoader != nil { + s.injectVars(s.opts.serviceLoader) + s.opts.serviceLoader.Init(ctx) + s.refValues = append(s.refValues, reflect.ValueOf(s.opts.serviceLoader)) + } + for _, srv := range s.opts.servers { + svr := srv + s.errGroup.Go(func() error { + return svr.Start(ctx) + }) + } + if s.opts.scope != nil { + s.errGroup.Go(func() error { + return s.opts.scope.Run(ctx) + }) + } + if s.opts.registry != nil { + s.errGroup.Go(func() error { + opts := func(o *registry.RegisterOptions) { + o.Context = ctx + } + if err = s.opts.registry.Register(s.service, opts); err != nil { + return err + } + s.Logger().Info(ctx, "service registered") + duration := s.opts.registrarTimeout + if duration > time.Minute { + duration = time.Minute + } else { + if duration > time.Second*10 { + duration = duration - time.Second*5 + } + } + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + s.Logger().Info(ctx, "service registar stopped") + return nil + case <-ticker.C: + if err = s.opts.registry.Register(s.service, func(o *registry.RegisterOptions) { + o.Context = ctx + o.TTL = s.opts.registrarTimeout + }); err != nil { + s.Logger().Warn(ctx, "service register error: %v", err) + } + } + } + }) + } + s.Logger().Info(s.ctx, "started") + return +} + +func (s *Service) preStop() (err error) { + if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) { + return + } + s.Logger().Info(s.ctx, "stopping") + ctx, cancelFunc := context.WithTimeoutCause(s.ctx, s.opts.stopTimeout, errors.ErrTimeout) + defer func() { + cancelFunc() + }() + for _, srv := range s.opts.servers { + if err = srv.Stop(ctx); err != nil { + s.Logger().Warn(ctx, "server stop error: %v", err) + } + } + if s.opts.registry != nil { + if err = s.opts.registry.Deregister(s.service, func(o *registry.DeregisterOptions) { + o.Context = ctx + }); err != nil { + s.Logger().Warn(ctx, "server deregister error: %v", err) + } + } + s.Logger().Info(ctx, "stopped") + return +} + +func (s *Service) Run() (err error) { + var ( + ctx context.Context + errCtx context.Context + cancelFunc context.CancelFunc + ) + s.ctx = WithContext(s.opts.ctx, s) + if s.service, err = s.build(s.ctx); err != nil { + return + } + ctx, cancelFunc = context.WithCancel(s.ctx) + defer cancelFunc() + s.errGroup, errCtx = errgroup.WithContext(ctx) + if err = s.preStart(errCtx); err != nil { + return + } + s.errGroup.Go(func() error { + ch := make(chan os.Signal, 1) + signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL} + signal.Notify(ch, signals...) + select { + case <-ch: + cancelFunc() + case <-ctx.Done(): + case <-s.ctx.Done(): + } + return s.preStop() + }) + err = s.errGroup.Wait() + return +} + +func New(cbs ...Option) *Service { + s := &Service{ + refValues: make([]reflect.Value, 0, 20), + opts: &options{ + id: uuid.New().String(), + ctx: context.Background(), + logger: logger.Default(), + stopTimeout: time.Second * 10, + registrarTimeout: time.Second * 30, + }, + } + s.opts.metadata = make(map[string]string) + for _, cb := range cbs { + cb(s.opts) + } + return s +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..204c7c2 --- /dev/null +++ b/go.mod @@ -0,0 +1,56 @@ +module git.nobla.cn/golang/aeus + +go 1.23.0 + +toolchain go1.23.9 + +require ( + github.com/gin-gonic/gin v1.10.1 + github.com/google/uuid v1.6.0 + github.com/mattn/go-runewidth v0.0.16 + github.com/peterh/liner v1.2.2 + go.etcd.io/etcd/api/v3 v3.6.0 + go.etcd.io/etcd/client/v3 v3.6.0 + golang.org/x/sync v0.12.0 + google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb + google.golang.org/grpc v1.72.2 + google.golang.org/protobuf v1.36.5 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.6.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8c9ea05 --- /dev/null +++ b/go.sum @@ -0,0 +1,181 @@ +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= +github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/peterh/liner v1.2.2 h1:aJ4AOodmL+JxOZZEL2u9iJf8omNRpqHc/EbrK+3mAXw= +github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiNRNwI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/etcd/api/v3 v3.6.0 h1:vdbkcUBGLf1vfopoGE/uS3Nv0KPyIpUV/HM6w9yx2kM= +go.etcd.io/etcd/api/v3 v3.6.0/go.mod h1:Wt5yZqEmxgTNJGHob7mTVBJDZNXiHPtXTcPab37iFOw= +go.etcd.io/etcd/client/pkg/v3 v3.6.0 h1:nchnPqpuxvv3UuGGHaz0DQKYi5EIW5wOYsgUNRc365k= +go.etcd.io/etcd/client/pkg/v3 v3.6.0/go.mod h1:Jv5SFWMnGvIBn8o3OaBq/PnT0jjsX8iNokAUessNjoA= +go.etcd.io/etcd/client/v3 v3.6.0 h1:/yjKzD+HW5v/3DVj9tpwFxzNbu8hjcKID183ug9duWk= +go.etcd.io/etcd/client/v3 v3.6.0/go.mod h1:Jzk/Knqe06pkOZPHXsQ0+vNDvMQrgIqJ0W8DwPdMJMg= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8= +google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/middleware/middleware.go b/middleware/middleware.go new file mode 100644 index 0000000..f0ec479 --- /dev/null +++ b/middleware/middleware.go @@ -0,0 +1,16 @@ +package middleware + +import "context" + +type Handler func(ctx context.Context) error + +type Middleware func(Handler) Handler + +func Chain(m ...Middleware) Middleware { + return func(next Handler) Handler { + for i := len(m) - 1; i >= 0; i-- { + next = m[i](next) + } + return next + } +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..10e437f --- /dev/null +++ b/options.go @@ -0,0 +1,96 @@ +package aeus + +import ( + "context" + "time" + + "git.nobla.cn/golang/aeus/pkg/logger" + "git.nobla.cn/golang/aeus/registry" +) + +type options struct { + id string + name string + version string + metadata map[string]string + ctx context.Context + logger logger.Logger + servers []Server + endpoints []string + scope Scope + registrarTimeout time.Duration + registry registry.Registry + serviceLoader ServiceLoader + stopTimeout time.Duration +} + +func WithName(name string) Option { + return func(o *options) { + o.name = name + } +} + +func WithVersion(version string) Option { + return func(o *options) { + o.version = version + } +} + +func WithMetadata(metadata map[string]string) Option { + return func(o *options) { + if o.metadata == nil { + o.metadata = make(map[string]string) + } + for k, v := range metadata { + o.metadata[k] = v + } + } +} + +func WithServer(servers ...Server) Option { + return func(o *options) { + o.servers = append(o.servers, servers...) + } +} + +func WithEndpoint(endpoints ...string) Option { + return func(o *options) { + o.endpoints = append(o.endpoints, endpoints...) + } +} + +func WithLogger(logger logger.Logger) Option { + return func(o *options) { + o.logger = logger + } +} + +func WithScope(scope Scope) Option { + return func(o *options) { + o.scope = scope + } +} + +func WithServiceLoader(loader ServiceLoader) Option { + return func(o *options) { + o.serviceLoader = loader + } +} + +func WithStopTimeout(timeout time.Duration) Option { + return func(o *options) { + o.stopTimeout = timeout + } +} + +func WithRegistry(registry registry.Registry) Option { + return func(o *options) { + o.registry = registry + } +} + +func WithRegistrarTimeout(timeout time.Duration) Option { + return func(o *options) { + o.registrarTimeout = timeout + } +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go new file mode 100644 index 0000000..08b9edb --- /dev/null +++ b/pkg/cache/cache.go @@ -0,0 +1,38 @@ +package cache + +import ( + "context" + "time" + + "git.nobla.cn/golang/aeus/pkg/cache/memory" +) + +var ( + std = memory.NewCache() +) + +type Cache interface { + // Get gets a cached value by key. + Get(ctx context.Context, key string) (any, time.Time, error) + // Put stores a key-value pair into cache. + Put(ctx context.Context, key string, val any, d time.Duration) error + // Delete removes a key from cache. + Delete(ctx context.Context, key string) error + // String returns the name of the implementation. + String() string +} + +// Get gets a cached value by key. +func Get(ctx context.Context, key string) (any, time.Time, error) { + return std.Get(ctx, key) +} + +// Put stores a key-value pair into cache. +func Put(ctx context.Context, key string, val any, d time.Duration) error { + return std.Put(ctx, key, val, d) +} + +// String returns the name of the implementation. +func Delete(ctx context.Context, key string) error { + return std.Delete(ctx, key) +} diff --git a/pkg/cache/memory/cache.go b/pkg/cache/memory/cache.go new file mode 100644 index 0000000..fc57169 --- /dev/null +++ b/pkg/cache/memory/cache.go @@ -0,0 +1,68 @@ +package memory + +import ( + "context" + "sync" + "time" + + "git.nobla.cn/golang/aeus/pkg/errors" +) + +type memCache struct { + opts Options + + items map[string]Item + sync.RWMutex +} + +func (c *memCache) Get(ctx context.Context, key string) (interface{}, time.Time, error) { + c.RWMutex.RLock() + defer c.RWMutex.RUnlock() + + item, found := c.items[key] + if !found { + return nil, time.Time{}, errors.ErrNotFound + } + if item.Expired() { + return nil, time.Time{}, errors.ErrExpired + } + + return item.Value, time.Unix(0, item.Expiration), nil +} + +func (c *memCache) Put(ctx context.Context, key string, val interface{}, d time.Duration) error { + var e int64 + if d == DefaultExpiration { + d = c.opts.Expiration + } + if d > 0 { + e = time.Now().Add(d).UnixNano() + } + + c.RWMutex.Lock() + defer c.RWMutex.Unlock() + + c.items[key] = Item{ + Value: val, + Expiration: e, + } + + return nil +} + +func (c *memCache) Delete(ctx context.Context, key string) error { + c.RWMutex.Lock() + defer c.RWMutex.Unlock() + + _, found := c.items[key] + if !found { + return errors.ErrNotFound + } + + delete(c.items, key) + return nil +} + +func (m *memCache) String() string { + return "memory" +} diff --git a/pkg/cache/memory/item.go b/pkg/cache/memory/item.go new file mode 100644 index 0000000..c35aebf --- /dev/null +++ b/pkg/cache/memory/item.go @@ -0,0 +1,33 @@ +package memory + +import "time" + +// Item represents an item stored in the cache. +type Item struct { + Value interface{} + Expiration int64 +} + +// Expired returns true if the item has expired. +func (i *Item) Expired() bool { + if i.Expiration == 0 { + return false + } + + return time.Now().UnixNano() > i.Expiration +} + +// NewCache returns a new cache. +func NewCache(opts ...Option) *memCache { + options := NewOptions(opts...) + items := make(map[string]Item) + + if len(options.Items) > 0 { + items = options.Items + } + + return &memCache{ + opts: options, + items: items, + } +} diff --git a/pkg/cache/memory/types.go b/pkg/cache/memory/types.go new file mode 100644 index 0000000..1157cfc --- /dev/null +++ b/pkg/cache/memory/types.go @@ -0,0 +1,77 @@ +package memory + +import ( + "context" + "time" + + "git.nobla.cn/golang/aeus/pkg/logger" +) + +var ( + DefaultExpiration time.Duration = 0 +) + +// Options represents the options for the cache. +type Options struct { + // Context should contain all implementation specific options, using context.WithValue. + Context context.Context + // Logger is the be used logger + Logger logger.Logger + Items map[string]Item + // Address represents the address or other connection information of the cache service. + Address string + Expiration time.Duration +} + +// Option manipulates the Options passed. +type Option func(o *Options) + +// Expiration sets the duration for items stored in the cache to expire. +func Expiration(d time.Duration) Option { + return func(o *Options) { + o.Expiration = d + } +} + +// Items initializes the cache with preconfigured items. +func Items(i map[string]Item) Option { + return func(o *Options) { + o.Items = i + } +} + +// WithAddress sets the cache service address or connection information. +func WithAddress(addr string) Option { + return func(o *Options) { + o.Address = addr + } +} + +// WithContext sets the cache context, for any extra configuration. +func WithContext(c context.Context) Option { + return func(o *Options) { + o.Context = c + } +} + +// WithLogger sets underline logger. +func WithLogger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +// NewOptions returns a new options struct. +func NewOptions(opts ...Option) Options { + options := Options{ + Expiration: DefaultExpiration, + Items: make(map[string]Item), + Logger: logger.Default(), + } + + for _, o := range opts { + o(&options) + } + + return options +} diff --git a/pkg/errors/const.go b/pkg/errors/const.go new file mode 100644 index 0000000..7e0ef19 --- /dev/null +++ b/pkg/errors/const.go @@ -0,0 +1,24 @@ +package errors + +const ( + OK = 0 //success + Exit = 1000 //normal exit + Invalid = 1001 //payload invalid + Timeout = 1002 //timeout + Expired = 1003 //expired + AccessDenied = 4005 //access denied + PermissionDenied = 4003 //permission denied + NotFound = 4004 //not found + Unavailable = 5000 //service unavailable +) + +var ( + ErrExit = New(Exit, "normal exit") + ErrTimeout = New(Timeout, "timeout") + ErrExpired = New(Expired, "expired") + ErrValidate = New(Invalid, "invalid payload") + ErrNotFound = New(NotFound, "not found") + ErrAccessDenied = New(AccessDenied, "access denied") + ErrPermissionDenied = New(PermissionDenied, "permission denied") + ErrUnavailable = New(Unavailable, "service unavailable") +) diff --git a/pkg/errors/error.go b/pkg/errors/error.go new file mode 100644 index 0000000..bfa01b3 --- /dev/null +++ b/pkg/errors/error.go @@ -0,0 +1,33 @@ +package errors + +import ( + "errors" + "fmt" +) + +type Error struct { + Code int + Message string +} + +func (e *Error) Error() string { + return fmt.Sprintf("code: %d, message: %s", e.Code, e.Message) +} + +func Format(code int, msg string, args ...any) Error { + return Error{ + Code: code, + Message: fmt.Sprintf(msg, args...), + } +} + +func Is(err, target error) bool { + return errors.Is(err, target) +} + +func New(code int, message string) *Error { + return &Error{ + Code: code, + Message: message, + } +} diff --git a/pkg/logger/default.go b/pkg/logger/default.go new file mode 100644 index 0000000..59b57a0 --- /dev/null +++ b/pkg/logger/default.go @@ -0,0 +1,33 @@ +package logger + +import ( + "context" + "fmt" + "log/slog" +) + +type logger struct { + log *slog.Logger +} + +func (l *logger) Debug(ctx context.Context, msg string, args ...any) { + l.log.DebugContext(ctx, fmt.Sprintf(msg, args...)) +} + +func (l *logger) Info(ctx context.Context, msg string, args ...any) { + l.log.InfoContext(ctx, fmt.Sprintf(msg, args...)) +} + +func (l *logger) Warn(ctx context.Context, msg string, args ...any) { + l.log.WarnContext(ctx, fmt.Sprintf(msg, args...)) +} + +func (l *logger) Error(ctx context.Context, msg string, args ...any) { + l.log.ErrorContext(ctx, fmt.Sprintf(msg, args...)) +} + +func NewLogger(log *slog.Logger) Logger { + return &logger{ + log: log, + } +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000..6ffeb33 --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,41 @@ +package logger + +import ( + "context" + "log/slog" +) + +var ( + log Logger +) + +func init() { + log = NewLogger(slog.Default()) +} + +type Logger interface { + Debug(ctx context.Context, format string, args ...any) + Info(ctx context.Context, format string, args ...any) + Warn(ctx context.Context, format string, args ...any) + Error(ctx context.Context, format string, args ...any) +} + +func Debug(ctx context.Context, format string, args ...any) { + log.Debug(ctx, format, args...) +} + +func Info(ctx context.Context, format string, args ...any) { + log.Debug(ctx, format, args...) +} + +func Warn(ctx context.Context, format string, args ...any) { + log.Debug(ctx, format, args...) +} + +func Error(ctx context.Context, format string, args ...any) { + log.Debug(ctx, format, args...) +} + +func Default() Logger { + return log +} diff --git a/pkg/net/ip.go b/pkg/net/ip.go new file mode 100644 index 0000000..5626e65 --- /dev/null +++ b/pkg/net/ip.go @@ -0,0 +1,45 @@ +package net + +import ( + "net" + "strings" +) + +// get internal ip +func LocalIP() string { + var ( + err error + addrs []net.Addr + inters []net.Interface + ) + if inters, err = net.Interfaces(); err != nil { + return "" + } + for _, inter := range inters { + if inter.Flags&net.FlagUp != net.FlagUp { + continue + } + if !strings.HasPrefix(inter.Name, "lo") { + if addrs, err = inter.Addrs(); err != nil { + continue + } + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + return ipNet.IP.String() + } + } + } + } + } + return "localhost" +} + +// extract ip address +func Extract(addr string) (string, error) { + // if addr is already specified then it's directly returned + if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") { + return addr, nil + } + return LocalIP(), nil +} diff --git a/pkg/net/net.go b/pkg/net/net.go new file mode 100644 index 0000000..ee6ea36 --- /dev/null +++ b/pkg/net/net.go @@ -0,0 +1,22 @@ +package net + +import ( + "fmt" + "net" + "strings" +) + +func HostPort(addr string, port any) string { + host := addr + if strings.Count(addr, ":") > 0 { + host = fmt.Sprintf("[%s]", addr) + } + // when port is blank or 0, host is a queue name + if v, ok := port.(string); ok && v == "" { + return host + } else if v, ok := port.(int); ok && v == 0 && net.ParseIP(host) == nil { + return host + } + + return fmt.Sprintf("%s:%v", host, port) +} diff --git a/pkg/net/utils.go b/pkg/net/utils.go new file mode 100644 index 0000000..392a571 --- /dev/null +++ b/pkg/net/utils.go @@ -0,0 +1,24 @@ +package net + +import ( + "net" + "strconv" +) + +// TrulyAddr returns the truly address of the listener. +func TrulyAddr(addr string, l net.Listener) string { + host, port, err := net.SplitHostPort(addr) + if err != nil && l == nil { + return "" + } + if l != nil { + if taddr, ok := l.Addr().(*net.TCPAddr); ok { + port = strconv.Itoa(taddr.Port) + } + } + if len(host) > 0 && (host != "0.0.0.0" && host != "[::]" && host != "::") { + return net.JoinHostPort(host, port) + } + host = LocalIP() + return net.JoinHostPort(host, port) +} diff --git a/pkg/pool/buffer.go b/pkg/pool/buffer.go new file mode 100644 index 0000000..9e58cce --- /dev/null +++ b/pkg/pool/buffer.go @@ -0,0 +1,22 @@ +package pool + +import ( + "bytes" + "sync" +) + +var ( + bufferPool sync.Pool +) + +func GetBuffer() *bytes.Buffer { + if v := bufferPool.Get(); v != nil { + return v.(*bytes.Buffer) + } + return bytes.NewBuffer([]byte{}) +} + +func PutBuffer(b *bytes.Buffer) { + b.Reset() + bufferPool.Put(b) +} diff --git a/pkg/pool/bytes.go b/pkg/pool/bytes.go new file mode 100644 index 0000000..cdcfcdf --- /dev/null +++ b/pkg/pool/bytes.go @@ -0,0 +1,50 @@ +package pool + +import "sync" + +var ( + bufPool5k sync.Pool + bufPool2k sync.Pool + bufPool1k sync.Pool + bufPool sync.Pool +) + +func GetBytes(size int) []byte { + if size <= 0 { + return nil + } + var x interface{} + if size >= 5*1024 { + x = bufPool5k.Get() + } else if size >= 2*1024 { + x = bufPool2k.Get() + } else if size >= 1*1024 { + x = bufPool1k.Get() + } else { + x = bufPool.Get() + } + if x == nil { + return make([]byte, size) + } + buf := x.([]byte) + if cap(buf) < size { + return make([]byte, size) + } + return buf[:size] +} + +func PutBytes(buf []byte) { + size := cap(buf) + if size <= 0 { + return + } + if size >= 5*1024 { + bufPool5k.Put(buf) + } else if size >= 2*1024 { + bufPool2k.Put(buf) + } else if size >= 1*1024 { + bufPool1k.Put(buf) + } else { + bufPool.Put(buf) + } +} diff --git a/pkg/protoc/command/command.pb.go b/pkg/protoc/command/command.pb.go new file mode 100644 index 0000000..cad963c --- /dev/null +++ b/pkg/protoc/command/command.pb.go @@ -0,0 +1,85 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v5.29.3 +// source: command.proto + +package command + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + descriptorpb "google.golang.org/protobuf/types/descriptorpb" + reflect "reflect" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var file_aeus_command_proto_extTypes = []protoimpl.ExtensionInfo{ + { + ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtensionType: (*string)(nil), + Field: 50001, + Name: "aeus.command", + Tag: "bytes,50001,opt,name=command", + Filename: "aeus/command.proto", + }, +} + +// Extension fields to descriptorpb.MethodOptions. +var ( + // The 'command' option will be a string. + // Choose a field number in the reserved range (50000-99999) to avoid conflicts. + // + // optional string command = 50001; + E_Command = &file_aeus_command_proto_extTypes[0] +) + +var File_aeus_command_proto protoreflect.FileDescriptor + +const file_aeus_command_proto_rawDesc = "" + + "\n" + + "\x12aeus/command.proto\x12\x04aeus\x1a google/protobuf/descriptor.proto:=\n" + + "\acommand\x12\x1e.google.protobuf.MethodOptions\x18ц\x03 \x01(\tR\acommand\x88\x01\x01B5Z3git.nobla.cn/golang/aeus/pkg/protoc/command;commandb\x06proto3" + +var file_aeus_command_proto_goTypes = []any{ + (*descriptorpb.MethodOptions)(nil), // 0: google.protobuf.MethodOptions +} +var file_aeus_command_proto_depIdxs = []int32{ + 0, // 0: aeus.command:extendee -> google.protobuf.MethodOptions + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 0, // [0:1] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_aeus_command_proto_init() } +func file_aeus_command_proto_init() { + if File_aeus_command_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_aeus_command_proto_rawDesc), len(file_aeus_command_proto_rawDesc)), + NumEnums: 0, + NumMessages: 0, + NumExtensions: 1, + NumServices: 0, + }, + GoTypes: file_aeus_command_proto_goTypes, + DependencyIndexes: file_aeus_command_proto_depIdxs, + ExtensionInfos: file_aeus_command_proto_extTypes, + }.Build() + File_aeus_command_proto = out.File + file_aeus_command_proto_goTypes = nil + file_aeus_command_proto_depIdxs = nil +} diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go new file mode 100644 index 0000000..7dfe3e6 --- /dev/null +++ b/registry/etcd/etcd.go @@ -0,0 +1,147 @@ +package etcd + +import ( + "crypto/tls" + "encoding/json" + "path" + "sync" + + "git.nobla.cn/golang/aeus/pkg/errors" + "git.nobla.cn/golang/aeus/registry" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type etcdRegistry struct { + prefix string + client *clientv3.Client + options *registry.RegistryOptions + leaseID clientv3.LeaseID + sync.RWMutex +} + +func (e *etcdRegistry) prefixKey(name string) string { + return path.Join(e.prefix, name) + "/" +} + +func (e *etcdRegistry) buildKey(s *registry.Service) string { + return path.Join(e.prefix, s.Name, s.ID) +} + +func (e *etcdRegistry) marshalService(s *registry.Service) string { + buf, err := json.Marshal(s) + if err == nil { + return string(buf) + } + return "" +} + +func (e *etcdRegistry) unmarshalService(b []byte) (*registry.Service, error) { + var service registry.Service + err := json.Unmarshal(b, &service) + if err == nil { + return &service, nil + } + return nil, err +} + +func (e *etcdRegistry) Name() string { + return Name +} + +func (e *etcdRegistry) Init(opts ...registry.RegistryOption) (err error) { + for _, cb := range opts { + cb(e.options) + } + cfg := clientv3.Config{} + if e.options.Context != nil { + clientOpts := FromContext(e.options.Context) + if clientOpts != nil { + cfg.Username = clientOpts.Username + cfg.Password = clientOpts.Password + cfg.DialTimeout = clientOpts.DialTimeout + if clientOpts.Prefix != "" { + e.prefix = clientOpts.Prefix + } + } + } + if e.options.TLSConfig != nil { + cfg.TLS = e.options.TLSConfig + } else if e.options.Secure { + cfg.TLS = &tls.Config{ + InsecureSkipVerify: true, + } + } + if e.prefix == "" { + e.prefix = "/micro/service/" + } + cfg.Endpoints = append(cfg.Endpoints, e.options.Addrs...) + e.client, err = clientv3.New(cfg) + return nil +} + +func (e *etcdRegistry) Register(s *registry.Service, cbs ...registry.RegisterOption) (err error) { + var lgr *clientv3.LeaseGrantResponse + opts := registry.NewRegisterOption(cbs...) + if opts.TTL.Seconds() > 0 { + if e.leaseID > 0 { + if _, err = e.client.KeepAliveOnce(opts.Context, e.leaseID); err == nil { + return + } else { + if err != rpctypes.ErrLeaseNotFound { + return err + } + } + } + lgr, err = e.client.Grant(opts.Context, int64(opts.TTL.Seconds())) + if err != nil { + return err + } + } + if lgr != nil { + if _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s), clientv3.WithLease(lgr.ID)); err == nil { + e.leaseID = lgr.ID + } + } else { + _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s)) + } + if err != nil { + return err + } + return nil +} + +func (e *etcdRegistry) Deregister(s *registry.Service, cbs ...registry.DeregisterOption) error { + opts := registry.NewDeregisterOption(cbs...) + e.client.Delete(opts.Context, e.buildKey(s)) + return nil +} + +func (e *etcdRegistry) GetService(name string, cbs ...registry.GetOption) ([]*registry.Service, error) { + opts := registry.NewGetOption(cbs...) + rsp, err := e.client.Get(opts.Context, e.prefixKey(name), clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + if len(rsp.Kvs) == 0 { + return nil, errors.ErrNotFound + } + ss := make([]*registry.Service, 0, len(rsp.Kvs)) + for _, n := range rsp.Kvs { + if s, err := e.unmarshalService(n.Value); err == nil { + ss = append(ss, s) + } + } + return ss, nil +} + +func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + return newWatcher(e, opts...) +} + +func NewEtcdRegistry() registry.Registry { + e := &etcdRegistry{ + options: ®istry.RegistryOptions{}, + } + return e +} diff --git a/registry/etcd/types.go b/registry/etcd/types.go new file mode 100644 index 0000000..7a7c723 --- /dev/null +++ b/registry/etcd/types.go @@ -0,0 +1,36 @@ +package etcd + +import ( + "context" + "time" +) + +const ( + Name = "etcd" +) + +type ( + etcdContextKey struct{} +) + +type ( + ClientOptions struct { + Prefix string + Username string + Password string + DialTimeout time.Duration + } +) + +func FromContext(ctx context.Context) *ClientOptions { + if v := ctx.Value(etcdContextKey{}); v != nil { + if v, ok := v.(*ClientOptions); ok { + return v + } + } + return nil +} + +func WithContext(ctx context.Context, opts *ClientOptions) context.Context { + return context.WithValue(ctx, etcdContextKey{}, opts) +} diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go new file mode 100644 index 0000000..12dff21 --- /dev/null +++ b/registry/etcd/watcher.go @@ -0,0 +1,66 @@ +package etcd + +import ( + "context" + "io" + "sync/atomic" + + "git.nobla.cn/golang/aeus/registry" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type etcdWatcher struct { + ctx context.Context + cancelFunc context.CancelFunc + watchChan clientv3.WatchChan + watcher clientv3.Watcher + registry *etcdRegistry + opts *registry.WatchOptions + setupFlag int32 +} + +func newWatcher(r *etcdRegistry, cbs ...registry.WatchOption) (watcher registry.Watcher, err error) { + w := &etcdWatcher{ + registry: r, + } + w.opts = registry.NewWatchOption(cbs...) + w.ctx, w.cancelFunc = context.WithCancel(w.opts.Context) + w.watcher = clientv3.NewWatcher(r.client) + w.watchChan = w.watcher.Watch(w.ctx, r.prefixKey(w.opts.Service), clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly()) + if err = w.watcher.RequestProgress(w.ctx); err != nil { + w.opts.Logger.Warn(w.ctx, "etcd request progress error: %v", err) + } + return w, err +} + +func (w *etcdWatcher) getServices() ([]*registry.Service, error) { + return w.registry.GetService(w.opts.Service) +} + +func (w *etcdWatcher) Next() ([]*registry.Service, error) { + if atomic.CompareAndSwapInt32(&w.setupFlag, 0, 1) { + return w.getServices() + } + select { + case <-w.ctx.Done(): + return nil, w.ctx.Err() + case res, ok := <-w.watchChan: + if ok { + if res.Err() == nil { + return w.getServices() + } else { + w.opts.Logger.Warn(w.ctx, "etcd watcher read error: %v", res.Err()) + return nil, res.Err() + } + } + } + return nil, io.ErrClosedPipe +} + +func (w *etcdWatcher) Stop() (err error) { + if err = w.watcher.Close(); err != nil { + w.opts.Logger.Warn(w.ctx, "etcd watcher close error: %v", err) + } + w.cancelFunc() + return err +} diff --git a/registry/registry.go b/registry/registry.go new file mode 100644 index 0000000..69ada7a --- /dev/null +++ b/registry/registry.go @@ -0,0 +1,13 @@ +package registry + +// The registry provides an interface for service discovery +// and an abstraction over varying implementations +// {consul, etcd, zookeeper, ...}. +type Registry interface { + Name() string //return registry name + Init(...RegistryOption) error //init registry + Register(*Service, ...RegisterOption) error //register service + Deregister(*Service, ...DeregisterOption) error //deregister service + GetService(string, ...GetOption) ([]*Service, error) //get service list + Watch(...WatchOption) (Watcher, error) //watch service +} diff --git a/registry/types.go b/registry/types.go new file mode 100644 index 0000000..522a99a --- /dev/null +++ b/registry/types.go @@ -0,0 +1,198 @@ +package registry + +import ( + "context" + "crypto/tls" + "time" + + "git.nobla.cn/golang/aeus/pkg/logger" +) + +type ( + Service struct { + ID string `json:"id"` + Name string `json:"name"` + Version string `json:"version"` + Metadata map[string]string `json:"metadata"` + Endpoints []string `json:"endpoints"` + } +) + +type ( + Watcher interface { + // Next returns services in the following two cases: + // 1.the first time to watch and the service instance list is not empty. + // 2.any service instance changes found. + // if the above two conditions are not met, it will block until context deadline exceeded or canceled + Next() ([]*Service, error) + // Stop close the watcher. + Stop() error + } +) + +type ( + RegistryOptions struct { + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + TLSConfig *tls.Config + Addrs []string + Timeout time.Duration + Secure bool + } + + RegistryOption func(o *RegistryOptions) + + RegisterOptions struct { + Context context.Context + // register ttl + TTL time.Duration + } + + RegisterOption func(o *RegisterOptions) + + WatchOptions struct { + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + // Specify a service to watch + // If blank, the watch is for all services + Service string + // Specify a logger + Logger logger.Logger + } + + WatchOption func(o *WatchOptions) + + DeregisterOptions struct { + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + } + + DeregisterOption func(o *DeregisterOptions) + + GetOptions struct { + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + } + + GetOption func(o *GetOptions) +) + +func WithRegistryContext(ctx context.Context) RegistryOption { + return func(o *RegistryOptions) { + o.Context = ctx + } +} + +func WithAddress(address string) RegistryOption { + return func(o *RegistryOptions) { + o.Addrs = append(o.Addrs, address) + } +} + +func WithTLS(tls *tls.Config) RegistryOption { + return func(o *RegistryOptions) { + o.TLSConfig = tls + } +} + +func WithTimeout(t time.Duration) RegistryOption { + return func(o *RegistryOptions) { + o.Timeout = t + } +} + +func NewRegistryOption(cbs ...RegistryOption) *RegistryOptions { + opts := &RegistryOptions{ + Context: context.Background(), + } + for _, o := range cbs { + o(opts) + } + return opts +} + +func WithTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} + +func WithRegsterContext(ctx context.Context) RegisterOption { + return func(o *RegisterOptions) { + o.Context = ctx + } +} +func NewRegisterOption(cbs ...RegisterOption) *RegisterOptions { + opts := &RegisterOptions{ + Context: context.Background(), + } + for _, o := range cbs { + o(opts) + } + return opts +} + +func WithDeregisterContext(ctx context.Context) DeregisterOption { + return func(o *DeregisterOptions) { + o.Context = ctx + } +} + +func NewDeregisterOption(cbs ...DeregisterOption) *DeregisterOptions { + opts := &DeregisterOptions{ + Context: context.Background(), + } + for _, o := range cbs { + o(opts) + } + return opts +} + +func WithService(service string) WatchOption { + return func(o *WatchOptions) { + o.Service = service + } +} + +func WithLogger(logger logger.Logger) WatchOption { + return func(o *WatchOptions) { + o.Logger = logger + } +} + +func WithWatchContext(ctx context.Context) WatchOption { + return func(o *WatchOptions) { + o.Context = ctx + } +} + +func NewWatchOption(cbs ...WatchOption) *WatchOptions { + opts := &WatchOptions{ + Context: context.Background(), + Logger: logger.Default(), + } + for _, o := range cbs { + o(opts) + } + return opts +} + +func WithGetContext(ctx context.Context) GetOption { + return func(o *GetOptions) { + o.Context = ctx + } +} + +func NewGetOption(cbs ...GetOption) *GetOptions { + opts := &GetOptions{ + Context: context.Background(), + } + for _, o := range cbs { + o(opts) + } + return opts +} diff --git a/transport/cli/client.go b/transport/cli/client.go new file mode 100644 index 0000000..5b34e42 --- /dev/null +++ b/transport/cli/client.go @@ -0,0 +1,262 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "io" + "math" + "net" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/peterh/liner" +) + +type Client struct { + name string + ctx context.Context + address string + sequence uint16 + conn net.Conn + liner *liner.State + mutex sync.Mutex + exitChan chan struct{} + readyChan chan struct{} + commandChan chan *Frame + completerChan chan *Frame + Timeout time.Duration + exitFlag int32 +} + +func (client *Client) getSequence() uint16 { + client.mutex.Lock() + defer client.mutex.Unlock() + if client.sequence >= math.MaxUint16 { + client.sequence = 0 + } + client.sequence++ + n := client.sequence + return n +} + +func (client *Client) dialContext(ctx context.Context, address string) (conn net.Conn, err error) { + var ( + pos int + network string + dialer net.Dialer + ) + if pos = strings.Index(address, "://"); pos > -1 { + network = address[:pos] + address = address[pos+3:] + } else { + network = "tcp" + } + if conn, err = dialer.DialContext(ctx, network, address); err != nil { + return + } + return +} + +func (client *Client) renderBanner(info *handshake) { + client.name = info.Name + fmt.Printf("Welcome to the %s(%s) monitor\n", info.Name, info.Version) + fmt.Printf("Your connection id is %d\n", info.ID) + fmt.Printf("Last login: %s from %s\n", info.ServerTime.Format(time.RFC822), info.RemoteAddr) + fmt.Printf("Type 'help' for help. Type 'exit' for quit. Type 'cls' to clear input statement.\n") +} + +func (client *Client) ioLoop(r io.Reader) { + defer func() { + _ = client.Close() + }() + for { + frame, err := readFrame(r) + if err != nil { + return + } + switch frame.Type { + case PacketTypeHandshake: + info := &handshake{} + if err = json.Unmarshal(frame.Data, info); err == nil { + client.renderBanner(info) + } + select { + case client.readyChan <- struct{}{}: + case <-client.exitChan: + return + } + case PacketTypeCompleter: + select { + case client.completerChan <- frame: + case <-client.exitChan: + return + } + case PacketTypeCommand: + select { + case client.commandChan <- frame: + case <-client.exitChan: + return + } + } + } +} + +func (client *Client) waitResponse(seq uint16, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + fmt.Println("timeout waiting for response") + return + case <-client.exitChan: + return + case res, ok := <-client.commandChan: + if !ok { + break + } + if res.Seq == seq { + if res.Error != "" { + fmt.Print(res.Error) + } else { + fmt.Print(string(res.Data)) + } + if res.Flag == FlagComplete { + fmt.Println("") + return + } + } + } + } +} + +func (client *Client) completer(str string) (ss []string) { + var ( + err error + seq uint16 + ) + ss = make([]string, 0) + seq = client.getSequence() + if err = writeFrame(client.conn, newFrame(PacketTypeCompleter, FlagComplete, seq, client.Timeout, []byte(str))); err != nil { + return + } + select { + case <-time.After(time.Second * 5): + case frame, ok := <-client.completerChan: + if ok { + err = json.Unmarshal(frame.Data, &ss) + } + } + return +} + +func (client *Client) Execute(s string) (err error) { + var ( + seq uint16 + ) + if client.conn, err = client.dialContext(client.ctx, client.address); err != nil { + return err + } + defer func() { + _ = client.Close() + }() + go client.ioLoop(client.conn) + seq = client.getSequence() + if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(s))); err != nil { + return err + } + client.waitResponse(seq, client.Timeout) + return +} + +func (client *Client) Shell() (err error) { + var ( + seq uint16 + line string + ) + client.liner.SetCtrlCAborts(true) + if client.conn, err = client.dialContext(client.ctx, client.address); err != nil { + return err + } + defer func() { + _ = client.Close() + }() + if err = writeFrame(client.conn, newFrame(PacketTypeHandshake, FlagComplete, client.getSequence(), client.Timeout, nil)); err != nil { + return + } + go client.ioLoop(client.conn) + select { + case <-client.readyChan: + case <-client.ctx.Done(): + return + } + client.liner.SetCompleter(client.completer) + for { + if line, err = client.liner.Prompt(client.name + "> "); err != nil { + break + } + if atomic.LoadInt32(&client.exitFlag) == 1 { + fmt.Println(Bye) + break + } + line = strings.TrimSpace(line) + if line == "" { + continue + } + if strings.ToLower(line) == "exit" || strings.ToLower(line) == "quit" { + fmt.Println(Bye) + return + } + if strings.ToLower(line) == "clear" || strings.ToLower(line) == "cls" { + fmt.Print("\033[2J") + continue + } + seq = client.getSequence() + if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(line))); err != nil { + break + } + client.liner.AppendHistory(line) + client.waitResponse(seq, client.Timeout) + } + return +} + +func (client *Client) Close() (err error) { + if !atomic.CompareAndSwapInt32(&client.exitFlag, 0, 1) { + return + } + close(client.exitChan) + if client.conn != nil { + err = client.conn.Close() + } + if client.liner != nil { + err = client.liner.Close() + } + return +} + +func NewClient(ctx context.Context, addr string) *Client { + var ( + timeout time.Duration + ) + if ctx == nil { + ctx = context.Background() + } + timeout = time.Second * 30 + return &Client{ + ctx: ctx, + address: addr, + name: filepath.Base(os.Args[0]), + Timeout: timeout, + liner: liner.NewLiner(), + readyChan: make(chan struct{}, 1), + exitChan: make(chan struct{}), + commandChan: make(chan *Frame, 5), + completerChan: make(chan *Frame, 5), + } +} diff --git a/transport/cli/context.go b/transport/cli/context.go new file mode 100644 index 0000000..433c71d --- /dev/null +++ b/transport/cli/context.go @@ -0,0 +1,148 @@ +package cli + +import ( + "context" + "fmt" + "io" + "math" + "net/url" + "sync" + + "github.com/gin-gonic/gin/binding" +) + +type Context struct { + Id int64 + seq uint16 + ctx context.Context + wc io.WriteCloser + params map[string]string + locker sync.RWMutex + variables map[string]any + args []string +} + +func (ctx *Context) reset(id int64, wc io.WriteCloser) { + ctx.Id = id + ctx.wc = wc + ctx.seq = 0 + ctx.ctx = context.Background() + ctx.args = make([]string, 0) + ctx.params = make(map[string]string) + ctx.variables = make(map[string]any) +} + +func (ctx *Context) setArgs(args []string) { + ctx.args = args +} + +func (ctx *Context) setParam(ps map[string]string) { + ctx.params = ps +} + +func (ctx *Context) Bind(val any) (err error) { + qs := url.Values{} + for k, v := range ctx.params { + qs.Set(k, v) + } + return binding.MapFormWithTag(val, qs, "json") +} + +func (ctx *Context) setContext(c context.Context) { + ctx.ctx = c +} + +func (ctx *Context) Context() context.Context { + return ctx.ctx +} + +func (ctx *Context) Argument(index int) string { + if index >= len(ctx.args) || index < 0 { + return "" + } + return ctx.args[index] +} + +func (ctx *Context) Param(s string) string { + if v, ok := ctx.params[s]; ok { + return v + } + return "" +} + +func (ctx *Context) SetValue(name string, value any) { + ctx.locker.Lock() + if ctx.variables == nil { + ctx.variables = make(map[string]any) + } + ctx.variables[name] = value + ctx.locker.Unlock() +} + +func (ctx *Context) GetValue(name string) (val any, ok bool) { + ctx.locker.RLock() + defer ctx.locker.RUnlock() + val, ok = ctx.variables[name] + return +} + +func (ctx *Context) Success(v any) (err error) { + return ctx.send(responsePayload{Type: PacketTypeCommand, Data: v}) +} + +func (ctx *Context) Error(code int, reason string) (err error) { + return ctx.send(responsePayload{Type: PacketTypeCommand, Code: code, Reason: reason}) +} + +func (ctx *Context) Close() (err error) { + return ctx.wc.Close() +} + +func (ctx *Context) send(res responsePayload) (err error) { + var ( + ok bool + buf []byte + marshal encoder + ) + if res.Code > 0 { + err = writeFrame(ctx.wc, &Frame{ + Feature: Feature, + Type: res.Type, + Seq: ctx.seq, + Flag: FlagComplete, + Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Reason), + }) + return + } + if res.Data == nil { + buf = OK + goto __END + } + if marshal, ok = res.Data.(encoder); ok { + buf, err = marshal.Marshal() + goto __END + } + buf, err = serialize(res.Data) +__END: + if err != nil { + return + } + offset := 0 + chunkSize := math.MaxInt16 - 1 + n := len(buf) / chunkSize + for i := 0; i < n; i++ { + if err = writeFrame(ctx.wc, newFrame(res.Type, FlagPortion, ctx.seq, 0, buf[offset:chunkSize+offset])); err != nil { + return + } + offset += chunkSize + } + err = writeFrame(ctx.wc, newFrame(res.Type, FlagComplete, ctx.seq, 0, buf[offset:])) + return +} + +func newContext(id int64, wc io.WriteCloser) *Context { + return &Context{ + Id: id, + wc: wc, + } +} diff --git a/transport/cli/frame.go b/transport/cli/frame.go new file mode 100644 index 0000000..bc0b451 --- /dev/null +++ b/transport/cli/frame.go @@ -0,0 +1,162 @@ +package cli + +import ( + "bytes" + "encoding/binary" + "io" + "math" + "time" +) + +const ( + PacketTypeCompleter byte = 0x01 + PacketTypeCommand = 0x02 + PacketTypeHandshake = 0x03 +) + +const ( + FlagPortion = 0x00 + FlagComplete = 0x01 +) + +type ( + Frame struct { + Feature []byte + Type byte `json:"type"` + Flag byte `json:"flag"` + Seq uint16 `json:"seq"` + Data []byte `json:"data"` + Error string `json:"error"` + Timeout int64 `json:"timeout"` + Timestamp int64 `json:"timestamp"` + } +) + +func readFrame(r io.Reader) (frame *Frame, err error) { + var ( + n int + dataLength uint16 + errorLength uint16 + errBuf []byte + ) + frame = &Frame{Feature: make([]byte, 3)} + if _, err = io.ReadFull(r, frame.Feature); err != nil { + return + } + if !bytes.Equal(frame.Feature, Feature) { + err = io.ErrUnexpectedEOF + return + } + if err = binary.Read(r, binary.LittleEndian, &frame.Type); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &frame.Flag); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &frame.Seq); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &frame.Timeout); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &frame.Timestamp); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &dataLength); err != nil { + return + } + if err = binary.Read(r, binary.LittleEndian, &errorLength); err != nil { + return + } + if dataLength > 0 { + frame.Data = make([]byte, dataLength) + if n, err = io.ReadFull(r, frame.Data); err == nil { + if n < int(dataLength) { + err = io.ErrShortBuffer + } + } + } + if errorLength > 0 { + errBuf = make([]byte, errorLength) + if n, err = io.ReadFull(r, errBuf); err == nil { + if n < int(dataLength) { + err = io.ErrShortBuffer + } else { + frame.Error = string(errBuf) + } + } + } + return +} + +func writeFrame(w io.Writer, frame *Frame) (err error) { + var ( + n int + dl int + dataLength uint16 + errorLength uint16 + errBuf []byte + ) + if _, err = w.Write(Feature); err != nil { + return + } + if frame.Data != nil { + dl = len(frame.Data) + if dl > math.MaxUint16 { + return io.ErrNoProgress + } + dataLength = uint16(dl) + } + if frame.Error != "" { + errBuf = []byte(frame.Error) + errorLength = uint16(len(errBuf)) + } + if err = binary.Write(w, binary.LittleEndian, frame.Type); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, frame.Flag); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, frame.Seq); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, frame.Timeout); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, frame.Timestamp); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, dataLength); err != nil { + return + } + if err = binary.Write(w, binary.LittleEndian, errorLength); err != nil { + return + } + if dataLength > 0 { + if n, err = w.Write(frame.Data); err == nil { + if n < int(dataLength) { + err = io.ErrShortWrite + } + } + } + if errorLength > 0 { + if n, err = w.Write(errBuf); err == nil { + if n < int(errorLength) { + err = io.ErrShortWrite + } + } + } + return +} + +func newFrame(t, f byte, seq uint16, timeout time.Duration, data []byte) *Frame { + return &Frame{ + Feature: Feature, + Type: t, + Flag: f, + Seq: seq, + Data: data, + Timeout: int64(timeout), + Timestamp: time.Now().Unix(), + } +} diff --git a/transport/cli/router.go b/transport/cli/router.go new file mode 100644 index 0000000..c496e38 --- /dev/null +++ b/transport/cli/router.go @@ -0,0 +1,181 @@ +package cli + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/mattn/go-runewidth" +) + +var ( + ErrNotFound = errors.New("not found") +) + +type Router struct { + name string + path []string + children []*Router + command Command + params []string +} + +func (r *Router) getChildren(name string) *Router { + for _, child := range r.children { + if child.name == name { + return child + } + } + return nil +} + +func (r *Router) Completer(tokens ...string) []string { + ss := make([]string, 0, 10) + if len(tokens) == 0 { + for _, child := range r.children { + ss = append(ss, strings.Join(child.path, " ")) + } + return ss + } + children := r.getChildren(tokens[0]) + if children == nil { + token := tokens[0] + for _, child := range r.children { + if strings.HasPrefix(child.name, token) { + ss = append(ss, strings.Join(child.path, " ")) + } + } + return ss + } + return children.Completer(tokens[1:]...) +} + +func (r *Router) Usage() string { + if len(r.path) <= 0 { + return "" + } + var ( + sb strings.Builder + ) + sb.WriteString("Usage: ") + sb.WriteString(strings.Join(r.path, " ")) + if len(r.params) > 0 { + for _, s := range r.params { + sb.WriteString(" {" + s + "}") + } + } + return sb.String() +} + +func (r *Router) Handle(path string, command Command) { + var ( + pos int + name string + ) + if strings.HasSuffix(path, "/") { + path = strings.TrimSuffix(path, "/") + } + if strings.HasPrefix(path, "/") { + path = strings.TrimPrefix(path, "/") + } + if path == "" { + r.command = command + return + } + if path[0] == ':' { + ss := strings.Split(path, "/") + for _, s := range ss { + r.params = append(r.params, strings.TrimPrefix(s, ":")) + } + r.command = command + return + } + if pos = strings.IndexByte(path, '/'); pos > -1 { + name = path[:pos] + path = path[pos:] + } else { + name = path + path = "" + } + if name == "-" { + name = "app" + } + children := r.getChildren(name) + if children == nil { + children = newRouter(name) + if len(r.path) == 0 { + children.path = append(children.path, name) + } else { + children.path = append(children.path, r.path...) + children.path = append(children.path, name) + } + r.children = append(r.children, children) + } + if children.command.Handle != nil { + panic("a handle is already registered for path /" + strings.Join(children.path, "/")) + } + children.Handle(path, command) +} + +func (r *Router) Lookup(tokens []string) (router *Router, args []string, err error) { + if len(tokens) > 0 { + children := r.getChildren(tokens[0]) + if children != nil { + return children.Lookup(tokens[1:]) + } + } + if r.command.Handle == nil { + err = ErrNotFound + return + } + router = r + args = tokens + return +} + +func (r *Router) String() string { + var ( + sb strings.Builder + width int + maxWidth int + walkFunc func(router *Router) []commander + ) + walkFunc = func(router *Router) []commander { + vs := make([]commander, 0, 5) + if router.command.Handle != nil { + vs = append(vs, commander{ + Name: router.name, + Path: strings.Join(router.path, " "), + Description: router.command.Description, + }) + } else { + if len(router.children) > 0 { + for _, child := range router.children { + vs = append(vs, walkFunc(child)...) + } + } + } + return vs + } + vs := walkFunc(r) + for _, v := range vs { + width = runewidth.StringWidth(v.Path) + if width > maxWidth { + maxWidth = width + } + } + for _, v := range vs { + sb.WriteString(fmt.Sprintf("%-"+strconv.Itoa(maxWidth+4)+"s %s\n", v.Path, v.Description)) + } + return sb.String() +} + +func newRouter(name string) *Router { + return &Router{ + name: name, + path: make([]string, 0, 4), + params: make([]string, 0, 4), + children: make([]*Router, 0, 10), + } +} diff --git a/transport/cli/serialize.go b/transport/cli/serialize.go new file mode 100644 index 0000000..4f60467 --- /dev/null +++ b/transport/cli/serialize.go @@ -0,0 +1,263 @@ +package cli + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" + "slices" + "strconv" + "strings" + "time" + + "git.nobla.cn/golang/aeus/pkg/pool" + "github.com/mattn/go-runewidth" +) + +func isNormalKind(kind reflect.Kind) bool { + normalKinds := []reflect.Kind{ + reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Int, + reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uint, + reflect.Float32, reflect.Float64, + reflect.String, + } + for _, k := range normalKinds { + if k == kind { + return true + } + } + return false +} + +func serializeMap(val map[any]any) ([]byte, error) { + var ( + canFormat bool + width int + maxWidth int + ) + canFormat = true + for k, v := range val { + if !isNormalKind(reflect.Indirect(reflect.ValueOf(k)).Kind()) || !isNormalKind(reflect.Indirect(reflect.ValueOf(v)).Kind()) { + canFormat = false + break + } + } + if !canFormat { + return json.MarshalIndent(val, "", "\t") + } + ms := make(map[string]string) + for k, v := range val { + sk := fmt.Sprint(k) + ms[sk] = fmt.Sprint(v) + width = runewidth.StringWidth(sk) + if width > maxWidth { + maxWidth = width + } + } + buffer := pool.GetBuffer() + defer pool.PutBuffer(buffer) + for k, v := range ms { + buffer.WriteString(fmt.Sprintf("%-"+strconv.Itoa(maxWidth+4)+"s %s\n", k, v)) + } + return buffer.Bytes(), nil +} + +func printBorder(w *bytes.Buffer, ws []int) { + for _, l := range ws { + w.WriteString("+") + w.WriteString(strings.Repeat("-", l+2)) + } + w.WriteString("+\n") +} + +func toString(v any) string { + switch t := v.(type) { + case float32, float64: + return fmt.Sprintf("%.2f", t) + case time.Time: + return t.Format("2006-01-02 15:04:05") + default: + return fmt.Sprint(v) + } +} + +func printArray(vals [][]any) (buf []byte) { + var ( + cell string + str string + widths []int + maxLength int + width int + rows [][]string + ) + rows = make([][]string, 0, len(vals)) + for _, value := range vals { + if len(value) > maxLength { + maxLength = len(value) + } + } + widths = make([]int, maxLength) + for _, vs := range vals { + rl := len(vs) + row := make([]string, rl) + for i, val := range vs { + str = toString(val) + if rl > 1 { + width = runewidth.StringWidth(str) + if width > widths[i] { + widths[i] = width + } + } + row[i] = str + } + rows = append(rows, row) + } + buffer := pool.GetBuffer() + defer pool.PutBuffer(buffer) + printBorder(buffer, widths) + for index, row := range rows { + size := len(row) + for i, w := range widths { + cell = "" + buffer.WriteString("|") + if size > i { + cell = row[i] + } + buffer.WriteString(" ") + buffer.WriteString(cell) + cl := runewidth.StringWidth(cell) + if w > cl { + buffer.WriteString(strings.Repeat(" ", w-cl)) + } + buffer.WriteString(" ") + } + buffer.WriteString("|\n") + if index == 0 { + printBorder(buffer, widths) + } + } + printBorder(buffer, widths) + return buffer.Bytes() +} + +func serializeArray(val []any) (buf []byte, err error) { + var ( + ok bool + vs [][]any + normalFormat bool + isArrayElement bool + isStructElement bool + columnName string + ) + normalFormat = true + for _, row := range val { + kind := reflect.Indirect(reflect.ValueOf(row)).Kind() + if !isNormalKind(kind) { + normalFormat = false + } + if kind == reflect.Array || kind == reflect.Slice { + isArrayElement = true + } + if kind == reflect.Struct { + isStructElement = true + } + } + if normalFormat { + goto __END + } + if isArrayElement { + vs = make([][]any, 0, len(val)) + for _, v := range val { + rv := reflect.Indirect(reflect.ValueOf(v)) + if rv.Kind() == reflect.Array || rv.Kind() == reflect.Slice { + row := make([]any, 0, rv.Len()) + for i := 0; i < rv.Len(); i++ { + if isNormalKind(rv.Index(i).Kind()) || rv.Index(i).Interface() == nil { + row = append(row, rv.Index(i).Interface()) + } else { + goto __END + } + } + vs = append(vs, row) + } else { + goto __END + } + } + } + if isStructElement { + vs = make([][]any, 0, len(val)) + indexes := make([]int, 0) + for i, v := range val { + rv := reflect.Indirect(reflect.ValueOf(v)) + if rv.Kind() == reflect.Struct { + if i == 0 { + row := make([]any, 0, rv.Type().NumField()) + for j := 0; j < rv.Type().NumField(); j++ { + st := rv.Type().Field(j).Tag + if columnName, ok = st.Lookup("kos"); !ok { + columnName = strings.ToUpper(rv.Type().Field(j).Name) + } else { + if columnName == "-" { + continue + } + } + if !rv.Type().Field(j).IsExported() { + continue + } + indexes = append(indexes, j) + row = append(row, columnName) + } + vs = append(vs, row) + } + row := make([]any, 0, rv.Type().NumField()) + for j := 0; j < rv.Type().NumField(); j++ { + if slices.Index(indexes, j) > -1 { + row = append(row, rv.Field(j).Interface()) + } + } + vs = append(vs, row) + } else { + goto __END + } + } + } + buf = printArray(vs) + return +__END: + return json.MarshalIndent(val, "", "\t") +} + +func serialize(val any) (buf []byte, err error) { + var ( + refVal reflect.Value + ) + refVal = reflect.Indirect(reflect.ValueOf(val)) + switch refVal.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + buf = []byte(strconv.FormatInt(refVal.Int(), 10)) + case reflect.Float32, reflect.Float64: + buf = []byte(strconv.FormatFloat(refVal.Float(), 'f', -1, 64)) + case reflect.String: + buf = []byte(refVal.String()) + case reflect.Slice, reflect.Array: + if refVal.Type().Elem().Kind() == reflect.Uint8 { + buf = refVal.Bytes() + } else { + as := make([]any, 0, refVal.Len()) + for i := 0; i < refVal.Len(); i++ { + as = append(as, refVal.Index(i).Interface()) + } + buf, err = serializeArray(as) + } + case reflect.Map: + ms := make(map[any]any) + keys := refVal.MapKeys() + for _, key := range keys { + ms[key.Interface()] = refVal.MapIndex(key).Interface() + } + buf, err = serializeMap(ms) + default: + buf, err = json.MarshalIndent(refVal.Interface(), "", "\t") + } + return +} diff --git a/transport/cli/server.go b/transport/cli/server.go new file mode 100644 index 0000000..90318a8 --- /dev/null +++ b/transport/cli/server.go @@ -0,0 +1,233 @@ +package cli + +import ( + "context" + "fmt" + "math" + "net" + "net/url" + "runtime" + "strings" + "sync" + "sync/atomic" + "time" + + "git.nobla.cn/golang/aeus/pkg/errors" + "git.nobla.cn/golang/aeus/pkg/logger" + netutil "git.nobla.cn/golang/aeus/pkg/net" +) + +type Server struct { + ctx context.Context + router *Router + opts *options + listener net.Listener + sequenceLocker sync.Mutex + sequence int64 + ctxMap sync.Map + uri *url.URL + exitFlag int32 +} + +func (svr *Server) Handle(pathname string, desc string, cb HandleFunc) { + svr.router.Handle(pathname, svr.wrapCommand(pathname, desc, cb)) +} + +func (svr *Server) wrapCommand(pathname, desc string, cb HandleFunc) Command { + h := func(ctx *Context) (err error) { + return cb(ctx) + } + if desc == "" { + desc = strings.Join(strings.Split(strings.TrimPrefix(pathname, "/"), "/"), " ") + } + return Command{ + Path: pathname, + Handle: h, + Description: desc, + } +} + +func (s *Server) createListener() (err error) { + if s.listener != nil { + return + } + if s.listener, err = net.Listen(s.opts.network, s.opts.address); err == nil { + s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener) + } + return +} + +func (s *Server) applyContext() *Context { + if v := ctxPool.Get(); v != nil { + if ctx, ok := v.(*Context); ok { + return ctx + } + } + return &Context{} +} + +func (s *Server) releaseContext(ctx *Context) { + ctxPool.Put(ctx) +} + +func (s *Server) execute(ctx *Context, frame *Frame) (err error) { + var ( + params map[string]string + tokens []string + args []string + r *Router + ) + cmd := string(frame.Data) + tokens = strings.Fields(cmd) + if frame.Timeout > 0 { + childCtx, cancelFunc := context.WithTimeout(s.ctx, time.Duration(frame.Timeout)) + ctx.setContext(childCtx) + defer func() { + cancelFunc() + }() + } else { + ctx.setContext(s.ctx) + } + if r, args, err = s.router.Lookup(tokens); err != nil { + if errors.Is(err, ErrNotFound) { + err = ctx.Error(errNotFound, fmt.Sprintf("Command %s not found", cmd)) + } else { + err = ctx.Error(errExecuteFailed, err.Error()) + } + } else { + if len(r.params) > len(args) { + err = ctx.Error(errExecuteFailed, r.Usage()) + return + } + if len(r.params) > 0 { + params = make(map[string]string) + for i, s := range r.params { + params[s] = args[i] + } + } + ctx.setArgs(args) + ctx.setParam(params) + err = r.command.Handle(ctx) + } + return +} + +func (svr *Server) nextSequence() int64 { + svr.sequenceLocker.Lock() + defer svr.sequenceLocker.Unlock() + if svr.sequence >= math.MaxInt64 { + svr.sequence = 1 + } + svr.sequence++ + return svr.sequence +} + +func (svr *Server) process(conn net.Conn) { + var ( + err error + ctx *Context + frame *Frame + ) + ctx = svr.applyContext() + ctx.reset(svr.nextSequence(), conn) + svr.ctxMap.Store(ctx.Id, ctx) + defer func() { + _ = conn.Close() + svr.ctxMap.Delete(ctx.Id) + svr.releaseContext(ctx) + }() + for { + if frame, err = readFrame(conn); err != nil { + break + } + //reset frame + ctx.seq = frame.Seq + switch frame.Type { + case PacketTypeHandshake: + if err = ctx.send(responsePayload{ + Type: PacketTypeHandshake, + Data: &handshake{ + ID: ctx.Id, + Name: "", + Version: "", + OS: runtime.GOOS, + ServerTime: time.Now(), + RemoteAddr: conn.RemoteAddr().String(), + }, + }); err != nil { + break + } + case PacketTypeCompleter: + if err = ctx.send(responsePayload{ + Type: PacketTypeCompleter, + Data: svr.router.Completer(strings.Fields(string(frame.Data))...), + }); err != nil { + break + } + case PacketTypeCommand: + if err = svr.execute(ctx, frame); err != nil { + break + } + default: + break + } + } +} + +func (s *Server) serve() (err error) { + for { + conn, err := s.listener.Accept() + if err != nil { + if atomic.LoadInt32(&s.exitFlag) == 1 { + return nil + } + return err + } + go s.process(conn) + } +} + +func (s *Server) Start(ctx context.Context) (err error) { + s.ctx = ctx + if err = s.createListener(); err != nil { + return + } + s.opts.logger.Info(ctx, "cli server listen on: %s", s.uri.Host) + s.Handle("/help", "Display help information", func(ctx *Context) (err error) { + return ctx.Success(s.router.String()) + }) + err = s.serve() + return +} + +func (s *Server) Stop(ctx context.Context) (err error) { + if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) { + return + } + if s.listener != nil { + err = s.listener.Close() + } + s.ctxMap.Range(func(key, value any) bool { + if ctx, ok := value.(*Context); ok { + err = ctx.Close() + } + return true + }) + return +} + +func New(cbs ...Option) *Server { + srv := &Server{ + opts: &options{ + network: "tcp", + address: ":0", + logger: logger.Default(), + }, + uri: &url.URL{Scheme: "cli"}, + router: newRouter(""), + } + for _, cb := range cbs { + cb(srv.opts) + } + return srv +} diff --git a/transport/cli/types.go b/transport/cli/types.go new file mode 100644 index 0000000..84743b2 --- /dev/null +++ b/transport/cli/types.go @@ -0,0 +1,87 @@ +package cli + +import ( + "context" + "crypto/tls" + "sync" + "time" + + "git.nobla.cn/golang/aeus/pkg/logger" +) + +var ( + Feature = []byte{67, 76, 73} + OK = []byte("OK") + Bye = "Bye Bye" +) + +const ( + errNotFound = 4004 + errExecuteFailed = 4005 +) + +var ( + ctxPool sync.Pool +) + +type Param struct { + Key string + Value string +} + +type Params []Param + +type HandleFunc func(ctx *Context) (err error) + +type ( + Option func(*options) + + options struct { + network string + address string + logger logger.Logger + context context.Context + } + + clientOptions struct { + tls *tls.Config + } + + ClientOption func(*clientOptions) +) + +type ( + encoder interface { + Marshal() ([]byte, error) + } + + responsePayload struct { + Type uint8 `json:"-"` + Code int `json:"code"` + Reason string `json:"reason,omitempty"` + Data any `json:"data,omitempty"` + } +) + +type ( + Command struct { + Path string + Handle HandleFunc + Description string + } + + commander struct { + Name string + Path string + Description string + } + + handshake struct { + ID int64 `json:"id"` + OS string `json:"os"` + Name string `json:"name"` + Version string `json:"version"` + ServerTime time.Time `json:"server_time"` + RemoteAddr string `json:"remote_addr"` + } +) diff --git a/transport/grpc/client.go b/transport/grpc/client.go new file mode 100644 index 0000000..d334555 --- /dev/null +++ b/transport/grpc/client.go @@ -0,0 +1,39 @@ +package grpc + +import ( + "context" + + "git.nobla.cn/golang/aeus/transport/grpc/resolver" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + grpcinsecure "google.golang.org/grpc/credentials/insecure" +) + +func Dial(ctx context.Context, target string, cbs ...ClientOption) (conn *grpc.ClientConn, err error) { + grpc.WithResolvers() + opts := &clientOptions{} + for _, cb := range cbs { + cb(opts) + } + dialOpts := make([]grpc.DialOption, 0, 10) + if opts.registry != nil { + dialOpts = append( + dialOpts, + grpc.WithResolvers( + resolver.New( + resolver.WithRegistry(opts.registry), + ), + ), + ) + } + if opts.tls != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(opts.tls))) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials())) + } + if opts.dialOptions != nil { + dialOpts = append(dialOpts, opts.dialOptions...) + } + conn, err = grpc.NewClient(target, dialOpts...) + return +} diff --git a/transport/grpc/resolver/builder.go b/transport/grpc/resolver/builder.go new file mode 100644 index 0000000..121da6f --- /dev/null +++ b/transport/grpc/resolver/builder.go @@ -0,0 +1,34 @@ +package resolver + +import ( + "git.nobla.cn/golang/aeus/registry" + "google.golang.org/grpc/resolver" +) + +type Builder struct { + opts *options +} + +func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + var ( + err error + watcher registry.Watcher + ) + if watcher, err = b.opts.registry.Watch(func(o *registry.WatchOptions) { + o.Context = b.opts.context + o.Service = target.URL.Host + }); err != nil { + return nil, err + } + return newResolver(cc, watcher), nil +} + +func (b *Builder) Scheme() string { + return "service" +} + +func New(cbs ...Option) resolver.Builder { + return &Builder{ + opts: newOptions(cbs...), + } +} diff --git a/transport/grpc/resolver/resolver.go b/transport/grpc/resolver/resolver.go new file mode 100644 index 0000000..54b58a2 --- /dev/null +++ b/transport/grpc/resolver/resolver.go @@ -0,0 +1,70 @@ +package resolver + +import ( + "net/url" + "strings" + "sync/atomic" + + "git.nobla.cn/golang/aeus/registry" + "google.golang.org/grpc/resolver" +) + +type Resolver struct { + conn resolver.ClientConn + watcher registry.Watcher + closeChan chan struct{} + closeFlag int32 +} + +func (r *Resolver) update(items []*registry.Service) { + addrs := make([]resolver.Address, 0, 10) + for _, item := range items { + for _, endpoint := range item.Endpoints { + if !strings.HasPrefix(endpoint, "grpc://") { + continue + } + if uri, err := url.Parse(endpoint); err == nil { + addrs = append(addrs, resolver.Address{ + Addr: uri.Host, + }) + } + } + } + r.conn.UpdateState(resolver.State{Addresses: addrs}) +} + +func (r *Resolver) eventLoop() { + for { + select { + case <-r.closeChan: + return + default: + } + if items, err := r.watcher.Next(); err == nil { + r.update(items) + } + } +} + +func (r *Resolver) ResolveNow(opts resolver.ResolveNowOptions) { + +} + +// Close closes the resolver. +func (r *Resolver) Close() { + if !atomic.CompareAndSwapInt32(&r.closeFlag, 0, 1) { + return + } + close(r.closeChan) + r.watcher.Stop() +} + +func newResolver(cc resolver.ClientConn, watcher registry.Watcher) *Resolver { + r := &Resolver{ + conn: cc, + watcher: watcher, + closeChan: make(chan struct{}), + } + go r.eventLoop() + return r +} diff --git a/transport/grpc/resolver/types.go b/transport/grpc/resolver/types.go new file mode 100644 index 0000000..b1ed2e0 --- /dev/null +++ b/transport/grpc/resolver/types.go @@ -0,0 +1,38 @@ +package resolver + +import ( + "context" + + "git.nobla.cn/golang/aeus/registry" +) + +type ( + options struct { + context context.Context + registry registry.Registry + } + + Option func(*options) +) + +func WithContext(ctx context.Context) Option { + return func(o *options) { + o.context = ctx + } +} + +func WithRegistry(registry registry.Registry) Option { + return func(o *options) { + o.registry = registry + } +} + +func newOptions(cbs ...Option) *options { + opts := &options{ + context: context.Background(), + } + for _, cb := range cbs { + cb(opts) + } + return opts +} diff --git a/transport/grpc/server.go b/transport/grpc/server.go new file mode 100644 index 0000000..901fcf4 --- /dev/null +++ b/transport/grpc/server.go @@ -0,0 +1,77 @@ +package grpc + +import ( + "context" + "net" + "net/url" + + "git.nobla.cn/golang/aeus/pkg/logger" + netutil "git.nobla.cn/golang/aeus/pkg/net" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +type Server struct { + ctx context.Context + opts *options + uri *url.URL + serve *grpc.Server + listener net.Listener +} + +func (s *Server) createListener() (err error) { + if s.listener == nil { + if s.listener, err = net.Listen(s.opts.network, s.opts.address); err != nil { + return + } + s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener) + } + return +} + +func (s *Server) Start(ctx context.Context) (err error) { + s.ctx = ctx + if err = s.createListener(); err != nil { + return + } + s.opts.logger.Info(ctx, "grpc server listen on: %s", s.uri.Host) + reflection.Register(s.serve) + s.serve.Serve(s.listener) + return +} + +func (s *Server) Endpoint(ctx context.Context) (string, error) { + if err := s.createListener(); err != nil { + return "", err + } + return s.uri.String(), nil +} + +func (s *Server) RegisterService(sd *grpc.ServiceDesc, ss any) { + s.serve.RegisterService(sd, ss) +} + +func (s *Server) Stop(ctx context.Context) (err error) { + s.serve.GracefulStop() + s.opts.logger.Info(s.ctx, "grpc server stopped") + return +} + +func New(cbs ...Option) *Server { + svr := &Server{ + opts: &options{ + network: "tcp", + logger: logger.Default(), + address: ":0", + }, + uri: &url.URL{ + Scheme: "grpc", + }, + } + for _, cb := range cbs { + cb(svr.opts) + } + gopts := []grpc.ServerOption{} + svr.serve = grpc.NewServer(gopts...) + return svr +} diff --git a/transport/grpc/types.go b/transport/grpc/types.go new file mode 100644 index 0000000..e373540 --- /dev/null +++ b/transport/grpc/types.go @@ -0,0 +1,71 @@ +package grpc + +import ( + "context" + "crypto/tls" + + "git.nobla.cn/golang/aeus/pkg/logger" + "git.nobla.cn/golang/aeus/registry" + "google.golang.org/grpc" +) + +type ( + Option func(*options) + + options struct { + network string + address string + logger logger.Logger + context context.Context + } + + clientOptions struct { + tls *tls.Config + registry registry.Registry + dialOptions []grpc.DialOption + } + + ClientOption func(*clientOptions) +) + +func WithNetwork(network string) Option { + return func(o *options) { + o.network = network + } +} + +func WithAddress(address string) Option { + return func(o *options) { + o.address = address + } +} + +func WithContext(ctx context.Context) Option { + return func(o *options) { + o.context = ctx + } +} + +func WithLogger(lg logger.Logger) Option { + return func(o *options) { + o.logger = lg + } +} + +func WithTLS(tls *tls.Config) ClientOption { + return func(o *clientOptions) { + o.tls = tls + } +} + +func WithRegistry(reg registry.Registry) ClientOption { + return func(o *clientOptions) { + o.registry = reg + } +} + +func WithGrpcDialOptions(opts ...grpc.DialOption) ClientOption { + return func(o *clientOptions) { + o.dialOptions = opts + } +} diff --git a/transport/http/context.go b/transport/http/context.go new file mode 100644 index 0000000..ca51749 --- /dev/null +++ b/transport/http/context.go @@ -0,0 +1,91 @@ +package http + +import ( + "context" + "net/http" + "net/url" + "sync" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" +) + +var ( + ctxPool sync.Pool +) + +type Context struct { + ctx *gin.Context +} + +func (c *Context) Context() context.Context { + return c.ctx.Request.Context() +} + +func (c *Context) Request() *http.Request { + return c.ctx.Request +} + +func (c *Context) Response() http.ResponseWriter { + return c.ctx.Writer +} + +func (c *Context) ClientIP() string { + return c.ctx.ClientIP() +} + +func (c *Context) Param(key string) string { + return c.ctx.Param(key) +} + +func (c *Context) Bind(val any) (err error) { + // if params exists, try bind params first + if len(c.ctx.Params) > 0 { + qs := url.Values{} + for _, p := range c.ctx.Params { + qs.Set(p.Key, p.Value) + } + if err = binding.MapFormWithTag(val, qs, "json"); err != nil { + return err + } + } + if c.Request().Method == http.MethodGet { + values := c.Request().URL.Query() + return binding.MapFormWithTag(val, values, "json") + } + return c.ctx.Bind(val) +} + +func (c *Context) Error(code int, reason string) (err error) { + r := newResponse(code, reason, nil) + c.ctx.JSON(http.StatusOK, r) + return +} + +func (c *Context) Success(value any) (err error) { + r := newResponse(http.StatusOK, "", value) + c.ctx.JSON(http.StatusOK, r) + return +} + +func (c *Context) reset(ctx *gin.Context) { + c.ctx = ctx +} + +func newContext(ctx *gin.Context) *Context { + v := ctxPool.Get() + if v == nil { + return &Context{ + ctx: ctx, + } + } + if c, ok := v.(*Context); ok { + c.reset(ctx) + return c + } + return &Context{ctx: ctx} +} + +func putContext(c *Context) { + ctxPool.Put(c) +} diff --git a/transport/http/response.go b/transport/http/response.go new file mode 100644 index 0000000..214a79d --- /dev/null +++ b/transport/http/response.go @@ -0,0 +1,33 @@ +package http + +type Response interface { + SetCode(int) + SetReason(string) + SetData(any) +} + +type response struct { + Code int `json:"code,omitempty" xml:"code,omitempty" yaml:"code,omitempty"` + Reason string `json:"reason,omitempty" xml:"reason,omitempty" yaml:"reason,omitempty"` + Data any `json:"data,omitempty" xml:"data,omitempty" yaml:"data,omitempty"` +} + +func (r *response) SetCode(code int) { + r.Code = code +} + +func (r *response) SetReason(reason string) { + r.Reason = reason +} + +func (r *response) SetData(data any) { + r.Data = data +} + +func newResponse(code int, reason string, data any) Response { + return &response{ + Code: code, + Reason: reason, + Data: data, + } +} diff --git a/transport/http/server.go b/transport/http/server.go new file mode 100644 index 0000000..a0d1da2 --- /dev/null +++ b/transport/http/server.go @@ -0,0 +1,179 @@ +package http + +import ( + "context" + "net" + "net/http" + "net/http/pprof" + "net/url" + "sync" + + "git.nobla.cn/golang/aeus/middleware" + "git.nobla.cn/golang/aeus/pkg/errors" + "git.nobla.cn/golang/aeus/pkg/logger" + netutil "git.nobla.cn/golang/aeus/pkg/net" + "github.com/gin-gonic/gin" +) + +type Server struct { + ctx context.Context + opts *options + uri *url.URL + serve *http.Server + engine *gin.Engine + once sync.Once + listener net.Listener +} + +func (s *Server) Endpoint(ctx context.Context) (string, error) { + if err := s.createListener(); err != nil { + return "", err + } + return s.uri.String(), nil +} + +func (s *Server) GET(pattern string, h HandleFunc) { + s.engine.GET(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) PUT(pattern string, h HandleFunc) { + s.engine.PUT(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) POST(pattern string, h HandleFunc) { + s.engine.POST(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) HEAD(pattern string, h HandleFunc) { + s.engine.HEAD(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) PATCH(pattern string, h HandleFunc) { + s.engine.PATCH(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) DELETE(pattern string, h HandleFunc) { + s.engine.DELETE(pattern, func(ctx *gin.Context) { + childCtx := newContext(ctx) + h(childCtx) + putContext(childCtx) + }) +} + +func (s *Server) Use(middlewares ...middleware.Middleware) { + for _, m := range middlewares { + s.engine.Use(s.warpMiddleware(m)) + } +} + +func (s *Server) warpMiddleware(m middleware.Middleware) gin.HandlerFunc { + return func(ginCtx *gin.Context) { + ctx := ginCtx.Request.Context() + handler := func(ctx context.Context) error { + ginCtx.Next() + if err := ginCtx.Errors.Last(); err != nil { + return err.Err + } + return nil + } + wrappedHandler := m(handler) + if err := wrappedHandler(ctx); err != nil { + ginCtx.AbortWithError(http.StatusServiceUnavailable, err) + return + } + } +} + +func (s *Server) wrapHandle(f http.HandlerFunc) gin.HandlerFunc { + return func(c *gin.Context) { + f(c.Writer, c.Request) + } +} + +func (s *Server) createListener() (err error) { + if s.listener == nil { + if s.listener, err = net.Listen(s.opts.network, s.opts.address); err != nil { + return + } + s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener) + } + return +} + +func (s *Server) Start(ctx context.Context) (err error) { + s.serve = &http.Server{ + Addr: s.opts.address, + Handler: s.engine, + } + s.ctx = ctx + if s.opts.debug { + s.engine.Handle(http.MethodGet, "/debug/pprof/", s.wrapHandle(pprof.Index)) + s.engine.Handle(http.MethodGet, "/debug/pprof/goroutine", s.wrapHandle(pprof.Index)) + s.engine.Handle(http.MethodGet, "/debug/pprof/heap", s.wrapHandle(pprof.Index)) + s.engine.Handle(http.MethodGet, "/debug/pprof/mutex", s.wrapHandle(pprof.Index)) + s.engine.Handle(http.MethodGet, "/debug/pprof/threadcreate", s.wrapHandle(pprof.Index)) + s.engine.Handle(http.MethodGet, "/debug/pprof/cmdline", s.wrapHandle(pprof.Cmdline)) + s.engine.Handle(http.MethodGet, "/debug/pprof/profile", s.wrapHandle(pprof.Profile)) + s.engine.Handle(http.MethodGet, "/debug/pprof/symbol", s.wrapHandle(pprof.Symbol)) + s.engine.Handle(http.MethodGet, "/debug/pprof/trace", s.wrapHandle(pprof.Trace)) + } + if err = s.createListener(); err != nil { + return + } + s.opts.logger.Info(ctx, "http server listen on: %s", s.uri.Host) + if s.opts.certFile != "" && s.opts.keyFile != "" { + s.uri.Scheme = "https" + err = s.serve.ServeTLS(s.listener, s.opts.certFile, s.opts.keyFile) + } else { + err = s.serve.Serve(s.listener) + } + if !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +func (s *Server) Stop(ctx context.Context) (err error) { + err = s.serve.Shutdown(ctx) + s.opts.logger.Info(ctx, "http server stopped") + return +} + +func New(cbs ...Option) *Server { + svr := &Server{ + uri: &url.URL{Scheme: "http"}, + opts: &options{ + network: "tcp", + logger: logger.Default(), + address: ":0", + }, + } + for _, cb := range cbs { + cb(svr.opts) + } + if !svr.opts.debug { + gin.SetMode(gin.ReleaseMode) + } + svr.engine = gin.New(svr.opts.ginOptions...) + return svr +} diff --git a/transport/http/types.go b/transport/http/types.go new file mode 100644 index 0000000..31ccbba --- /dev/null +++ b/transport/http/types.go @@ -0,0 +1,83 @@ +package http + +import ( + "context" + "net/http" + + "git.nobla.cn/golang/aeus/pkg/logger" + "github.com/gin-gonic/gin" +) + +type ( + Option func(*options) + + options struct { + network string + address string + certFile string + keyFile string + debug bool + hander http.Handler + logger logger.Logger + context context.Context + ginOptions []gin.OptionFunc + } + + HandleFunc func(ctx *Context) (err error) + + Middleware func(http.Handler) http.Handler +) + +func WithNetwork(network string) Option { + return func(o *options) { + o.network = network + } +} + +func WithAddress(address string) Option { + return func(o *options) { + o.address = address + } +} + +func WithCertFile(certFile string) Option { + return func(o *options) { + o.certFile = certFile + } +} + +func WithKeyFile(keyFile string) Option { + return func(o *options) { + o.keyFile = keyFile + } +} + +func WithLogger(lg logger.Logger) Option { + return func(o *options) { + o.logger = lg + } +} + +func WithContext(ctx context.Context) Option { + return func(o *options) { + o.context = ctx + } +} + +func WithDebug(debug bool) Option { + return func(o *options) { + o.debug = debug + } +} + +func WithHandler(h http.Handler) Option { + return func(o *options) { + o.hander = h + } +} + +func WithGinOptions(opts ...gin.OptionFunc) Option { + return func(o *options) { + o.ginOptions = opts + } +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..896ea2a --- /dev/null +++ b/types.go @@ -0,0 +1,54 @@ +package aeus + +import "context" + +type ( + applicationKey struct{} + + // application context value. + Application interface { + ID() string + Name() string + Version() string + Debug() bool + Metadata() map[string]string + Endpoint() []string + } + + // application scope interface. + Scope interface { + Init(ctx context.Context) + Run(ctx context.Context) (err error) + } + + // service loader interface. + ServiceLoader interface { + Init(ctx context.Context) + } + + // endpoint interface. + Endpointer interface { + Endpoint(ctx context.Context) (string, error) + } + + // option callback + Option func(*options) + + // server interface. + Server interface { + Start(ctx context.Context) (err error) + Stop(ctx context.Context) (err error) + } +) + +func WithContext(ctx context.Context, app Application) context.Context { + return context.WithValue(ctx, applicationKey{}, app) +} + +func FromContext(ctx context.Context) Application { + app, ok := ctx.Value(applicationKey{}).(Application) + if !ok { + return nil + } + return app +}