Commit 13c14bc9 by 吴凯波

flume监控ftp保存文件到服务器

parent c3ad374e
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
<dependency> <dependency>
<groupId>org.apache.flume</groupId> <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId> <artifactId>flume-ng-core</artifactId>
<version>${version.flume}</version> <version>1.9.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flume</groupId> <groupId>org.apache.flume</groupId>
...@@ -31,6 +31,17 @@ ...@@ -31,6 +31,17 @@
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>6.0.5</version> <version>6.0.5</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.3</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -10,10 +10,11 @@ import org.slf4j.LoggerFactory; ...@@ -10,10 +10,11 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class LocalUploadSink extends AbstractSink implements Configurable, BatchSizeSupported { public class LocalUploadSink extends AbstractSink implements Configurable, BatchSizeSupported {
...@@ -29,13 +30,15 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch ...@@ -29,13 +30,15 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch
private String fileReadName; private String fileReadName;
private CounterGroup counterGroup = new CounterGroup();
@Override @Override
public Status process() throws EventDeliveryException { public Status process() throws EventDeliveryException {
Map<String , List<Event>> eventMap=new HashMap<>();
Status status = Status.READY; Status status = Status.READY;
Channel channel = getChannel(); Channel channel = getChannel();
Transaction transaction = channel.getTransaction(); Transaction transaction = channel.getTransaction();
transaction.begin(); transaction.begin();
List<Event> batch = new ArrayList<>();
try { try {
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
Event event = channel.take(); Event event = channel.take();
...@@ -43,13 +46,21 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch ...@@ -43,13 +46,21 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch
break; break;
} }
fileReadName=event.getHeaders().get("fileName").toString(); fileReadName=event.getHeaders().get("fileName").toString();
System.out.println("读取的文件名:"+fileReadName); //System.out.println("读取文件名:"+fileReadName+" 文件大小:"+event.getBody().length);
batch.add(event); this.counterGroup.incrementAndGet("event.file");
List<Event> eventList=eventMap.get(fileReadName);
if (eventList==null){
eventList=new ArrayList<>();
}
eventList.add(event);
eventMap.put(fileReadName,eventList);
}
for (Map.Entry<String , List<Event>> data:eventMap.entrySet()) {
//System.out.println("文件名称:"+data.getKey()+"集合大小:"+data.getValue().size());
dealData(data.getKey(),data.getValue());
} }
dealData(batch);
transaction.commit(); transaction.commit();
} catch (IOException e) { } catch (Exception e) {
transaction.rollback(); transaction.rollback();
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
...@@ -64,13 +75,13 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch ...@@ -64,13 +75,13 @@ public class LocalUploadSink extends AbstractSink implements Configurable, Batch
batchSize = context.getInteger("batchSize", DEFAULT_BATCH_SIZE); batchSize = context.getInteger("batchSize", DEFAULT_BATCH_SIZE);
} }
public void dealData(List<Event> eventList) throws IOException { public void dealData(String fileName,List<Event> eventList) throws IOException {
if (eventList.size() == 0) { if (eventList.size() == 0) {
return; return;
} }
logger.info("deal data numbers: {}", eventList.size()); logger.info("deal data numbers: {}", eventList.size());
String filename = System.currentTimeMillis() + ".png"; String filename = System.currentTimeMillis() +fileName;
File localFile = new File(uploadDir + filename); File localFile = new File(uploadDir + filename);
FileOutputStream fos = new FileOutputStream(localFile); FileOutputStream fos = new FileOutputStream(localFile);
for (int i = 0; i < eventList.size(); i++) { for (int i = 0; i < eventList.size(); i++) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment