init project

This commit is contained in:
Yavolte 2025-06-06 13:31:33 +08:00
commit 7839f7caef
44 changed files with 3819 additions and 0 deletions

47
.gitignore vendored 100644
View File

@ -0,0 +1,47 @@
bin/
.svn/
.godeps
./build
.cover/
dist
_site
_posts
*.dat
.vscode
vendor
cmd/
third_party/
# Go.gitignore
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
storage
.idea
Makefile
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
profile
# vim stuff
*.sw[op]

6
README.md 100644
View File

@ -0,0 +1,6 @@
# AEUS 介绍
`AEUS` 是一个轻量级的golang脚手架框架用于快速开发微服务。

257
app.go 100644
View File

@ -0,0 +1,257 @@
package aeus
import (
"context"
"os"
"os/signal"
"reflect"
"runtime"
"sync/atomic"
"syscall"
"time"
"git.nobla.cn/golang/aeus/pkg/errors"
"git.nobla.cn/golang/aeus/pkg/logger"
"git.nobla.cn/golang/aeus/registry"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
type Service struct {
ctx context.Context
opts *options
errGroup *errgroup.Group
refValues []reflect.Value
service *registry.Service
exitFlag int32
}
func (s *Service) ID() string {
return s.opts.id
}
func (s *Service) Name() string {
return s.opts.name
}
func (s *Service) Debug() bool {
return false
}
func (s *Service) Version() string {
return s.opts.version
}
func (s *Service) Metadata() map[string]string {
if s.service == nil {
return nil
}
return s.service.Metadata
}
func (s *Service) Endpoint() []string {
if s.service == nil {
return nil
}
return s.service.Endpoints
}
func (s *Service) Logger() logger.Logger {
if s.opts.logger == nil {
return logger.Default()
}
return s.opts.logger
}
func (s *Service) build(ctx context.Context) (*registry.Service, error) {
svr := &registry.Service{
ID: s.ID(),
Name: s.Name(),
Version: s.Version(),
Metadata: s.opts.metadata,
Endpoints: make([]string, 0, 4),
}
if svr.Metadata == nil {
svr.Metadata = make(map[string]string)
}
svr.Metadata["os"] = runtime.GOOS
svr.Metadata["go"] = runtime.Version()
svr.Metadata["hostname"], _ = os.Hostname()
svr.Metadata["uptime"] = time.Now().Format(time.DateTime)
if s.opts.endpoints != nil {
svr.Endpoints = append(svr.Endpoints, s.opts.endpoints...)
}
for _, ptr := range s.opts.servers {
if e, ok := ptr.(Endpointer); ok {
if uri, err := e.Endpoint(ctx); err != nil {
return nil, err
} else {
svr.Endpoints = append(svr.Endpoints, uri)
}
}
}
return svr, nil
}
func (s *Service) injectVars(v any) {
refValue := reflect.Indirect(reflect.ValueOf(v))
refType := refValue.Type()
for i := range refValue.NumField() {
fieldValue := refValue.Field(i)
if !fieldValue.CanSet() {
continue
}
fieldType := refType.Field(i)
if fieldType.Type.Kind() != reflect.Ptr {
continue
}
for _, rv := range s.refValues {
if fieldType.Type == rv.Type() {
refValue.Field(i).Set(rv)
break
}
}
}
}
func (s *Service) preStart(ctx context.Context) (err error) {
s.Logger().Info(s.ctx, "starting")
for _, ptr := range s.opts.servers {
s.refValues = append(s.refValues, reflect.ValueOf(ptr))
}
if s.opts.registry != nil {
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.registry))
}
if s.opts.scope != nil {
s.injectVars(s.opts.scope)
s.opts.scope.Init(ctx)
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.scope))
}
if s.opts.serviceLoader != nil {
s.injectVars(s.opts.serviceLoader)
s.opts.serviceLoader.Init(ctx)
s.refValues = append(s.refValues, reflect.ValueOf(s.opts.serviceLoader))
}
for _, srv := range s.opts.servers {
svr := srv
s.errGroup.Go(func() error {
return svr.Start(ctx)
})
}
if s.opts.scope != nil {
s.errGroup.Go(func() error {
return s.opts.scope.Run(ctx)
})
}
if s.opts.registry != nil {
s.errGroup.Go(func() error {
opts := func(o *registry.RegisterOptions) {
o.Context = ctx
}
if err = s.opts.registry.Register(s.service, opts); err != nil {
return err
}
s.Logger().Info(ctx, "service registered")
duration := s.opts.registrarTimeout
if duration > time.Minute {
duration = time.Minute
} else {
if duration > time.Second*10 {
duration = duration - time.Second*5
}
}
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.Logger().Info(ctx, "service registar stopped")
return nil
case <-ticker.C:
if err = s.opts.registry.Register(s.service, func(o *registry.RegisterOptions) {
o.Context = ctx
o.TTL = s.opts.registrarTimeout
}); err != nil {
s.Logger().Warn(ctx, "service register error: %v", err)
}
}
}
})
}
s.Logger().Info(s.ctx, "started")
return
}
func (s *Service) preStop() (err error) {
if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) {
return
}
s.Logger().Info(s.ctx, "stopping")
ctx, cancelFunc := context.WithTimeoutCause(s.ctx, s.opts.stopTimeout, errors.ErrTimeout)
defer func() {
cancelFunc()
}()
for _, srv := range s.opts.servers {
if err = srv.Stop(ctx); err != nil {
s.Logger().Warn(ctx, "server stop error: %v", err)
}
}
if s.opts.registry != nil {
if err = s.opts.registry.Deregister(s.service, func(o *registry.DeregisterOptions) {
o.Context = ctx
}); err != nil {
s.Logger().Warn(ctx, "server deregister error: %v", err)
}
}
s.Logger().Info(ctx, "stopped")
return
}
func (s *Service) Run() (err error) {
var (
ctx context.Context
errCtx context.Context
cancelFunc context.CancelFunc
)
s.ctx = WithContext(s.opts.ctx, s)
if s.service, err = s.build(s.ctx); err != nil {
return
}
ctx, cancelFunc = context.WithCancel(s.ctx)
defer cancelFunc()
s.errGroup, errCtx = errgroup.WithContext(ctx)
if err = s.preStart(errCtx); err != nil {
return
}
s.errGroup.Go(func() error {
ch := make(chan os.Signal, 1)
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
signal.Notify(ch, signals...)
select {
case <-ch:
cancelFunc()
case <-ctx.Done():
case <-s.ctx.Done():
}
return s.preStop()
})
err = s.errGroup.Wait()
return
}
func New(cbs ...Option) *Service {
s := &Service{
refValues: make([]reflect.Value, 0, 20),
opts: &options{
id: uuid.New().String(),
ctx: context.Background(),
logger: logger.Default(),
stopTimeout: time.Second * 10,
registrarTimeout: time.Second * 30,
},
}
s.opts.metadata = make(map[string]string)
for _, cb := range cbs {
cb(s.opts)
}
return s
}

56
go.mod 100644
View File

@ -0,0 +1,56 @@
module git.nobla.cn/golang/aeus
go 1.23.0
toolchain go1.23.9
require (
github.com/gin-gonic/gin v1.10.1
github.com/google/uuid v1.6.0
github.com/mattn/go-runewidth v0.0.16
github.com/peterh/liner v1.2.2
go.etcd.io/etcd/api/v3 v3.6.0
go.etcd.io/etcd/client/v3 v3.6.0
golang.org/x/sync v0.12.0
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb
google.golang.org/grpc v1.72.2
google.golang.org/protobuf v1.36.5
)
require (
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

181
go.sum 100644
View File

@ -0,0 +1,181 @@
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ=
github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/peterh/liner v1.2.2 h1:aJ4AOodmL+JxOZZEL2u9iJf8omNRpqHc/EbrK+3mAXw=
github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiNRNwI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/etcd/api/v3 v3.6.0 h1:vdbkcUBGLf1vfopoGE/uS3Nv0KPyIpUV/HM6w9yx2kM=
go.etcd.io/etcd/api/v3 v3.6.0/go.mod h1:Wt5yZqEmxgTNJGHob7mTVBJDZNXiHPtXTcPab37iFOw=
go.etcd.io/etcd/client/pkg/v3 v3.6.0 h1:nchnPqpuxvv3UuGGHaz0DQKYi5EIW5wOYsgUNRc365k=
go.etcd.io/etcd/client/pkg/v3 v3.6.0/go.mod h1:Jv5SFWMnGvIBn8o3OaBq/PnT0jjsX8iNokAUessNjoA=
go.etcd.io/etcd/client/v3 v3.6.0 h1:/yjKzD+HW5v/3DVj9tpwFxzNbu8hjcKID183ug9duWk=
go.etcd.io/etcd/client/v3 v3.6.0/go.mod h1:Jzk/Knqe06pkOZPHXsQ0+vNDvMQrgIqJ0W8DwPdMJMg=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950=
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

View File

@ -0,0 +1,16 @@
package middleware
import "context"
type Handler func(ctx context.Context) error
type Middleware func(Handler) Handler
func Chain(m ...Middleware) Middleware {
return func(next Handler) Handler {
for i := len(m) - 1; i >= 0; i-- {
next = m[i](next)
}
return next
}
}

96
options.go 100644
View File

@ -0,0 +1,96 @@
package aeus
import (
"context"
"time"
"git.nobla.cn/golang/aeus/pkg/logger"
"git.nobla.cn/golang/aeus/registry"
)
type options struct {
id string
name string
version string
metadata map[string]string
ctx context.Context
logger logger.Logger
servers []Server
endpoints []string
scope Scope
registrarTimeout time.Duration
registry registry.Registry
serviceLoader ServiceLoader
stopTimeout time.Duration
}
func WithName(name string) Option {
return func(o *options) {
o.name = name
}
}
func WithVersion(version string) Option {
return func(o *options) {
o.version = version
}
}
func WithMetadata(metadata map[string]string) Option {
return func(o *options) {
if o.metadata == nil {
o.metadata = make(map[string]string)
}
for k, v := range metadata {
o.metadata[k] = v
}
}
}
func WithServer(servers ...Server) Option {
return func(o *options) {
o.servers = append(o.servers, servers...)
}
}
func WithEndpoint(endpoints ...string) Option {
return func(o *options) {
o.endpoints = append(o.endpoints, endpoints...)
}
}
func WithLogger(logger logger.Logger) Option {
return func(o *options) {
o.logger = logger
}
}
func WithScope(scope Scope) Option {
return func(o *options) {
o.scope = scope
}
}
func WithServiceLoader(loader ServiceLoader) Option {
return func(o *options) {
o.serviceLoader = loader
}
}
func WithStopTimeout(timeout time.Duration) Option {
return func(o *options) {
o.stopTimeout = timeout
}
}
func WithRegistry(registry registry.Registry) Option {
return func(o *options) {
o.registry = registry
}
}
func WithRegistrarTimeout(timeout time.Duration) Option {
return func(o *options) {
o.registrarTimeout = timeout
}
}

38
pkg/cache/cache.go vendored 100644
View File

@ -0,0 +1,38 @@
package cache
import (
"context"
"time"
"git.nobla.cn/golang/aeus/pkg/cache/memory"
)
var (
std = memory.NewCache()
)
type Cache interface {
// Get gets a cached value by key.
Get(ctx context.Context, key string) (any, time.Time, error)
// Put stores a key-value pair into cache.
Put(ctx context.Context, key string, val any, d time.Duration) error
// Delete removes a key from cache.
Delete(ctx context.Context, key string) error
// String returns the name of the implementation.
String() string
}
// Get gets a cached value by key.
func Get(ctx context.Context, key string) (any, time.Time, error) {
return std.Get(ctx, key)
}
// Put stores a key-value pair into cache.
func Put(ctx context.Context, key string, val any, d time.Duration) error {
return std.Put(ctx, key, val, d)
}
// String returns the name of the implementation.
func Delete(ctx context.Context, key string) error {
return std.Delete(ctx, key)
}

68
pkg/cache/memory/cache.go vendored 100644
View File

@ -0,0 +1,68 @@
package memory
import (
"context"
"sync"
"time"
"git.nobla.cn/golang/aeus/pkg/errors"
)
type memCache struct {
opts Options
items map[string]Item
sync.RWMutex
}
func (c *memCache) Get(ctx context.Context, key string) (interface{}, time.Time, error) {
c.RWMutex.RLock()
defer c.RWMutex.RUnlock()
item, found := c.items[key]
if !found {
return nil, time.Time{}, errors.ErrNotFound
}
if item.Expired() {
return nil, time.Time{}, errors.ErrExpired
}
return item.Value, time.Unix(0, item.Expiration), nil
}
func (c *memCache) Put(ctx context.Context, key string, val interface{}, d time.Duration) error {
var e int64
if d == DefaultExpiration {
d = c.opts.Expiration
}
if d > 0 {
e = time.Now().Add(d).UnixNano()
}
c.RWMutex.Lock()
defer c.RWMutex.Unlock()
c.items[key] = Item{
Value: val,
Expiration: e,
}
return nil
}
func (c *memCache) Delete(ctx context.Context, key string) error {
c.RWMutex.Lock()
defer c.RWMutex.Unlock()
_, found := c.items[key]
if !found {
return errors.ErrNotFound
}
delete(c.items, key)
return nil
}
func (m *memCache) String() string {
return "memory"
}

33
pkg/cache/memory/item.go vendored 100644
View File

@ -0,0 +1,33 @@
package memory
import "time"
// Item represents an item stored in the cache.
type Item struct {
Value interface{}
Expiration int64
}
// Expired returns true if the item has expired.
func (i *Item) Expired() bool {
if i.Expiration == 0 {
return false
}
return time.Now().UnixNano() > i.Expiration
}
// NewCache returns a new cache.
func NewCache(opts ...Option) *memCache {
options := NewOptions(opts...)
items := make(map[string]Item)
if len(options.Items) > 0 {
items = options.Items
}
return &memCache{
opts: options,
items: items,
}
}

77
pkg/cache/memory/types.go vendored 100644
View File

@ -0,0 +1,77 @@
package memory
import (
"context"
"time"
"git.nobla.cn/golang/aeus/pkg/logger"
)
var (
DefaultExpiration time.Duration = 0
)
// Options represents the options for the cache.
type Options struct {
// Context should contain all implementation specific options, using context.WithValue.
Context context.Context
// Logger is the be used logger
Logger logger.Logger
Items map[string]Item
// Address represents the address or other connection information of the cache service.
Address string
Expiration time.Duration
}
// Option manipulates the Options passed.
type Option func(o *Options)
// Expiration sets the duration for items stored in the cache to expire.
func Expiration(d time.Duration) Option {
return func(o *Options) {
o.Expiration = d
}
}
// Items initializes the cache with preconfigured items.
func Items(i map[string]Item) Option {
return func(o *Options) {
o.Items = i
}
}
// WithAddress sets the cache service address or connection information.
func WithAddress(addr string) Option {
return func(o *Options) {
o.Address = addr
}
}
// WithContext sets the cache context, for any extra configuration.
func WithContext(c context.Context) Option {
return func(o *Options) {
o.Context = c
}
}
// WithLogger sets underline logger.
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// NewOptions returns a new options struct.
func NewOptions(opts ...Option) Options {
options := Options{
Expiration: DefaultExpiration,
Items: make(map[string]Item),
Logger: logger.Default(),
}
for _, o := range opts {
o(&options)
}
return options
}

View File

@ -0,0 +1,24 @@
package errors
const (
OK = 0 //success
Exit = 1000 //normal exit
Invalid = 1001 //payload invalid
Timeout = 1002 //timeout
Expired = 1003 //expired
AccessDenied = 4005 //access denied
PermissionDenied = 4003 //permission denied
NotFound = 4004 //not found
Unavailable = 5000 //service unavailable
)
var (
ErrExit = New(Exit, "normal exit")
ErrTimeout = New(Timeout, "timeout")
ErrExpired = New(Expired, "expired")
ErrValidate = New(Invalid, "invalid payload")
ErrNotFound = New(NotFound, "not found")
ErrAccessDenied = New(AccessDenied, "access denied")
ErrPermissionDenied = New(PermissionDenied, "permission denied")
ErrUnavailable = New(Unavailable, "service unavailable")
)

View File

@ -0,0 +1,33 @@
package errors
import (
"errors"
"fmt"
)
type Error struct {
Code int
Message string
}
func (e *Error) Error() string {
return fmt.Sprintf("code: %d, message: %s", e.Code, e.Message)
}
func Format(code int, msg string, args ...any) Error {
return Error{
Code: code,
Message: fmt.Sprintf(msg, args...),
}
}
func Is(err, target error) bool {
return errors.Is(err, target)
}
func New(code int, message string) *Error {
return &Error{
Code: code,
Message: message,
}
}

View File

@ -0,0 +1,33 @@
package logger
import (
"context"
"fmt"
"log/slog"
)
type logger struct {
log *slog.Logger
}
func (l *logger) Debug(ctx context.Context, msg string, args ...any) {
l.log.DebugContext(ctx, fmt.Sprintf(msg, args...))
}
func (l *logger) Info(ctx context.Context, msg string, args ...any) {
l.log.InfoContext(ctx, fmt.Sprintf(msg, args...))
}
func (l *logger) Warn(ctx context.Context, msg string, args ...any) {
l.log.WarnContext(ctx, fmt.Sprintf(msg, args...))
}
func (l *logger) Error(ctx context.Context, msg string, args ...any) {
l.log.ErrorContext(ctx, fmt.Sprintf(msg, args...))
}
func NewLogger(log *slog.Logger) Logger {
return &logger{
log: log,
}
}

View File

@ -0,0 +1,41 @@
package logger
import (
"context"
"log/slog"
)
var (
log Logger
)
func init() {
log = NewLogger(slog.Default())
}
type Logger interface {
Debug(ctx context.Context, format string, args ...any)
Info(ctx context.Context, format string, args ...any)
Warn(ctx context.Context, format string, args ...any)
Error(ctx context.Context, format string, args ...any)
}
func Debug(ctx context.Context, format string, args ...any) {
log.Debug(ctx, format, args...)
}
func Info(ctx context.Context, format string, args ...any) {
log.Debug(ctx, format, args...)
}
func Warn(ctx context.Context, format string, args ...any) {
log.Debug(ctx, format, args...)
}
func Error(ctx context.Context, format string, args ...any) {
log.Debug(ctx, format, args...)
}
func Default() Logger {
return log
}

45
pkg/net/ip.go 100644
View File

@ -0,0 +1,45 @@
package net
import (
"net"
"strings"
)
// get internal ip
func LocalIP() string {
var (
err error
addrs []net.Addr
inters []net.Interface
)
if inters, err = net.Interfaces(); err != nil {
return ""
}
for _, inter := range inters {
if inter.Flags&net.FlagUp != net.FlagUp {
continue
}
if !strings.HasPrefix(inter.Name, "lo") {
if addrs, err = inter.Addrs(); err != nil {
continue
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String()
}
}
}
}
}
return "localhost"
}
// extract ip address
func Extract(addr string) (string, error) {
// if addr is already specified then it's directly returned
if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") {
return addr, nil
}
return LocalIP(), nil
}

22
pkg/net/net.go 100644
View File

@ -0,0 +1,22 @@
package net
import (
"fmt"
"net"
"strings"
)
func HostPort(addr string, port any) string {
host := addr
if strings.Count(addr, ":") > 0 {
host = fmt.Sprintf("[%s]", addr)
}
// when port is blank or 0, host is a queue name
if v, ok := port.(string); ok && v == "" {
return host
} else if v, ok := port.(int); ok && v == 0 && net.ParseIP(host) == nil {
return host
}
return fmt.Sprintf("%s:%v", host, port)
}

24
pkg/net/utils.go 100644
View File

@ -0,0 +1,24 @@
package net
import (
"net"
"strconv"
)
// TrulyAddr returns the truly address of the listener.
func TrulyAddr(addr string, l net.Listener) string {
host, port, err := net.SplitHostPort(addr)
if err != nil && l == nil {
return ""
}
if l != nil {
if taddr, ok := l.Addr().(*net.TCPAddr); ok {
port = strconv.Itoa(taddr.Port)
}
}
if len(host) > 0 && (host != "0.0.0.0" && host != "[::]" && host != "::") {
return net.JoinHostPort(host, port)
}
host = LocalIP()
return net.JoinHostPort(host, port)
}

22
pkg/pool/buffer.go 100644
View File

@ -0,0 +1,22 @@
package pool
import (
"bytes"
"sync"
)
var (
bufferPool sync.Pool
)
func GetBuffer() *bytes.Buffer {
if v := bufferPool.Get(); v != nil {
return v.(*bytes.Buffer)
}
return bytes.NewBuffer([]byte{})
}
func PutBuffer(b *bytes.Buffer) {
b.Reset()
bufferPool.Put(b)
}

50
pkg/pool/bytes.go 100644
View File

@ -0,0 +1,50 @@
package pool
import "sync"
var (
bufPool5k sync.Pool
bufPool2k sync.Pool
bufPool1k sync.Pool
bufPool sync.Pool
)
func GetBytes(size int) []byte {
if size <= 0 {
return nil
}
var x interface{}
if size >= 5*1024 {
x = bufPool5k.Get()
} else if size >= 2*1024 {
x = bufPool2k.Get()
} else if size >= 1*1024 {
x = bufPool1k.Get()
} else {
x = bufPool.Get()
}
if x == nil {
return make([]byte, size)
}
buf := x.([]byte)
if cap(buf) < size {
return make([]byte, size)
}
return buf[:size]
}
func PutBytes(buf []byte) {
size := cap(buf)
if size <= 0 {
return
}
if size >= 5*1024 {
bufPool5k.Put(buf)
} else if size >= 2*1024 {
bufPool2k.Put(buf)
} else if size >= 1*1024 {
bufPool1k.Put(buf)
} else {
bufPool.Put(buf)
}
}

View File

@ -0,0 +1,85 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: command.proto
package command
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
descriptorpb "google.golang.org/protobuf/types/descriptorpb"
reflect "reflect"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
var file_aeus_command_proto_extTypes = []protoimpl.ExtensionInfo{
{
ExtendedType: (*descriptorpb.MethodOptions)(nil),
ExtensionType: (*string)(nil),
Field: 50001,
Name: "aeus.command",
Tag: "bytes,50001,opt,name=command",
Filename: "aeus/command.proto",
},
}
// Extension fields to descriptorpb.MethodOptions.
var (
// The 'command' option will be a string.
// Choose a field number in the reserved range (50000-99999) to avoid conflicts.
//
// optional string command = 50001;
E_Command = &file_aeus_command_proto_extTypes[0]
)
var File_aeus_command_proto protoreflect.FileDescriptor
const file_aeus_command_proto_rawDesc = "" +
"\n" +
"\x12aeus/command.proto\x12\x04aeus\x1a google/protobuf/descriptor.proto:=\n" +
"\acommand\x12\x1e.google.protobuf.MethodOptions\x18ц\x03 \x01(\tR\acommand\x88\x01\x01B5Z3git.nobla.cn/golang/aeus/pkg/protoc/command;commandb\x06proto3"
var file_aeus_command_proto_goTypes = []any{
(*descriptorpb.MethodOptions)(nil), // 0: google.protobuf.MethodOptions
}
var file_aeus_command_proto_depIdxs = []int32{
0, // 0: aeus.command:extendee -> google.protobuf.MethodOptions
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
0, // [0:1] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_aeus_command_proto_init() }
func file_aeus_command_proto_init() {
if File_aeus_command_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_aeus_command_proto_rawDesc), len(file_aeus_command_proto_rawDesc)),
NumEnums: 0,
NumMessages: 0,
NumExtensions: 1,
NumServices: 0,
},
GoTypes: file_aeus_command_proto_goTypes,
DependencyIndexes: file_aeus_command_proto_depIdxs,
ExtensionInfos: file_aeus_command_proto_extTypes,
}.Build()
File_aeus_command_proto = out.File
file_aeus_command_proto_goTypes = nil
file_aeus_command_proto_depIdxs = nil
}

View File

@ -0,0 +1,147 @@
package etcd
import (
"crypto/tls"
"encoding/json"
"path"
"sync"
"git.nobla.cn/golang/aeus/pkg/errors"
"git.nobla.cn/golang/aeus/registry"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)
type etcdRegistry struct {
prefix string
client *clientv3.Client
options *registry.RegistryOptions
leaseID clientv3.LeaseID
sync.RWMutex
}
func (e *etcdRegistry) prefixKey(name string) string {
return path.Join(e.prefix, name) + "/"
}
func (e *etcdRegistry) buildKey(s *registry.Service) string {
return path.Join(e.prefix, s.Name, s.ID)
}
func (e *etcdRegistry) marshalService(s *registry.Service) string {
buf, err := json.Marshal(s)
if err == nil {
return string(buf)
}
return ""
}
func (e *etcdRegistry) unmarshalService(b []byte) (*registry.Service, error) {
var service registry.Service
err := json.Unmarshal(b, &service)
if err == nil {
return &service, nil
}
return nil, err
}
func (e *etcdRegistry) Name() string {
return Name
}
func (e *etcdRegistry) Init(opts ...registry.RegistryOption) (err error) {
for _, cb := range opts {
cb(e.options)
}
cfg := clientv3.Config{}
if e.options.Context != nil {
clientOpts := FromContext(e.options.Context)
if clientOpts != nil {
cfg.Username = clientOpts.Username
cfg.Password = clientOpts.Password
cfg.DialTimeout = clientOpts.DialTimeout
if clientOpts.Prefix != "" {
e.prefix = clientOpts.Prefix
}
}
}
if e.options.TLSConfig != nil {
cfg.TLS = e.options.TLSConfig
} else if e.options.Secure {
cfg.TLS = &tls.Config{
InsecureSkipVerify: true,
}
}
if e.prefix == "" {
e.prefix = "/micro/service/"
}
cfg.Endpoints = append(cfg.Endpoints, e.options.Addrs...)
e.client, err = clientv3.New(cfg)
return nil
}
func (e *etcdRegistry) Register(s *registry.Service, cbs ...registry.RegisterOption) (err error) {
var lgr *clientv3.LeaseGrantResponse
opts := registry.NewRegisterOption(cbs...)
if opts.TTL.Seconds() > 0 {
if e.leaseID > 0 {
if _, err = e.client.KeepAliveOnce(opts.Context, e.leaseID); err == nil {
return
} else {
if err != rpctypes.ErrLeaseNotFound {
return err
}
}
}
lgr, err = e.client.Grant(opts.Context, int64(opts.TTL.Seconds()))
if err != nil {
return err
}
}
if lgr != nil {
if _, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s), clientv3.WithLease(lgr.ID)); err == nil {
e.leaseID = lgr.ID
}
} else {
_, err = e.client.Put(opts.Context, e.buildKey(s), e.marshalService(s))
}
if err != nil {
return err
}
return nil
}
func (e *etcdRegistry) Deregister(s *registry.Service, cbs ...registry.DeregisterOption) error {
opts := registry.NewDeregisterOption(cbs...)
e.client.Delete(opts.Context, e.buildKey(s))
return nil
}
func (e *etcdRegistry) GetService(name string, cbs ...registry.GetOption) ([]*registry.Service, error) {
opts := registry.NewGetOption(cbs...)
rsp, err := e.client.Get(opts.Context, e.prefixKey(name), clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
if len(rsp.Kvs) == 0 {
return nil, errors.ErrNotFound
}
ss := make([]*registry.Service, 0, len(rsp.Kvs))
for _, n := range rsp.Kvs {
if s, err := e.unmarshalService(n.Value); err == nil {
ss = append(ss, s)
}
}
return ss, nil
}
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
return newWatcher(e, opts...)
}
func NewEtcdRegistry() registry.Registry {
e := &etcdRegistry{
options: &registry.RegistryOptions{},
}
return e
}

View File

@ -0,0 +1,36 @@
package etcd
import (
"context"
"time"
)
const (
Name = "etcd"
)
type (
etcdContextKey struct{}
)
type (
ClientOptions struct {
Prefix string
Username string
Password string
DialTimeout time.Duration
}
)
func FromContext(ctx context.Context) *ClientOptions {
if v := ctx.Value(etcdContextKey{}); v != nil {
if v, ok := v.(*ClientOptions); ok {
return v
}
}
return nil
}
func WithContext(ctx context.Context, opts *ClientOptions) context.Context {
return context.WithValue(ctx, etcdContextKey{}, opts)
}

View File

