备份代码

This commit is contained in:
dandan 2024-11-13 23:01:41 +08:00
parent c953c127fc
commit f25b94db91
11 changed files with 640 additions and 133 deletions

View File

@ -14,6 +14,9 @@ type Service interface {
// SaveUser 保存用户信息
SaveUser(ctx context.Context, user *User) error
// AddAmount 金额变化
AddAmount(ctx context.Context, userID int64, amount int64) error
}
type User struct {
@ -24,6 +27,13 @@ type User struct {
Password string `gorm:"column:password;type:varchar(255);comment:密码;not null" json:"password"`
Email string `gorm:"column:email;type:varchar(255);comment:邮箱;not null" json:"email"`
CreatedAt time.Time `gorm:"column:created_at;type:datetime;comment:创建时间;not null;<-:create" json:"createdAt"`
Account Account `gorm:"foreignkey:UserId;constraint:OnDelete:CASCADE" json:"account"`
}
type Account struct {
ID int64 `gorm:"column:id;primary_key;auto_increment" json:"id"`
UserId int64 `gorm:"column:user_id;primary_key;auto_increment" json:"userId"`
Amount int64 `gorm:"column:amount;type:varchar(255)" json:"amount"`
}
func (u *User) MarshalBinary() ([]byte, error) {

View File

@ -27,6 +27,23 @@ func (s *UserService) SaveUser(ctx context.Context, user *User) error {
return nil
}
func (s *UserService) AddAmount(ctx context.Context, userID int64, amount int64) error {
// 使用kafka 来发布事件
return nil
}
//func (s *UserService) ChangeAmount(container framework.Container, ctx *gin.Context) error {
// queueService := s.container.MustMake(contract.QueueKey).(contract.QueueService)
// queueService.SubscribeEvent(ctx,ChangeAmountTopic, )
//}
const ChangeAmountTopic = "UserAmountChange"
type ChangeAmountEvent struct {
UserID int64 `json:"user_id"`
Amount int64 `json:"amount"`
}
func NewUserService(params ...interface{}) (interface{}, error) {
container := params[0].(framework.Container)
userService := &UserService{container: container}

View File

@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"strings"
"text/template"
@ -21,6 +22,7 @@ func initProviderCommand() *cobra.Command {
providerCommand.AddCommand(providerCreateCommand)
providerCommand.AddCommand(providerListCommand)
providerCommand.AddCommand(providerRepositoryCommand)
providerCommand.AddCommand(providerInterfaceCommand)
return providerCommand
}
@ -49,6 +51,182 @@ var providerListCommand = &cobra.Command{
},
}
var providerInterfaceCommand = &cobra.Command{
Use: "interface",
Aliases: []string{"interface"},
Short: "创建应用接口",
RunE: func(c *cobra.Command, args []string) error {
container := c.GetContainer()
fmt.Println("创建一个应用接口")
var name, folder string
interfaceNames := &RouteNode{}
{
prompt := &survey.Input{
Message: "请输入接口模块名称(http/module目录下的文件夹名称)",
}
err := survey.AskOne(prompt, &name)
if err != nil {
return err
}
}
{
prompt := &survey.Input{
Message: "请输入接口模块所在目录名称(默认: 接口模块名称):",
}
err := survey.AskOne(prompt, &folder)
if err != nil {
return err
}
}
if folder == "" {
folder = name
}
// 收集用户输入并填充嵌套映射
for {
prompt := &survey.Input{
Message: "请输入接口路径(格式:/user/login直接按回车结束输入",
}
var input string
err := survey.AskOne(prompt, &input)
if err != nil {
return err
}
// 如果用户直接按回车,结束输入
if strings.TrimSpace(input) == "" {
fmt.Println("接口输入结束")
break
}
// 解析输入的路径为路径部分
pathParts := strings.Split(strings.TrimPrefix(input, "/"), "/")
if len(pathParts) == 0 {
fmt.Println("路径格式错误,请输入正确格式:/节点/接口")
continue
}
// 将路径部分插入到路由树中
insertIntoRouteTree(pathParts, []string{}, interfaceNames)
fmt.Printf("已添加接口:%s\n", input)
}
interfaceNames.NeedExtra = false
// 打印所有添加的接口(测试用)
printRouteTree(interfaceNames, 0)
// 模板数据
config := container.MustMake(contract.ConfigKey).(contract.Config)
data := map[string]interface{}{
"appName": config.GetAppName(),
"packageName": name,
"interfaces": interfaceNames,
"structName": name,
}
// 创建title这个模版方法
funcs := template.FuncMap{
"title": strings.Title,
"dict": func(values ...interface{}) (map[string]interface{}, error) {
if len(values)%2 != 0 {
return nil, fmt.Errorf("invalid dict call: missing key or value")
}
dict := make(map[string]interface{}, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].(string)
if !ok {
return nil, fmt.Errorf("dict keys must be strings")
}
dict[key] = values[i+1]
}
return dict, nil
},
"len": func(v interface{}) int {
return reflect.ValueOf(v).Len()
},
"camelToPath": func(s string) string {
re := regexp.MustCompile("([a-z0-9])([A-Z])")
result := re.ReplaceAllString(s, "$1/$2")
return "/" + strings.ToLower(result)
},
}
if interfaceNames.Children == nil || len(interfaceNames.Children) == 0 {
return nil
}
app := container.MustMake(contract.AppKey).(contract.App)
moduleFolder := app.HttpModuleFolder()
pModuleFolder := filepath.Join(moduleFolder, name)
util.EnsureDir(pModuleFolder)
{
// module 目录下 创建 服务包
// 创建api 我呢见
{
// 创建 api.go
file := filepath.Join(pModuleFolder, "api.go")
f, err := os.Create(file)
if err != nil {
return err
}
data["interfaces"] = interfaceNames // 传递嵌套的接口名称映射
t := template.Must(template.New("api").Funcs(funcs).Parse(apiTmp))
if err := t.Execute(f, data); err != nil {
return err
}
}
// 创建api_controller文件
{
tmpl := template.Must(template.New("controller").Funcs(funcs).Parse(apiControllerTmp))
data["packageName"] = name
data["structName"] = name
// 递归生成控制器文件
if err := generateControllers(interfaceNames, []string{}, tmpl, data, pModuleFolder); err != nil {
fmt.Println("生成控制器失败:", err)
return err
}
}
//创建 dto 文件
{
// 创建dto.go
file := filepath.Join(pModuleFolder, "dto.go")
f, err := os.Create(file)
if err != nil {
return err
}
t := template.Must(template.New("dto").Funcs(funcs).Parse(dtoTmp))
if err := t.Execute(f, data); err != nil {
return err
}
}
//创建 mapper 文件
{
// 创建mapper.go
file := filepath.Join(pModuleFolder, "mapper.go")
f, err := os.Create(file)
if err != nil {
return err
}
t := template.Must(template.New("mapper").Funcs(funcs).Parse(mapperTmp))
if err := t.Execute(f, data); err != nil {
return err
}
}
}
fmt.Println("创建接口文件成功")
return nil
},
}
var providerCreateCommand = &cobra.Command{
Use: "create",
Aliases: []string{"create", "init"},
@ -168,6 +346,10 @@ var providerCreateCommand = &cobra.Command{
"len": func(v interface{}) int {
return reflect.ValueOf(v).Len()
},
"camelToPath": func(s string) string {
re := regexp.MustCompile("([A-Z])")
return "/" + re.ReplaceAllString(s, "/$1")
},
}
{
@ -584,7 +766,19 @@ import (
"github.com/Superdanda/hade/framework/gin"
)
type {{.methodName}}Param struct {}
// {{.methodName}} handler
// @Summary 输入你的接口总结
// @Description 输入你的接口总结详情
// @ID 接口Id
// @Tags 接口tag
// @Accept json
// @Produce json
// @Param {{.methodName}}Param body {{.methodName}}Param true "输入参数描述"
// @Success 200 {object} result.Result "返回成功的流程定义数据"
// @Failure 500 {object} result.Result "返回失败的流程定义数据"
// @Router {{.methodName | camelToPath}} [post]
func (api *{{.structName | title}}Api) {{.methodName}}(c *gin.Context) {
// TODO: Implement {{.methodName}}
}

View File

@ -0,0 +1,9 @@
package contract
import "os"
type FileService interface {
UploadFile(fileName string, file os.File) error
DownloadFile(fileName string) (os.File, error)
DeleteFile(fileName string) error
}

View File

@ -14,27 +14,27 @@ type KafkaOption func(container framework.Container, config *KafkaConfig) error
// KafkaService 表示一个 Kafka 服务
type KafkaService interface {
// GetClient 获取kafka
GetClient(option ...KafkaOption) (sarama.Client, error)
GetClientDefault() (sarama.Client, error)
// GetProducer 获取 Kafka 同步生产者实例
GetProducer(option ...KafkaOption) (sarama.SyncProducer, error)
GetProducerDefault() (sarama.SyncProducer, error)
// GetAsyncProducer 获取 Kafka 异步生产者实例
GetAsyncProducer(option ...KafkaOption) (sarama.AsyncProducer, error)
GetAsyncProducerDefault() (sarama.AsyncProducer, error)
// GetConsumer 获取 Kafka 消费者实例
GetConsumer(option ...KafkaOption) (sarama.Consumer, error)
GetConsumerDefault() (sarama.Consumer, error)
// GetConsumerGroup 获取 Kafka 消费者组实例
GetConsumerGroup(groupID string, topics []string, option ...KafkaOption) (sarama.ConsumerGroup, error)
GetConsumerGroupDefault(groupID string, topics []string) (sarama.ConsumerGroup, error)
}
// KafkaConfig 为 Kafka 定义的配置结构
type KafkaConfig struct {
// 基础配置
Brokers []string // Kafka broker 列表
//// 生产者配置
//Producer *sarama.Config // 生产者配置
//// 消费者配置
//Consumer *sarama.Config // 消费者配置
//// 消费者组配置
//ConsumerGroup *sarama.Config // 消费者组配置
ClientConfig *sarama.Config // kafka 配置
}

View File

@ -28,31 +28,8 @@ type QueueService interface {
}
type Event interface {
EventID() int64 // 事件唯一标识
GetEventKey() string // 事件唯一标识
EventTopic() string // 事件类型
EventTimestamp() int64 // 事件发生时间
Payload() interface{} // 事件负载
}
type BaseEvent struct {
ID int64 `json:"id"`
Topic string `json:"topic"`
Timestamp int64 `json:"timestamp"`
Data interface{} `json:"data"`
}
func (e *BaseEvent) EventTopic() string {
return e.Topic
}
func (e *BaseEvent) EventID() int64 {
return e.ID
}
func (e *BaseEvent) EventTimestamp() int64 {
return e.Timestamp
}
func (e *BaseEvent) Payload() interface{} {
return e.Data
}

View File

@ -0,0 +1,32 @@
package file
import (
"github.com/Superdanda/hade/framework"
)
type FileProvider struct{}
func (f FileProvider) Register(container framework.Container) framework.NewInstance {
//TODO implement me
panic("implement me")
}
func (f FileProvider) Boot(container framework.Container) error {
//TODO implement me
panic("implement me")
}
func (f FileProvider) IsDefer() bool {
//TODO implement me
panic("implement me")
}
func (f FileProvider) Params(container framework.Container) []interface{} {
//TODO implement me
panic("implement me")
}
func (f FileProvider) Name() string {
//TODO implement me
panic("implement me")
}

View File

@ -0,0 +1,29 @@
package file
import (
"github.com/Superdanda/hade/framework"
"os"
)
type HadeFileService struct {
container framework.Container
}
func NewHadeFileService(container framework.Container) *HadeFileService {
return &HadeFileService{container: container}
}
func (h HadeFileService) UploadFile(fileName string, file os.File) error {
//TODO implement me
panic("implement me")
}
func (h HadeFileService) DownloadFile(fileName string) (os.File, error) {
//TODO implement me
panic("implement me")
}
func (h HadeFileService) DeleteFile(fileName string) error {
//TODO implement me
panic("implement me")
}

View File

@ -17,13 +17,20 @@ type HadeKafka struct {
lock *sync.RWMutex
}
func NewHadeKafka(params ...interface{}) (interface{}, error) {
hadeKafka := &HadeKafka{}
container := params[0]
hadeKafka.container = container.(framework.Container)
hadeKafka.clients = make(map[string]sarama.Client)
hadeKafka.lock = new(sync.RWMutex)
return hadeKafka, nil
func (k HadeKafka) GetClientDefault() (sarama.Client, error) {
// 读取默认配置
config := GetBaseConfig(k.container)
return k.getClientByConfig(config)
}
func (k HadeKafka) GetConsumerDefault() (sarama.Consumer, error) {
config := GetBaseConfig(k.container)
return k.getConsumer(k.GetClientDefault, config)
}
func (k HadeKafka) GetConsumerGroupDefault(groupID string, topics []string) (sarama.ConsumerGroup, error) {
config := GetBaseConfig(k.container)
return k.getConsumerGroup(k.GetClientDefault, config, groupID, topics)
}
func (k HadeKafka) GetClient(option ...contract.KafkaOption) (sarama.Client, error) {
@ -35,6 +42,72 @@ func (k HadeKafka) GetClient(option ...contract.KafkaOption) (sarama.Client, err
return nil, err
}
}
return k.getClientByConfig(config)
}
// 内部方法:获取消费者组
func (k HadeKafka) getConsumerGroup(
getClientFunc func() (sarama.Client, error),
config *contract.KafkaConfig,
groupID string,
topics []string) (sarama.ConsumerGroup, error) {
uniqKey := config.UniqKey() + "_" + groupID
k.lock.RLock()
if group, ok := k.consumerGroups[uniqKey]; ok {
k.lock.RUnlock()
return group, nil
}
k.lock.RUnlock()
client, err := getClientFunc()
if err != nil {
return nil, err
}
k.lock.Lock()
defer k.lock.Unlock()
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
return nil, err
}
k.consumerGroups[uniqKey] = consumerGroup
return consumerGroup, nil
}
// 内部方法:获取消费者
func (k HadeKafka) getConsumer(
getClientFunc func() (sarama.Client, error),
config *contract.KafkaConfig) (sarama.Consumer, error) {
uniqKey := config.UniqKey()
k.lock.RLock()
if consumer, ok := k.consumers[uniqKey]; ok {
k.lock.RUnlock()
return consumer, nil
}
k.lock.RUnlock()
client, err := getClientFunc()
if err != nil {
return nil, err
}
k.lock.Lock()
defer k.lock.Unlock()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, err
}
k.consumers[uniqKey] = consumer
return consumer, nil
}
func NewHadeKafka(params ...interface{}) (interface{}, error) {
hadeKafka := &HadeKafka{}
container := params[0]
hadeKafka.container = container.(framework.Container)
hadeKafka.clients = make(map[string]sarama.Client)
hadeKafka.lock = new(sync.RWMutex)
return hadeKafka, nil
}
func (k HadeKafka) getClientByConfig(config *contract.KafkaConfig) (sarama.Client, error) {
uniqKey := config.UniqKey()
// 判断是否已经实例化了kafka.Client
k.lock.RLock()
@ -64,22 +137,63 @@ func (k HadeKafka) GetProducer(option ...contract.KafkaOption) (sarama.SyncProdu
return nil, err
}
}
uniqKey := config.UniqKey()
return k.getProducer(func() (sarama.Client, error) {
return k.GetClient(option...)
}, config)
}
func (k HadeKafka) GetProducerDefault() (sarama.SyncProducer, error) {
// 读取默认配置
config := GetBaseConfig(k.container)
return k.getProducer(k.GetClientDefault, config)
}
func (k HadeKafka) GetAsyncProducerDefault() (sarama.AsyncProducer, error) {
config := GetBaseConfig(k.container)
return k.getAsyncProducer(k.GetClientDefault, config)
}
// 内部方法:获取异步生产者
func (k HadeKafka) getAsyncProducer(
getClientFunc func() (sarama.Client, error),
config *contract.KafkaConfig) (sarama.AsyncProducer, error) {
uniqKey := config.UniqKey()
k.lock.RLock()
if producer, ok := k.asyncProducers[uniqKey]; ok {
k.lock.RUnlock()
return producer, nil
}
k.lock.RUnlock()
client, err := getClientFunc()
if err != nil {
return nil, err
}
k.lock.Lock()
defer k.lock.Unlock()
asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
k.asyncProducers[uniqKey] = asyncProducer
return asyncProducer, nil
}
func (k HadeKafka) getProducer(
getClientFunc func() (sarama.Client, error), // 传入一个返回 sarama.Client 的函数
config *contract.KafkaConfig) (sarama.SyncProducer, error) {
uniqKey := config.UniqKey()
// 判断是否已经实例化了kafka.Client
k.lock.RLock()
if producer, ok := k.syncProducers[uniqKey]; ok {
k.lock.Lock()
return producer, nil
}
k.lock.RUnlock()
client, err := k.GetClient(option...)
client, err := getClientFunc()
if err != nil {
return nil, err
}
k.lock.Lock()
defer k.lock.Unlock()
syncProducer, err := sarama.NewSyncProducerFromClient(client)
@ -99,31 +213,9 @@ func (k HadeKafka) GetAsyncProducer(option ...contract.KafkaOption) (sarama.Asyn
return nil, err
}
}
uniqKey := config.UniqKey()
// 检查是否已实例化 asyncProducer
k.lock.RLock()
if producer, ok := k.asyncProducers[uniqKey]; ok {
k.lock.RUnlock()
return producer, nil
}
k.lock.RUnlock()
// 获取 client 实例
client, err := k.GetClient(option...)
if err != nil {
return nil, err
}
// 创建并保存 asyncProducer
k.lock.Lock()
defer k.lock.Unlock()
asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
k.asyncProducers[uniqKey] = asyncProducer
return asyncProducer, nil
return k.getAsyncProducer(func() (sarama.Client, error) {
return k.GetClient(option...)
}, config)
}
func (k HadeKafka) GetConsumer(option ...contract.KafkaOption) (sarama.Consumer, error) {
@ -135,31 +227,9 @@ func (k HadeKafka) GetConsumer(option ...contract.KafkaOption) (sarama.Consumer,
return nil, err
}
}
uniqKey := config.UniqKey()
// 检查是否已实例化 consumer
k.lock.RLock()
if consumer, ok := k.consumers[uniqKey]; ok {
k.lock.RUnlock()
return consumer, nil
}
k.lock.RUnlock()
// 获取 client 实例
client, err := k.GetClient(option...)
if err != nil {
return nil, err
}
// 创建并保存 consumer
k.lock.Lock()
defer k.lock.Unlock()
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, err
}
k.consumers[uniqKey] = consumer
return consumer, nil
return k.getConsumer(func() (sarama.Client, error) {
return k.GetClient(option...)
}, config)
}
func (k HadeKafka) GetConsumerGroup(groupID string, topics []string, option ...contract.KafkaOption) (sarama.ConsumerGroup, error) {
@ -171,29 +241,9 @@ func (k HadeKafka) GetConsumerGroup(groupID string, topics []string, option ...c
return nil, err
}
}
uniqKey := config.UniqKey() + "_" + groupID
// 检查是否已实例化 consumerGroup
k.lock.RLock()
if group, ok := k.consumerGroups[uniqKey]; ok {
k.lock.RUnlock()
return group, nil
}
k.lock.RUnlock()
// 获取 client 实例
client, err := k.GetClient(option...)
if err != nil {
return nil, err
}
// 创建并保存 consumerGroup
k.lock.Lock()
defer k.lock.Unlock()
consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
return nil, err
}
k.consumerGroups[uniqKey] = consumerGroup
return consumerGroup, nil
return k.getConsumerGroup(
func() (sarama.Client, error) {
return k.GetClient(option...)
}, config, groupID, topics,
)
}

