diff --git a/app/provider/user/contract.go b/app/provider/user/contract.go index 3a5cbbd..fb436ee 100644 --- a/app/provider/user/contract.go +++ b/app/provider/user/contract.go @@ -14,6 +14,9 @@ type Service interface { // SaveUser 保存用户信息 SaveUser(ctx context.Context, user *User) error + + // AddAmount 金额变化 + AddAmount(ctx context.Context, userID int64, amount int64) error } type User struct { @@ -24,6 +27,13 @@ type User struct { Password string `gorm:"column:password;type:varchar(255);comment:密码;not null" json:"password"` Email string `gorm:"column:email;type:varchar(255);comment:邮箱;not null" json:"email"` CreatedAt time.Time `gorm:"column:created_at;type:datetime;comment:创建时间;not null;<-:create" json:"createdAt"` + Account Account `gorm:"foreignkey:UserId;constraint:OnDelete:CASCADE" json:"account"` +} + +type Account struct { + ID int64 `gorm:"column:id;primary_key;auto_increment" json:"id"` + UserId int64 `gorm:"column:user_id;primary_key;auto_increment" json:"userId"` + Amount int64 `gorm:"column:amount;type:varchar(255)" json:"amount"` } func (u *User) MarshalBinary() ([]byte, error) { diff --git a/app/provider/user/service.go b/app/provider/user/service.go index 90ff524..6250941 100644 --- a/app/provider/user/service.go +++ b/app/provider/user/service.go @@ -27,6 +27,23 @@ func (s *UserService) SaveUser(ctx context.Context, user *User) error { return nil } +func (s *UserService) AddAmount(ctx context.Context, userID int64, amount int64) error { + // 使用kafka 来发布事件 + return nil +} + +//func (s *UserService) ChangeAmount(container framework.Container, ctx *gin.Context) error { +// queueService := s.container.MustMake(contract.QueueKey).(contract.QueueService) +// queueService.SubscribeEvent(ctx,ChangeAmountTopic, ) +//} + +const ChangeAmountTopic = "UserAmountChange" + +type ChangeAmountEvent struct { + UserID int64 `json:"user_id"` + Amount int64 `json:"amount"` +} + func NewUserService(params ...interface{}) (interface{}, error) { container := params[0].(framework.Container) userService := &UserService{container: container} diff --git a/framework/command/provider.go b/framework/command/provider.go index 10d681d..0354628 100644 --- a/framework/command/provider.go +++ b/framework/command/provider.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "reflect" + "regexp" "strings" "text/template" @@ -21,6 +22,7 @@ func initProviderCommand() *cobra.Command { providerCommand.AddCommand(providerCreateCommand) providerCommand.AddCommand(providerListCommand) providerCommand.AddCommand(providerRepositoryCommand) + providerCommand.AddCommand(providerInterfaceCommand) return providerCommand } @@ -49,6 +51,182 @@ var providerListCommand = &cobra.Command{ }, } +var providerInterfaceCommand = &cobra.Command{ + Use: "interface", + Aliases: []string{"interface"}, + Short: "创建应用接口", + RunE: func(c *cobra.Command, args []string) error { + container := c.GetContainer() + fmt.Println("创建一个应用接口") + var name, folder string + interfaceNames := &RouteNode{} + + { + prompt := &survey.Input{ + Message: "请输入接口模块名称(http/module目录下的文件夹名称):", + } + err := survey.AskOne(prompt, &name) + if err != nil { + return err + } + } + + { + prompt := &survey.Input{ + Message: "请输入接口模块所在目录名称(默认: 接口模块名称):", + } + err := survey.AskOne(prompt, &folder) + if err != nil { + return err + } + } + + if folder == "" { + folder = name + } + + // 收集用户输入并填充嵌套映射 + for { + prompt := &survey.Input{ + Message: "请输入接口路径(格式:/user/login,直接按回车结束输入):", + } + + var input string + err := survey.AskOne(prompt, &input) + if err != nil { + return err + } + + // 如果用户直接按回车,结束输入 + if strings.TrimSpace(input) == "" { + fmt.Println("接口输入结束") + break + } + + // 解析输入的路径为路径部分 + pathParts := strings.Split(strings.TrimPrefix(input, "/"), "/") + if len(pathParts) == 0 { + fmt.Println("路径格式错误,请输入正确格式:/节点/接口") + continue + } + + // 将路径部分插入到路由树中 + insertIntoRouteTree(pathParts, []string{}, interfaceNames) + fmt.Printf("已添加接口:%s\n", input) + } + interfaceNames.NeedExtra = false + + // 打印所有添加的接口(测试用) + printRouteTree(interfaceNames, 0) + + // 模板数据 + config := container.MustMake(contract.ConfigKey).(contract.Config) + data := map[string]interface{}{ + "appName": config.GetAppName(), + "packageName": name, + "interfaces": interfaceNames, + "structName": name, + } + // 创建title这个模版方法 + funcs := template.FuncMap{ + "title": strings.Title, + "dict": func(values ...interface{}) (map[string]interface{}, error) { + if len(values)%2 != 0 { + return nil, fmt.Errorf("invalid dict call: missing key or value") + } + dict := make(map[string]interface{}, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, ok := values[i].(string) + if !ok { + return nil, fmt.Errorf("dict keys must be strings") + } + dict[key] = values[i+1] + } + return dict, nil + }, + "len": func(v interface{}) int { + return reflect.ValueOf(v).Len() + }, + "camelToPath": func(s string) string { + re := regexp.MustCompile("([a-z0-9])([A-Z])") + result := re.ReplaceAllString(s, "$1/$2") + return "/" + strings.ToLower(result) + }, + } + + if interfaceNames.Children == nil || len(interfaceNames.Children) == 0 { + return nil + } + + app := container.MustMake(contract.AppKey).(contract.App) + moduleFolder := app.HttpModuleFolder() + pModuleFolder := filepath.Join(moduleFolder, name) + util.EnsureDir(pModuleFolder) + { + // module 目录下 创建 服务包 + + // 创建api 我呢见 + { + // 创建 api.go + file := filepath.Join(pModuleFolder, "api.go") + f, err := os.Create(file) + if err != nil { + return err + } + data["interfaces"] = interfaceNames // 传递嵌套的接口名称映射 + t := template.Must(template.New("api").Funcs(funcs).Parse(apiTmp)) + if err := t.Execute(f, data); err != nil { + return err + } + } + + // 创建api_controller文件 + { + tmpl := template.Must(template.New("controller").Funcs(funcs).Parse(apiControllerTmp)) + data["packageName"] = name + data["structName"] = name + + // 递归生成控制器文件 + if err := generateControllers(interfaceNames, []string{}, tmpl, data, pModuleFolder); err != nil { + fmt.Println("生成控制器失败:", err) + return err + } + } + + //创建 dto 文件 + { + // 创建dto.go + file := filepath.Join(pModuleFolder, "dto.go") + f, err := os.Create(file) + if err != nil { + return err + } + t := template.Must(template.New("dto").Funcs(funcs).Parse(dtoTmp)) + if err := t.Execute(f, data); err != nil { + return err + } + } + + //创建 mapper 文件 + { + // 创建mapper.go + file := filepath.Join(pModuleFolder, "mapper.go") + f, err := os.Create(file) + if err != nil { + return err + } + t := template.Must(template.New("mapper").Funcs(funcs).Parse(mapperTmp)) + if err := t.Execute(f, data); err != nil { + return err + } + } + } + + fmt.Println("创建接口文件成功") + return nil + }, +} + var providerCreateCommand = &cobra.Command{ Use: "create", Aliases: []string{"create", "init"}, @@ -168,6 +346,10 @@ var providerCreateCommand = &cobra.Command{ "len": func(v interface{}) int { return reflect.ValueOf(v).Len() }, + "camelToPath": func(s string) string { + re := regexp.MustCompile("([A-Z])") + return "/" + re.ReplaceAllString(s, "/$1") + }, } { @@ -584,7 +766,19 @@ import ( "github.com/Superdanda/hade/framework/gin" ) +type {{.methodName}}Param struct {} + // {{.methodName}} handler +// @Summary 输入你的接口总结 +// @Description 输入你的接口总结详情 +// @ID 接口Id +// @Tags 接口tag +// @Accept json +// @Produce json +// @Param {{.methodName}}Param body {{.methodName}}Param true "输入参数描述" +// @Success 200 {object} result.Result "返回成功的流程定义数据" +// @Failure 500 {object} result.Result "返回失败的流程定义数据" +// @Router {{.methodName | camelToPath}} [post] func (api *{{.structName | title}}Api) {{.methodName}}(c *gin.Context) { // TODO: Implement {{.methodName}} } diff --git a/framework/contract/file.go b/framework/contract/file.go new file mode 100644 index 0000000..630a636 --- /dev/null +++ b/framework/contract/file.go @@ -0,0 +1,9 @@ +package contract + +import "os" + +type FileService interface { + UploadFile(fileName string, file os.File) error + DownloadFile(fileName string) (os.File, error) + DeleteFile(fileName string) error +} diff --git a/framework/contract/kafka.go b/framework/contract/kafka.go index c604926..1a05fbb 100644 --- a/framework/contract/kafka.go +++ b/framework/contract/kafka.go @@ -14,27 +14,27 @@ type KafkaOption func(container framework.Container, config *KafkaConfig) error // KafkaService 表示一个 Kafka 服务 type KafkaService interface { + // GetClient 获取kafka + GetClient(option ...KafkaOption) (sarama.Client, error) + GetClientDefault() (sarama.Client, error) // GetProducer 获取 Kafka 同步生产者实例 GetProducer(option ...KafkaOption) (sarama.SyncProducer, error) + GetProducerDefault() (sarama.SyncProducer, error) // GetAsyncProducer 获取 Kafka 异步生产者实例 GetAsyncProducer(option ...KafkaOption) (sarama.AsyncProducer, error) + GetAsyncProducerDefault() (sarama.AsyncProducer, error) // GetConsumer 获取 Kafka 消费者实例 GetConsumer(option ...KafkaOption) (sarama.Consumer, error) + GetConsumerDefault() (sarama.Consumer, error) // GetConsumerGroup 获取 Kafka 消费者组实例 GetConsumerGroup(groupID string, topics []string, option ...KafkaOption) (sarama.ConsumerGroup, error) + GetConsumerGroupDefault(groupID string, topics []string) (sarama.ConsumerGroup, error) } // KafkaConfig 为 Kafka 定义的配置结构 type KafkaConfig struct { // 基础配置 - Brokers []string // Kafka broker 列表 - //// 生产者配置 - //Producer *sarama.Config // 生产者配置 - //// 消费者配置 - //Consumer *sarama.Config // 消费者配置 - //// 消费者组配置 - //ConsumerGroup *sarama.Config // 消费者组配置 - + Brokers []string // Kafka broker 列表 ClientConfig *sarama.Config // kafka 配置 } diff --git a/framework/contract/queue.go b/framework/contract/queue.go index d25e123..c81bebb 100644 --- a/framework/contract/queue.go +++ b/framework/contract/queue.go @@ -28,31 +28,8 @@ type QueueService interface { } type Event interface { - EventID() int64 // 事件唯一标识 + GetEventKey() string // 事件唯一标识 EventTopic() string // 事件类型 EventTimestamp() int64 // 事件发生时间 Payload() interface{} // 事件负载 } - -type BaseEvent struct { - ID int64 `json:"id"` - Topic string `json:"topic"` - Timestamp int64 `json:"timestamp"` - Data interface{} `json:"data"` -} - -func (e *BaseEvent) EventTopic() string { - return e.Topic -} - -func (e *BaseEvent) EventID() int64 { - return e.ID -} - -func (e *BaseEvent) EventTimestamp() int64 { - return e.Timestamp -} - -func (e *BaseEvent) Payload() interface{} { - return e.Data -} diff --git a/framework/provider/file/provider.go b/framework/provider/file/provider.go new file mode 100644 index 0000000..1b72cc8 --- /dev/null +++ b/framework/provider/file/provider.go @@ -0,0 +1,32 @@ +package file + +import ( + "github.com/Superdanda/hade/framework" +) + +type FileProvider struct{} + +func (f FileProvider) Register(container framework.Container) framework.NewInstance { + //TODO implement me + panic("implement me") +} + +func (f FileProvider) Boot(container framework.Container) error { + //TODO implement me + panic("implement me") +} + +func (f FileProvider) IsDefer() bool { + //TODO implement me + panic("implement me") +} + +func (f FileProvider) Params(container framework.Container) []interface{} { + //TODO implement me + panic("implement me") +} + +func (f FileProvider) Name() string { + //TODO implement me + panic("implement me") +} diff --git a/framework/provider/file/service.go b/framework/provider/file/service.go new file mode 100644 index 0000000..e213da3 --- /dev/null +++ b/framework/provider/file/service.go @@ -0,0 +1,29 @@ +package file + +import ( + "github.com/Superdanda/hade/framework" + "os" +) + +type HadeFileService struct { + container framework.Container +} + +func NewHadeFileService(container framework.Container) *HadeFileService { + return &HadeFileService{container: container} +} + +func (h HadeFileService) UploadFile(fileName string, file os.File) error { + //TODO implement me + panic("implement me") +} + +func (h HadeFileService) DownloadFile(fileName string) (os.File, error) { + //TODO implement me + panic("implement me") +} + +func (h HadeFileService) DeleteFile(fileName string) error { + //TODO implement me + panic("implement me") +} diff --git a/framework/provider/kafka/service.go b/framework/provider/kafka/service.go index 60a1f39..9df32f9 100644 --- a/framework/provider/kafka/service.go +++ b/framework/provider/kafka/service.go @@ -17,13 +17,20 @@ type HadeKafka struct { lock *sync.RWMutex } -func NewHadeKafka(params ...interface{}) (interface{}, error) { - hadeKafka := &HadeKafka{} - container := params[0] - hadeKafka.container = container.(framework.Container) - hadeKafka.clients = make(map[string]sarama.Client) - hadeKafka.lock = new(sync.RWMutex) - return hadeKafka, nil +func (k HadeKafka) GetClientDefault() (sarama.Client, error) { + // 读取默认配置 + config := GetBaseConfig(k.container) + return k.getClientByConfig(config) +} + +func (k HadeKafka) GetConsumerDefault() (sarama.Consumer, error) { + config := GetBaseConfig(k.container) + return k.getConsumer(k.GetClientDefault, config) +} + +func (k HadeKafka) GetConsumerGroupDefault(groupID string, topics []string) (sarama.ConsumerGroup, error) { + config := GetBaseConfig(k.container) + return k.getConsumerGroup(k.GetClientDefault, config, groupID, topics) } func (k HadeKafka) GetClient(option ...contract.KafkaOption) (sarama.Client, error) { @@ -35,6 +42,72 @@ func (k HadeKafka) GetClient(option ...contract.KafkaOption) (sarama.Client, err return nil, err } } + return k.getClientByConfig(config) +} + +// 内部方法:获取消费者组 +func (k HadeKafka) getConsumerGroup( + getClientFunc func() (sarama.Client, error), + config *contract.KafkaConfig, + groupID string, + topics []string) (sarama.ConsumerGroup, error) { + + uniqKey := config.UniqKey() + "_" + groupID + k.lock.RLock() + if group, ok := k.consumerGroups[uniqKey]; ok { + k.lock.RUnlock() + return group, nil + } + k.lock.RUnlock() + client, err := getClientFunc() + if err != nil { + return nil, err + } + k.lock.Lock() + defer k.lock.Unlock() + consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + if err != nil { + return nil, err + } + k.consumerGroups[uniqKey] = consumerGroup + return consumerGroup, nil +} + +// 内部方法:获取消费者 +func (k HadeKafka) getConsumer( + getClientFunc func() (sarama.Client, error), + config *contract.KafkaConfig) (sarama.Consumer, error) { + + uniqKey := config.UniqKey() + k.lock.RLock() + if consumer, ok := k.consumers[uniqKey]; ok { + k.lock.RUnlock() + return consumer, nil + } + k.lock.RUnlock() + client, err := getClientFunc() + if err != nil { + return nil, err + } + k.lock.Lock() + defer k.lock.Unlock() + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + return nil, err + } + k.consumers[uniqKey] = consumer + return consumer, nil +} +func NewHadeKafka(params ...interface{}) (interface{}, error) { + hadeKafka := &HadeKafka{} + container := params[0] + hadeKafka.container = container.(framework.Container) + hadeKafka.clients = make(map[string]sarama.Client) + hadeKafka.lock = new(sync.RWMutex) + return hadeKafka, nil +} + +func (k HadeKafka) getClientByConfig(config *contract.KafkaConfig) (sarama.Client, error) { uniqKey := config.UniqKey() // 判断是否已经实例化了kafka.Client k.lock.RLock() @@ -64,22 +137,63 @@ func (k HadeKafka) GetProducer(option ...contract.KafkaOption) (sarama.SyncProdu return nil, err } } - uniqKey := config.UniqKey() + return k.getProducer(func() (sarama.Client, error) { + return k.GetClient(option...) + }, config) +} +func (k HadeKafka) GetProducerDefault() (sarama.SyncProducer, error) { + // 读取默认配置 + config := GetBaseConfig(k.container) + return k.getProducer(k.GetClientDefault, config) +} + +func (k HadeKafka) GetAsyncProducerDefault() (sarama.AsyncProducer, error) { + config := GetBaseConfig(k.container) + return k.getAsyncProducer(k.GetClientDefault, config) +} + +// 内部方法:获取异步生产者 +func (k HadeKafka) getAsyncProducer( + getClientFunc func() (sarama.Client, error), + config *contract.KafkaConfig) (sarama.AsyncProducer, error) { + + uniqKey := config.UniqKey() + k.lock.RLock() + if producer, ok := k.asyncProducers[uniqKey]; ok { + k.lock.RUnlock() + return producer, nil + } + k.lock.RUnlock() + client, err := getClientFunc() + if err != nil { + return nil, err + } + k.lock.Lock() + defer k.lock.Unlock() + asyncProducer, err := sarama.NewAsyncProducerFromClient(client) + if err != nil { + return nil, err + } + k.asyncProducers[uniqKey] = asyncProducer + return asyncProducer, nil +} + +func (k HadeKafka) getProducer( + getClientFunc func() (sarama.Client, error), // 传入一个返回 sarama.Client 的函数 + config *contract.KafkaConfig) (sarama.SyncProducer, error) { + uniqKey := config.UniqKey() // 判断是否已经实例化了kafka.Client k.lock.RLock() if producer, ok := k.syncProducers[uniqKey]; ok { k.lock.Lock() return producer, nil } - k.lock.RUnlock() - - client, err := k.GetClient(option...) + client, err := getClientFunc() if err != nil { return nil, err } - k.lock.Lock() defer k.lock.Unlock() syncProducer, err := sarama.NewSyncProducerFromClient(client) @@ -99,31 +213,9 @@ func (k HadeKafka) GetAsyncProducer(option ...contract.KafkaOption) (sarama.Asyn return nil, err } } - uniqKey := config.UniqKey() - - // 检查是否已实例化 asyncProducer - k.lock.RLock() - if producer, ok := k.asyncProducers[uniqKey]; ok { - k.lock.RUnlock() - return producer, nil - } - k.lock.RUnlock() - - // 获取 client 实例 - client, err := k.GetClient(option...) - if err != nil { - return nil, err - } - - // 创建并保存 asyncProducer - k.lock.Lock() - defer k.lock.Unlock() - asyncProducer, err := sarama.NewAsyncProducerFromClient(client) - if err != nil { - return nil, err - } - k.asyncProducers[uniqKey] = asyncProducer - return asyncProducer, nil + return k.getAsyncProducer(func() (sarama.Client, error) { + return k.GetClient(option...) + }, config) } func (k HadeKafka) GetConsumer(option ...contract.KafkaOption) (sarama.Consumer, error) { @@ -135,31 +227,9 @@ func (k HadeKafka) GetConsumer(option ...contract.KafkaOption) (sarama.Consumer, return nil, err } } - uniqKey := config.UniqKey() - - // 检查是否已实例化 consumer - k.lock.RLock() - if consumer, ok := k.consumers[uniqKey]; ok { - k.lock.RUnlock() - return consumer, nil - } - k.lock.RUnlock() - - // 获取 client 实例 - client, err := k.GetClient(option...) - if err != nil { - return nil, err - } - - // 创建并保存 consumer - k.lock.Lock() - defer k.lock.Unlock() - consumer, err := sarama.NewConsumerFromClient(client) - if err != nil { - return nil, err - } - k.consumers[uniqKey] = consumer - return consumer, nil + return k.getConsumer(func() (sarama.Client, error) { + return k.GetClient(option...) + }, config) } func (k HadeKafka) GetConsumerGroup(groupID string, topics []string, option ...contract.KafkaOption) (sarama.ConsumerGroup, error) { @@ -171,29 +241,9 @@ func (k HadeKafka) GetConsumerGroup(groupID string, topics []string, option ...c return nil, err } } - uniqKey := config.UniqKey() + "_" + groupID - - // 检查是否已实例化 consumerGroup - k.lock.RLock() - if group, ok := k.consumerGroups[uniqKey]; ok { - k.lock.RUnlock() - return group, nil - } - k.lock.RUnlock() - - // 获取 client 实例 - client, err := k.GetClient(option...) - if err != nil { - return nil, err - } - - // 创建并保存 consumerGroup - k.lock.Lock() - defer k.lock.Unlock() - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) - if err != nil { - return nil, err - } - k.consumerGroups[uniqKey] = consumerGroup - return consumerGroup, nil + return k.getConsumerGroup( + func() (sarama.Client, error) { + return k.GetClient(option...) + }, config, groupID, topics, + ) } diff --git a/framework/provider/queue/services/default_service.go b/framework/provider/queue/services/default_service.go index cac68ea..a61bdb4 100644 --- a/framework/provider/queue/services/default_service.go +++ b/framework/provider/queue/services/default_service.go @@ -152,12 +152,17 @@ func (m *MemoryQueueService) Close() error { type DefaultBaseEvent struct { ID int64 `gorm:"primaryKey;type:bigint" json:"id"` + EventKey string `gorm:"type:varchar(255);not null" json:"event_key"` Topic string `gorm:"type:varchar(50);not null" json:"topic"` Timestamp int64 `gorm:"autoCreateTime:milli" json:"timestamp"` Data string `gorm:"type:json" json:"data"` // 将数据存储为 JSON 格式 CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` } +func (d *DefaultBaseEvent) GetEventKey() string { + return d.EventKey +} + func (d *DefaultBaseEvent) EventID() int64 { return d.ID } diff --git a/framework/provider/queue/services/kafka_service.go b/framework/provider/queue/services/kafka_service.go index 7e27ee6..077d5c1 100644 --- a/framework/provider/queue/services/kafka_service.go +++ b/framework/provider/queue/services/kafka_service.go @@ -2,8 +2,13 @@ package services import ( "context" + "encoding/json" + "fmt" + "github.com/Shopify/sarama" "github.com/Superdanda/hade/framework" "github.com/Superdanda/hade/framework/contract" + "github.com/google/uuid" + "time" ) type KafkaQueueService struct { @@ -19,37 +24,216 @@ func NewKafkaQueueService(params ...interface{}) (interface{}, error) { return kafkaQueueService, nil } +type KafkaEvent struct { + EventKey string // 事件唯一标识 + Topic string // 事件主题 + Timestamp int64 // 事件时间戳 + Data interface{} // 事件负载 +} + +func convertToMessage(e contract.Event) (*sarama.ProducerMessage, error) { + data, err := json.Marshal(e) + if err != nil { + return nil, err + } + // 创建Kafka消息 + message := &sarama.ProducerMessage{ + Topic: e.EventTopic(), + Key: sarama.StringEncoder(e.GetEventKey()), + Value: sarama.StringEncoder(data), // 假设payload是字符串 + } + return message, nil +} + +// GetEventKey 实现 EventID 方法 +func (e *KafkaEvent) GetEventKey() string { + return e.EventKey +} + +// EventTopic 实现 EventTopic 方法 +func (e *KafkaEvent) EventTopic() string { + return e.Topic +} + +// EventTimestamp 实现 EventTimestamp 方法 +func (e *KafkaEvent) EventTimestamp() int64 { + return e.Timestamp +} + +// Payload 实现 Payload 方法 +func (e *KafkaEvent) Payload() interface{} { + return e.Data +} + func (k KafkaQueueService) PublishEvent(ctx context.Context, event contract.Event) error { - //TODO implement me - panic("implement me") + producer, err := k.kafkaService.GetProducer() + if err != nil { + return err + } + producerMessage, err := convertToMessage(event) + if err != nil { + return err + } + // 发送消息 + _, _, err = producer.SendMessage(producerMessage) + return err } func (k KafkaQueueService) SubscribeEvent(ctx context.Context, topic string, handler func(event contract.Event) error) error { - //TODO implement me - panic("implement me") + consumer, err := k.kafkaService.GetConsumer() + if err != nil { + return err + } + partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) + if err != nil { + return err + } + + go func() { + for { + select { + case msg := <-partitionConsumer.Messages(): + event := &KafkaEvent{ + EventKey: string(msg.Key), + Topic: msg.Topic, + Timestamp: time.Now().Unix(), + Data: string(msg.Value), + } + handler(event) + case <-ctx.Done(): + return + } + } + }() + return nil } func (k KafkaQueueService) ReplayEvents(ctx context.Context, topic string, fromID string, fromTimestamp int64, handler func(event contract.Event) error) error { - //TODO implement me - panic("implement me") + consumer, err := k.kafkaService.GetConsumer() + if err != nil { + return err + } + + // 使用时间戳来设定偏移量,确保从某个时间点开始消费 + offset := sarama.OffsetOldest // 默认从最早的消息开始消费 + if fromTimestamp > 0 { + // 设置为从某个时间戳的消息开始 + offset = sarama.OffsetNewest // 这里假设你通过时间戳来决定是否是最新的 + } + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset) + if err != nil { + return err + } + + go func() { + for { + select { + case msg := <-partitionConsumer.Messages(): + event := &KafkaEvent{ + EventKey: string(msg.Key), + Topic: msg.Topic, + Timestamp: time.Now().Unix(), + Data: string(msg.Value), + } + handler(event) + case <-ctx.Done(): + return + } + } + }() + return nil } func (k KafkaQueueService) GetEventById(ctx context.Context, topic string, eventID string) (contract.Event, error) { - //TODO implement me - panic("implement me") + consumer, err := k.kafkaService.GetConsumer() + if err != nil { + return nil, err + } + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) + if err != nil { + return nil, err + } + + var result *KafkaEvent + go func() { + for msg := range partitionConsumer.Messages() { + if string(msg.Key) == eventID { + result = &KafkaEvent{ + EventKey: string(msg.Key), + Topic: msg.Topic, + Timestamp: time.Now().Unix(), + Data: string(msg.Value), + } + break + } + } + }() + + // 等待事件处理完成 + time.Sleep(2 * time.Second) // 可以改为一个更合理的超时处理逻辑 + + if result == nil { + return nil, fmt.Errorf("event with ID %s not found", eventID) + } + + return result, nil } func (k KafkaQueueService) GetEventByTime(ctx context.Context, topic string, fromTimestamp int64) (contract.Event, error) { - //TODO implement me - panic("implement me") + consumer, err := k.kafkaService.GetConsumer() + if err != nil { + return nil, err + } + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) + if err != nil { + return nil, err + } + + var result *KafkaEvent + go func() { + for msg := range partitionConsumer.Messages() { + // 假设消息的时间戳存在Data字段中,进行时间戳筛选 + if msg.Timestamp.Unix() >= fromTimestamp { + result = &KafkaEvent{ + EventKey: string(msg.Key), + Topic: msg.Topic, + Timestamp: msg.Timestamp.Unix(), + Data: string(msg.Value), + } + break + } + } + }() + + // 等待事件处理完成 + time.Sleep(2 * time.Second) // 可以改为一个更合理的超时处理逻辑 + + if result == nil { + return nil, fmt.Errorf("event not found from timestamp %d", fromTimestamp) + } + + return result, nil } func (k KafkaQueueService) Close() error { - //TODO implement me - panic("implement me") + // 如果有关闭消费者的逻辑,或者其他资源清理操作,可以在这里实现 + // 示例:关闭所有消费者 + //for _, consumer := range k.kafkaService.GetConsumers() { + // consumer.Close() // 关闭消费者连接 + //} + return nil } func (k KafkaQueueService) NewEventAndPublish(ctx context.Context, topic string, payload interface{}) error { - //TODO implement me - panic("implement me") + // 生成新的事件 + event := &KafkaEvent{ + EventKey: uuid.New().String(), // 使用 UUID 作为事件 ID + Topic: topic, + Timestamp: time.Now().Unix(), + Data: payload, + } + return k.PublishEvent(ctx, event) }