Commit ee331933 by txy

阿里云

parent 2a4bb7e1
...@@ -31,6 +31,11 @@ ...@@ -31,6 +31,11 @@
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>6.0.5</version> <version>6.0.5</version>
</dependency> </dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.15.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
......
package com.ihooyah.flume.sink; package com.ihooyah.flume.sink;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.PutObjectRequest;
import com.aliyun.oss.model.PutObjectResult;
import com.ihooyah.flume.sink.util.HttpRequestUtil;
import com.ihooyah.flume.sink.util.aliyunOssUtil;
import org.apache.flume.*; import org.apache.flume.*;
import org.apache.flume.conf.BatchSizeSupported; import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurable;
...@@ -8,9 +17,7 @@ import org.apache.flume.sink.AbstractSink; ...@@ -8,9 +17,7 @@ import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.*;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -32,35 +39,39 @@ public class YunnanFileSink extends AbstractSink implements Configurable, BatchS ...@@ -32,35 +39,39 @@ public class YunnanFileSink extends AbstractSink implements Configurable, BatchS
private String fileReadName; private String fileReadName;
private HttpRequestUtil httpRequestUtil=new HttpRequestUtil();
private CounterGroup counterGroup = new CounterGroup(); private CounterGroup counterGroup = new CounterGroup();
@Override public Sink.Status process() throws EventDeliveryException {
public Status process() throws EventDeliveryException { Map<String, List<Event>> eventMap = new HashMap<>();
Map<String , List<Event>> eventMap=new HashMap<>(); Sink.Status status = Sink.Status.READY;
Status status = Status.READY;
Channel channel = getChannel(); Channel channel = getChannel();
Transaction transaction = channel.getTransaction(); Transaction transaction = channel.getTransaction();
transaction.begin(); transaction.begin();
try { try {
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < this.batchSize; i++) {
Event event = channel.take(); Event event = channel.take();
if (event == null) { if (event == null)
break; break;
this.fileReadName = ((String) event.getHeaders().get("fileName")).toString();
if (!this.fileReadName.endsWith(".json")) {
this.counterGroup.incrementAndGet("event.file");
List<Event> eventList = eventMap.get(this.fileReadName);
if (eventList == null)
eventList = new ArrayList<>();
eventList.add(event);
eventMap.put(this.fileReadName, eventList);
} }
fileReadName=event.getHeaders().get("fileName").toString();
if(fileReadName.endsWith(".json")){
continue;
}
this.counterGroup.incrementAndGet("event.file");
List<Event> eventList=eventMap.get(fileReadName);
if (eventList==null){
eventList=new ArrayList<>();
}
eventList.add(event);
eventMap.put(fileReadName,eventList);
} }
for (Map.Entry<String , List<Event>> data:eventMap.entrySet()) { for (Map.Entry<String, List<Event>> data : eventMap.entrySet()) {
dealData(data.getKey(),data.getValue()); try {
dealData(data.getKey(), data.getValue());
} catch (Exception e) {
System.out.printf("aliyun::%s%n", e.getMessage());
throw new RuntimeException(e);
}
} }
transaction.commit(); transaction.commit();
} catch (Exception e) { } catch (Exception e) {
...@@ -72,36 +83,43 @@ public class YunnanFileSink extends AbstractSink implements Configurable, BatchS ...@@ -72,36 +83,43 @@ public class YunnanFileSink extends AbstractSink implements Configurable, BatchS
return status; return status;
} }
@Override
public void configure(Context context) { public void configure(Context context) {
batchSize = context.getInteger("batchSize", DEFAULT_BATCH_SIZE); this.batchSize = context.getInteger("batchSize", Integer.valueOf(1000)).intValue();
} }
public void dealData(String fileName,List<Event> eventList) throws IOException { public void dealData(String fileName, List<Event> eventList) throws IOException {
if (eventList.size() == 0) { if (eventList.size() == 0)
return; return;
} fileName = fileName.replaceAll(",,", ",").replaceAll(",", "\\/");
String uploadDir = "/" + fileName;
fileName=fileName.replaceAll(",,",",").replaceAll(",","\\/"); String dir = uploadDir.substring(0, uploadDir.lastIndexOf("/"));
String uploadDir="/"+fileName; logger.info("uploadDir:" + uploadDir + " dir:" + dir);
String dir=uploadDir.substring(0,uploadDir.lastIndexOf("/")); File isDir = new File(dir);
logger.info("uploadDir:"+uploadDir+" dir:"+dir); if (!isDir.exists())
File isDir=new File(dir);
if (!isDir.exists()){
isDir.mkdirs(); isDir.mkdirs();
} logger.info("deal data numbers: {}", Integer.valueOf(eventList.size()));
logger.info("deal data numbers: {}", eventList.size());
File localFile = new File(uploadDir); File localFile = new File(uploadDir);
FileOutputStream fos = new FileOutputStream(localFile); FileOutputStream fos = new FileOutputStream(localFile);
for (int i = 0; i < eventList.size(); i++) { for (int i = 0; i < eventList.size(); i++)
fos.write(eventList.get(i).getBody()); fos.write(((Event) eventList.get(i)).getBody());
}
String url="http://86.1.41.66:8765/api/common/common/uploadFileToAliYun";
JSONObject param = new JSONObject();
param.put("filePath", uploadDir);
String res= HttpRequestUtil.doPostJson(url, param.toJSONString(), null);
JSONObject resJsonObect = JSONObject.parseObject(res);
String code = resJsonObect.getString("code");
String msg=resJsonObect.getString("msg");
logger.info("调用三方接口的code"+code);
logger.info("调用三方接口的msg"+msg);
System.out.println(code);
System.out.println(msg);
fos.flush(); fos.flush();
fos.close(); fos.close();
} }
@Override
public long getBatchSize() { public long getBatchSize() {
return batchSize; return this.batchSize;
} }
} }
package com.ihooyah.flume.sink.util;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.PutObjectRequest;
import com.aliyun.oss.model.PutObjectResult;
import com.ihooyah.flume.sink.LocalUploadSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
/**
* @Description: 阿里云对象存储工具类
**/
public class aliyunOssUtil {
private static final Logger logger = LoggerFactory.getLogger(LocalUploadSink.class);
public static OSS ossClient(){
// Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。关于其他Region对应的Endpoint信息,请参见访问域名和数据中心。
//String endpoint = "oss-cn-kunming-kmga-d01-a.ops.galthfkcloud.com";
logger.info("inputStreamOSS");
String endpoint="http://86.1.6.240:7001";
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
String accessKeyId = "fHSixNT3v5xb8qoV";
String accessKeySecret = "KuYB1KeRNose7teueCm5TnrUPXTzQa";
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
return ossClient;
}
/**
* @Description: 用输出流的方式往阿里云上传文件
**/
public static void OssUtils(InputStream inputStream, String path) {
// 填写Bucket名称,例如examplebucket。
String bucketName = "beiming-bucket01";
// 填写Object完整路径,完整路径中不能包含Bucket名称,例如exampledir/exampleobject.txt。
// String objectName = "exampledir/exampleobject.txt";
logger.info("inputStream" + inputStream.hashCode());
logger.info("path" + path);
String objectName=path;
// 创建OSSClient实例。
logger.info("准备启动客户端");
OSS ossClient =ossClient();
logger.info("启动客户端成功");
Integer code = 0;
try {
//InputStream inputStream = new URL(url).openStream();
// 创建PutObjectRequest对象。
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, inputStream);
// 设置该属性可以返回response。如果不设置,则返回的response为空。
putObjectRequest.setProcess("true");
// 创建PutObject请求。
PutObjectResult result = ossClient.putObject(putObjectRequest);
// 如果上传成功,则返回200。
System.out.println(result.getResponse().getStatusCode());
code = result.getResponse().getStatusCode();
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Message:" + oe.getErrorMessage());
System.out.println("Error Code:" + oe.getErrorCode());
System.out.println("Request ID:" + oe.getRequestId());
System.out.println("Host ID:" + oe.getHostId());
} catch (ClientException ce) {
System.out.println("Caught an ClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with OSS, "
+ "such as not being able to access the network.");
System.out.println("Error Message:" + ce.getMessage());
} finally {
if (ossClient != null) {
ossClient.shutdown();
}
}
}
/**
* @Description: 以流的方式下载文件
**/
public static String downLoadForStream(String filePath) {
// 填写Bucket名称,例如examplebucket。
String bucketName = "beiming-bucket01";
// 填写Object完整路径,例如exampledir/exampleobject.txt。Object完整路径中不能包含Bucket名称。
String objectName = filePath;
String line="";
// 创建OSSClient实例。
OSS ossClient = ossClient();
try {
// ossObject包含文件所在的存储空间名称、文件名称、文件元信息以及一个输入流。
OSSObject ossObject = ossClient.getObject(bucketName, objectName);
// 读取文件内容。
System.out.println("Object content:");
BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
while (true) {
line = reader.readLine();
if (line == null) break;
System.out.println("\n" + line);
}
// 数据读取完成后,获取的流必须关闭,否则会造成连接泄漏,导致请求无连接可用,程序无法正常工作。
reader.close();
// ossObject对象使用完毕后必须关闭,否则会造成连接泄漏,导致请求无连接可用,程序无法正常工作。
ossObject.close();
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Message:" + oe.getErrorMessage());
System.out.println("Error Code:" + oe.getErrorCode());
System.out.println("Request ID:" + oe.getRequestId());
System.out.println("Host ID:" + oe.getHostId());
} catch (Throwable ce) {
System.out.println("Caught an ClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with OSS, "
+ "such as not being able to access the network.");
System.out.println("Error Message:" + ce.getMessage());
} finally {
if (ossClient != null) {
ossClient.shutdown();
}
}
return line;
}
/**
* @Description: 从阿里云上下载文件到本地指定位置
**/
public static void downLoadFile(String filePath) throws Exception {
// 填写Bucket名称,例如examplebucket。
String bucketName = "beiming-bucket01";
// 填写不包含Bucket名称在内的Object完整路径,例如testfolder/exampleobject.txt。
String objectName = filePath;
String pathName = "D:/opt/a.jpg";
// 创建OSSClient实例。
OSS ossClient = ossClient();
try {
// 下载Object到本地文件,并保存到指定的本地路径中。如果指定的本地文件存在会覆盖,不存在则新建。
// 如果未指定本地路径,则下载后的文件默认保存到示例程序所属项目对应本地路径中。
ossClient.getObject(new GetObjectRequest(bucketName, objectName), new File(pathName));
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Message:" + oe.getErrorMessage());
System.out.println("Error Code:" + oe.getErrorCode());
System.out.println("Request ID:" + oe.getRequestId());
System.out.println("Host ID:" + oe.getHostId());
} catch (ClientException ce) {
System.out.println("Caught an ClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with OSS, "
+ "such as not being able to access the network.");
System.out.println("Error Message:" + ce.getMessage());
} finally {
if (ossClient != null) {
ossClient.shutdown();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment