approveflow/app/provider/flow_instance/model/approval_instance.go

628 lines
19 KiB
Go

package model
import (
"approveflow/app/base"
"approveflow/app/provider/abstract/connect"
"approveflow/app/provider/flow_definition"
"approveflow/app/utils"
"context"
"encoding/json"
"fmt"
"github.com/Superdanda/hade/framework/gin"
"github.com/pkg/errors"
"sync"
)
const (
StatusCreated = "Created" // 审批实例已创建
StatusReject = "Reject" // 审批实例已创建
StatusInProgress = "InProgress" // 审批进行中
StatusCompleted = "Completed" // 审批完成
StatusCancelled = "Cancelled" // 审批取消
)
// ApprovalInstance 审批实例表
type ApprovalInstance struct {
ID int64 `gorm:"primaryKey;autoIncrement" json:"id"` // 主键ID
FlowID int64 `gorm:"type:bigint;index;not null" json:"flow_id"` // 流程ID
ApplicantKey string `gorm:"type:bigint;index;not null" json:"approver_id"` //申请人ID
CreatorKey string `gorm:"type:bigint;not null" json:"creator_id"` // 创建者ID
Status string `gorm:"-" json:"status"` // 审批状态
CurrentStepIDs []*CurrentStep `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE" json:"current_step_ids"` // 当前步骤ID
Steps []*InstanceStep `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE" json:"steps"` // 实例步骤
StatusEvents []*InstanceStatusEvent `gorm:"foreignKey:InstanceID;constraint:OnDelete:CASCADE" json:"status_events"` // 实例步骤
Data string `gorm:"type:json" json:"data"`
NodeMap map[string]connect.AbstractNode `gorm:"-" json:"node_map"` // 保存审批数据的 JSON
TenantKey string `gorm:"type:varchar(50);not null" json:"tenant_key"`
AssociateKey string `gorm:"type:varchar(50);not null" json:"associate_key"`
sync.RWMutex
base.Model
}
// InstanceStatusEvent 审批实例状态表
type InstanceStatusEvent struct {
InstanceID int64 `gorm:"type:bigint;not null" json:"instance_id"`
Status string `gorm:"type:varchar(50);not null" json:"status"`
Extension string `gorm:"type:varchar(50);not null" json:"extension"`
base.Model
}
func (instance *ApprovalInstance) CheckIfComplete() bool {
currentSteps := instance.GetCurrentSteps()
if instance.GetStatus() == StatusCompleted {
return true
}
if len(currentSteps) == 1 && currentSteps[0].StepCode == flow_definition.StepEnd {
currentSteps[0].AddCompletedEvent()
instance.AddCompletedEvent()
return true
}
return false
}
func (instance *ApprovalInstance) MarshalBinary() ([]byte, error) {
return json.Marshal(instance)
}
func (instance *ApprovalInstance) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, instance)
}
func (instance *ApprovalInstance) GetNodes() []connect.AbstractNode {
return utils.ConvertToAbstractNodes(instance.Steps, func(step *InstanceStep) connect.AbstractNode {
return step
})
}
func (instance *ApprovalInstance) SetNodes(nodes []connect.AbstractNode) {
specificType, _ := utils.ConvertToSpecificType[*InstanceStep, connect.AbstractNode](nodes, func(node connect.AbstractNode) (*InstanceStep, bool) {
return node.(*InstanceStep), true
})
instance.Steps = specificType
}
func (instance *ApprovalInstance) GetNodeMap() map[string]connect.AbstractNode {
if instance.NodeMap == nil {
instance.NodeMap = map[string]connect.AbstractNode{}
connect.InitializeNodeMap(instance)
}
return instance.NodeMap
}
func (instance *ApprovalInstance) NewNodePathConfig(fromNodeKey, toNodeKey string) connect.AbstractNodePathConfig {
fromStep, err := instance.GetStepByKey(fromNodeKey)
if err != nil {
return nil
}
return NewDynamicPathConfig(instance.ID, fromStep.ID, fromNodeKey, toNodeKey, false)
}
// CurrentStep 目前审批中的节点
type CurrentStep struct {
CurrentStepId int64 `gorm:"type:bigint;index;not null" json:"current_step_id"`
InstanceID int64 `gorm:"type:bigint;index;not null" json:"instance_id"`
base.Model
}
func (current *CurrentStep) MarshalBinary() ([]byte, error) {
return json.Marshal(current)
}
func (current *CurrentStep) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, current)
}
// NewApprovalInstance 初始化创建一个审批流实例
func NewApprovalInstance(
ctx *gin.Context,
approvalFlow *flow_definition.ApprovalFlow,
data map[string]interface{}) (*ApprovalInstance, error) {
bytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
instance := &ApprovalInstance{
FlowID: approvalFlow.ID,
ApplicantKey: data[base.ApplicantKey].(string),
CreatorKey: data[base.CreatorKey].(string),
Steps: []*InstanceStep{},
Data: string(bytes),
}
// 保存租户键和关联键
if value, ok := data[base.TenantKey]; ok {
instance.TenantKey = value.(string)
}
if value, ok := data[base.AssociateKey]; ok {
instance.TenantKey = value.(string)
}
// Get the first step of the approval flow
firstStep, err := approvalFlow.FirstStep()
if err != nil {
return nil, err
}
// Build the steps recursively
processedSteps := make(map[string]*InstanceStep)
err = buildInstanceSteps(ctx, approvalFlow, instance, firstStep, nil, data, processedSteps)
if err != nil {
return nil, err
}
// 赋予创建状态
instance.AddCreatedEvent()
return instance, nil
}
// buildInstanceSteps recursively builds instance steps from approval steps
func buildInstanceSteps(ctx *gin.Context, approvalFlow *flow_definition.ApprovalFlow,
instance *ApprovalInstance, currentApprovalStep *flow_definition.ApprovalStep,
previousInstanceStep *InstanceStep, data map[string]interface{}, processedSteps map[string]*InstanceStep) error {
// Check if this step has already been processed to avoid cycles
if existingInstanceStep, ok := processedSteps[currentApprovalStep.Key]; ok {
if previousInstanceStep != nil {
connect.InsertNodeAfter(instance, previousInstanceStep, existingInstanceStep)
}
return nil
}
var approverKey string
if currentApprovalStep.DynamicConfig != nil {
approver, err := currentApprovalStep.DynamicConfig.GetApprover(ctx, data)
if err != nil {
approverKey = ""
} else {
approverKey = approver.Key
}
} else {
approverKey = ""
}
currentInstanceStep := NewInstanceStep(currentApprovalStep, approverKey, false)
// Insert the instance step into the instance
if previousInstanceStep == nil {
connect.InsertNodeFirst(instance, currentInstanceStep)
} else {
connect.InsertNodeAfter(instance, previousInstanceStep, currentInstanceStep)
}
// Mark this step as processed
processedSteps[currentApprovalStep.Key] = currentInstanceStep
// Get the next steps and recursively build them
flowStep, err := approvalFlow.GetStep(currentInstanceStep.StepID)
nextFlowSteps, err := connect.GetNextNodes(approvalFlow, flowStep)
if err != nil {
return err
}
for _, nextStep := range nextFlowSteps {
nextApprovalStep := nextStep.(*flow_definition.ApprovalStep)
err := buildInstanceSteps(ctx, approvalFlow, instance, nextApprovalStep, currentInstanceStep, data, processedSteps)
if err != nil {
return err
}
}
return nil
}
// SendApprovalNotification 发送审批通知给第一个审批人的方法
func (instance *ApprovalInstance) SendApprovalNotification(step *InstanceStep, ctx context.Context) error {
// 向消息队列发送审批开始事件
//userService := container.MustMake(userProvider.Key).(userProvider.Service)
//approver, err := userService.GetUserByID(step.ApproverID)
//if err != nil {
// return fmt.Errorf("获取审批人失败: %v", err)
//}
//
//// 构建通知内容
//notification := fmt.Sprintf("审批流程已启动,请您审批步骤: %s", step.Name)
//
//// 发送通知
//err = sendNotificationToUser(approver, notification)
//if err != nil {
// return fmt.Errorf("发送通知失败: %v", err)
//}
return nil
}
// Start 开始审批流程,设置第一个步骤为当前步骤
func (instance *ApprovalInstance) Start(ctx context.Context) error {
instance.RLock()
defer instance.RUnlock()
if instance.GetStatus() != StatusCreated && instance.GetStatus() != StatusReject {
return fmt.Errorf("审批流程已启动或已完成,无法再次启动")
}
// 获取第一个审批步骤
firstStep, err := instance.getFirstStep()
if err != nil {
return err
}
// 设置当前步骤为第一个步骤,并更改状态
instance.AddCurrentStepID(firstStep)
instance.AddInProgressEvent()
err = instance.SendApprovalNotification(firstStep, ctx)
if err != nil {
return err
}
return nil
}
// getFirstStep 获取第一个审批步骤
func (instance *ApprovalInstance) getFirstStep() (*InstanceStep, error) {
for _, step := range instance.Steps {
if step.StepCode == flow_definition.StepStart {
return step, nil
}
}
return nil, errors.New("未找到起点")
}
// MoveToNextStep 移动到下一个步骤
func (instance *ApprovalInstance) MoveToNextStep() error {
instance.Lock()
defer instance.Unlock()
currentStepSlice := instance.GetCurrentSteps()
if currentStepSlice == nil || len(currentStepSlice) == 0 {
return fmt.Errorf("当前步骤不存在")
}
dataMap, err := instance.DataToMap()
if err != nil {
return err
}
currentStep := currentStepSlice[0]
// 完成当前步骤
currentStep.AddCompletedEvent()
// 获取下一个步骤
var nextStep *InstanceStep
nextStepArr, err := instance.getNextStep(currentStep, dataMap)
if nextStepArr != nil && len(nextStepArr) == 1 {
nextStep = nextStepArr[0]
} else {
return errors.New("暂时不支持并行审批")
}
if err != nil {
instance.AddCompletedEvent() // 流程结束
return nil
}
// 设置新的当前步骤
instance.RemoveCurrentStepID(currentStep.ID)
instance.AddCurrentStepID(nextStep)
nextStep.AddPendingEvent()
return nil
}
// getNextStep 获取下一个步骤
func (instance *ApprovalInstance) getNextStep(currentStep *InstanceStep, data map[string]interface{}) ([]*InstanceStep, error) {
pathConfigs := currentStep.InstancePathConfigs
var nextSteps []*InstanceStep
for _, pathConfig := range pathConfigs {
met, err := pathConfig.IsConditionMet(data)
if err != nil || !met {
continue
}
nextStep, err := instance.GetStepByKey(pathConfig.GetToNodeKey())
if err != nil {
return nil, err
}
nextSteps = append(nextSteps, nextStep)
}
return nextSteps, nil
}
func (instance *ApprovalInstance) findStepByID(id int64) (*InstanceStep, error) {
for _, step := range instance.Steps {
if step.ID == id {
return step, nil
}
}
return nil, nil
}
func (instance *ApprovalInstance) findStepByKey(key string) (*InstanceStep, error) {
for _, step := range instance.Steps {
if step.Key == key {
return step, nil
}
}
return nil, nil
}
func (instance *ApprovalInstance) getStepByFlowStepID(flowStepId int64) *InstanceStep {
for _, step := range instance.Steps {
if step.StepID == flowStepId {
return step
}
}
return nil
}
// DataToMap 将 Data 字段转为 map[string]interface{}
func (instance *ApprovalInstance) DataToMap() (map[string]interface{}, error) {
var dataMap map[string]interface{}
if err := json.Unmarshal([]byte(instance.Data), &dataMap); err != nil {
return nil, fmt.Errorf("解析 Data 字段失败: %v", err)
}
return dataMap, nil
}
func (instance *ApprovalInstance) GetStep(id int64) (*InstanceStep, error) {
for _, step := range instance.Steps {
if step.ID == id {
return step, nil
}
}
return nil, fmt.Errorf("step with ID %d not found in approval flow", id)
}
func (instance *ApprovalInstance) GetStepByKey(key string) (*InstanceStep, error) {
for _, step := range instance.Steps {
if step.Key == key {
return step, nil
}
}
return nil, fmt.Errorf("step with key %s not found in approval flow", key)
}
// AddCurrentStepID adds a new CurrentStep to the CurrentStepIDs slice if it does not already exist
func (instance *ApprovalInstance) AddCurrentStepID(step *InstanceStep) {
// Check if the step already exists
for _, s := range instance.CurrentStepIDs {
if s.CurrentStepId == step.ID {
return // Step already exists, do nothing
}
}
// Add the new step
instance.CurrentStepIDs = append(instance.CurrentStepIDs, &CurrentStep{CurrentStepId: step.ID, InstanceID: instance.ID})
}
// RemoveCurrentStepID removes a CurrentStep from the CurrentStepIDs slice based on its ID
func (instance *ApprovalInstance) RemoveCurrentStepID(stepID int64) {
// Find the index of the step to remove
for i, s := range instance.CurrentStepIDs {
if s.CurrentStepId == stepID {
// Remove the step by slicing
s.Delete()
instance.CurrentStepIDs = append(instance.CurrentStepIDs[:i], instance.CurrentStepIDs[i+1:]...)
return
}
}
}
// HasCurrentStepID checks if a CurrentStep with a given ID exists in the CurrentStepIDs slice
func (instance *ApprovalInstance) HasCurrentStepID(stepID int64) bool {
for _, s := range instance.CurrentStepIDs {
if s.CurrentStepId == stepID {
return true
}
}
return false
}
// GetCurrentSteps returns a copy of the CurrentSteps slice
func (instance *ApprovalInstance) GetCurrentSteps() []*InstanceStep {
// Return a copy of the slice to avoid modifications
var currentSteps []*InstanceStep
for _, step := range instance.Steps {
if instance.HasCurrentStepID(step.ID) {
currentSteps = append(currentSteps, step)
}
}
return currentSteps
}
func (instance *ApprovalInstance) GetCurrentStepsById(id int64) *InstanceStep {
currentSteps := instance.GetCurrentSteps()
for _, step := range currentSteps {
if step.ID == id {
return step
}
}
return nil
}
func (instance *ApprovalInstance) ExecuteApprovalReversal(reversal *ApprovalReversal) error {
step, err := instance.GetStepByKey(reversal.StepKey)
if err != nil {
return err
}
currentStep := instance.GetCurrentStepsById(step.ID)
if currentStep == nil {
return errors.New("当前反转节点不是正在审批的节点")
}
switch reversal.FixAction {
case FixActionReApproveAndUpdateData:
err := currentStep.Reject(reversal.Reason)
if err != nil {
return err
}
// 移除当前所有节点,替换为开始节点
for _, step := range instance.GetCurrentSteps() {
instance.RemoveCurrentStepID(step.ID)
}
reversalStep, err := instance.GetStepByKey(reversal.ReversedStepKey)
if err != nil {
return err
}
for _, step := range instance.Steps {
// 所有节点重新加入待审批事件
if step.GetStatus() == StepStatusApproved {
// 如果是通过的话要再审批
step.AddPendingEvent()
}
}
instance.AddCurrentStepID(reversalStep)
}
return nil
}
func (instance *ApprovalInstance) Reversal(step *InstanceStep, reversedStep *InstanceStep, reason string, fixAction string) error {
reversal := NewReversal(step, reversedStep, reason, fixAction)
step.addApprovalReversal(reversal)
instance.AddRejectEvent()
return instance.ExecuteApprovalReversal(reversal)
}
// ExecuteApprovalStep 执行审批步骤规则
func (instance *ApprovalInstance) ExecuteApprovalStep(approvalFlow *flow_definition.ApprovalFlow) error {
currentSteps := instance.GetCurrentSteps()
for _, instanceStep := range currentSteps {
// 只对待审批的节点进行处理
if instanceStep.GetStatus() != StepStatusPending {
continue
}
flowStep, err := approvalFlow.GetStep(instanceStep.StepID)
if err != nil {
continue
}
for _, rule := range flowStep.Rules {
// 评估条件表达式是否满足
dataToMap, err := instance.DataToMap()
if err != nil {
continue
}
conditionMet, err := rule.EvaluateCondition(dataToMap)
if err != nil {
return err
}
if conditionMet {
switch rule.Action {
case flow_definition.ActionAutoApprove:
// 自动通过
err := instanceStep.Approve("自动通过")
if err != nil {
return err
}
return nil
//case ActionAutoReject:
// // 自动驳回
// err := instanceService.RejectStep(ctx, instance, step, "自动驳回")
// if err != nil {
// return err
// }
// return nil
//case ActionReassignApprover:
// // 重新指定审批人
// newApprover, err := getNewApprover(container, data)
// if err != nil {
// return err
// }
// step.ApproverID = newApprover.ID
// // 更新步骤信息
// err = updateApprovalStep(step)
// if err != nil {
// return err
// }
// return nil
// 可以添加更多的 Action 处理逻辑
default:
return fmt.Errorf("unsupported action: %s", rule.Action)
}
}
}
}
return instance.CheckIfMoveToNextStep(currentSteps)
}
func (instance *ApprovalInstance) CheckIfMoveToNextStep(currentSteps []*InstanceStep) error {
if instance.CheckIfComplete() {
return errors.New("当前审批流已结束")
}
// 如果当前节点所有的处理完毕,那么当前节点将向后移动
currentIsCompleted := true
for _, step := range currentSteps {
if step.GetStatus() != StepStatusCompleted && step.GetStatus() != StepStatusApproved {
currentIsCompleted = false
}
}
if currentIsCompleted {
err := instance.MoveToNextStep()
if err != nil {
return err
}
instance.CheckIfComplete()
return nil
}
return nil
}
func (instance *ApprovalInstance) GetPathByFromStepKey(fromStepKey string) (*InstancePathConfig, error) {
return nil, nil
}
func (instance *ApprovalInstance) GetStartStep() *InstanceStep {
for _, step := range instance.Steps {
if step.StepCode == flow_definition.StepStart {
return step
}
}
return nil
}
func (instance *ApprovalInstance) GetEndStep() *InstanceStep {
for _, step := range instance.Steps {
if step.StepCode == flow_definition.StepEnd {
return step
}
}
return nil
}
func (instance *ApprovalInstance) GetStatus() string {
return instance.StatusEvents[len(instance.StatusEvents)-1].Status
}
func (instance *ApprovalInstance) AddStatusEvent(status string) {
if instance.StatusEvents == nil {
instance.StatusEvents = make([]*InstanceStatusEvent, 0)
}
if len(instance.StatusEvents) > 0 && instance.GetStatus() == status {
return
}
instance.StatusEvents = append(instance.StatusEvents, &InstanceStatusEvent{
InstanceID: instance.ID, Status: status, Extension: "",
})
instance.Status = status
}
func (instance *ApprovalInstance) AddCreatedEvent() {
instance.AddStatusEvent(StatusCreated)
}
func (instance *ApprovalInstance) AddRejectEvent() {
instance.AddStatusEvent(StatusReject)
}
func (instance *ApprovalInstance) AddInProgressEvent() {
instance.AddStatusEvent(StatusInProgress)
}
func (instance *ApprovalInstance) AddCompletedEvent() {
instance.AddStatusEvent(StatusCompleted)
}
func (instance *ApprovalInstance) AddCancelledEvent() {
instance.AddStatusEvent(StatusCancelled)
}