java 仿 go channel 队列
Coding
2025-11-21 16:38:15
func main() {
queue := make(chan any, 1024)
wg := sync.WaitGroup{}
wg.Go(func() {
for data := range queue {
fmt.Println(data)
}
})
for i := 0; i < 1024; i++ {
queue <- i
}
close(queue)
wg.Wait()
}
package com.example.jtool;
import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class ImitateGoroutinue {
Gson g = new Gson();
int selectLimit = 333;
@Data
public static class Params {
Long startId;
Long endId;
}
@Data
@AllArgsConstructor
public static class TaskParams {
Long id;
String type;
}
public void execute(String s) throws Exception {
Params params = g.fromJson(s, Params.class);
int maxThread = 9;
BlockingQueue<TaskParams> queue = new LinkedBlockingQueue<>(1000);
ExecutorService executor = Executors.newFixedThreadPool(maxThread);
AtomicBoolean done = new AtomicBoolean(false);
Thread producer = getThread(params, queue, done);
for (int i = 1; i <= maxThread; i++) {
executor.submit(() -> {
try {
while (!done.get() || !queue.isEmpty()) {
TaskParams item = queue.poll(1, TimeUnit.SECONDS);
if (item == null) {
System.out.println("获取失败");
continue;
}
System.out.println("taskItem finish " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 保持中断状态
}
System.out.println("消费退出");
});
}
producer.join();
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("等待任务结束");
Thread.sleep(300);
}
System.out.println("运行结束");
return;
}
private Thread getThread(Params params, BlockingQueue<TaskParams> queue, AtomicBoolean done) {
Thread producer = new Thread(() -> {
try {
for (Integer i = 1; i<=100;i++ ) {
queue.put(new TaskParams(i.longValue(),"action" ));
}
done.set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
return producer;
}
public static void main(String[] args) throws Exception {
(new ImitateGoroutinue()).execute("{}");
}
}