java_thread_queue 任务不退出问题
coding
之前一直用go写的多消费者,因为go存在channel,所以可以用有缓channel模拟队列进行消费和退出,但是java不存在channel,所以要是用一些内存queue,结果由于退出条件的判断不一样,导致java的代码进入空等待。一只没有释放资源。
go 原始代码如下
func main(){
type Item struct {
value string
}
queue:= make(chan Item,100)
task := StartTogether(func(){
for item:= range queue{
fmt.Println(item)
}
},10)
for i:=1;i<=10000;i++{
queue<-Item{
value: fmt.Sprintf("%v",i),
}
}
close(queue)
task.Wait()
}
func StartTogether(job func(), counter int) *sync.WaitGroup {
var wg sync.WaitGroup
for i := 1; i <= counter; i++ {
wg.Add(1)
go func() {
defer wg.Done()
job()
}()
}
return &wg
}
一开始改造的java代码如下,这份代码会出现偶尔无法退出的情况。最后定位到是因为 Object item = queue.take();
发生了阻塞。
public class ThreatQueue {
private static final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(2000);
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
private static final AtomicBoolean done = new AtomicBoolean(false);
public static void main(String[] args) throws InterruptedException {
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 2; i++) {
queue.put(i);
}
done.set(true);
System.out.println("Produced all items");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
for (int i = 1; i <= 10; i++) {
executor.submit(() -> {
try {
while (!done.get() || !queue.isEmpty()) {
Object item = queue.take();
System.out.println(Thread.currentThread().getName() + "Consumed: " + item);
}
System.out.println("Consumed all items");
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 保持中断状态
}
});
}
producer.join();
executor.shutdown();
while (!executor.isTerminated()){
System.out.println("waiting");
Thread.sleep(100);
}
System.out.println("finish");
}
}
和go代码不同,go代码是range在消费完channel后,如果channel已经close,则会直接退出循环。那么当前goroutine退出,并执行wg.done()。整个过程完全借助于go本身的,所以整个托管过程是比较顺畅的。
对比之下由于java版本的代码要实现上面的功能。需要自己实现queue空判断,和任务结束。但是取值是在空判断之后,这就出现在并发场景下,可能判断非空,但是取值的时候已经没有任务了。导致 Object item = queue.take();
进入无尽的等待。从而!executor.isTerminated()
始终不处于结束
while (!done.get() || !queue.isEmpty()) {
Object item = queue.take(); // 问题的关键在于本行代码, queue.isEmpty 进入前为false,但是进入本行后queue不在有数据,数据被其他消费者消费完毕了。
关于此处的代码更改,可以尝试破除queue的阻塞取值逻辑,即更换为以下代码
Object item = queue.poll(1, TimeUnit.SECONDS);
if (item == null) {
System.out.println(Thread.currentThread().getName() + " 获取失败");
continue;
}
区别在于queue.poll(1, TimeUnit.SECONDS)
不会无限制的等待获取队列中的内容,而是1秒后如果取不到那么就返回一个null,我们需要判断一下是否为null,如果为null跳出本次循环,进入while()
判断,如果当前处于生产者不在产生数据,并且队列为就可以正常退出程序啦。
评论列表