Commit e9cd693c by 黄杰

flume操作文件的自定义sink提交

parents
# Created by .ignore support plugin (hsz.mobi)
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
### 使用apache flume 进行ftp文件采集汇总
### 基础知识
#####flume的优势:<br>
可以高速采集数据,采集的数据能够以想要的文件格式及压缩方式存储在hdfs上<br>
事务功能保证了数据在采集的过程中数据不丢失<br>
部分Source保证了Flume挂了以后重启依旧能够继续在上一次采集点采集数据,真正做到数据零丢失<br>
#####flume的组成
flume有3大组件<br>
source(源端数据采集):Flume提供了各种各样的Source、同时还提供了自定义的Source<br>
Channel(临时存储聚合数据):主要用的是memory channel和File channel(生产最常用),生产中channel的数据一定是要监控的,防止sink挂了,撑爆channel<br>
Sink(移动数据到目标端):如HDFS、KAFKA、DB以及自定义的sink<br>
flume-ng agent -c D:\software\apache-flume-1.8.0-bin\conf -f D:\software\apache-flume-1.8.0-bin\conf\flume-ftp-source-dir.conf -n agent
flume-ng agent -c D:\software\apache-flume-1.8.0-bin\conf -f D:\software\apache-flume-1.8.0-bin\conf\flume-ftp-result.conf -n agent
telnet localhost 44444
flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ihooyah</groupId>
<artifactId>ihooyah-flume</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<version.flume>1.7.0</version.flume>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${version.flume}</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>${version.flume}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.5</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.ihooyah.flume.sink;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 描述
*
* @author fatiaojie
* @date 2020/4/1 16:16
*/
public class DAOClass {
private static final Logger logger = LoggerFactory.getLogger(DAOClass.class);
private static Map<String, String> columnsMap = new HashMap();
private static String INSERT_QUERY;
private Connection connection;
public DAOClass() {
}
public void insertData(Event event, Map<String, String> params) {
try {
String body = new String(event.getBody());
Map<String, String> headers = event.getHeaders();
String fileName = (String)headers.get("fileName");
String databaseName = (String)params.get("databaseName");
String tableName = (String)params.get("tableName");
String partition = (String)params.get("partition");
String iscustom = (String)params.get("iscustom");
if (!"false".equals(iscustom)) {
if ("true".equals(iscustom)) {
;
}
} else {
String columns = "";
Statement st = this.connection.createStatement();
String nowDate;
if (columnsMap.get(tableName) == null) {
nowDate = "select COLUMN_NAME from INFORMATION_SCHEMA.Columns where table_name='" + tableName + "' and table_schema='" + databaseName + "'";
for(ResultSet resultSet = st.executeQuery(nowDate); resultSet.next(); columns = columns + resultSet.getString("COLUMN_NAME") + ",") {
;
}
columns = columns.substring(0, columns.length() - 1);
columnsMap.put(tableName, columns);
} else {
columns = (String)columnsMap.get(tableName);
}
nowDate = getNowDate();
String[] datas = body.split(partition);
List<String> dataList = new ArrayList();
String[] arr$ = datas;
int i1 = datas.length;
for(int i$ = 0; i$ < i1; ++i$) {
String data = arr$[i$];
dataList.add(data);
}
dataList.add(fileName);
dataList.add(nowDate);
String indexs = "";
i1 = 0;
while(true) {
if (i1 >= dataList.size()) {
indexs = indexs.substring(0, indexs.length() - 1);
INSERT_QUERY = "INSERT INTO " + tableName + "(" + columns + ") values (" + indexs + ")";
break;
}
String ind = "";
if (((String)dataList.get(i1)).equals("")) {
ind = "null,";
} else {
ind = "'" + (String)dataList.get(i1) + "',";
}
indexs = indexs + ind;
++i1;
}
}
PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);
insertStmnt.execute();
} catch (SQLException var20) {
var20.printStackTrace();
}
}
public void createConnection(String driver, String db_url, String user, String password) throws IOException {
if (this.connection == null) {
try {
Class.forName(driver);
this.connection = DriverManager.getConnection(db_url, user, password);
} catch (ClassNotFoundException var6) {
var6.printStackTrace();
} catch (SQLException var7) {
var7.printStackTrace();
}
}
}
public void destroyConnection(String db_url, String user) {
if (this.connection != null) {
logger.debug("Destroying connection to: {}:{}", db_url, user);
try {
this.connection.close();
} catch (SQLException var4) {
var4.printStackTrace();
}
}
this.connection = null;
}
public static String escapeSQL(String s) {
return s.replaceAll("'", "\\'");
}
public Connection getConnection() {
return this.connection;
}
private static String getNowDate() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String INSERT_DATE = df.format(new Date());
return INSERT_DATE;
}
}
package com.ihooyah.flume.sink;
import com.ihooyah.flume.sink.util.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.mortbay.util.ajax.JSON;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* 描述 检测到flumechannls中的文件并处理生成新文件保存
*
* @author fatiaojie
* @date 2020/4/2 11:07
*/
public class DirFtpSink extends AbstractSink implements Configurable {
/**
* 文件保存地址
*/
private String fileDir;
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = this.getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
Event event = channel.take();
if (event != null) {
String body = new String(event.getBody(), "UTF-8");
if (StringUtils.isNotBlank(body)) {
/**
* 处理文件内容并保存为新文件 新文件后缀为_result.json
*/
body += "该数据已被处理过...";
createTempFile(fileDir+File.separator+System.currentTimeMillis()+"_result.json",body.getBytes());
}
System.out.println(body);
}
transaction.commit();
} catch (Exception e) {
transaction.rollback();
status = Status.BACKOFF;
e.printStackTrace();
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
this.fileDir = context.getString("file_dir");
}
public static File createTempFile(String filePath, byte[] data) throws IOException
{
File file = new File(filePath);
if (!file.getParentFile().exists())
{
file.getParentFile().mkdirs();
}
if (!file.exists())
{
file.createNewFile();
}
FileOutputStream fos = new FileOutputStream(file);
fos.write(data, 0, data.length);
fos.flush();
fos.close();
return file;
}
}
package com.ihooyah.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.mortbay.util.ajax.JSON;
/**
* 描述 检测到flume channls中的文件信息并处理文件内容
*
* @author fatiaojie
* @date 2020/4/2 14:06
*/
public class DirHandleSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = this.getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
Event event = channel.take();
if (event != null) {
String body = new String(event.getBody(), "UTF-8");
System.out.println("新文件已经获取到内容为:");
System.out.println(body);
}
transaction.commit();
} catch (Exception e) {
transaction.rollback();
status = Status.BACKOFF;
e.printStackTrace();
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
}
}
package com.ihooyah.flume.sink;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.Sink.Status;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 描述
*
* @author fatiaojie
* @date 2020/4/1 16:15
*/
public class MySqlSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MySqlSink.class);
private String databaseName;
private String tableName;
private String partition;
private String iscustom;
private String password;
private String user;
private String driver;
private String db_url;
private CounterGroup counterGroup = new CounterGroup();
DAOClass daoClass = new DAOClass();
public MySqlSink() {
}
@Override
public void configure(Context context) {
this.db_url = context.getString("url");
this.password = context.getString("password");
this.user = context.getString("user");
this.driver = context.getString("driver");
this.tableName = context.getString("tableName");
this.partition = context.getString("partition");
this.iscustom = context.getString("iscustom");
this.databaseName = context.getString("databaseName");
Preconditions.checkState(this.password != null, "No password specified");
Preconditions.checkState(this.user != null, "No user specified");
}
@Override
public void start() {
logger.info("Mysql sink starting");
try {
this.daoClass.createConnection(this.driver, this.db_url, this.user, this.password);
} catch (Exception var2) {
logger.error("Unable to create MySQL client using url:" + this.db_url + " username:" + this.user + ". Exception follows.", var2);
this.daoClass.destroyConnection(this.db_url, this.user);
return;
}
super.start();
logger.debug("MySQL sink {} started", this.getName());
}
@Override
public void stop() {
logger.info("MySQL sink {} stopping", this.getName());
this.daoClass.destroyConnection(this.db_url, this.user);
super.stop();
logger.debug("MySQL sink {} stopped. Metrics:{}", this.getName(), this.counterGroup);
}
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = this.getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
this.daoClass.createConnection(this.driver, this.db_url, this.user, this.password);
Event event = channel.take();
if (event == null) {
this.counterGroup.incrementAndGet("event.empty");
status = Status.BACKOFF;
} else {
Map<String, String> params = new HashMap<>();
params.put("tableName", this.tableName);
params.put("partition", this.partition);
params.put("iscustom", this.iscustom);
params.put("databaseName", this.databaseName);
this.daoClass.insertData(event, params);
this.counterGroup.incrementAndGet("event.mysql");
}
transaction.commit();
} catch (ChannelException var10) {
transaction.rollback();
logger.error("Unable to get event from channel. Exception follows.", var10);
status = Status.BACKOFF;
} catch (Exception var11) {
transaction.rollback();
logger.error("Unable to communicate with MySQL server. Exception follows.", var11);
status = Status.BACKOFF;
this.daoClass.destroyConnection(this.db_url, this.user);
} finally {
transaction.close();
}
return status;
}
}
package com.ihooyah.flume.sink.util;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.net.URLEncoder;
/**
* 文件处理工具类
*
* @author ruoyi
*/
public class FileUtils
{
public static String FILENAME_PATTERN = "[a-zA-Z0-9_\\-\\|\\.\\u4e00-\\u9fa5]+";
/**
* 输出指定文件的byte数组
*
* @param filePath 文件路径
* @param os 输出流
* @return
*/
public static void writeBytes(String filePath, OutputStream os) throws IOException
{
FileInputStream fis = null;
try
{
File file = new File(filePath);
if (!file.exists())
{
throw new FileNotFoundException(filePath);
}
fis = new FileInputStream(file);
byte[] b = new byte[1024];
int length;
while ((length = fis.read(b)) > 0)
{
os.write(b, 0, length);
}
}
catch (IOException e)
{
throw e;
}
finally
{
if (os != null)
{
try
{
os.close();
}
catch (IOException e1)
{
e1.printStackTrace();
}
}
if (fis != null)
{
try
{
fis.close();
}
catch (IOException e1)
{
e1.printStackTrace();
}
}
}
}
/**
* 删除文件
*
* @param filePath 文件
* @return
*/
public static boolean deleteFile(String filePath)
{
boolean flag = false;
File file = new File(filePath);
// 路径为文件且不为空则进行删除
if (file.isFile() && file.exists())
{
file.delete();
flag = true;
}
return flag;
}
/**
* 文件名称验证
*
* @param filename 文件名称
* @return true 正常 false 非法
*/
public static boolean isValidFilename(String filename)
{
return filename.matches(FILENAME_PATTERN);
}
/**
* 下载文件名重新编码
*
* @param request 请求对象
* @param fileName 文件名
* @return 编码后的文件名
*/
public static String setFileDownloadHeader(HttpServletRequest request, String fileName)
throws UnsupportedEncodingException
{
final String agent = request.getHeader("USER-AGENT");
String filename = fileName;
if (agent.contains("MSIE"))
{
// IE浏览器
filename = URLEncoder.encode(filename, "utf-8");
filename = filename.replace("+", " ");
}
else if (agent.contains("Firefox"))
{
// 火狐浏览器
filename = new String(fileName.getBytes(), "ISO8859-1");
}
else if (agent.contains("Chrome"))
{
// google浏览器
filename = URLEncoder.encode(filename, "utf-8");
}
else
{
// 其它浏览器
filename = URLEncoder.encode(filename, "utf-8");
}
return filename;
}
/**
* 获取系统临时目录
* @return
*/
public static String getTemp()
{
return System.getProperty("java.io.tmpdir");
}
/**
* 创建临时文件
* @param filePath
* @param data
* @return
*/
public static File createTempFile(String filePath, byte[] data) throws IOException
{
String temp = getTemp() + filePath;
File file = new File(temp);
if (!file.getParentFile().exists())
{
file.getParentFile().mkdirs();
}
if (!file.exists())
{
file.createNewFile();
}
FileOutputStream fos = new FileOutputStream(file);
fos.write(data, 0, data.length);
fos.flush();
fos.close();
return file;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment