kos/service.go

358 lines
8.3 KiB
Go

package kos
import (
"context"
"errors"
"flag"
"fmt"
"net"
"net/http/pprof"
"os"
"os/signal"
"runtime"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"git.nspix.com/golang/kos/entry"
"git.nspix.com/golang/kos/entry/cli"
"git.nspix.com/golang/kos/entry/http"
_ "git.nspix.com/golang/kos/pkg/cache"
"git.nspix.com/golang/kos/pkg/log"
"git.nspix.com/golang/kos/util/env"
"github.com/sourcegraph/conc"
)
var (
ErrStopping = errors.New("stopping")
cliFlag = flag.Bool("cli", false, "Go application interactive mode")
)
type (
application struct {
ctx context.Context
cancelFunc context.CancelCauseFunc
opts *Options
gateway *entry.Gateway
http *http.Server
command *cli.Server
uptime time.Time
info *Info
plugins sync.Map
waitGroup conc.WaitGroup
exitFlag int32
}
)
func (app *application) Log() log.Logger {
return log.GetLogger()
}
func (app *application) Healthy() string {
if atomic.LoadInt32(&app.gateway.State().Processing) == 1 && atomic.LoadInt32(&app.gateway.State().Accepting) == 1 {
return StateHealthy
}
if atomic.LoadInt32(&app.gateway.State().Processing) == 1 {
return StateNoAccepting
}
if atomic.LoadInt32(&app.gateway.State().Accepting) == 1 {
return StateNoProgress
}
return StateUnavailable
}
func (app *application) Info() *Info {
return app.info
}
func (app *application) Http() *http.Server {
return app.http
}
func (app *application) Command() *cli.Server {
return app.command
}
func (app *application) Handle(path string, cb HandleFunc) {
if app.http != nil {
app.http.Handle(http.MethodPost, path, func(ctx *http.Context) (err error) {
return cb(ctx)
})
}
if app.command != nil {
app.command.Handle(path, "", func(ctx *cli.Context) (err error) {
return cb(ctx)
})
}
}
func (app *application) httpServe() (err error) {
var (
l net.Listener
)
app.http = http.New(app.ctx)
if l, err = app.gateway.Apply(
entry.Feature(http.MethodGet),
entry.Feature(http.MethodHead),
entry.Feature(http.MethodPost),
entry.Feature(http.MethodPut),
entry.Feature(http.MethodPatch),
entry.Feature(http.MethodDelete),
entry.Feature(http.MethodConnect),
entry.Feature(http.MethodOptions),
entry.Feature(http.MethodTrace),
); err != nil {
return
}
if app.opts.EnableDebug {
app.http.Handle(http.MethodGet, "/debug/pprof/", http.Wrap(pprof.Index))
app.http.Handle(http.MethodGet, "/debug/pprof/goroutine", http.Wrap(pprof.Index))
app.http.Handle(http.MethodGet, "/debug/pprof/heap", http.Wrap(pprof.Index))
app.http.Handle(http.MethodGet, "/debug/pprof/mutex", http.Wrap(pprof.Index))
app.http.Handle(http.MethodGet, "/debug/pprof/threadcreate", http.Wrap(pprof.Index))
app.http.Handle(http.MethodGet, "/debug/pprof/cmdline", http.Wrap(pprof.Cmdline))
app.http.Handle(http.MethodGet, "/debug/pprof/profile", http.Wrap(pprof.Profile))
app.http.Handle(http.MethodGet, "/debug/pprof/symbol", http.Wrap(pprof.Symbol))
app.http.Handle(http.MethodGet, "/debug/pprof/trace", http.Wrap(pprof.Trace))
}
timer := time.NewTimer(time.Millisecond * 200)
defer timer.Stop()
errChan := make(chan error, 1)
app.waitGroup.Go(func() {
select {
case errChan <- app.http.Serve(l):
}
})
select {
case err = <-errChan:
case <-timer.C:
if app.opts.EnableDirectHttp {
app.gateway.Direct(l)
}
}
return
}
func (app *application) commandServe() (err error) {
var (
l net.Listener
)
app.command = cli.New(app.ctx)
if l, err = app.gateway.Apply(
cli.Feature,
); err != nil {
return
}
timer := time.NewTimer(time.Millisecond * 200)
defer timer.Stop()
errChan := make(chan error, 1)
app.waitGroup.Go(func() {
select {
case errChan <- app.command.Serve(l):
}
})
select {
case err = <-errChan:
case <-timer.C:
if app.opts.EnableDirectCommand {
app.gateway.Direct(l)
}
}
return
}
func (app *application) gotoInteractive() (err error) {
var (
client *cli.Client
)
client = cli.NewClient(
app.ctx,
net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port)),
)
return client.Shell()
}
func (app *application) buildInfo() *Info {
info := &Info{
ID: app.opts.Name,
Name: app.opts.Name,
Version: app.opts.Version,
Status: StateHealthy,
Address: app.opts.Address,
Port: app.opts.Port,
Metadata: app.opts.Metadata,
}
if info.Metadata == nil {
info.Metadata = make(map[string]string)
}
info.Metadata["os"] = runtime.GOOS
info.Metadata["numOfCPU"] = strconv.Itoa(runtime.NumCPU())
info.Metadata["goVersion"] = runtime.Version()
info.Metadata["shortName"] = app.opts.ShortName()
info.Metadata["upTime"] = app.uptime.Format(time.DateTime)
return info
}
func (app *application) preStart() (err error) {
var (
addr string
)
app.ctx, app.cancelFunc = context.WithCancelCause(app.opts.Context)
if *cliFlag && !app.opts.DisableCommand {
if err = app.gotoInteractive(); err != nil {
fmt.Println(err)
os.Exit(1)
}
os.Exit(0)
}
app.info = app.buildInfo()
app.Log().Infof("server starting")
env.Set(EnvAppName, app.opts.ShortName())
env.Set(EnvAppVersion, app.opts.Version)
addr = net.JoinHostPort(app.opts.Address, strconv.Itoa(app.opts.Port))
app.Log().Infof("server listen on: %s", addr)
app.gateway = entry.New(addr)
if err = app.gateway.Start(app.ctx); err != nil {
return
}
if !app.opts.DisableHttp {
if err = app.httpServe(); err != nil {
return
}
}
if !app.opts.DisableCommand {
if err = app.commandServe(); err != nil {
return
}
}
app.plugins.Range(func(key, value any) bool {
if plugin, ok := value.(Plugin); ok {
if err = plugin.BeforeStart(); err != nil {
return false
}
}
return true
})
if app.opts.server != nil {
if err = app.opts.server.Start(app.ctx); err != nil {
app.Log().Warnf("server start error: %s", err.Error())
return
}
}
if !app.opts.DisableStateApi {
app.Handle("/-/run/state", func(ctx Context) (err error) {
return ctx.Success(State{
ID: app.opts.Name,
Name: app.opts.Name,
Version: app.opts.Version,
Uptime: time.Now().Sub(app.uptime).String(),
Gateway: app.gateway.State(),
})
})
app.Handle("/-/healthy", func(ctx Context) (err error) {
return ctx.Success(app.Healthy())
})
}
app.plugins.Range(func(key, value any) bool {
if plugin, ok := value.(Plugin); ok {
if err = plugin.AfterStart(); err != nil {
return false
}
}
return true
})
app.Log().Infof("server started")
return
}
func (app *application) preStop() (err error) {
if !atomic.CompareAndSwapInt32(&app.exitFlag, 0, 1) {
return
}
app.Log().Infof("server stopping")
if app.opts.server != nil {
if err = app.opts.server.Stop(); err != nil {
app.Log().Warnf("app server stop error: %s", err.Error())
}
}
app.cancelFunc(ErrStopping)
app.plugins.Range(func(key, value any) bool {
if plugin, ok := value.(Plugin); ok {
if err = plugin.BeforeStop(); err != nil {
return false
}
}
return true
})
if app.http != nil {
if err = app.http.Shutdown(); err != nil {
app.Log().Warnf("server http shutdown error: %s", err.Error())
}
}
if app.command != nil {
if err = app.command.Shutdown(); err != nil {
app.Log().Warnf("server command shutdown error: %s", err.Error())
}
}
if err = app.gateway.Stop(); err != nil {
app.Log().Warnf("server gateway shutdown error: %s", err.Error())
}
app.plugins.Range(func(key, value any) bool {
if plugin, ok := value.(Plugin); ok {
if err = plugin.AfterStop(); err != nil {
return false
}
}
return true
})
app.waitGroup.Wait()
app.Log().Infof("server stopped")
return
}
func (app *application) Use(plugin Plugin) (err error) {
var (
ok bool
)
if _, ok = app.plugins.Load(plugin.Name()); ok {
return fmt.Errorf("plugin %s already registered", plugin.Name())
}
if err = plugin.Mount(app.ctx); err != nil {
return
}
app.plugins.Store(plugin.Name(), plugin)
return
}
func (app *application) Run() (err error) {
if err = app.preStart(); err != nil {
return
}
ch := make(chan os.Signal, 1)
if app.opts.Signals == nil {
app.opts.Signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
}
signal.Notify(ch, app.opts.Signals...)
select {
case <-ch:
case <-app.ctx.Done():
}
return app.preStop()
}
func New(cbs ...Option) *application {
opts := NewOptions()
for _, cb := range cbs {
cb(opts)
}
app := &application{
opts: opts,
uptime: time.Now(),
}
return app
}