190 lines
4.0 KiB
Go
190 lines
4.0 KiB
Go
package aeusadmin
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"git.nobla.cn/golang/aeus-admin/models"
|
|
"git.nobla.cn/golang/rest"
|
|
"git.nobla.cn/golang/rest/types"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type (
|
|
recorder struct {
|
|
Domain string `json:"domain"`
|
|
User string `json:"user"`
|
|
Module string `json:"module"`
|
|
Table string `json:"table"`
|
|
Action string `json:"action"`
|
|
PrimaryKey any `json:"primary_key"`
|
|
Diffs []*types.DiffAttr `json:"diffs"`
|
|
}
|
|
)
|
|
|
|
type ActivityRecorder struct {
|
|
db *gorm.DB
|
|
ctx context.Context
|
|
batchTimeout time.Duration
|
|
BatchSize int
|
|
ch chan *models.Activity
|
|
}
|
|
|
|
func (s *ActivityRecorder) onAfterCreate(ctx context.Context, tx *gorm.DB, model any, diff []*types.DiffAttr) {
|
|
v := ctx.Value(rest.RuntimeScopeKey)
|
|
if v == nil {
|
|
return
|
|
}
|
|
runtimeScope, ok := v.(*types.RuntimeScope)
|
|
if !ok {
|
|
return
|
|
}
|
|
data := &models.Activity{}
|
|
data.Uid = runtimeScope.User
|
|
data.Module = runtimeScope.ModuleName
|
|
data.Table = runtimeScope.TableName
|
|
data.Action = types.ScenarioCreate
|
|
if buf, err := json.Marshal(diff); err == nil {
|
|
data.Data = string(buf)
|
|
}
|
|
select {
|
|
case s.ch <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *ActivityRecorder) onAfterUpdate(ctx context.Context, tx *gorm.DB, model any, diff []*types.DiffAttr) {
|
|
v := ctx.Value(rest.RuntimeScopeKey)
|
|
if v == nil {
|
|
return
|
|
}
|
|
runtimeScope, ok := v.(*types.RuntimeScope)
|
|
if !ok {
|
|
return
|
|
}
|
|
data := &models.Activity{}
|
|
data.Uid = runtimeScope.User
|
|
data.Module = runtimeScope.ModuleName
|
|
data.Table = runtimeScope.TableName
|
|
data.Action = types.ScenarioUpdate
|
|
if diff == nil {
|
|
diff = make([]*types.DiffAttr, 0)
|
|
}
|
|
diff = append(diff, &types.DiffAttr{
|
|
Column: "primary_key",
|
|
NewValue: runtimeScope.PrimaryKeyValue,
|
|
})
|
|
if buf, err := json.Marshal(diff); err == nil {
|
|
data.Data = string(buf)
|
|
}
|
|
select {
|
|
case s.ch <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *ActivityRecorder) onAfterDelete(ctx context.Context, tx *gorm.DB, model any) {
|
|
v := ctx.Value(rest.RuntimeScopeKey)
|
|
if v == nil {
|
|
return
|
|
}
|
|
runtimeScope, ok := v.(*types.RuntimeScope)
|
|
if !ok {
|
|
return
|
|
}
|
|
data := &models.Activity{}
|
|
data.Uid = runtimeScope.User
|
|
data.Module = runtimeScope.ModuleName
|
|
data.Table = runtimeScope.TableName
|
|
data.Action = types.ScenarioDelete
|
|
diff := make([]*types.DiffAttr, 0, 1)
|
|
diff = append(diff, &types.DiffAttr{
|
|
Column: "primary_key",
|
|
NewValue: runtimeScope.PrimaryKeyValue,
|
|
})
|
|
if buf, err := json.Marshal(diff); err == nil {
|
|
data.Data = string(buf)
|
|
}
|
|
select {
|
|
case s.ch <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *ActivityRecorder) batchWrite(values []*models.Activity) {
|
|
var (
|
|
err error
|
|
)
|
|
if len(values) <= 0 {
|
|
return
|
|
}
|
|
if err = s.db.Create(values).Error; err != nil {
|
|
for _, row := range values {
|
|
s.db.Create(row)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ActivityRecorder) writeLoop() {
|
|
var (
|
|
values []*models.Activity
|
|
)
|
|
timer := time.NewTimer(s.batchTimeout)
|
|
defer func() {
|
|
timer.Stop()
|
|
if len(values) > 0 {
|
|
s.batchWrite(values)
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
if len(values) > 0 {
|
|
s.batchWrite(values)
|
|
values = nil
|
|
}
|
|
timer.Reset(s.batchTimeout)
|
|
case msg, ok := <-s.ch:
|
|
if ok {
|
|
values = append(values, msg)
|
|
if len(values) > s.BatchSize {
|
|
s.batchWrite(values)
|
|
values = nil
|
|
timer.Reset(s.batchTimeout)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ActivityRecorder) Recoder(scenes ...string) {
|
|
if len(scenes) == 0 {
|
|
scenes = []string{types.ScenarioCreate, types.ScenarioUpdate, types.ScenarioDelete}
|
|
}
|
|
for _, str := range scenes {
|
|
switch str {
|
|
case types.ScenarioCreate:
|
|
rest.OnAfterCreate(s.onAfterCreate)
|
|
case types.ScenarioUpdate:
|
|
rest.OnAfterUpdate(s.onAfterUpdate)
|
|
case types.ScenarioDelete:
|
|
rest.OnAfterDelete(s.onAfterDelete)
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewActivityRecorder(ctx context.Context, db *gorm.DB) *ActivityRecorder {
|
|
s := &ActivityRecorder{
|
|
db: db,
|
|
ctx: ctx,
|
|
batchTimeout: time.Second,
|
|
BatchSize: 50,
|
|
ch: make(chan *models.Activity, 100),
|
|
}
|
|
go s.writeLoop()
|
|
return s
|
|
}
|