我有一个接收任务并将其放入频道的功能。每个任务都有ID,一些属性以及放置结果的通道。看起来像这样
task.Result = make(chan *TaskResult) queue <- task result := <-task.Result sendReponse(result)
另一个goroutine从通道中获取任务,对其进行处理,然后将结果放入任务的通道中
task := <-queue task.Result <- doExpensiveComputation(task)
此代码可以正常工作。但是现在我要合并任务queue。任务处理是一项非常昂贵的操作,因此我希望一次处理具有相同ID的队列中的所有任务。我看到了两种方法。
queue
首先,不要将具有相同ID的任务放入队列,因此,当现有任务到达时,它将等待其复制完成。这是伪代码
if newTask in queue { existing := queue.getById(newTask.ID) existing.waitForComplete() sendResponse(existing.ProcessingResult) } else { queue.enqueue(newTask) }
因此,我可以使用go通道和map来实现它以实现随机访问+一些同步手段,例如互斥体。我不喜欢这种方式的是,我必须在代码周围携带地图和通道,并使它们的内容保持同步。
第二种方法是将所有任务放入队列,但是在结果到达时从队列中提取任务和所有具有相同ID的任务,然后将结果发送到所有任务。这是伪代码
someTask := queue.dequeue() result := doExpensiveComputation(someTask) someTask.Result <- result moreTasks := queue.getAllWithID(someTask.ID) for _,theSameTask := range moreTasks { theSameTask.Result <- result }
我有一个想法,如何以与上述相同的方式使用chan + map +互斥体来实现此目的。
这是一个问题:是否存在一些内置/现有的数据结构可用于解决此类问题?还有其他(更好)的方法吗?
如果我正确地理解了问题,我想到的最简单的解决方案是在任务发送者(放入queue)和工作人员(从取出queue)之间添加中间层。这(可能是例行程序)将负责存储当前任务(按ID)并将结果广播到每个匹配的任务。
伪代码:
go func() { active := make(map[TaskID][]Task) for { select { case task := <-queue: tasks := active[task.ID] // No tasks with such ID, start heavy work if len(tasks) == 0 { worker <- task } // Save task for the result active[task.ID] = append(active[task.ID], task) case r := <-response: // Broadcast to all tasks for _, task := range active[r.ID] { task.Result <- r.Result } } } }()
不需要互斥体,也可能不需要携带任何东西,工作人员将只需要将所有结果放入中间层,然后可以正确地路由响应。如果冲突ID可能在一段时间内到达,您甚至可以在此处轻松添加缓存。
编辑: 我有一个梦想,上面的代码导致了死锁。如果您一次发送大量请求并阻塞了worker通道,那么这将是一个严重的问题–这个中间层例程正worker <- task等待工作人员完成,但是所有工作人员都可能无法发送到响应通道(因为我们的例程可以收集)。可播放的证明。
worker
worker <- task
可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以通过这种方式设计系统,使缓冲区永远不会填满)。有几种方法可以解决此问题;例如,您可以运行一个单独的例程来收集响应,但是随后您需要active使用互斥量保护地图。可行 您还可以worker <- task选择一个选项,该选项将尝试将任务发送给工作人员,接收新任务(如果没有要发送的内容)或收集响应。可以利用以下事实:零通道从未准备好进行通信(被选择忽略),因此您可以在一个选择中选择接收和发送任务。例:
active
go func() { var next Task // received task which needs to be passed to a worker in := queue // incoming channel (new tasks) -- active var out chan Task // outgoing channel (to workers) -- inactive for { select { case t := <-in: next = t // store task, so we can pass to worker in, out = nil, worker // deactivate incoming channel, activate outgoing case out <- next: in, out = queue, nil // deactivate outgoing channel, activate incoming case r := <-response: collect <- r } } }()
玩