Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Q&A #17

Open
qinguoyi opened this issue Jan 6, 2024 · 3 comments
Open

Q&A #17

qinguoyi opened this issue Jan 6, 2024 · 3 comments
Labels
documentation Improvements or additions to documentation

Comments

@qinguoyi
Copy link
Owner

qinguoyi commented Jan 6, 2024

  1. 为什么需要分布式转发?
    (1) 文件需要先在本地校验合并后,再上传到对象存储;大文件分片时,数据需要转发到一个机器上方便合并。
    (2) 如果文件上传失败,对象存储中不存在脏数据

  2. 如果不在本地中转,直接代理到对象存储是否可行?
    可以,s3协议的对象存储支持分片合并,同时会清理长时间不使用的数据。

  3. 这个项目的主要目的是什么?
    屏蔽底层对象存储,方便底层数据迁移。

@qinguoyi qinguoyi added the documentation Improvements or additions to documentation label Jan 6, 2024
@bao11seven
Copy link
Contributor

作者你好,本项目若在集群部署,那么每一个节点将会有自己的任务生产者与消费者,但我发现每次生产者定时查询DB获取任务时,只根据任务状态进行查询,那么是否可能会获取到其它节点的任务,导致handler在处理时,当前节点并没有文件而导致任务失败。我认为是否应该在发布任务时,插入当前节点ip到DB,而在生产者获取任务时根据status与node_ip进行获取任务进行处理,使得当前节点消费者只处理当前节点的任务。

@qinguoyi
Copy link
Owner Author

作者你好,本项目若在集群部署,那么每一个节点将会有自己的任务生产者与消费者,但我发现每次生产者定时查询DB获取任务时,只根据任务状态进行查询,那么是否可能会获取到其它节点的任务,导致handler在处理时,当前节点并没有文件而导致任务失败。我认为是否应该在发布任务时,插入当前节点ip到DB,而在生产者获取任务时根据status与node_ip进行获取任务进行处理,使得当前节点消费者只处理当前节点的任务。

感谢关注,这是一个好问题,因为文件分片相关的任务,需要和node_ip来关联执行。

我们从问题思路,项目设计,代码优化几个方面来看下,

问题思路

这里从你的疑问点出发,解释下为什么不使用node_ip。

首先,使用容器部署,每次重新部署后,node_ip都会发生变化(你可以使用固定ip,但这个应用没必要),会导致有些未执行的任务不被执行。当然,你可以另外写逻辑,使用脚本来兜底。

另外,这是一个通用的异步任务设计,如果不涉及文件分片相关的任务,希望能多副本抢占任务,而不是只能单个副本+node_ip执行,比如发送邮件等。

项目设计

这里从现有逻辑来看,解释下为什么可以正常运行。

集群部署时,每个副本都会启动本地生产者和消费者,两者通过JobQueue这个chan来通信,启动时,生产者从数据库中获取所有任务,通过preProcess(预处理)来判断是否文件在本地,再去抢占任务,这样的话,消费者拿到的都是文件在本地的任务。

case <-timer.C:
var lgDB = new(plugins.LangGoDB).Use("default").NewDB()
undoTaskList, _ := repo.NewTaskRepo().FindByStatus(lgDB, utils.TaskStatusUndo)
for _, i := range undoTaskList {
// 抢占前处理
preProcess := event.NewEventsHandler().GetPreProcess(i.TaskType)
if preProcess != nil {
if f := preProcess(i.ID); !f {
continue
}
}
// 抢占任务
affectRow := repo.NewTaskRepo().PreemptiveTaskByID(lgDB, i.ID, ip)
if affectRow != 0 {
JobQueue <- Job{
TaskID: i.ID,
TaskType: i.TaskType,
}
}
}

代码优化

分析完之后,发现partDelete任务的预处理确实有点问题,需要增加判断uid是否在本地的逻辑,在下面的代码中:

