Commit 3eaec6e3 by 吴凯波

云南同步文件代码

parent 99cf28ce
package com.ihooyah.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 云南项目读取文件到指定目录
*/
public class YunnanFileSink extends AbstractSink implements Configurable, BatchSizeSupported {
private static final Logger logger = LoggerFactory.getLogger(LocalUploadSink.class);
private int batchSize;
private static final int DEFAULT_BATCH_SIZE = 1000;
private SinkCounter sinkCounter;
private String fileReadName;
private CounterGroup counterGroup = new CounterGroup();
@Override
public Status process() throws EventDeliveryException {
Map<String , List<Event>> eventMap=new HashMap<>();
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
for (int i = 0; i < batchSize; i++) {
Event event = channel.take();
if (event == null) {
break;
}
fileReadName=event.getHeaders().get("fileName").toString();
if(fileReadName.endsWith(".json")){
continue;
}
this.counterGroup.incrementAndGet("event.file");
List<Event> eventList=eventMap.get(fileReadName);
if (eventList==null){
eventList=new ArrayList<>();
}
eventList.add(event);
eventMap.put(fileReadName,eventList);
}
for (Map.Entry<String , List<Event>> data:eventMap.entrySet()) {
dealData(data.getKey(),data.getValue());
}
transaction.commit();
} catch (Exception e) {
transaction.rollback();
e.printStackTrace();
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
batchSize = context.getInteger("batchSize", DEFAULT_BATCH_SIZE);
}
public void dealData(String fileName,List<Event> eventList) throws IOException {
if (eventList.size() == 0) {
return;
}
fileName=fileName.replaceAll(",","\\/");
String uploadDir="/"+fileName;
logger.info("deal data numbers: {}", eventList.size());
File localFile = new File(uploadDir);
FileOutputStream fos = new FileOutputStream(localFile);
for (int i = 0; i < eventList.size(); i++) {
fos.write(eventList.get(i).getBody());
}
fos.flush();
fos.close();
}
@Override
public long getBatchSize() {
return batchSize;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment