aeus-admin/activity.go

190 lines
4.0 KiB
Go

package aeusadmin
import (
"context"
"encoding/json"
"time"
"git.nobla.cn/golang/aeus-admin/models"
"git.nobla.cn/golang/rest"
"git.nobla.cn/golang/rest/types"
"gorm.io/gorm"
)
type (
recorder struct {
Domain string `json:"domain"`
User string `json:"user"`
Module string `json:"module"`
Table string `json:"table"`
Action string `json:"action"`
PrimaryKey any `json:"primary_key"`
Diffs []*types.DiffAttr `json:"diffs"`
}
)
type ActivityRecorder struct {
db *gorm.DB
ctx context.Context
batchTimeout time.Duration
BatchSize int
ch chan *models.Activity
}
func (s *ActivityRecorder) onAfterCreate(ctx context.Context, tx *gorm.DB, model any, diff []*types.DiffAttr) {
v := ctx.Value(rest.RuntimeScopeKey)
if v == nil {
return
}
runtimeScope, ok := v.(*types.RuntimeScope)
if !ok {
return
}
data := &models.Activity{}
data.Uid = runtimeScope.User
data.Module = runtimeScope.ModuleName
data.Table = runtimeScope.TableName
data.Action = types.ScenarioCreate
if buf, err := json.Marshal(diff); err == nil {
data.Data = string(buf)
}
select {
case s.ch <- data:
default:
}
}
func (s *ActivityRecorder) onAfterUpdate(ctx context.Context, tx *gorm.DB, model any, diff []*types.DiffAttr) {
v := ctx.Value(rest.RuntimeScopeKey)
if v == nil {
return
}
runtimeScope, ok := v.(*types.RuntimeScope)
if !ok {
return
}
data := &models.Activity{}
data.Uid = runtimeScope.User
data.Module = runtimeScope.ModuleName
data.Table = runtimeScope.TableName
data.Action = types.ScenarioUpdate
if diff == nil {
diff = make([]*types.DiffAttr, 0)
}
diff = append(diff, &types.DiffAttr{
Column: "primary_key",
NewValue: runtimeScope.PrimaryKeyValue,
})
if buf, err := json.Marshal(diff); err == nil {
data.Data = string(buf)
}
select {
case s.ch <- data:
default:
}
}
func (s *ActivityRecorder) onAfterDelete(ctx context.Context, tx *gorm.DB, model any) {
v := ctx.Value(rest.RuntimeScopeKey)
if v == nil {
return
}
runtimeScope, ok := v.(*types.RuntimeScope)
if !ok {
return
}
data := &models.Activity{}
data.Uid = runtimeScope.User
data.Module = runtimeScope.ModuleName
data.Table = runtimeScope.TableName
data.Action = types.ScenarioDelete
diff := make([]*types.DiffAttr, 0, 1)
diff = append(diff, &types.DiffAttr{
Column: "primary_key",
NewValue: runtimeScope.PrimaryKeyValue,
})
if buf, err := json.Marshal(diff); err == nil {
data.Data = string(buf)
}
select {
case s.ch <- data:
default:
}
}
func (s *ActivityRecorder) batchWrite(values []*models.Activity) {
var (
err error
)
if len(values) <= 0 {
return
}
if err = s.db.Create(values).Error; err != nil {
for _, row := range values {
s.db.Create(row)
}
}
}
func (s *ActivityRecorder) writeLoop() {
var (
values []*models.Activity
)
timer := time.NewTimer(s.batchTimeout)
defer func() {
timer.Stop()
if len(values) > 0 {
s.batchWrite(values)
}
}()
for {
select {
case <-s.ctx.Done():
return
case <-timer.C:
if len(values) > 0 {
s.batchWrite(values)
values = nil
}
timer.Reset(s.batchTimeout)
case msg, ok := <-s.ch:
if ok {
values = append(values, msg)
if len(values) > s.BatchSize {
s.batchWrite(values)
values = nil
timer.Reset(s.batchTimeout)
}
}
}
}
}
func (s *ActivityRecorder) Recoder(scenes ...string) {
if len(scenes) == 0 {
scenes = []string{types.ScenarioCreate, types.ScenarioUpdate, types.ScenarioDelete}
}
for _, str := range scenes {
switch str {
case types.ScenarioCreate:
rest.OnAfterCreate(s.onAfterCreate)
case types.ScenarioUpdate:
rest.OnAfterUpdate(s.onAfterUpdate)
case types.ScenarioDelete:
rest.OnAfterDelete(s.onAfterDelete)
}
}
}
func NewActivityRecorder(ctx context.Context, db *gorm.DB) *ActivityRecorder {
s := &ActivityRecorder{
db: db,
ctx: ctx,
batchTimeout: time.Second,
BatchSize: 50,
ch: make(chan *models.Activity, 100),
}
go s.writeLoop()
return s
}