func preProcessPartDelete(i interface{}) bool {
lgDB := new(plugins.LangGoDB).Use("default").NewDB()
taskID := i.(int64)
taskInfo, err := repo.NewTaskRepo().GetByID(lgDB, taskID)
if err != nil {
fmt.Printf("任务不存在%v", err)
return false
}
// 反序列extraData
var msg models.MergeInfo
if err := json.Unmarshal([]byte(taskInfo.ExtraData), &msg); err != nil {
fmt.Printf("任务不存在%v", err)
return false
}
return true
}

新增partMerge类似逻辑的代码:
dirName := path.Join(utils.LocalStore, fmt.Sprintf("%d", msg.StorageUid))
if _, err := os.Stat(dirName); os.IsNotExist(err) {
return false
} else {
return true
}

请问,你能提交一个commit来修复这个问题吗?非常感谢。

@bao11seven
Copy link
Contributor

bao11seven commented Sep 22, 2024

u> > 作者你好,本项目若在集群部署,那么每一个节点将会有自己的任务生产者与消费者,但我发现每次生产者定时查询DB获取任务时,只根据任务状态进行查询,那么是否可能会获取到其它节点的任务,导致handler在处理时,当前节点并没有文件而导致任务失败。我认为是否应该在发布任务时,插入当前节点ip到DB,而在生产者获取任务时根据status与node_ip进行获取任务进行处理,使得当前节点消费者只处理当前节点的任务。

感谢关注,这是一个好问题,因为文件分片相关的任务,需要和node_ip来关联执行。

我们从问题思路,项目设计,代码优化几个方面来看下,

问题思路

这里从你的疑问点出发,解释下为什么不使用node_ip。

首先,使用容器部署,每次重新部署后,node_ip都会发生变化(你可以使用固定ip,但这个应用没必要),会导致有些未执行的任务不被执行。当然,你可以另外写逻辑,使用脚本来兜底。

另外,这是一个通用的异步任务设计,如果不涉及文件分片相关的任务,希望能多副本抢占任务,而不是只能单个副本+node_ip执行,比如发送邮件等。

项目设计

这里从现有逻辑来看,解释下为什么可以正常运行。

集群部署时,每个副本都会启动本地生产者和消费者,两者通过JobQueue这个chan来通信,启动时,生产者从数据库中获取所有任务,通过preProcess(预处理)来判断是否文件在本地,再去抢占任务,这样的话,消费者拿到的都是文件在本地的任务。

case <-timer.C:
var lgDB = new(plugins.LangGoDB).Use("default").NewDB()
undoTaskList, _ := repo.NewTaskRepo().FindByStatus(lgDB, utils.TaskStatusUndo)
for _, i := range undoTaskList {
// 抢占前处理
preProcess := event.NewEventsHandler().GetPreProcess(i.TaskType)
if preProcess != nil {
if f := preProcess(i.ID); !f {
continue
}
}
// 抢占任务
affectRow := repo.NewTaskRepo().PreemptiveTaskByID(lgDB, i.ID, ip)
if affectRow != 0 {
JobQueue <- Job{
TaskID: i.ID,
TaskType: i.TaskType,
}
}
}

代码优化

分析完之后,发现partDelete任务的预处理确实有点问题,需要增加判断uid是否在本地的逻辑,在下面的代码中:

func preProcessPartDelete(i interface{}) bool {
lgDB := new(plugins.LangGoDB).Use("default").NewDB()
taskID := i.(int64)
taskInfo, err := repo.NewTaskRepo().GetByID(lgDB, taskID)
if err != nil {
fmt.Printf("任务不存在%v", err)
return false
}
// 反序列extraData
var msg models.MergeInfo
if err := json.Unmarshal([]byte(taskInfo.ExtraData), &msg); err != nil {
fmt.Printf("任务不存在%v", err)
return false
}
return true
}

新增partMerge类似逻辑的代码:

dirName := path.Join(utils.LocalStore, fmt.Sprintf("%d", msg.StorageUid))
if _, err := os.Stat(dirName); os.IsNotExist(err) {
return false
} else {
return true
}

请问,你能提交一个commit来修复这个问题吗?非常感谢。

当然可以,这是我的荣幸,也非常感谢您解答我的疑惑

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

2 participants