登录

java 仿 go channel 队列

Coding
2025-11-21 16:38:15
func main() {
	queue := make(chan any, 1024)

	wg := sync.WaitGroup{}

	wg.Go(func() {
		for data := range queue {
			fmt.Println(data)
		}
	})

	for i := 0; i < 1024; i++ {
		queue <- i
	}
	close(queue)
	wg.Wait()
}
package com.example.jtool;

import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class ImitateGoroutinue {

    Gson g           = new Gson();
    int  selectLimit = 333;

    @Data
    public static class Params {
        Long startId;
        Long endId;
    }

    @Data
    @AllArgsConstructor
    public static class TaskParams {
        Long   id;
        String type;
    }

    public void execute(String s) throws Exception {
        Params                    params    = g.fromJson(s, Params.class);
        int                       maxThread = 9;
        BlockingQueue<TaskParams> queue     = new LinkedBlockingQueue<>(1000);
        ExecutorService           executor  = Executors.newFixedThreadPool(maxThread);
        AtomicBoolean             done      = new AtomicBoolean(false);
        Thread                    producer  = getThread(params, queue, done);
        for (int i = 1; i <= maxThread; i++) {
            executor.submit(() -> {
                try {
                    while (!done.get() || !queue.isEmpty()) {
                        TaskParams item = queue.poll(1, TimeUnit.SECONDS);
                        if (item == null) {
                            System.out.println("获取失败");
                            continue;
                        }
                        System.out.println("taskItem finish " + item);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 保持中断状态
                }
                System.out.println("消费退出");
            });
        }
        producer.join();
        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("等待任务结束");
            Thread.sleep(300);
        }
        System.out.println("运行结束");
        return;
    }


    private Thread getThread(Params params, BlockingQueue<TaskParams> queue, AtomicBoolean done) {
        Thread producer = new Thread(() -> {
            try {
                for (Integer i = 1; i<=100;i++ ) {
                    queue.put(new TaskParams(i.longValue(),"action" ));

                }
                done.set(true);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        producer.start();
        return producer;
    }

    public static void main(String[] args) throws Exception {
        (new ImitateGoroutinue()).execute("{}");
    }
}

加入讨论

登录或注册以发表评论