DataX Core transport

reader -> queue -> writer

这是一个极其简化的代码示例 reader -> queue -> writer

    // demo Exchanger 
    private class DemoExchanger {
        // 用于从reader到writer传输record的队列
        private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(128);
        // writer 
        private final Runnable writer = new Runnable() {
            private final int batchSize = 4;
            // record接收器
            private Object rev(){
                return doPoll();
            }
            @Override
            public void run() {
                System.out.println("writer:");
                Object o;
                // 写入批
                List<Object> batch = new ArrayList<>(batchSize);
                while (null != (o = rev())) {
                    batch.add(o);
                    if (batch.size() >= batchSize) {
                        batch.forEach(System.out::println);
                        batch.clear();
                    }
                }
                if (batch.size() >= batchSize) {
                    batch.forEach(System.out::println);
                    batch.clear();
                }
            }
        };

        // reader
        private final Runnable reader = new Runnable() {
            // record 发送器
            private void send(Object o){
                doPush(o);
            }
            @Override
            public void run() {
                Random random = new Random();
                int i = 0;
                System.out.println("reader:");
                // 生成数据
                while (true) {
                    if (i > 12) {
                        i = 0;
                    }
                    send(random.nextLong());
                    i++;
                }

            }
        };

        /**
         * reader push
         * @param o record
         */
        private void doPush(Object o) {
            try {
                queue.put(o);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
        /**
         * writer poll
         * @return
         */
        private Object doPoll() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
        public void test(){
            
            Thread writerThread = new Thread(writer);
            Thread readerThread = new Thread(reader);
            //先启动writer 再启动reader
            writerThread.start();
            readerThread.start();

        }

    }