356 lines
8.4 KiB
Go
356 lines
8.4 KiB
Go
package kos
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.nspix.com/golang/kos/entry"
|
|
"git.nspix.com/golang/kos/entry/cli"
|
|
"git.nspix.com/golang/kos/entry/http"
|
|
_ "git.nspix.com/golang/kos/pkg/cache"
|
|
"git.nspix.com/golang/kos/pkg/log"
|
|
"git.nspix.com/golang/kos/util/env"
|
|
"github.com/sourcegraph/conc"
|
|
)
|
|
|
|
var (
|
|
ErrStopping = errors.New("stopping")
|
|
|
|
cliFlag = flag.Bool("cli", false, "Go application interactive mode")
|
|
)
|
|
|
|
type (
|
|
application struct {
|
|
ctx context.Context
|
|
cancelFunc context.CancelCauseFunc
|
|
opts *Options
|
|
gateway *entry.Gateway
|
|
http *http.Server
|
|
command *cli.Server
|
|
uptime time.Time
|
|
info *Info
|
|
plugins sync.Map
|
|
waitGroup conc.WaitGroup
|
|
exitFlag int32
|
|
}
|
|
)
|
|
|
|
func (app *application) Log() log.Logger {
|
|
return log.GetLogger()
|
|
}
|
|
|
|
func (app *application) Healthy() string {
|
|
if atomic.LoadInt32(&app.gateway.State().Processing) == 1 && atomic.LoadInt32(&app.gateway.State().Accepting) == 1 {
|
|
return StateHealthy
|
|
}
|
|
if atomic.LoadInt32(&app.gateway.State().Processing) == 1 {
|
|
return StateNoAccepting
|
|
}
|
|
if atomic.LoadInt32(&app.gateway.State().Accepting) == 1 {
|
|
return StateNoProgress
|
|
}
|
|
return StateUnavailable
|
|
}
|
|
|
|
func (app *application) Info() *Info {
|
|
return app.info
|
|
}
|
|
|
|
func (app *application) Http() *http.Server {
|
|
return app.http
|
|
}
|
|
|
|
func (app *application) Command() *cli.Server {
|
|
return app.command
|
|
}
|
|
|
|
func (app *application) Handle(path string, cb HandleFunc, cbs ...HandleOption) {
|
|
opts := newHandleOptions(cbs...)
|
|
if app.http != nil {
|
|
app.http.Handle(http.MethodPost, path, func(ctx *http.Context) (err error) {
|
|
return cb(ctx)
|
|
})
|
|
}
|
|
if app.command != nil {
|
|
app.command.Handle(path, opts.description, func(ctx *cli.Context) (err error) {
|
|
return cb(ctx)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (app *application) httpServe() (err error) {
|
|
var (
|
|
l net.Listener
|
|
)
|
|
app.http = http.New(app.ctx)
|
|
if l, err = app.gateway.Apply(
|
|
entry.Feature(http.MethodGet),
|
|
entry.Feature(http.MethodHead),
|
|
entry.Feature(http.MethodPost),
|
|
entry.Feature(http.MethodPut),
|
|
entry.Feature(http.MethodPatch),
|
|
entry.Feature(http.MethodDelete),
|
|
entry.Feature(http.MethodConnect),
|
|
entry.Feature(http.MethodOptions),
|
|
entry.Feature(http.MethodTrace),
|
|
); err != nil {
|
|
return
|
|
}
|
|
if app.opts.EnableDebug {
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/", http.Wrap(pprof.Index))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/goroutine", http.Wrap(pprof.Index))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/heap", http.Wrap(pprof.Index))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/mutex", http.Wrap(pprof.Index))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/threadcreate", http.Wrap(pprof.Index))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/cmdline", http.Wrap(pprof.Cmdline))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/profile", http.Wrap(pprof.Profile))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/symbol", http.Wrap(pprof.Symbol))
|
|
app.http.Handle(http.MethodGet, "/debug/pprof/trace", http.Wrap(pprof.Trace))
|
|
}
|
|
timer := time.NewTimer(time.Millisecond * 200)
|
|
defer timer.Stop()
|
|
errChan := make(chan error, 1)
|
|
app.waitGroup.Go(func() {
|
|
select {
|
|
case errChan <- app.http.Serve(l):
|
|
}
|
|
})
|
|
select {
|
|
case err = <-errChan:
|
|
case <-timer.C:
|
|
if app.opts.EnableDirectHttp {
|
|
app.gateway.Direct(l)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (app *application) commandServe() (err error) {
|
|
var (
|
|
l net.Listener
|
|
)
|
|
app.command = cli.New(app.ctx)
|
|
if l, err = app.gateway.Apply(
|
|
cli.Feature,
|
|
); err != nil {
|
|
return
|
|
}
|
|
timer := time.NewTimer(time.Millisecond * 200)
|
|
defer timer.Stop()
|
|
errChan := make(chan error, 1)
|
|
app.waitGroup.Go(func() {
|
|
select {
|
|
case errChan <- app.command.Serve(l):
|
|
}
|
|
})
|
|
select {
|
|
case err = <-errChan:
|
|
case <-timer.C:
|
|
if app.opts.EnableDirectCommand {
|
|
app.gateway.Direct(l)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (app *application) gotoInteractive() (err error) {
|
|
var (
|
|
client *cli.Client
|
|
)
|
|
client = cli.NewClient(
|
|
app.ctx,
|
|
net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port)),
|
|
)
|
|
return client.Shell()
|
|
}
|
|
|
|
func (app *application) buildInfo() *Info {
|
|
info := &Info{
|
|
ID: app.opts.Name,
|
|
Name: app.opts.Name,
|
|
Version: app.opts.Version,
|
|
Status: StateHealthy,
|
|
Address: app.opts.Address,
|
|
Port: app.opts.Port,
|
|
Metadata: app.opts.Metadata,
|
|
}
|
|
if info.Metadata == nil {
|
|
info.Metadata = make(map[string]string)
|
|
}
|
|
info.Metadata["os"] = runtime.GOOS
|
|
info.Metadata["numOfCPU"] = strconv.Itoa(runtime.NumCPU())
|
|
info.Metadata["goVersion"] = runtime.Version()
|
|
info.Metadata["shortName"] = app.opts.ShortName()
|
|
info.Metadata["upTime"] = app.uptime.Format(time.DateTime)
|
|
return info
|
|
}
|
|
|
|
func (app *application) preStart() (err error) {
|
|
var (
|
|
addr string
|
|
)
|
|
app.ctx, app.cancelFunc = context.WithCancelCause(app.opts.Context)
|
|
if *cliFlag && !app.opts.DisableCommand {
|
|
if err = app.gotoInteractive(); err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(1)
|
|
}
|
|
os.Exit(0)
|
|
}
|
|
app.info = app.buildInfo()
|
|
app.Log().Infof("server starting")
|
|
env.Set(EnvAppName, app.opts.ShortName())
|
|
env.Set(EnvAppVersion, app.opts.Version)
|
|
addr = net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port))
|
|
app.Log().Infof("server listen on: %s", addr)
|
|
app.gateway = entry.New(addr)
|
|
if err = app.gateway.Start(app.ctx); err != nil {
|
|
return
|
|
}
|
|
if !app.opts.DisableHttp {
|
|
if err = app.httpServe(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
if !app.opts.DisableCommand {
|
|
if err = app.commandServe(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
app.plugins.Range(func(key, value any) bool {
|
|
if plugin, ok := value.(Plugin); ok {
|
|
if err = plugin.BeforeStart(); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
if app.opts.server != nil {
|
|
if err = app.opts.server.Start(app.ctx); err != nil {
|
|
app.Log().Warnf("server start error: %s", err.Error())
|
|
return
|
|
}
|
|
}
|
|
if !app.opts.DisableStateApi {
|
|
app.Handle("/-/run/state", func(ctx Context) (err error) {
|
|
return ctx.Success(State{
|
|
ID: app.opts.Name,
|
|
Name: app.opts.Name,
|
|
Version: app.opts.Version,
|
|
Uptime: time.Now().Sub(app.uptime).String(),
|
|
Gateway: app.gateway.State(),
|
|
})
|
|
}, WithHandleDescription("Display application state"))
|
|
app.Handle("/-/healthy", func(ctx Context) (err error) {
|
|
return ctx.Success(app.Healthy())
|
|
}, WithHandleDescription("Display application healthy"))
|
|
}
|
|
app.plugins.Range(func(key, value any) bool {
|
|
if plugin, ok := value.(Plugin); ok {
|
|
if err = plugin.AfterStart(); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
app.Log().Infof("server started")
|
|
return
|
|
}
|
|
|
|
func (app *application) preStop() (err error) {
|
|
if !atomic.CompareAndSwapInt32(&app.exitFlag, 0, 1) {
|
|
return
|
|
}
|
|
app.Log().Infof("server stopping")
|
|
if app.opts.server != nil {
|
|
if err = app.opts.server.Stop(); err != nil {
|
|
app.Log().Warnf("app server stop error: %s", err.Error())
|
|
}
|
|
}
|
|
app.cancelFunc(ErrStopping)
|
|
app.plugins.Range(func(key, value any) bool {
|
|
if plugin, ok := value.(Plugin); ok {
|
|
if err = plugin.BeforeStop(); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
if app.http != nil {
|
|
if err = app.http.Shutdown(); err != nil {
|
|
app.Log().Warnf("server http shutdown error: %s", err.Error())
|
|
}
|
|
}
|
|
if app.command != nil {
|
|
if err = app.command.Shutdown(); err != nil {
|
|
app.Log().Warnf("server command shutdown error: %s", err.Error())
|
|
}
|
|
}
|
|
if err = app.gateway.Stop(); err != nil {
|
|
app.Log().Warnf("server gateway shutdown error: %s", err.Error())
|
|
}
|
|
app.plugins.Range(func(key, value any) bool {
|
|
if plugin, ok := value.(Plugin); ok {
|
|
if err = plugin.AfterStop(); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
app.waitGroup.Wait()
|
|
app.Log().Infof("server stopped")
|
|
return
|
|
}
|
|
|
|
func (app *application) Use(plugin Plugin) (err error) {
|
|
var (
|
|
ok bool
|
|
)
|
|
if _, ok = app.plugins.Load(plugin.Name()); ok {
|
|
return fmt.Errorf("plugin %s already registered", plugin.Name())
|
|
}
|
|
if err = plugin.Mount(app.ctx); err != nil {
|
|
return
|
|
}
|
|
app.plugins.Store(plugin.Name(), plugin)
|
|
return
|
|
}
|
|
|
|
func (app *application) Run() (err error) {
|
|
if err = app.preStart(); err != nil {
|
|
return
|
|
}
|
|
ch := make(chan os.Signal, 1)
|
|
if app.opts.Signals == nil {
|
|
app.opts.Signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
|
|
}
|
|
signal.Notify(ch, app.opts.Signals...)
|
|
select {
|
|
case <-ch:
|
|
case <-app.ctx.Done():
|
|
}
|
|
return app.preStop()
|
|
}
|
|
|
|
func New(cbs ...Option) *application {
|
|
opts := NewOptions(cbs...)
|
|
app := &application{
|
|
opts: opts,
|
|
uptime: time.Now(),
|
|
}
|
|
return app
|
|
}
|