View File

@ -152,12 +152,17 @@ func (m *MemoryQueueService) Close() error {
type DefaultBaseEvent struct {
ID int64 `gorm:"primaryKey;type:bigint" json:"id"`
EventKey string `gorm:"type:varchar(255);not null" json:"event_key"`
Topic string `gorm:"type:varchar(50);not null" json:"topic"`
Timestamp int64 `gorm:"autoCreateTime:milli" json:"timestamp"`
Data string `gorm:"type:json" json:"data"` // 将数据存储为 JSON 格式
CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"`
}
func (d *DefaultBaseEvent) GetEventKey() string {
return d.EventKey
}
func (d *DefaultBaseEvent) EventID() int64 {
return d.ID
}

View File

@ -2,8 +2,13 @@ package services
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/Superdanda/hade/framework"
"github.com/Superdanda/hade/framework/contract"
"github.com/google/uuid"
"time"
)
type KafkaQueueService struct {
@ -19,37 +24,216 @@ func NewKafkaQueueService(params ...interface{}) (interface{}, error) {
return kafkaQueueService, nil
}
type KafkaEvent struct {
EventKey string // 事件唯一标识
Topic string // 事件主题
Timestamp int64 // 事件时间戳
Data interface{} // 事件负载
}
func convertToMessage(e contract.Event) (*sarama.ProducerMessage, error) {
data, err := json.Marshal(e)
if err != nil {
return nil, err
}
// 创建Kafka消息
message := &sarama.ProducerMessage{
Topic: e.EventTopic(),
Key: sarama.StringEncoder(e.GetEventKey()),
Value: sarama.StringEncoder(data), // 假设payload是字符串
}
return message, nil
}
// GetEventKey 实现 EventID 方法
func (e *KafkaEvent) GetEventKey() string {
return e.EventKey
}
// EventTopic 实现 EventTopic 方法
func (e *KafkaEvent) EventTopic() string {
return e.Topic
}
// EventTimestamp 实现 EventTimestamp 方法
func (e *KafkaEvent) EventTimestamp() int64 {
return e.Timestamp
}
// Payload 实现 Payload 方法
func (e *KafkaEvent) Payload() interface{} {
return e.Data
}
func (k KafkaQueueService) PublishEvent(ctx context.Context, event contract.Event) error {
//TODO implement me
panic("implement me")
producer, err := k.kafkaService.GetProducer()
if err != nil {
return err
}
producerMessage, err := convertToMessage(event)
if err != nil {
return err
}
// 发送消息
_, _, err = producer.SendMessage(producerMessage)
return err
}
func (k KafkaQueueService) SubscribeEvent(ctx context.Context, topic string, handler func(event contract.Event) error) error {
//TODO implement me
panic("implement me")
consumer, err := k.kafkaService.GetConsumer()
if err != nil {
return err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
return err
}
go func() {
for {
select {
case msg := <-partitionConsumer.Messages():
event := &KafkaEvent{
EventKey: string(msg.Key),
Topic: msg.Topic,
Timestamp: time.Now().Unix(),
Data: string(msg.Value),
}
handler(event)
case <-ctx.Done():
return
}
}
}()
return nil
}
func (k KafkaQueueService) ReplayEvents(ctx context.Context, topic string, fromID string, fromTimestamp int64, handler func(event contract.Event) error) error {
//TODO implement me
panic("implement me")
consumer, err := k.kafkaService.GetConsumer()
if err != nil {
return err
}
// 使用时间戳来设定偏移量,确保从某个时间点开始消费
offset := sarama.OffsetOldest // 默认从最早的消息开始消费
if fromTimestamp > 0 {
// 设置为从某个时间戳的消息开始
offset = sarama.OffsetNewest // 这里假设你通过时间戳来决定是否是最新的
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset)
if err != nil {
return err
}
go func() {
for {
select {
case msg := <-partitionConsumer.Messages():
event := &KafkaEvent{
EventKey: string(msg.Key),
Topic: msg.Topic,
Timestamp: time.Now().Unix(),
Data: string(msg.Value),
}
handler(event)
case <-ctx.Done():
return
}
}
}()
return nil
}
func (k KafkaQueueService) GetEventById(ctx context.Context, topic string, eventID string) (contract.Event, error) {
//TODO implement me
panic("implement me")
consumer, err := k.kafkaService.GetConsumer()
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
return nil, err
}
var result *KafkaEvent
go func() {
for msg := range partitionConsumer.Messages() {
if string(msg.Key) == eventID {
result = &KafkaEvent{
EventKey: string(msg.Key),
Topic: msg.Topic,
Timestamp: time.Now().Unix(),
Data: string(msg.Value),
}
break
}
}
}()
// 等待事件处理完成
time.Sleep(2 * time.Second) // 可以改为一个更合理的超时处理逻辑
if result == nil {
return nil, fmt.Errorf("event with ID %s not found", eventID)
}
return result, nil
}
func (k KafkaQueueService) GetEventByTime(ctx context.Context, topic string, fromTimestamp int64) (contract.Event, error) {
//TODO implement me
panic("implement me")
consumer, err := k.kafkaService.GetConsumer()
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
return nil, err
}
var result *KafkaEvent
go func() {
for msg := range partitionConsumer.Messages() {
// 假设消息的时间戳存在Data字段中进行时间戳筛选
if msg.Timestamp.Unix() >= fromTimestamp {
result = &KafkaEvent{
EventKey: string(msg.Key),
Topic: msg.Topic,
Timestamp: msg.Timestamp.Unix(),
Data: string(msg.Value),
}
break
}
}
}()
// 等待事件处理完成
time.Sleep(2 * time.Second) // 可以改为一个更合理的超时处理逻辑
if result == nil {
return nil, fmt.Errorf("event not found from timestamp %d", fromTimestamp)
}
return result, nil
}
func (k KafkaQueueService) Close() error {
//TODO implement me
panic("implement me")
// 如果有关闭消费者的逻辑,或者其他资源清理操作,可以在这里实现
// 示例:关闭所有消费者
//for _, consumer := range k.kafkaService.GetConsumers() {
// consumer.Close() // 关闭消费者连接
//}
return nil
}
func (k KafkaQueueService) NewEventAndPublish(ctx context.Context, topic string, payload interface{}) error {
//TODO implement me
panic("implement me")
// 生成新的事件
event := &KafkaEvent{
EventKey: uuid.New().String(), // 使用 UUID 作为事件 ID
Topic: topic,
Timestamp: time.Now().Unix(),
Data: payload,
}
return k.PublishEvent(ctx, event)
}