@ -0,0 +1,66 @@
package etcd
import (
"context"
"io"
"sync/atomic"
"git.nobla.cn/golang/aeus/registry"
clientv3 "go.etcd.io/etcd/client/v3"
)
type etcdWatcher struct {
ctx context.Context
cancelFunc context.CancelFunc
watchChan clientv3.WatchChan
watcher clientv3.Watcher
registry *etcdRegistry
opts *registry.WatchOptions
setupFlag int32
}
func newWatcher(r *etcdRegistry, cbs ...registry.WatchOption) (watcher registry.Watcher, err error) {
w := &etcdWatcher{
registry: r,
}
w.opts = registry.NewWatchOption(cbs...)
w.ctx, w.cancelFunc = context.WithCancel(w.opts.Context)
w.watcher = clientv3.NewWatcher(r.client)
w.watchChan = w.watcher.Watch(w.ctx, r.prefixKey(w.opts.Service), clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
if err = w.watcher.RequestProgress(w.ctx); err != nil {
w.opts.Logger.Warn(w.ctx, "etcd request progress error: %v", err)
}
return w, err
}
func (w *etcdWatcher) getServices() ([]*registry.Service, error) {
return w.registry.GetService(w.opts.Service)
}
func (w *etcdWatcher) Next() ([]*registry.Service, error) {
if atomic.CompareAndSwapInt32(&w.setupFlag, 0, 1) {
return w.getServices()
}
select {
case <-w.ctx.Done():
return nil, w.ctx.Err()
case res, ok := <-w.watchChan:
if ok {
if res.Err() == nil {
return w.getServices()
} else {
w.opts.Logger.Warn(w.ctx, "etcd watcher read error: %v", res.Err())
return nil, res.Err()
}
}
}
return nil, io.ErrClosedPipe
}
func (w *etcdWatcher) Stop() (err error) {
if err = w.watcher.Close(); err != nil {
w.opts.Logger.Warn(w.ctx, "etcd watcher close error: %v", err)
}
w.cancelFunc()
return err
}

View File

@ -0,0 +1,13 @@
package registry
// The registry provides an interface for service discovery
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}.
type Registry interface {
Name() string //return registry name
Init(...RegistryOption) error //init registry
Register(*Service, ...RegisterOption) error //register service
Deregister(*Service, ...DeregisterOption) error //deregister service
GetService(string, ...GetOption) ([]*Service, error) //get service list
Watch(...WatchOption) (Watcher, error) //watch service
}

198
registry/types.go 100644
View File

@ -0,0 +1,198 @@
package registry
import (
"context"
"crypto/tls"
"time"
"git.nobla.cn/golang/aeus/pkg/logger"
)
type (
Service struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Metadata map[string]string `json:"metadata"`
Endpoints []string `json:"endpoints"`
}
)
type (
Watcher interface {
// Next returns services in the following two cases:
// 1.the first time to watch and the service instance list is not empty.
// 2.any service instance changes found.
// if the above two conditions are not met, it will block until context deadline exceeded or canceled
Next() ([]*Service, error)
// Stop close the watcher.
Stop() error
}
)
type (
RegistryOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
TLSConfig *tls.Config
Addrs []string
Timeout time.Duration
Secure bool
}
RegistryOption func(o *RegistryOptions)
RegisterOptions struct {
Context context.Context
// register ttl
TTL time.Duration
}
RegisterOption func(o *RegisterOptions)
WatchOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Specify a service to watch
// If blank, the watch is for all services
Service string
// Specify a logger
Logger logger.Logger
}
WatchOption func(o *WatchOptions)
DeregisterOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
DeregisterOption func(o *DeregisterOptions)
GetOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
GetOption func(o *GetOptions)
)
func WithRegistryContext(ctx context.Context) RegistryOption {
return func(o *RegistryOptions) {
o.Context = ctx
}
}
func WithAddress(address string) RegistryOption {
return func(o *RegistryOptions) {
o.Addrs = append(o.Addrs, address)
}
}
func WithTLS(tls *tls.Config) RegistryOption {
return func(o *RegistryOptions) {
o.TLSConfig = tls
}
}
func WithTimeout(t time.Duration) RegistryOption {
return func(o *RegistryOptions) {
o.Timeout = t
}
}
func NewRegistryOption(cbs ...RegistryOption) *RegistryOptions {
opts := &RegistryOptions{
Context: context.Background(),
}
for _, o := range cbs {
o(opts)
}
return opts
}
func WithTTL(t time.Duration) RegisterOption {
return func(o *RegisterOptions) {
o.TTL = t
}
}
func WithRegsterContext(ctx context.Context) RegisterOption {
return func(o *RegisterOptions) {
o.Context = ctx
}
}
func NewRegisterOption(cbs ...RegisterOption) *RegisterOptions {
opts := &RegisterOptions{
Context: context.Background(),
}
for _, o := range cbs {
o(opts)
}
return opts
}
func WithDeregisterContext(ctx context.Context) DeregisterOption {
return func(o *DeregisterOptions) {
o.Context = ctx
}
}
func NewDeregisterOption(cbs ...DeregisterOption) *DeregisterOptions {
opts := &DeregisterOptions{
Context: context.Background(),
}
for _, o := range cbs {
o(opts)
}
return opts
}
func WithService(service string) WatchOption {
return func(o *WatchOptions) {
o.Service = service
}
}
func WithLogger(logger logger.Logger) WatchOption {
return func(o *WatchOptions) {
o.Logger = logger
}
}
func WithWatchContext(ctx context.Context) WatchOption {
return func(o *WatchOptions) {
o.Context = ctx
}
}
func NewWatchOption(cbs ...WatchOption) *WatchOptions {
opts := &WatchOptions{
Context: context.Background(),
Logger: logger.Default(),
}
for _, o := range cbs {
o(opts)
}
return opts
}
func WithGetContext(ctx context.Context) GetOption {
return func(o *GetOptions) {
o.Context = ctx
}
}
func NewGetOption(cbs ...GetOption) *GetOptions {
opts := &GetOptions{
Context: context.Background(),
}
for _, o := range cbs {
o(opts)
}
return opts
}

View File

@ -0,0 +1,262 @@
package cli
import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"net"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/peterh/liner"
)
type Client struct {
name string
ctx context.Context
address string
sequence uint16
conn net.Conn
liner *liner.State
mutex sync.Mutex
exitChan chan struct{}
readyChan chan struct{}
commandChan chan *Frame
completerChan chan *Frame
Timeout time.Duration
exitFlag int32
}
func (client *Client) getSequence() uint16 {
client.mutex.Lock()
defer client.mutex.Unlock()
if client.sequence >= math.MaxUint16 {
client.sequence = 0
}
client.sequence++
n := client.sequence
return n
}
func (client *Client) dialContext(ctx context.Context, address string) (conn net.Conn, err error) {
var (
pos int
network string
dialer net.Dialer
)
if pos = strings.Index(address, "://"); pos > -1 {
network = address[:pos]
address = address[pos+3:]
} else {
network = "tcp"
}
if conn, err = dialer.DialContext(ctx, network, address); err != nil {
return
}
return
}
func (client *Client) renderBanner(info *handshake) {
client.name = info.Name
fmt.Printf("Welcome to the %s(%s) monitor\n", info.Name, info.Version)
fmt.Printf("Your connection id is %d\n", info.ID)
fmt.Printf("Last login: %s from %s\n", info.ServerTime.Format(time.RFC822), info.RemoteAddr)
fmt.Printf("Type 'help' for help. Type 'exit' for quit. Type 'cls' to clear input statement.\n")
}
func (client *Client) ioLoop(r io.Reader) {
defer func() {
_ = client.Close()
}()
for {
frame, err := readFrame(r)
if err != nil {
return
}
switch frame.Type {
case PacketTypeHandshake:
info := &handshake{}
if err = json.Unmarshal(frame.Data, info); err == nil {
client.renderBanner(info)
}
select {
case client.readyChan <- struct{}{}:
case <-client.exitChan:
return
}
case PacketTypeCompleter:
select {
case client.completerChan <- frame:
case <-client.exitChan:
return
}
case PacketTypeCommand:
select {
case client.commandChan <- frame:
case <-client.exitChan:
return
}
}
}
}
func (client *Client) waitResponse(seq uint16, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
fmt.Println("timeout waiting for response")
return
case <-client.exitChan:
return
case res, ok := <-client.commandChan:
if !ok {
break
}
if res.Seq == seq {
if res.Error != "" {
fmt.Print(res.Error)
} else {
fmt.Print(string(res.Data))
}
if res.Flag == FlagComplete {
fmt.Println("")
return
}
}
}
}
}
func (client *Client) completer(str string) (ss []string) {
var (
err error
seq uint16
)
ss = make([]string, 0)
seq = client.getSequence()
if err = writeFrame(client.conn, newFrame(PacketTypeCompleter, FlagComplete, seq, client.Timeout, []byte(str))); err != nil {
return
}
select {
case <-time.After(time.Second * 5):
case frame, ok := <-client.completerChan:
if ok {
err = json.Unmarshal(frame.Data, &ss)
}
}
return
}
func (client *Client) Execute(s string) (err error) {
var (
seq uint16
)
if client.conn, err = client.dialContext(client.ctx, client.address); err != nil {
return err
}
defer func() {
_ = client.Close()
}()
go client.ioLoop(client.conn)
seq = client.getSequence()
if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(s))); err != nil {
return err
}
client.waitResponse(seq, client.Timeout)
return
}
func (client *Client) Shell() (err error) {
var (
seq uint16
line string
)
client.liner.SetCtrlCAborts(true)
if client.conn, err = client.dialContext(client.ctx, client.address); err != nil {
return err
}
defer func() {
_ = client.Close()
}()
if err = writeFrame(client.conn, newFrame(PacketTypeHandshake, FlagComplete, client.getSequence(), client.Timeout, nil)); err != nil {
return
}
go client.ioLoop(client.conn)
select {
case <-client.readyChan:
case <-client.ctx.Done():
return
}
client.liner.SetCompleter(client.completer)
for {
if line, err = client.liner.Prompt(client.name + "> "); err != nil {
break
}
if atomic.LoadInt32(&client.exitFlag) == 1 {
fmt.Println(Bye)
break
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
if strings.ToLower(line) == "exit" || strings.ToLower(line) == "quit" {
fmt.Println(Bye)
return
}
if strings.ToLower(line) == "clear" || strings.ToLower(line) == "cls" {
fmt.Print("\033[2J")
continue
}
seq = client.getSequence()
if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(line))); err != nil {
break
}
client.liner.AppendHistory(line)
client.waitResponse(seq, client.Timeout)
}
return
}
func (client *Client) Close() (err error) {
if !atomic.CompareAndSwapInt32(&client.exitFlag, 0, 1) {
return
}
close(client.exitChan)
if client.conn != nil {
err = client.conn.Close()
}
if client.liner != nil {
err = client.liner.Close()
}
return
}
func NewClient(ctx context.Context, addr string) *Client {
var (
timeout time.Duration
)
if ctx == nil {
ctx = context.Background()
}
timeout = time.Second * 30
return &Client{
ctx: ctx,
address: addr,
name: filepath.Base(os.Args[0]),
Timeout: timeout,
liner: liner.NewLiner(),
readyChan: make(chan struct{}, 1),
exitChan: make(chan struct{}),
commandChan: make(chan *Frame, 5),
completerChan: make(chan *Frame, 5),
}
}

View File

@ -0,0 +1,148 @@
package cli
import (
"context"
"fmt"
"io"
"math"
"net/url"
"sync"
"github.com/gin-gonic/gin/binding"
)
type Context struct {
Id int64
seq uint16
ctx context.Context
wc io.WriteCloser
params map[string]string
locker sync.RWMutex
variables map[string]any
args []string
}
func (ctx *Context) reset(id int64, wc io.WriteCloser) {
ctx.Id = id
ctx.wc = wc
ctx.seq = 0
ctx.ctx = context.Background()
ctx.args = make([]string, 0)
ctx.params = make(map[string]string)
ctx.variables = make(map[string]any)
}
func (ctx *Context) setArgs(args []string) {
ctx.args = args
}
func (ctx *Context) setParam(ps map[string]string) {
ctx.params = ps
}
func (ctx *Context) Bind(val any) (err error) {
qs := url.Values{}
for k, v := range ctx.params {
qs.Set(k, v)
}
return binding.MapFormWithTag(val, qs, "json")
}
func (ctx *Context) setContext(c context.Context) {
ctx.ctx = c
}
func (ctx *Context) Context() context.Context {
return ctx.ctx
}
func (ctx *Context) Argument(index int) string {
if index >= len(ctx.args) || index < 0 {
return ""
}
return ctx.args[index]
}
func (ctx *Context) Param(s string) string {
if v, ok := ctx.params[s]; ok {
return v
}
return ""
}
func (ctx *Context) SetValue(name string, value any) {
ctx.locker.Lock()
if ctx.variables == nil {
ctx.variables = make(map[string]any)
}
ctx.variables[name] = value
ctx.locker.Unlock()
}
func (ctx *Context) GetValue(name string) (val any, ok bool) {
ctx.locker.RLock()
defer ctx.locker.RUnlock()
val, ok = ctx.variables[name]
return
}
func (ctx *Context) Success(v any) (err error) {
return ctx.send(responsePayload{Type: PacketTypeCommand, Data: v})
}
func (ctx *Context) Error(code int, reason string) (err error) {
return ctx.send(responsePayload{Type: PacketTypeCommand, Code: code, Reason: reason})
}
func (ctx *Context) Close() (err error) {
return ctx.wc.Close()
}
func (ctx *Context) send(res responsePayload) (err error) {
var (
ok bool
buf []byte
marshal encoder
)
if res.Code > 0 {
err = writeFrame(ctx.wc, &Frame{
Feature: Feature,
Type: res.Type,
Seq: ctx.seq,
Flag: FlagComplete,
Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Reason),
})
return
}
if res.Data == nil {
buf = OK
goto __END
}
if marshal, ok = res.Data.(encoder); ok {
buf, err = marshal.Marshal()
goto __END
}
buf, err = serialize(res.Data)
__END:
if err != nil {
return
}
offset := 0
chunkSize := math.MaxInt16 - 1
n := len(buf) / chunkSize
for i := 0; i < n; i++ {
if err = writeFrame(ctx.wc, newFrame(res.Type, FlagPortion, ctx.seq, 0, buf[offset:chunkSize+offset])); err != nil {
return
}
offset += chunkSize
}
err = writeFrame(ctx.wc, newFrame(res.Type, FlagComplete, ctx.seq, 0, buf[offset:]))
return
}
func newContext(id int64, wc io.WriteCloser) *Context {
return &Context{
Id: id,
wc: wc,
}
}

View File

@ -0,0 +1,162 @@
package cli
import (
"bytes"
"encoding/binary"
"io"
"math"
"time"
)
const (
PacketTypeCompleter byte = 0x01
PacketTypeCommand = 0x02
PacketTypeHandshake = 0x03
)
const (
FlagPortion = 0x00
FlagComplete = 0x01
)
type (
Frame struct {
Feature []byte
Type byte `json:"type"`
Flag byte `json:"flag"`
Seq uint16 `json:"seq"`
Data []byte `json:"data"`
Error string `json:"error"`
Timeout int64 `json:"timeout"`
Timestamp int64 `json:"timestamp"`
}
)
func readFrame(r io.Reader) (frame *Frame, err error) {
var (
n int
dataLength uint16
errorLength uint16
errBuf []byte
)
frame = &Frame{Feature: make([]byte, 3)}
if _, err = io.ReadFull(r, frame.Feature); err != nil {
return
}
if !bytes.Equal(frame.Feature, Feature) {
err = io.ErrUnexpectedEOF
return
}
if err = binary.Read(r, binary.LittleEndian, &frame.Type); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &frame.Flag); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &frame.Seq); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &frame.Timeout); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &frame.Timestamp); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &dataLength); err != nil {
return
}
if err = binary.Read(r, binary.LittleEndian, &errorLength); err != nil {
return
}
if dataLength > 0 {
frame.Data = make([]byte, dataLength)
if n, err = io.ReadFull(r, frame.Data); err == nil {
if n < int(dataLength) {
err = io.ErrShortBuffer
}
}
}
if errorLength > 0 {
errBuf = make([]byte, errorLength)
if n, err = io.ReadFull(r, errBuf); err == nil {
if n < int(dataLength) {
err = io.ErrShortBuffer
} else {
frame.Error = string(errBuf)
}
}
}
return
}
func writeFrame(w io.Writer, frame *Frame) (err error) {
var (
n int
dl int
dataLength uint16
errorLength uint16
errBuf []byte
)
if _, err = w.Write(Feature); err != nil {
return
}
if frame.Data != nil {
dl = len(frame.Data)
if dl > math.MaxUint16 {
return io.ErrNoProgress
}
dataLength = uint16(dl)
}
if frame.Error != "" {
errBuf = []byte(frame.Error)
errorLength = uint16(len(errBuf))
}
if err = binary.Write(w, binary.LittleEndian, frame.Type); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, frame.Flag); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, frame.Seq); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, frame.Timeout); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, frame.Timestamp); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, dataLength); err != nil {
return
}
if err = binary.Write(w, binary.LittleEndian, errorLength); err != nil {
return
}
if dataLength > 0 {
if n, err = w.Write(frame.Data); err == nil {
if n < int(dataLength) {
err = io.ErrShortWrite
}
}
}
if errorLength > 0 {
if n, err = w.Write(errBuf); err == nil {
if n < int(errorLength) {
err = io.ErrShortWrite
}
}
}
return
}
func newFrame(t, f byte, seq uint16, timeout time.Duration, data []byte) *Frame {
return &Frame{
Feature: Feature,
Type: t,
Flag: f,
Seq: seq,
Data: data,
Timeout: int64(timeout),
Timestamp: time.Now().Unix(),
}
}

View File

@ -0,0 +1,181 @@
package cli
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/mattn/go-runewidth"
)
var (
ErrNotFound = errors.New("not found")
)
type Router struct {
name string
path []string
children []*Router
command Command
params []string
}
func (r *Router) getChildren(name string) *Router {
for _, child := range r.children {
if child.name == name {
return child
}
}
return nil
}
func (r *Router) Completer(tokens ...string) []string {
ss := make([]string, 0, 10)
if len(tokens) == 0 {
for _, child := range r.children {
ss = append(ss, strings.Join(child.path, " "))
}
return ss
}
children := r.getChildren(tokens[0])
if children == nil {
token := tokens[0]
for _, child := range r.children {
if strings.HasPrefix(child.name, token) {
ss = append(ss, strings.Join(child.path, " "))
}
}
return ss
}
return children.Completer(tokens[1:]...)
}
func (r *Router) Usage() string {
if len(r.path) <= 0 {
return ""
}
var (
sb strings.Builder
)
sb.WriteString("Usage: ")
sb.WriteString(strings.Join(r.path, " "))
if len(r.params) > 0 {
for _, s := range r.params {
sb.WriteString(" {" + s + "}")
}
}
return sb.String()
}
func (r *Router) Handle(path string, command Command) {
var (
pos int
name string
)
if strings.HasSuffix(path, "/") {
path = strings.TrimSuffix(path, "/")
}
if strings.HasPrefix(path, "/") {
path = strings.TrimPrefix(path, "/")
}
if path == "" {
r.command = command
return
}
if path[0] == ':' {
ss := strings.Split(path, "/")
for _, s := range ss {
r.params = append(r.params, strings.TrimPrefix(s, ":"))
}
r.command = command
return
}
if pos = strings.IndexByte(path, '/'); pos > -1 {
name = path[:pos]
path = path[pos:]
} else {
name = path
path = ""
}
if name == "-" {
name = "app"
}
children := r.getChildren(name)
if children == nil {
children = newRouter(name)
if len(r.path) == 0 {
children.path = append(children.path, name)
} else {
children.path = append(children.path, r.path...)
children.path = append(children.path, name)
}
r.children = append(r.children, children)
}
if children.command.Handle != nil {
panic("a handle is already registered for path /" + strings.Join(children.path, "/"))
}
children.Handle(path, command)
}
func (r *Router) Lookup(tokens []string) (router *Router, args []string, err error) {
if len(tokens) > 0 {
children := r.getChildren(tokens[0])
if children != nil {
return children.Lookup(tokens[1:])
}
}
if r.command.Handle == nil {
err = ErrNotFound
return
}
router = r
args = tokens
return
}
func (r *Router) String() string {
var (
sb strings.Builder
width int
maxWidth int
walkFunc func(router *Router) []commander
)
walkFunc = func(router *Router) []commander {
vs := make([]commander, 0, 5)
if router.command.Handle != nil {
vs = append(vs, commander{
Name: router.name,
Path: strings.Join(router.path, " "),
Description: router.command.Description,
})
} else {
if len(router.children) > 0 {
for _, child := range router.children {
vs = append(vs, walkFunc(child)...)
}
}
}
return vs
}
vs := walkFunc(r)
for _, v := range vs {
width = runewidth.StringWidth(v.Path)
if width > maxWidth {
maxWidth = width
}
}
for _, v := range vs {
sb.WriteString(fmt.Sprintf("%-"+strconv.Itoa(maxWidth+4)+"s %s\n", v.Path, v.Description))
}
return sb.String()
}
func newRouter(name string) *Router {
return &Router{
name: name,
path: make([]string, 0, 4),
params: make([]string, 0, 4),
children: make([]*Router, 0, 10),
}
}

View File

@ -0,0 +1,263 @@
package cli
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"slices"
"strconv"
"strings"
"time"
"git.nobla.cn/golang/aeus/pkg/pool"
"github.com/mattn/go-runewidth"
)
func isNormalKind(kind reflect.Kind) bool {
normalKinds := []reflect.Kind{
reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Int,
reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uint,
reflect.Float32, reflect.Float64,
reflect.String,
}
for _, k := range normalKinds {
if k == kind {
return true
}
}
return false
}
func serializeMap(val map[any]any) ([]byte, error) {
var (
canFormat bool
width int
maxWidth int
)
canFormat = true
for k, v := range val {
if !isNormalKind(reflect.Indirect(reflect.ValueOf(k)).Kind()) || !isNormalKind(reflect.Indirect(reflect.ValueOf(v)).Kind()) {
canFormat = false
break
}
}
if !canFormat {
return json.MarshalIndent(val, "", "\t")
}
ms := make(map[string]string)
for k, v := range val {
sk := fmt.Sprint(k)
ms[sk] = fmt.Sprint(v)
width = runewidth.StringWidth(sk)
if width > maxWidth {
maxWidth = width
}
}
buffer := pool.GetBuffer()
defer pool.PutBuffer(buffer)
for k, v := range ms {
buffer.WriteString(fmt.Sprintf("%-"+strconv.Itoa(maxWidth+4)+"s %s\n", k, v))
}
return buffer.Bytes(), nil
}
func printBorder(w *bytes.Buffer, ws []int) {
for _, l := range ws {
w.WriteString("+")
w.WriteString(strings.Repeat("-", l+2))
}
w.WriteString("+\n")
}
func toString(v any) string {
switch t := v.(type) {
case float32, float64:
return fmt.Sprintf("%.2f", t)
case time.Time:
return t.Format("2006-01-02 15:04:05")
default:
return fmt.Sprint(v)
}
}
func printArray(vals [][]any) (buf []byte) {
var (
cell string
str string
widths []int
maxLength int
width int
rows [][]string
)
rows = make([][]string, 0, len(vals))
for _, value := range vals {
if len(value) > maxLength {
maxLength = len(value)
}
}
widths = make([]int, maxLength)
for _, vs := range vals {
rl := len(vs)
row := make([]string, rl)
for i, val := range vs {
str = toString(val)
if rl > 1 {
width = runewidth.StringWidth(str)
if width > widths[i] {
widths[i] = width
}
}
row[i] = str
}
rows = append(rows, row)
}
buffer := pool.GetBuffer()
defer pool.PutBuffer(buffer)
printBorder(buffer, widths)
for index, row := range rows {
size := len(row)
for i, w := range widths {
cell = ""
buffer.WriteString("|")
if size > i {
cell = row[i]
}
buffer.WriteString(" ")
buffer.WriteString(cell)
cl := runewidth.StringWidth(cell)
if w > cl {
buffer.WriteString(strings.Repeat(" ", w-cl))
}
buffer.WriteString(" ")
}
buffer.WriteString("|\n")
if index == 0 {
printBorder(buffer, widths)
}
}
printBorder(buffer, widths)
return buffer.Bytes()
}
func serializeArray(val []any) (buf []byte, err error) {
var (
ok bool
vs [][]any
normalFormat bool
isArrayElement bool
isStructElement bool
columnName string
)
normalFormat = true
for _, row := range val {
kind := reflect.Indirect(reflect.ValueOf(row)).Kind()
if !isNormalKind(kind) {
normalFormat = false
}
if kind == reflect.Array || kind == reflect.Slice {
isArrayElement = true
}
if kind == reflect.Struct {
isStructElement = true
}
}
if normalFormat {
goto __END
}
if isArrayElement {
vs = make([][]any, 0, len(val))
for _, v := range val {
rv := reflect.Indirect(reflect.ValueOf(v))
if rv.Kind() == reflect.Array || rv.Kind() == reflect.Slice {
row := make([]any, 0, rv.Len())
for i := 0; i < rv.Len(); i++ {
if isNormalKind(rv.Index(i).Kind()) || rv.Index(i).Interface() == nil {
row = append(row, rv.Index(i).Interface())
} else {
goto __END
}
}
vs = append(vs, row)
} else {
goto __END
}
}
}
if isStructElement {
vs = make([][]any, 0, len(val))
indexes := make([]int, 0)
for i, v := range val {
rv := reflect.Indirect(reflect.ValueOf(v))
if rv.Kind() == reflect.Struct {
if i == 0 {
row := make([]any, 0, rv.Type().NumField())
for j := 0; j < rv.Type().NumField(); j++ {
st := rv.Type().Field(j).Tag
if columnName, ok = st.Lookup("kos"); !ok {
columnName = strings.ToUpper(rv.Type().Field(j).Name)
} else {
if columnName == "-" {
continue
}
}
if !rv.Type().Field(j).IsExported() {
continue
}
indexes = append(indexes, j)
row = append(row, columnName)
}
vs = append(vs, row)
}
row := make([]any, 0, rv.Type().NumField())
for j := 0; j < rv.Type().NumField(); j++ {
if slices.Index(indexes, j) > -1 {
row = append(row, rv.Field(j).Interface())
}
}
vs = append(vs, row)
} else {
goto __END
}
}
}
buf = printArray(vs)
return
__END:
return json.MarshalIndent(val, "", "\t")
}
func serialize(val any) (buf []byte, err error) {
var (
refVal reflect.Value
)
refVal = reflect.Indirect(reflect.ValueOf(val))
switch refVal.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
buf = []byte(strconv.FormatInt(refVal.Int(), 10))
case reflect.Float32, reflect.Float64:
buf = []byte(strconv.FormatFloat(refVal.Float(), 'f', -1, 64))
case reflect.String:
buf = []byte(refVal.String())
case reflect.Slice, reflect.Array:
if refVal.Type().Elem().Kind() == reflect.Uint8 {
buf = refVal.Bytes()
} else {
as := make([]any, 0, refVal.Len())
for i := 0; i < refVal.Len(); i++ {
as = append(as, refVal.Index(i).Interface())
}
buf, err = serializeArray(as)
}
case reflect.Map:
ms := make(map[any]any)
keys := refVal.MapKeys()
for _, key := range keys {
ms[key.Interface()] = refVal.MapIndex(key).Interface()
}
buf, err = serializeMap(ms)
default:
buf, err = json.MarshalIndent(refVal.Interface(), "", "\t")
}
return
}

View File

@ -0,0 +1,233 @@
package cli
import (
"context"
"fmt"
"math"
"net"
"net/url"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"git.nobla.cn/golang/aeus/pkg/errors"
"git.nobla.cn/golang/aeus/pkg/logger"
netutil "git.nobla.cn/golang/aeus/pkg/net"
)
type Server struct {
ctx context.Context
router *Router
opts *options
listener net.Listener
sequenceLocker sync.Mutex
sequence int64
ctxMap sync.Map
uri *url.URL
exitFlag int32
}
func (svr *Server) Handle(pathname string, desc string, cb HandleFunc) {
svr.router.Handle(pathname, svr.wrapCommand(pathname, desc, cb))
}
func (svr *Server) wrapCommand(pathname, desc string, cb HandleFunc) Command {
h := func(ctx *Context) (err error) {
return cb(ctx)
}
if desc == "" {
desc = strings.Join(strings.Split(strings.TrimPrefix(pathname, "/"), "/"), " ")
}
return Command{
Path: pathname,
Handle: h,
Description: desc,
}
}
func (s *Server) createListener() (err error) {
if s.listener != nil {
return
}
if s.listener, err = net.Listen(s.opts.network, s.opts.address); err == nil {
s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener)
}
return
}
func (s *Server) applyContext() *Context {
if v := ctxPool.Get(); v != nil {
if ctx, ok := v.(*Context); ok {
return ctx
}
}
return &Context{}
}
func (s *Server) releaseContext(ctx *Context) {
ctxPool.Put(ctx)
}
func (s *Server) execute(ctx *Context, frame *Frame) (err error) {
var (
params map[string]string
tokens []string
args []string
r *Router
)
cmd := string(frame.Data)
tokens = strings.Fields(cmd)
if frame.Timeout > 0 {
childCtx, cancelFunc := context.WithTimeout(s.ctx, time.Duration(frame.Timeout))
ctx.setContext(childCtx)
defer func() {
cancelFunc()
}()
} else {
ctx.setContext(s.ctx)
}
if r, args, err = s.router.Lookup(tokens); err != nil {
if errors.Is(err, ErrNotFound) {
err = ctx.Error(errNotFound, fmt.Sprintf("Command %s not found", cmd))
} else {
err = ctx.Error(errExecuteFailed, err.Error())
}
} else {
if len(r.params) > len(args) {
err = ctx.Error(errExecuteFailed, r.Usage())
return
}
if len(r.params) > 0 {
params = make(map[string]string)
for i, s := range r.params {
params[s] = args[i]
}
}
ctx.setArgs(args)
ctx.setParam(params)
err = r.command.Handle(ctx)
}
return
}
func (svr *Server) nextSequence() int64 {
svr.sequenceLocker.Lock()
defer svr.sequenceLocker.Unlock()
if svr.sequence >= math.MaxInt64 {
svr.sequence = 1
}
svr.sequence++
return svr.sequence
}
func (svr *Server) process(conn net.Conn) {
var (
err error
ctx *Context
frame *Frame
)
ctx = svr.applyContext()
ctx.reset(svr.nextSequence(), conn)
svr.ctxMap.Store(ctx.Id, ctx)
defer func() {
_ = conn.Close()
svr.ctxMap.Delete(ctx.Id)
svr.releaseContext(ctx)
}()
for {
if frame, err = readFrame(conn); err != nil {
break
}
//reset frame
ctx.seq = frame.Seq
switch frame.Type {
case PacketTypeHandshake:
if err = ctx.send(responsePayload{
Type: PacketTypeHandshake,
Data: &handshake{
ID: ctx.Id,
Name: "",
Version: "",
OS: runtime.GOOS,
ServerTime: time.Now(),
RemoteAddr: conn.RemoteAddr().String(),
},
}); err != nil {
break
}
case PacketTypeCompleter:
if err = ctx.send(responsePayload{
Type: PacketTypeCompleter,
Data: svr.router.Completer(strings.Fields(string(frame.Data))...),
}); err != nil {
break
}
case PacketTypeCommand:
if err = svr.execute(ctx, frame); err != nil {
break
}
default:
break
}
}
}
func (s *Server) serve() (err error) {
for {
conn, err := s.listener.Accept()
if err != nil {
if atomic.LoadInt32(&s.exitFlag) == 1 {
return nil
}
return err
}
go s.process(conn)
}
}
func (s *Server) Start(ctx context.Context) (err error) {
s.ctx = ctx
if err = s.createListener(); err != nil {
return
}
s.opts.logger.Info(ctx, "cli server listen on: %s", s.uri.Host)
s.Handle("/help", "Display help information", func(ctx *Context) (err error) {
return ctx.Success(s.router.String())
})
err = s.serve()
return
}
func (s *Server) Stop(ctx context.Context) (err error) {
if !atomic.CompareAndSwapInt32(&s.exitFlag, 0, 1) {
return
}
if s.listener != nil {
err = s.listener.Close()
}
s.ctxMap.Range(func(key, value any) bool {
if ctx, ok := value.(*Context); ok {
err = ctx.Close()
}
return true
})
return
}
func New(cbs ...Option) *Server {
srv := &Server{
opts: &options{
network: "tcp",
address: ":0",
logger: logger.Default(),
},
uri: &url.URL{Scheme: "cli"},
router: newRouter(""),
}
for _, cb := range cbs {
cb(srv.opts)
}
return srv
}

View File

@ -0,0 +1,87 @@
package cli
import (
"context"
"crypto/tls"
"sync"
"time"
"git.nobla.cn/golang/aeus/pkg/logger"
)
var (
Feature = []byte{67, 76, 73}
OK = []byte("OK")
Bye = "Bye Bye"
)
const (
errNotFound = 4004
errExecuteFailed = 4005
)
var (
ctxPool sync.Pool
)
type Param struct {
Key string
Value string
}
type Params []Param
type HandleFunc func(ctx *Context) (err error)
type (
Option func(*options)
options struct {
network string
address string
logger logger.Logger
context context.Context
}
clientOptions struct {
tls *tls.Config
}
ClientOption func(*clientOptions)
)
type (
encoder interface {
Marshal() ([]byte, error)
}
responsePayload struct {
Type uint8 `json:"-"`
Code int `json:"code"`
Reason string `json:"reason,omitempty"`
Data any `json:"data,omitempty"`
}
)
type (
Command struct {
Path string
Handle HandleFunc
Description string
}
commander struct {
Name string
Path string
Description string
}
handshake struct {
ID int64 `json:"id"`
OS string `json:"os"`
Name string `json:"name"`
Version string `json:"version"`
ServerTime time.Time `json:"server_time"`
RemoteAddr string `json:"remote_addr"`
}
)

View File

@ -0,0 +1,39 @@
package grpc
import (
"context"
"git.nobla.cn/golang/aeus/transport/grpc/resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
grpcinsecure "google.golang.org/grpc/credentials/insecure"
)
func Dial(ctx context.Context, target string, cbs ...ClientOption) (conn *grpc.ClientConn, err error) {
grpc.WithResolvers()
opts := &clientOptions{}
for _, cb := range cbs {
cb(opts)
}
dialOpts := make([]grpc.DialOption, 0, 10)
if opts.registry != nil {
dialOpts = append(
dialOpts,
grpc.WithResolvers(
resolver.New(
resolver.WithRegistry(opts.registry),
),
),
)
}
if opts.tls != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(opts.tls)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
}
if opts.dialOptions != nil {
dialOpts = append(dialOpts, opts.dialOptions...)
}
conn, err = grpc.NewClient(target, dialOpts...)
return
}

View File

@ -0,0 +1,34 @@
package resolver
import (
"git.nobla.cn/golang/aeus/registry"
"google.golang.org/grpc/resolver"
)
type Builder struct {
opts *options
}
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var (
err error
watcher registry.Watcher
)
if watcher, err = b.opts.registry.Watch(func(o *registry.WatchOptions) {
o.Context = b.opts.context
o.Service = target.URL.Host
}); err != nil {
return nil, err
}
return newResolver(cc, watcher), nil
}
func (b *Builder) Scheme() string {
return "service"
}
func New(cbs ...Option) resolver.Builder {
return &Builder{
opts: newOptions(cbs...),
}
}

View File

@ -0,0 +1,70 @@
package resolver
import (
"net/url"
"strings"
"sync/atomic"
"git.nobla.cn/golang/aeus/registry"
"google.golang.org/grpc/resolver"
)
type Resolver struct {
conn resolver.ClientConn
watcher registry.Watcher
closeChan chan struct{}
closeFlag int32
}
func (r *Resolver) update(items []*registry.Service) {
addrs := make([]resolver.Address, 0, 10)
for _, item := range items {
for _, endpoint := range item.Endpoints {
if !strings.HasPrefix(endpoint, "grpc://") {
continue
}
if uri, err := url.Parse(endpoint); err == nil {
addrs = append(addrs, resolver.Address{
Addr: uri.Host,
})
}
}
}
r.conn.UpdateState(resolver.State{Addresses: addrs})
}
func (r *Resolver) eventLoop() {
for {
select {
case <-r.closeChan:
return
default:
}
if items, err := r.watcher.Next(); err == nil {
r.update(items)
}
}
}
func (r *Resolver) ResolveNow(opts resolver.ResolveNowOptions) {
}
// Close closes the resolver.
func (r *Resolver) Close() {
if !atomic.CompareAndSwapInt32(&r.closeFlag, 0, 1) {
return
}
close(r.closeChan)
r.watcher.Stop()
}
func newResolver(cc resolver.ClientConn, watcher registry.Watcher) *Resolver {
r := &Resolver{
conn: cc,
watcher: watcher,
closeChan: make(chan struct{}),
}
go r.eventLoop()
return r
}

View File

@ -0,0 +1,38 @@
package resolver
import (
"context"
"git.nobla.cn/golang/aeus/registry"
)
type (
options struct {
context context.Context
registry registry.Registry
}
Option func(*options)
)
func WithContext(ctx context.Context) Option {
return func(o *options) {
o.context = ctx
}
}
func WithRegistry(registry registry.Registry) Option {
return func(o *options) {
o.registry = registry
}
}
func newOptions(cbs ...Option) *options {
opts := &options{
context: context.Background(),
}
for _, cb := range cbs {
cb(opts)
}
return opts
}

View File

@ -0,0 +1,77 @@
package grpc
import (
"context"
"net"
"net/url"
"git.nobla.cn/golang/aeus/pkg/logger"
netutil "git.nobla.cn/golang/aeus/pkg/net"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type Server struct {
ctx context.Context
opts *options
uri *url.URL
serve *grpc.Server
listener net.Listener
}
func (s *Server) createListener() (err error) {
if s.listener == nil {
if s.listener, err = net.Listen(s.opts.network, s.opts.address); err != nil {
return
}
s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener)
}
return
}
func (s *Server) Start(ctx context.Context) (err error) {
s.ctx = ctx
if err = s.createListener(); err != nil {
return
}
s.opts.logger.Info(ctx, "grpc server listen on: %s", s.uri.Host)
reflection.Register(s.serve)
s.serve.Serve(s.listener)
return
}
func (s *Server) Endpoint(ctx context.Context) (string, error) {
if err := s.createListener(); err != nil {
return "", err
}
return s.uri.String(), nil
}
func (s *Server) RegisterService(sd *grpc.ServiceDesc, ss any) {
s.serve.RegisterService(sd, ss)
}
func (s *Server) Stop(ctx context.Context) (err error) {
s.serve.GracefulStop()
s.opts.logger.Info(s.ctx, "grpc server stopped")
return
}
func New(cbs ...Option) *Server {
svr := &Server{
opts: &options{
network: "tcp",
logger: logger.Default(),
address: ":0",
},
uri: &url.URL{
Scheme: "grpc",
},
}
for _, cb := range cbs {
cb(svr.opts)
}
gopts := []grpc.ServerOption{}
svr.serve = grpc.NewServer(gopts...)
return svr
}

View File

@ -0,0 +1,71 @@
package grpc
import (
"context"
"crypto/tls"
"git.nobla.cn/golang/aeus/pkg/logger"
"git.nobla.cn/golang/aeus/registry"
"google.golang.org/grpc"
)
type (
Option func(*options)
options struct {
network string
address string
logger logger.Logger
context context.Context
}
clientOptions struct {
tls *tls.Config
registry registry.Registry
dialOptions []grpc.DialOption
}
ClientOption func(*clientOptions)
)
func WithNetwork(network string) Option {
return func(o *options) {
o.network = network
}
}
func WithAddress(address string) Option {
return func(o *options) {
o.address = address
}
}
func WithContext(ctx context.Context) Option {
return func(o *options) {
o.context = ctx
}
}
func WithLogger(lg logger.Logger) Option {
return func(o *options) {
o.logger = lg
}
}
func WithTLS(tls *tls.Config) ClientOption {
return func(o *clientOptions) {
o.tls = tls
}
}
func WithRegistry(reg registry.Registry) ClientOption {
return func(o *clientOptions) {
o.registry = reg
}
}
func WithGrpcDialOptions(opts ...grpc.DialOption) ClientOption {
return func(o *clientOptions) {
o.dialOptions = opts
}
}

