From 94cd32b520a0dfc18507a42237150203a7905ae4 Mon Sep 17 00:00:00 2001 From: dandan <1033719135@qq.com> Date: Sun, 17 Nov 2024 21:38:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=87=E4=BB=BD=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/http/base/param.go | 7 ++ app/http/base/res/res.go | 23 ++++++ app/http/base/response.go | 77 +++++++++++++++++++ app/http/base/result.go | 46 +++++++++++ app/http/kernel.go | 5 +- app/http/module/user/api.go | 1 + app/http/module/user/api_user_amount.go | 36 +++++++++ app/http/subscribe.go | 18 +++++ app/http/swagger/docs.go | 67 ++++++++++++++++ app/http/swagger/swagger.json | 67 ++++++++++++++++ app/http/swagger/swagger.yaml | 44 +++++++++++ app/provider/user/contract.go | 9 ++- app/provider/user/model_test.go | 17 ++++ app/provider/user/service.go | 38 ++++++--- app/utils/collection.go | 12 +++ app/utils/convert.go | 47 +++++++++++ config/development/app.yaml | 4 +- config/development/kafka.yaml | 66 ++++++++-------- config/development/queue.yaml | 2 +- framework/command/app.go | 59 ++++++++------ framework/contract/queue.go | 17 ++++ framework/provider/kafka/config.go | 12 +-- framework/provider/kafka/service.go | 11 +-- .../queue/services/default_service.go | 32 ++++++-- .../provider/queue/services/kafka_service.go | 33 +++++++- test/env.go | 36 ++++++++- 26 files changed, 697 insertions(+), 89 deletions(-) create mode 100644 app/http/base/param.go create mode 100644 app/http/base/res/res.go create mode 100644 app/http/base/response.go create mode 100644 app/http/base/result.go create mode 100644 app/http/module/user/api_user_amount.go create mode 100644 app/http/subscribe.go create mode 100644 app/provider/user/model_test.go create mode 100644 app/utils/collection.go create mode 100644 app/utils/convert.go diff --git a/app/http/base/param.go b/app/http/base/param.go new file mode 100644 index 0000000..dd71c60 --- /dev/null +++ b/app/http/base/param.go @@ -0,0 +1,7 @@ +package base + +// PageRequest 分页请求结构体 +type PageRequest struct { + PageNumber int `json:"page_number"` // 当前页码,默认是第一页 + PageSize int `json:"page_size"` // 每页记录数,默认是10条 +} diff --git a/app/http/base/res/res.go b/app/http/base/res/res.go new file mode 100644 index 0000000..b509f28 --- /dev/null +++ b/app/http/base/res/res.go @@ -0,0 +1,23 @@ +package res + +import ( + "github.com/Superdanda/hade/app/http/base" + "github.com/Superdanda/hade/framework/gin" + "net/http" +) + +func FailWithErr(ctx *gin.Context, err error) { + ctx.ISetStatus(http.StatusInternalServerError).IJson(base.Fail(err.Error())) +} + +func Fail(ctx *gin.Context) { + ctx.ISetStatus(http.StatusInternalServerError).IJson(base.Fail("操作失败")) +} + +func Success(ctx *gin.Context) { + ctx.ISetStatus(http.StatusOK).IJson(base.SuccessWithOKMessage()) +} + +func SuccessWithData(ctx *gin.Context, data interface{}) { + ctx.ISetStatus(http.StatusOK).IJson(base.Success(data)) +} diff --git a/app/http/base/response.go b/app/http/base/response.go new file mode 100644 index 0000000..af7215e --- /dev/null +++ b/app/http/base/response.go @@ -0,0 +1,77 @@ +package base + +import "fmt" + +// PageResponse 分页响应结构体 +type PageResponse struct { + TotalRecords int64 `json:"total_records"` // 总记录数 + TotalPages int `json:"total_pages"` // 总页数 + CurrentPage int `json:"current_page"` // 当前页码 + RecordsPerPage int `json:"records_per_page"` // 每页记录数 + Data interface{} `json:"data"` // 当前页的数据,可以是任意类型的切片 +} + +// NewPageResponseWithPageRequest PageResponse构造方法 +func NewPageResponseWithPageRequest(totalRecords int64, pageRequest PageRequest, data interface{}) *PageResponse { + pr := &PageResponse{ + TotalRecords: totalRecords, + RecordsPerPage: pageRequest.PageSize, + CurrentPage: pageRequest.PageNumber, + Data: data, + } + // 计算总页数 + pr.CalculateTotalPages() + return pr +} + +// NewPageResponse PageResponse构造方法 +func NewPageResponse(totalRecords int64, recordsPerPage int, currentPage int, data interface{}) *PageResponse { + pr := &PageResponse{ + TotalRecords: totalRecords, + RecordsPerPage: recordsPerPage, + CurrentPage: currentPage, + Data: data, + } + // 计算总页数 + pr.CalculateTotalPages() + return pr +} + +// CalculateTotalPages 计算总页数 +func (pr *PageResponse) CalculateTotalPages() { + if pr.TotalRecords == 0 { + pr.TotalPages = 0 + } else { + pr.TotalPages = int((pr.TotalRecords + int64(pr.RecordsPerPage) - 1) / int64(pr.RecordsPerPage)) + } +} + +// HasNextPage 判断当前页是否有下一页 +func (pr *PageResponse) HasNextPage() bool { + return pr.CurrentPage < pr.TotalPages +} + +// HasPrevPage 判断当前页是否有上一页 +func (pr *PageResponse) HasPrevPage() bool { + return pr.CurrentPage > 1 +} + +// GetStartIndex 获取分页的起始索引 +func (pr *PageResponse) GetStartIndex() int { + return (pr.CurrentPage - 1) * pr.RecordsPerPage +} + +// GetEndIndex 获取分页的结束索引 +func (pr *PageResponse) GetEndIndex() int { + endIndex := pr.CurrentPage * pr.RecordsPerPage + if int64(endIndex) > pr.TotalRecords { + endIndex = int(pr.TotalRecords) + } + return endIndex +} + +// PrintPageInfo 打印分页信息 +func (pr *PageResponse) PrintPageInfo() { + fmt.Printf("Page %d of %d pages. Showing records %d to %d of %d total records.\n", + pr.CurrentPage, pr.TotalPages, pr.GetStartIndex()+1, pr.GetEndIndex(), pr.TotalRecords) +} diff --git a/app/http/base/result.go b/app/http/base/result.go new file mode 100644 index 0000000..84c9532 --- /dev/null +++ b/app/http/base/result.go @@ -0,0 +1,46 @@ +package base + +type Result struct { + Code int `json:"code"` + Success bool `json:"success"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +// Success 方法,封装成功响应的结构体 +func Success(data interface{}) Result { + return Result{ + Code: 1, + Success: true, + Message: "Success", + Data: data, + } +} + +// SuccessWithMessage 方法,封装成功响应的结构体 +func SuccessWithMessage(message string) Result { + return Result{ + Code: 1, + Success: true, + Message: message, + } +} + +// SuccessWithOKMessage 方法,封装成功响应的结构体, message 操作成功 +func SuccessWithOKMessage() Result { + return Result{ + Code: 1, + Success: true, + Message: "操作成功", + } +} + +// Fail 方法,封装失败响应的结构体 +func Fail(message string) Result { + return Result{ + Code: 0, + Success: false, + Message: message, + Data: nil, + } +} diff --git a/app/http/kernel.go b/app/http/kernel.go index fddccda..e8ccfb3 100644 --- a/app/http/kernel.go +++ b/app/http/kernel.go @@ -8,7 +8,7 @@ import ( ) // NewHttpEngine 创建了一个绑定了路由的Web引擎 -func NewHttpEngine(container *framework.HadeContainer) (*gin.Engine, error) { +func NewHttpEngine(container framework.Container) (*gin.Engine, error) { // 设置为Release,为的是默认在启动中不输出调试信息 gin.SetMode(gin.ReleaseMode) // 默认启动一个Web引擎 @@ -28,5 +28,8 @@ func NewHttpEngine(container *framework.HadeContainer) (*gin.Engine, error) { // 业务绑定路由操作 Routes(r) + + //注册消息队列事件 + SubscribeEvent(container) return r, nil } diff --git a/app/http/module/user/api.go b/app/http/module/user/api.go index b9ece44..238717e 100644 --- a/app/http/module/user/api.go +++ b/app/http/module/user/api.go @@ -25,6 +25,7 @@ func RegisterRoutes(r *gin.Engine) error { userGroup.POST("/login", api.UserLogin) userGroup.POST("/get", api.UserGet) userGroup.POST("/save", api.UserSave) + userGroup.POST("/amount", api.ChangeAmount) } } diff --git a/app/http/module/user/api_user_amount.go b/app/http/module/user/api_user_amount.go new file mode 100644 index 0000000..232195b --- /dev/null +++ b/app/http/module/user/api_user_amount.go @@ -0,0 +1,36 @@ +package user + +import ( + "github.com/Superdanda/hade/app/provider/user" + "github.com/Superdanda/hade/app/utils" + "github.com/Superdanda/hade/framework/gin" + "net/http" +) + +type ChangeAmountParam struct { + UserId int64 `json:"user_id"` + Amount int64 `json:"amount"` +} + +// ChangeAmount 更改金额 +// @Summary 更改金额 +// @Description 更改金额 +// @ID ChangeAmount +// @Tags ChangeAmount +// @Accept json +// @Produce json +// @Param ChangeAmountParam body ChangeAmountParam true "查询详情请求参数" +// @Success 200 {object} base.Result "返回成功的流程定义数据" +// @Failure 500 {object} base.Result "操作失败" +// @Router /user/amount [post] +func (api *UserApi) ChangeAmount(context *gin.Context) { + + param := utils.QuickBind[ChangeAmountParam](context) + + userService := context.MustMake(user.UserKey).(user.Service) + err := userService.AddAmount(context, param.UserId, param.Amount) + if err != nil { + return + } + context.JSON(http.StatusOK, gin.H{"code": 0, "data": nil}) +} diff --git a/app/http/subscribe.go b/app/http/subscribe.go new file mode 100644 index 0000000..b04e10a --- /dev/null +++ b/app/http/subscribe.go @@ -0,0 +1,18 @@ +package http + +import ( + "github.com/Superdanda/hade/app/provider/user" + "github.com/Superdanda/hade/framework" + "github.com/Superdanda/hade/framework/contract" +) + +func SubscribeEvent(container framework.Container) error { + queueService := container.MustMake(contract.QueueKey).(contract.QueueService) + userService := container.MustMake(user.UserKey).(user.Service) + + queueService.RegisterSubscribe(user.ChangeAmountTopic, func(event contract.Event) error { + return userService.ChangeAmount(queueService.GetContext(), event.Payload()) + }) + + return nil +} diff --git a/app/http/swagger/docs.go b/app/http/swagger/docs.go index 05c5dc9..3b785b5 100644 --- a/app/http/swagger/docs.go +++ b/app/http/swagger/docs.go @@ -70,9 +70,65 @@ const docTemplate = `{ } } } + }, + "/user/amount": { + "post": { + "description": "更改金额", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ChangeAmount" + ], + "summary": "更改金额", + "operationId": "ChangeAmount", + "parameters": [ + { + "description": "查询详情请求参数", + "name": "ChangeAmountParam", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/user.ChangeAmountParam" + } + } + ], + "responses": { + "200": { + "description": "返回成功的流程定义数据", + "schema": { + "$ref": "#/definitions/base.Result" + } + }, + "500": { + "description": "操作失败", + "schema": { + "$ref": "#/definitions/base.Result" + } + } + } + } } }, "definitions": { + "base.Result": { + "type": "object", + "properties": { + "code": { + "type": "integer" + }, + "data": {}, + "message": { + "type": "string" + }, + "success": { + "type": "boolean" + } + } + }, "demo.UserDTO": { "type": "object", "properties": { @@ -83,6 +139,17 @@ const docTemplate = `{ "type": "string" } } + }, + "user.ChangeAmountParam": { + "type": "object", + "properties": { + "amount": { + "type": "integer" + }, + "user_id": { + "type": "integer" + } + } } }, "securityDefinitions": { diff --git a/app/http/swagger/swagger.json b/app/http/swagger/swagger.json index 49fd507..214c4e2 100644 --- a/app/http/swagger/swagger.json +++ b/app/http/swagger/swagger.json @@ -63,9 +63,65 @@ } } } + }, + "/user/amount": { + "post": { + "description": "更改金额", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ChangeAmount" + ], + "summary": "更改金额", + "operationId": "ChangeAmount", + "parameters": [ + { + "description": "查询详情请求参数", + "name": "ChangeAmountParam", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/user.ChangeAmountParam" + } + } + ], + "responses": { + "200": { + "description": "返回成功的流程定义数据", + "schema": { + "$ref": "#/definitions/base.Result" + } + }, + "500": { + "description": "操作失败", + "schema": { + "$ref": "#/definitions/base.Result" + } + } + } + } } }, "definitions": { + "base.Result": { + "type": "object", + "properties": { + "code": { + "type": "integer" + }, + "data": {}, + "message": { + "type": "string" + }, + "success": { + "type": "boolean" + } + } + }, "demo.UserDTO": { "type": "object", "properties": { @@ -76,6 +132,17 @@ "type": "string" } } + }, + "user.ChangeAmountParam": { + "type": "object", + "properties": { + "amount": { + "type": "integer" + }, + "user_id": { + "type": "integer" + } + } } }, "securityDefinitions": { diff --git a/app/http/swagger/swagger.yaml b/app/http/swagger/swagger.yaml index adaf575..fd4a0a4 100644 --- a/app/http/swagger/swagger.yaml +++ b/app/http/swagger/swagger.yaml @@ -1,5 +1,15 @@ basePath: / definitions: + base.Result: + properties: + code: + type: integer + data: {} + message: + type: string + success: + type: boolean + type: object demo.UserDTO: properties: id: @@ -7,6 +17,13 @@ definitions: name: type: string type: object + user.ChangeAmountParam: + properties: + amount: + type: integer + user_id: + type: integer + type: object info: contact: email: yejianfeng @@ -50,6 +67,33 @@ paths: summary: 获取所有学生 tags: - demo + /user/amount: + post: + consumes: + - application/json + description: 更改金额 + operationId: ChangeAmount + parameters: + - description: 查询详情请求参数 + in: body + name: ChangeAmountParam + required: true + schema: + $ref: '#/definitions/user.ChangeAmountParam' + produces: + - application/json + responses: + "200": + description: 返回成功的流程定义数据 + schema: + $ref: '#/definitions/base.Result' + "500": + description: 操作失败 + schema: + $ref: '#/definitions/base.Result' + summary: 更改金额 + tags: + - ChangeAmount securityDefinitions: ApiKeyAuth: in: header diff --git a/app/provider/user/contract.go b/app/provider/user/contract.go index fb436ee..e4d7131 100644 --- a/app/provider/user/contract.go +++ b/app/provider/user/contract.go @@ -8,6 +8,8 @@ import ( const UserKey = "user" +const ChangeAmountTopic = "UserAmountChange" + type Service interface { // GetUser 获取用户信息 GetUser(ctx context.Context, userID int64) (*User, error) @@ -17,6 +19,9 @@ type Service interface { // AddAmount 金额变化 AddAmount(ctx context.Context, userID int64, amount int64) error + + // ChangeAmount 订阅ChangeAmountTopic事件 + ChangeAmount(ctx context.Context, payLoad interface{}) error } type User struct { @@ -27,13 +32,13 @@ type User struct { Password string `gorm:"column:password;type:varchar(255);comment:密码;not null" json:"password"` Email string `gorm:"column:email;type:varchar(255);comment:邮箱;not null" json:"email"` CreatedAt time.Time `gorm:"column:created_at;type:datetime;comment:创建时间;not null;<-:create" json:"createdAt"` - Account Account `gorm:"foreignkey:UserId;constraint:OnDelete:CASCADE" json:"account"` + Account *Account `gorm:"foreignkey:UserId;constraint:OnDelete:CASCADE" json:"account"` } type Account struct { ID int64 `gorm:"column:id;primary_key;auto_increment" json:"id"` UserId int64 `gorm:"column:user_id;primary_key;auto_increment" json:"userId"` - Amount int64 `gorm:"column:amount;type:varchar(255)" json:"amount"` + Amount int64 `gorm:"column:amount;type:bigint" json:"amount"` } func (u *User) MarshalBinary() ([]byte, error) { diff --git a/app/provider/user/model_test.go b/app/provider/user/model_test.go new file mode 100644 index 0000000..2f5e020 --- /dev/null +++ b/app/provider/user/model_test.go @@ -0,0 +1,17 @@ +package user + +import ( + "github.com/Superdanda/hade/app/provider/database_connect" + tests "github.com/Superdanda/hade/test" + + "testing" +) + +func TestLoadModel(test *testing.T) { + container := tests.InitBaseContainer() + databaseConnectService := container.MustMake(database_connect.DatabaseConnectKey).(database_connect.Service) + + db := databaseConnectService.DefaultDatabaseConnect() + db.AutoMigrate(User{}, Account{}) + +} diff --git a/app/provider/user/service.go b/app/provider/user/service.go index 6250941..42c5719 100644 --- a/app/provider/user/service.go +++ b/app/provider/user/service.go @@ -2,6 +2,7 @@ package user import ( "context" + "encoding/json" "github.com/Superdanda/hade/framework" "github.com/Superdanda/hade/framework/contract" ) @@ -28,20 +29,37 @@ func (s *UserService) SaveUser(ctx context.Context, user *User) error { } func (s *UserService) AddAmount(ctx context.Context, userID int64, amount int64) error { - // 使用kafka 来发布事件 - return nil + // 来发布事件 + queueService := s.container.MustMake(contract.QueueKey).(contract.QueueService) + return queueService.NewEventAndPublish(ctx, ChangeAmountTopic, NewChangeAmountEvent(userID, amount)) } -//func (s *UserService) ChangeAmount(container framework.Container, ctx *gin.Context) error { -// queueService := s.container.MustMake(contract.QueueKey).(contract.QueueService) -// queueService.SubscribeEvent(ctx,ChangeAmountTopic, ) -//} - -const ChangeAmountTopic = "UserAmountChange" +func (s *UserService) ChangeAmount(ctx context.Context, payLoad interface{}) error { + amountEvent := &ChangeAmountEvent{} + json.Unmarshal([]byte(payLoad.(string)), &ChangeAmountEvent{}) + user, err := s.GetUser(ctx, amountEvent.UserID) + if err != nil { + return err + } + if user.Account == nil { + user.Account = &Account{} + } + user.Account.Amount += amountEvent.Amount + return s.repository.Save(ctx, user) +} type ChangeAmountEvent struct { - UserID int64 `json:"user_id"` - Amount int64 `json:"amount"` + Topic string `json:"topic"` + UserID int64 `json:"user_id"` + Amount int64 `json:"amount"` +} + +func NewChangeAmountEvent(userID int64, amount int64) *ChangeAmountEvent { + return &ChangeAmountEvent{ + Topic: ChangeAmountTopic, + UserID: userID, + Amount: amount, + } } func NewUserService(params ...interface{}) (interface{}, error) { diff --git a/app/utils/collection.go b/app/utils/collection.go new file mode 100644 index 0000000..9eda3ee --- /dev/null +++ b/app/utils/collection.go @@ -0,0 +1,12 @@ +package utils + +// Filter 泛型过滤函数 +func Filter[T any](slice []T, predicate func(T) bool) []T { + var result []T + for _, item := range slice { + if predicate(item) { + result = append(result, item) // 仅添加符合条件的元素 + } + } + return result +} diff --git a/app/utils/convert.go b/app/utils/convert.go new file mode 100644 index 0000000..67224c2 --- /dev/null +++ b/app/utils/convert.go @@ -0,0 +1,47 @@ +package utils + +import ( + "encoding/json" + "fmt" + "github.com/Superdanda/hade/framework/gin" +) + +func ConvertToSpecificType[T any, I interface{}](items []I, convertFunc func(I) (T, bool)) ([]T, error) { + specificItems := make([]T, 0, len(items)) + for i, item := range items { + if specificItem, ok := convertFunc(item); ok { + specificItems = append(specificItems, specificItem) + } else { + return nil, fmt.Errorf("invalid type at index %d", i) + } + } + return specificItems, nil +} + +func ConvertToAbstractNodes[T any, I interface{}](items []T, toInterface func(T) I) []I { + nodes := make([]I, len(items)) + for i, item := range items { + nodes[i] = toInterface(item) + } + return nodes +} + +func QuickBind[T any](c *gin.Context) *T { + var params T + if err := c.ShouldBindJSON(¶ms); err != nil { + return nil + } + return ¶ms +} + +func Convert(origin interface{}, target interface{}) error { + data, err := json.Marshal(origin) + if err != nil { + return err + } + err = json.Unmarshal(data, target) + if err != nil { + return err + } + return nil +} diff --git a/config/development/app.yaml b/config/development/app.yaml index ac74771..d10ed30 100644 --- a/config/development/app.yaml +++ b/config/development/app.yaml @@ -1,6 +1,6 @@ url: http://127.0.0.1:8066 -name: github.com/Superdanda/hade +name: hade swagger_open: true @@ -8,6 +8,8 @@ dev_fresh: 1 swagger: true +queue_subscribe: true # 是否开启事件订阅 + dev: # 调试模式 port: 8888 # 调试模式最终监听的端口,默认为8070 backend: # 后端调试模式配置 diff --git a/config/development/kafka.yaml b/config/development/kafka.yaml index ed8968a..aa764bb 100644 --- a/config/development/kafka.yaml +++ b/config/development/kafka.yaml @@ -2,36 +2,40 @@ name: default default: base: - version: "2.5.0" # Kafka 版本 - brokers: [ "localhost:9092" ] # Kafka broker 列表 - Net_SASL_Enable: true # 是否启用 SASL 认证 - Net_SASL_User: "your_user" # SASL 用户名 - Net_SASL_Password: "your_password" # SASL 密码 - Net_TLS_Enable: false # 是否启用 TLS,若为 true,需要进一步配置 TLS 证书信息 - Net_TLS_Config_InsecureSkipVerify: true # 是否跳过 TLS 证书验证(仅在 TLS 启用时配置) + version: "3.8.1" # kafka 版本 + brokers: [ "127.0.0.1:9092" ] # kafka broker 列表 +# admin_timeout: 10s +# net_sasl_enable: false # 是否启用 sasl 认证 +# net_sasl_user: "admin" # sasl 用户名 +# net_sasl_password: "admin" # sasl 密码 +# net_tls_enable: false # 是否启用 tls,若为 true,需要进一步配置 tls 证书信息 +# net_tls_config_insecureskipverify: true # 是否跳过 tls 证书验证(仅在 tls 启用时配置) +# net_max_open_requests: 10 # 限制 Kafka 客户端在等待服务器响应时可以同时发送的最大请求数量 +# net_dial_timeout: 10s +# net_read_timeout: 10s +# net_write_timeout: 10s producer: - RequiredAcks: -1 # 生产者应答机制(-1 表示等待所有副本应答) - Retry_Max: 5 # 最大重试次数 - Return_Successes: true # 是否返回成功发送确认 - Partitioner: "random" # 分区策略("random", "roundrobin", "hash" 等) - Compression: "snappy" # 消息压缩类型("none", "gzip", "snappy", "lz4", "zstd") - Flush_Frequency: 500ms # 消息刷写频率 - Flush_Messages: 10 # 每批次最大消息数 - Flush_Bytes: 1024 # 每批次字节数 - - consumer: - Offsets_Initial: -2 # 初始偏移量(-2 表示从最早的偏移量开始消费) - Return_Errors: true # 是否返回消费错误 - Fetch_Default: 1048576 # 每次拉取消息的默认大小(1MB) - MaxWaitTime: 250ms # 拉取消息的最大等待时间 - MaxProcessingTime: 100ms # 每条消息的最大处理时间 - - consumerGroup: - Group_Rebalance_Strategy: "roundrobin" # 消费者组分区再平衡策略(roundrobin, range, sticky) - Group_Rebalance_Timeout: 60s # 分区再平衡的超时时间 - Group_Rebalance_Retry_Max: 4 # 再平衡的最大重试次数 - Group_Heartbeat_Interval: 3s # 消费者组心跳间隔 - Offsets_Initial: -2 # 初始偏移量(-2 表示从最早的偏移量开始消费) - Return_Errors: true # 是否返回消费错误 - +# requiredacks: -1 # 生产者应答机制(-1 表示等待所有副本应答) +# retry_max: 5 # 最大重试次数 + return_successes: true # 是否返回成功发送确认 +# partitioner: "random" # 分区策略("random", "roundrobin", "hash" 等) +# compression: "snappy" # 消息压缩类型("none", "gzip", "snappy", "lz4", "zstd") +# flush_frequency: 500ms # 消息刷写频率 +# flush_messages: 10 # 每批次最大消息数 +# flush_bytes: 1024 # 每批次字节数 +# +# consumer: +# offsets_initial: -2 # 初始偏移量(-2 表示从最早的偏移量开始消费) +# return_errors: true # 是否返回消费错误 +# fetch_default: 1048576 # 每次拉取消息的默认大小(1mb) +# maxwaittime: 250ms # 拉取消息的最大等待时间 +# maxprocessingtime: 100ms # 每条消息的最大处理时间 +# +# consumergroup: +# group_rebalance_strategy: "roundrobin" # 消费者组分区再平衡策略(roundrobin, range, sticky) +# group_rebalance_timeout: 60s # 分区再平衡的超时时间 +# group_rebalance_retry_max: 4 # 再平衡的最大重试次数 +# group_heartbeat_interval: 3s # 消费者组心跳间隔 +# offsets_initial: -2 # 初始偏移量(-2 表示从最早的偏移量开始消费) +# return_errors: true # 是否返回消费错误 diff --git a/config/development/queue.yaml b/config/development/queue.yaml index da761ec..ab6e053 100644 --- a/config/development/queue.yaml +++ b/config/development/queue.yaml @@ -1,4 +1,4 @@ -driver: default +driver: kafka diff --git a/framework/command/app.go b/framework/command/app.go index 4f9152d..c787fb8 100644 --- a/framework/command/app.go +++ b/framework/command/app.go @@ -44,22 +44,38 @@ var appCommand = &cobra.Command{ } // 启动AppServer, 这个函数会将当前goroutine阻塞 -func startAppServe(server *http.Server, c framework.Container) error { - // 这个goroutine是启动服务的goroutine +func startAppServe(server *http.Server, c framework.Container, command *cobra.Command) error { + errChan := make(chan error, 1) + + // 启动服务器 go func() { - server.ListenAndServe() + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errChan <- err + } }() - // 当前的goroutine等待信号量 - quit := make(chan os.Signal) - // 监控信号:SIGINT, SIGTERM, SIGQUIT + // 等待信号或错误 + quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - // 这里会阻塞当前goroutine等待信号 - <-quit - // 调用Server.Shutdown graceful结束 - closeWait := 5 configService := c.MustMake(contract.ConfigKey).(contract.Config) + if configService.GetBool("app.queue_subscribe") { + queueService := c.MustMake(contract.QueueKey).(contract.QueueService) + queueService.SetContext(command.Context()) + queueService.ProcessSubscribe() + } + + select { + case <-quit: + // 进行优雅关闭 + case err := <-errChan: + // 处理服务器错误 + return err + } + + // 调用Server.Shutdown进行优雅关闭 + closeWait := 5 + if configService.IsExist("app.close_wait") { closeWait = configService.GetInt("app.close_wait") } @@ -102,11 +118,6 @@ var appStartCommand = &cobra.Command{ Addr: appAddress, } - // 这个goroutine是启动服务的goroutine - go func() { - server.ListenAndServe() - }() - appService := container.MustMake(contract.AppKey).(contract.App) pidFolder := appService.RuntimeFolder() @@ -157,26 +168,28 @@ var appStartCommand = &cobra.Command{ } defer cntxt.Release() // 子进程执行真正的app启动操作 - fmt.Println("deamon started") - //gspt.SetProcTitle("hade app") - if err := startAppServe(server, container); err != nil { + fmt.Println("daemon started") + // 设置进程名称,假设有这样的函数 + // gspt.SetProcTitle("hade app") + if err := startAppServe(server, container, c); err != nil { fmt.Println(err) } return nil } - // 非deamon模式,直接执行 + // 非daemon模式,直接执行 content := strconv.Itoa(os.Getpid()) fmt.Println("[PID]", content) - err := ioutil.WriteFile(serverPidFile, []byte(content), 0644) + err := os.WriteFile(serverPidFile, []byte(content), 0644) if err != nil { return err } - //gspt.SetProcTitle("hade app") + // 设置进程名称,假设有这样的函数 + // gspt.SetProcTitle("hade app") fmt.Println("app serve url:", appAddress) - if err := startAppServe(server, container); err != nil { - fmt.Println(err) + if err := startAppServe(server, container, c); err != nil { + return err } return nil }, diff --git a/framework/contract/queue.go b/framework/contract/queue.go index c81bebb..75d2f15 100644 --- a/framework/contract/queue.go +++ b/framework/contract/queue.go @@ -11,9 +11,18 @@ type QueueService interface { // SubscribeEvent 订阅事件 SubscribeEvent(ctx context.Context, topic string, handler func(event Event) error) error + // ProcessSubscribe 执行订阅 + ProcessSubscribe() + + // RegisterSubscribe 注册订阅 订阅事件 + RegisterSubscribe(topic string, handler func(event Event) error) error + // ReplayEvents 从指定的时间点或事件ID开始重放事件 ReplayEvents(ctx context.Context, topic string, fromID string, fromTimestamp int64, handler func(event Event) error) error + // GetRegisterSubscribe 根据主题获取已经注册的订阅 + GetRegisterSubscribe(topic string) []EventHandler + // GetEventById 根据事件ID获取事件 GetEventById(ctx context.Context, topic string, eventID string) (Event, error) @@ -25,8 +34,16 @@ type QueueService interface { // NewEventAndPublish 创建并推送事件方法 NewEventAndPublish(ctx context.Context, topic string, payload interface{}) error + + // SetContext 为订阅设置上下文 + SetContext(ctx context.Context) + + // GetContext 为订阅设置上下文 + GetContext() context.Context } +type EventHandler func(event Event) error + type Event interface { GetEventKey() string // 事件唯一标识 EventTopic() string // 事件类型 diff --git a/framework/provider/kafka/config.go b/framework/provider/kafka/config.go index 7c28825..df26886 100644 --- a/framework/provider/kafka/config.go +++ b/framework/provider/kafka/config.go @@ -27,11 +27,12 @@ func GetBaseConfig(c framework.Container) *contract.KafkaConfig { func WithConfigPath(configPath string) contract.KafkaOption { return func(container framework.Container, config *contract.KafkaConfig) error { + configService := container.MustMake(contract.ConfigKey).(contract.Config) - baseConfigPath := configPath + ".base" - producerConfigPath := configPath + ".producer" - consumerConfigPath := configPath + ".consumer" - consumerGroupConfigPath := configPath + ".consumerGroup" + baseConfigPath := "kafka." + configPath + ".base" + producerConfigPath := "kafka." + configPath + ".producer" + consumerConfigPath := "kafka." + configPath + ".consumer" + consumerGroupConfigPath := "kafka." + configPath + ".consumerGroup" baseConfMap := configService.GetStringMapString(baseConfigPath) producerConfMap := configService.GetStringMapString(producerConfigPath) @@ -41,7 +42,7 @@ func WithConfigPath(configPath string) contract.KafkaOption { brokers := configService.GetStringSlice(baseConfigPath + ".brokers") config.Brokers = brokers - saramaConfig := &sarama.Config{} + saramaConfig := sarama.NewConfig() version, ok := baseConfMap["version"] if ok { @@ -120,6 +121,7 @@ func WithConfigPath(configPath string) contract.KafkaOption { } // 配置 Producer (生产者) + if requiredAcks, ok := producerConfMap["required_acks"]; ok { if acks, err := strconv.Atoi(requiredAcks); err == nil { saramaConfig.Producer.RequiredAcks = sarama.RequiredAcks(acks) diff --git a/framework/provider/kafka/service.go b/framework/provider/kafka/service.go index 9df32f9..c92cd1b 100644 --- a/framework/provider/kafka/service.go +++ b/framework/provider/kafka/service.go @@ -103,6 +103,10 @@ func NewHadeKafka(params ...interface{}) (interface{}, error) { container := params[0] hadeKafka.container = container.(framework.Container) hadeKafka.clients = make(map[string]sarama.Client) + hadeKafka.syncProducers = make(map[string]sarama.SyncProducer) + hadeKafka.asyncProducers = make(map[string]sarama.AsyncProducer) + hadeKafka.consumers = make(map[string]sarama.Consumer) + hadeKafka.consumerGroups = make(map[string]sarama.ConsumerGroup) hadeKafka.lock = new(sync.RWMutex) return hadeKafka, nil } @@ -184,18 +188,15 @@ func (k HadeKafka) getProducer( config *contract.KafkaConfig) (sarama.SyncProducer, error) { uniqKey := config.UniqKey() // 判断是否已经实例化了kafka.Client - k.lock.RLock() if producer, ok := k.syncProducers[uniqKey]; ok { - k.lock.Lock() return producer, nil } - k.lock.RUnlock() client, err := getClientFunc() if err != nil { return nil, err } - k.lock.Lock() - defer k.lock.Unlock() + k.lock.RLock() + defer k.lock.RUnlock() syncProducer, err := sarama.NewSyncProducerFromClient(client) if err != nil { return nil, err diff --git a/framework/provider/queue/services/default_service.go b/framework/provider/queue/services/default_service.go index a61bdb4..1eb8af2 100644 --- a/framework/provider/queue/services/default_service.go +++ b/framework/provider/queue/services/default_service.go @@ -14,11 +14,30 @@ import ( ) type MemoryQueueService struct { - container framework.Container - subscribers map[string][]func(event contract.Event) error - subscriberMu sync.RWMutex - eventStore EventStore - eventQueue chan contract.Event // 使用 channel 作为 FIFO 队列 + container framework.Container + subscribers map[string][]func(event contract.Event) error + subscriberMu sync.RWMutex + eventStore EventStore + eventQueue chan contract.Event // 使用 channel 作为 FIFO 队列 + RegisterSubscribed map[string][]contract.EventHandler + context context.Context +} + +func (m *MemoryQueueService) SetContext(ctx context.Context) { + m.context = ctx +} + +func (m *MemoryQueueService) ProcessSubscribe() { + +} + +func (m *MemoryQueueService) RegisterSubscribe(topic string, handler func(event contract.Event) error) error { + m.RegisterSubscribed[topic] = append(m.RegisterSubscribed[topic], handler) + return nil +} + +func (m *MemoryQueueService) GetRegisterSubscribe(topic string) []contract.EventHandler { + return m.RegisterSubscribed[topic] } func (m *MemoryQueueService) NewEventAndPublish(ctx context.Context, topic string, payload interface{}) error { @@ -42,7 +61,6 @@ func (m *MemoryQueueService) NewEventAndPublish(ctx context.Context, topic strin if err != nil { return err } - return nil } @@ -62,6 +80,8 @@ func NewMemoryQueueService(params ...interface{}) (interface{}, error) { memoryQueueService.eventStore = eventStore go memoryQueueService.processEvents() // 启动事件处理协程 + + memoryQueueService.RegisterSubscribed = make(map[string][]contract.EventHandler) return memoryQueueService, nil } diff --git a/framework/provider/queue/services/kafka_service.go b/framework/provider/queue/services/kafka_service.go index 077d5c1..1bdfc89 100644 --- a/framework/provider/queue/services/kafka_service.go +++ b/framework/provider/queue/services/kafka_service.go @@ -12,8 +12,28 @@ import ( ) type KafkaQueueService struct { - container framework.Container - kafkaService contract.KafkaService + container framework.Container + kafkaService contract.KafkaService + RegisterSubscribed map[string][]contract.EventHandler + context context.Context +} + +// GetContext 为订阅设置上下文 +func (k *KafkaQueueService) GetContext() context.Context { + return k.context +} + +func (k *KafkaQueueService) SetContext(ctx context.Context) { + k.context = ctx +} + +func (k *KafkaQueueService) RegisterSubscribe(topic string, handler func(event contract.Event) error) error { + k.RegisterSubscribed[topic] = append(k.RegisterSubscribed[topic], handler) + return nil +} + +func (k *KafkaQueueService) GetRegisterSubscribe(topic string) []contract.EventHandler { + return k.RegisterSubscribed[topic] } func NewKafkaQueueService(params ...interface{}) (interface{}, error) { @@ -21,6 +41,7 @@ func NewKafkaQueueService(params ...interface{}) (interface{}, error) { kafkaQueueService.container = params[0].(framework.Container) kafkaService := kafkaQueueService.container.MustMake(contract.KafkaKey).(contract.KafkaService) kafkaQueueService.kafkaService = kafkaService + kafkaQueueService.RegisterSubscribed = make(map[string][]contract.EventHandler) return kafkaQueueService, nil } @@ -45,6 +66,14 @@ func convertToMessage(e contract.Event) (*sarama.ProducerMessage, error) { return message, nil } +func (k *KafkaQueueService) ProcessSubscribe() { + for topic, handlers := range k.RegisterSubscribed { + for _, handler := range handlers { + k.SubscribeEvent(k.context, topic, handler) + } + } +} + // GetEventKey 实现 EventID 方法 func (e *KafkaEvent) GetEventKey() string { return e.EventKey diff --git a/test/env.go b/test/env.go index 3513104..9c9f6f1 100644 --- a/test/env.go +++ b/test/env.go @@ -1,13 +1,25 @@ package test import ( + "github.com/Superdanda/hade/app/provider/database_connect" "github.com/Superdanda/hade/framework" "github.com/Superdanda/hade/framework/provider/app" + "github.com/Superdanda/hade/framework/provider/cache" + "github.com/Superdanda/hade/framework/provider/config" + "github.com/Superdanda/hade/framework/provider/distributed" "github.com/Superdanda/hade/framework/provider/env" + "github.com/Superdanda/hade/framework/provider/kafka" + "github.com/Superdanda/hade/framework/provider/log" + "github.com/Superdanda/hade/framework/provider/orm" + "github.com/Superdanda/hade/framework/provider/queue" + "github.com/Superdanda/hade/framework/provider/redis" + "github.com/Superdanda/hade/framework/provider/repository" + "github.com/Superdanda/hade/framework/provider/ssh" + "github.com/Superdanda/hade/framework/provider/type_register" ) const ( - BasePath = "C:\\Users\\lulz1\\GolandProjects\\framework1" + BasePath = "C:\\Users\\a1033\\GolandProjects\\framework1" ) func InitBaseContainer() framework.Container { @@ -16,6 +28,26 @@ func InitBaseContainer() framework.Container { // 绑定App服务提供者 container.Bind(&app.HadeAppProvider{BaseFolder: BasePath}) // 后续初始化需要绑定的服务提供者... - container.Bind(&env.HadeTestingEnvProvider{}) + container.Bind(&env.HadeEnvProvider{}) + container.Bind(&distributed.LocalDistributedProvider{}) + container.Bind(&config.HadeConfigProvider{}) + //container.Bind(&id.HadeIDProvider{}) + //container.Bind(&trace.HadeTraceProvider{}) + container.Bind(&log.HadeLogServiceProvider{}) + container.Bind(&orm.GormProvider{}) + container.Bind(&redis.RedisProvider{}) + container.Bind(&cache.HadeCacheProvider{}) + container.Bind(&ssh.SSHProvider{}) + container.Bind(&type_register.TypeRegisterProvider{}) + //container.Bind(&infrastructure.InfrastructureProvider{}) + container.Bind(&repository.RepositoryProvider{}) + container.Bind(&kafka.KafkaProvider{}) + container.Bind(&queue.QueueProvider{}) + // 将HTTP引擎初始化,并且作为服务提供者绑定到服务容器中 + //if engine, err := http.NewHttpEngine(container); err == nil { + // container.Bind(&kernel.HadeKernelProvider{HttpEngine: engine}) + //} + + container.Bind(&database_connect.DatabaseConnectProvider{}) return container }