DataX Core transport
reader -> queue -> writer
这是一个极其简化的代码示例 reader -> queue -> writer
// demo Exchanger
private class DemoExchanger {
// 用于从reader到writer传输record的队列
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(128);
// writer
private final Runnable writer = new Runnable() {
private final int batchSize = 4;
// record接收器
private Object rev(){
return doPoll();
}
@Override
public void run() {
System.out.println("writer:");
Object o;
// 写入批
List<Object> batch = new ArrayList<>(batchSize);
while (null != (o = rev())) {
batch.add(o);
if (batch.size() >= batchSize) {
batch.forEach(System.out::println);
batch.clear();
}
}
if (batch.size() >= batchSize) {
batch.forEach(System.out::println);
batch.clear();
}
}
};
// reader
private final Runnable reader = new Runnable() {
// record 发送器
private void send(Object o){
doPush(o);
}
@Override
public void run() {
Random random = new Random();
int i = 0;
System.out.println("reader:");
// 生成数据
while (true) {
if (i > 12) {
i = 0;
}
send(random.nextLong());
i++;
}
}
};
/**
* reader push
* @param o record
*/
private void doPush(Object o) {
try {
queue.put(o);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
/**
* writer poll
* @return
*/
private Object doPoll() {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
public void test(){
Thread writerThread = new Thread(writer);
Thread readerThread = new Thread(reader);
//先启动writer 再启动reader
writerThread.start();
readerThread.start();
}
}