View File

@ -0,0 +1,91 @@
package http
import (
"context"
"net/http"
"net/url"
"sync"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
)
var (
ctxPool sync.Pool
)
type Context struct {
ctx *gin.Context
}
func (c *Context) Context() context.Context {
return c.ctx.Request.Context()
}
func (c *Context) Request() *http.Request {
return c.ctx.Request
}
func (c *Context) Response() http.ResponseWriter {
return c.ctx.Writer
}
func (c *Context) ClientIP() string {
return c.ctx.ClientIP()
}
func (c *Context) Param(key string) string {
return c.ctx.Param(key)
}
func (c *Context) Bind(val any) (err error) {
// if params exists, try bind params first
if len(c.ctx.Params) > 0 {
qs := url.Values{}
for _, p := range c.ctx.Params {
qs.Set(p.Key, p.Value)
}
if err = binding.MapFormWithTag(val, qs, "json"); err != nil {
return err
}
}
if c.Request().Method == http.MethodGet {
values := c.Request().URL.Query()
return binding.MapFormWithTag(val, values, "json")
}
return c.ctx.Bind(val)
}
func (c *Context) Error(code int, reason string) (err error) {
r := newResponse(code, reason, nil)
c.ctx.JSON(http.StatusOK, r)
return
}
func (c *Context) Success(value any) (err error) {
r := newResponse(http.StatusOK, "", value)
c.ctx.JSON(http.StatusOK, r)
return
}
func (c *Context) reset(ctx *gin.Context) {
c.ctx = ctx
}
func newContext(ctx *gin.Context) *Context {
v := ctxPool.Get()
if v == nil {
return &Context{
ctx: ctx,
}
}
if c, ok := v.(*Context); ok {
c.reset(ctx)
return c
}
return &Context{ctx: ctx}
}
func putContext(c *Context) {
ctxPool.Put(c)
}

View File

@ -0,0 +1,33 @@
package http
type Response interface {
SetCode(int)
SetReason(string)
SetData(any)
}
type response struct {
Code int `json:"code,omitempty" xml:"code,omitempty" yaml:"code,omitempty"`
Reason string `json:"reason,omitempty" xml:"reason,omitempty" yaml:"reason,omitempty"`
Data any `json:"data,omitempty" xml:"data,omitempty" yaml:"data,omitempty"`
}
func (r *response) SetCode(code int) {
r.Code = code
}
func (r *response) SetReason(reason string) {
r.Reason = reason
}
func (r *response) SetData(data any) {
r.Data = data
}
func newResponse(code int, reason string, data any) Response {
return &response{
Code: code,
Reason: reason,
Data: data,
}
}

View File

@ -0,0 +1,179 @@
package http
import (
"context"
"net"
"net/http"
"net/http/pprof"
"net/url"
"sync"
"git.nobla.cn/golang/aeus/middleware"
"git.nobla.cn/golang/aeus/pkg/errors"
"git.nobla.cn/golang/aeus/pkg/logger"
netutil "git.nobla.cn/golang/aeus/pkg/net"
"github.com/gin-gonic/gin"
)
type Server struct {
ctx context.Context
opts *options
uri *url.URL
serve *http.Server
engine *gin.Engine
once sync.Once
listener net.Listener
}
func (s *Server) Endpoint(ctx context.Context) (string, error) {
if err := s.createListener(); err != nil {
return "", err
}
return s.uri.String(), nil
}
func (s *Server) GET(pattern string, h HandleFunc) {
s.engine.GET(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) PUT(pattern string, h HandleFunc) {
s.engine.PUT(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) POST(pattern string, h HandleFunc) {
s.engine.POST(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) HEAD(pattern string, h HandleFunc) {
s.engine.HEAD(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) PATCH(pattern string, h HandleFunc) {
s.engine.PATCH(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) DELETE(pattern string, h HandleFunc) {
s.engine.DELETE(pattern, func(ctx *gin.Context) {
childCtx := newContext(ctx)
h(childCtx)
putContext(childCtx)
})
}
func (s *Server) Use(middlewares ...middleware.Middleware) {
for _, m := range middlewares {
s.engine.Use(s.warpMiddleware(m))
}
}
func (s *Server) warpMiddleware(m middleware.Middleware) gin.HandlerFunc {
return func(ginCtx *gin.Context) {
ctx := ginCtx.Request.Context()
handler := func(ctx context.Context) error {
ginCtx.Next()
if err := ginCtx.Errors.Last(); err != nil {
return err.Err
}
return nil
}
wrappedHandler := m(handler)
if err := wrappedHandler(ctx); err != nil {
ginCtx.AbortWithError(http.StatusServiceUnavailable, err)
return
}
}
}
func (s *Server) wrapHandle(f http.HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {
f(c.Writer, c.Request)
}
}
func (s *Server) createListener() (err error) {
if s.listener == nil {
if s.listener, err = net.Listen(s.opts.network, s.opts.address); err != nil {
return
}
s.uri.Host = netutil.TrulyAddr(s.opts.address, s.listener)
}
return
}
func (s *Server) Start(ctx context.Context) (err error) {
s.serve = &http.Server{
Addr: s.opts.address,
Handler: s.engine,
}
s.ctx = ctx
if s.opts.debug {
s.engine.Handle(http.MethodGet, "/debug/pprof/", s.wrapHandle(pprof.Index))
s.engine.Handle(http.MethodGet, "/debug/pprof/goroutine", s.wrapHandle(pprof.Index))
s.engine.Handle(http.MethodGet, "/debug/pprof/heap", s.wrapHandle(pprof.Index))
s.engine.Handle(http.MethodGet, "/debug/pprof/mutex", s.wrapHandle(pprof.Index))
s.engine.Handle(http.MethodGet, "/debug/pprof/threadcreate", s.wrapHandle(pprof.Index))
s.engine.Handle(http.MethodGet, "/debug/pprof/cmdline", s.wrapHandle(pprof.Cmdline))
s.engine.Handle(http.MethodGet, "/debug/pprof/profile", s.wrapHandle(pprof.Profile))
s.engine.Handle(http.MethodGet, "/debug/pprof/symbol", s.wrapHandle(pprof.Symbol))
s.engine.Handle(http.MethodGet, "/debug/pprof/trace", s.wrapHandle(pprof.Trace))
}
if err = s.createListener(); err != nil {
return
}
s.opts.logger.Info(ctx, "http server listen on: %s", s.uri.Host)
if s.opts.certFile != "" && s.opts.keyFile != "" {
s.uri.Scheme = "https"
err = s.serve.ServeTLS(s.listener, s.opts.certFile, s.opts.keyFile)
} else {
err = s.serve.Serve(s.listener)
}
if !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
func (s *Server) Stop(ctx context.Context) (err error) {
err = s.serve.Shutdown(ctx)
s.opts.logger.Info(ctx, "http server stopped")
return
}
func New(cbs ...Option) *Server {
svr := &Server{
uri: &url.URL{Scheme: "http"},
opts: &options{
network: "tcp",
logger: logger.Default(),
address: ":0",
},
}
for _, cb := range cbs {
cb(svr.opts)
}
if !svr.opts.debug {
gin.SetMode(gin.ReleaseMode)
}
svr.engine = gin.New(svr.opts.ginOptions...)
return svr
}

View File

@ -0,0 +1,83 @@
package http
import (
"context"
"net/http"
"git.nobla.cn/golang/aeus/pkg/logger"
"github.com/gin-gonic/gin"
)
type (
Option func(*options)
options struct {
network string
address string
certFile string
keyFile string
debug bool
hander http.Handler
logger logger.Logger
context context.Context
ginOptions []gin.OptionFunc
}
HandleFunc func(ctx *Context) (err error)
Middleware func(http.Handler) http.Handler
)
func WithNetwork(network string) Option {
return func(o *options) {
o.network = network
}
}
func WithAddress(address string) Option {
return func(o *options) {
o.address = address
}
}
func WithCertFile(certFile string) Option {
return func(o *options) {
o.certFile = certFile
}
}
func WithKeyFile(keyFile string) Option {
return func(o *options) {
o.keyFile = keyFile
}
}
func WithLogger(lg logger.Logger) Option {
return func(o *options) {
o.logger = lg
}
}
func WithContext(ctx context.Context) Option {
return func(o *options) {
o.context = ctx
}
}
func WithDebug(debug bool) Option {
return func(o *options) {
o.debug = debug
}
}
func WithHandler(h http.Handler) Option {
return func(o *options) {
o.hander = h
}
}
func WithGinOptions(opts ...gin.OptionFunc) Option {
return func(o *options) {
o.ginOptions = opts
}
}

54
types.go 100644
View File

@ -0,0 +1,54 @@
package aeus
import "context"
type (
applicationKey struct{}
// application context value.
Application interface {
ID() string
Name() string
Version() string
Debug() bool
Metadata() map[string]string
Endpoint() []string
}
// application scope interface.
Scope interface {
Init(ctx context.Context)
Run(ctx context.Context) (err error)
}
// service loader interface.
ServiceLoader interface {
Init(ctx context.Context)
}
// endpoint interface.
Endpointer interface {
Endpoint(ctx context.Context) (string, error)
}
// option callback
Option func(*options)
// server interface.
Server interface {
Start(ctx context.Context) (err error)
Stop(ctx context.Context) (err error)
}
)
func WithContext(ctx context.Context, app Application) context.Context {
return context.WithValue(ctx, applicationKey{}, app)
}
func FromContext(ctx context.Context) Application {
app, ok := ctx.Value(applicationKey{}).(Application)
if !ok {
return nil
}
return app
}