Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
I
ihooyah-flume
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
黄杰
ihooyah-flume
Commits
9cea6875
Commit
9cea6875
authored
May 14, 2021
by
吴凯波
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
readme更新
parent
bc1f087b
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
76 additions
and
76 deletions
+76
-76
README.md
README.md
+76
-76
No files found.
README.md
View file @
9cea6875
...
@@ -24,96 +24,96 @@
...
@@ -24,96 +24,96 @@
flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
#####flume实战案例一:从ftp环境读取json文件,数据存入mysql
#####flume实战案例一:从ftp环境读取json文件,数据存入mysql
参考链接:https://blog.csdn.net/qq_40015759/article/details/82429117
参考链接:https://blog.csdn.net/qq_40015759/article/details/82429117
<br>
前提条件:jdk1.8
前提条件:jdk1.8
<br>
flume1.9的安装包
flume1.9的安装包
<br>
mysql数据库
mysql数据库
<br>
ftp环境
ftp环境
<br>
步骤:
步骤:
<br>
一:解压flume1.9的压缩包,配置环境变量(linux环境需要做)
一:解压flume1.9的压缩包,配置环境变量(linux环境需要做)
<br>
二:添加flume读取ftp环境中文件的三方包(https://github.com/keedio/flume-ftp-source)
二:添加flume读取ftp环境中文件的三方包(https://github.com/keedio/flume-ftp-source)
<br>
flume本身并没有类型去读取ftp文件的功能,需要引入三方包去实现
flume本身并没有类型去读取ftp文件的功能,需要引入三方包去实现
<br>
三:commons-net-3.3.jar,jsch-0.1.54.jar ,mysql-connector-java-6.0.5.jar,
三:commons-net-3.3.jar,jsch-0.1.54.jar ,mysql-connector-java-6.0.5.jar,
<br>
flume-ftp-source-2.2.0.jar(步骤二打成的jar包)共四个jar包加入之前解压的flume中的lib 中
flume-ftp-source-2.2.0.jar(步骤二打成的jar包)共四个jar包加入之前解压的flume中的lib 中
<br>
四:在flume的conf中新增配置文件ftp.conf
四:在flume的conf中新增配置文件ftp.conf
<br>
具体配置如下
具体配置如下
<br>
# flume监控ftp文件功能的配置如下
# flume监控ftp文件功能的配置如下
agent.sources = ftp1
agent.sources = ftp1
<br>
agent.sinks = k1
agent.sinks = k1
<br>
agent.channels = ch1
agent.channels = ch1
<br>
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
<br>
agent.sources.ftp1.client.source = ftp
agent.sources.ftp1.client.source = ftp
<br>
#ftp服务地址
#ftp服务地址
agent.sources.ftp1.name.server = 61.159.214.198
agent.sources.ftp1.name.server = 61.159.214.198
<br>
#端口
#端口
agent.sources.ftp1.port = 50021
agent.sources.ftp1.port = 50021
<br>
#ftp用户名
#ftp用户名
agent.sources.ftp1.user = flume
agent.sources.ftp1.user = flume
<br>
#密码
#密码
agent.sources.ftp1.password = sSt23dDmcikTNiw8
agent.sources.ftp1.password = sSt23dDmcikTNiw8
<br>
#文件所在目录 (相对于根目录)
#文件所在目录 (相对于根目录)
agent.sources.ftp1.working.directory = /
agent.sources.ftp1.working.directory = /
<br>
#文件的名称匹配 (java正则表达式)
#文件的名称匹配 (java正则表达式)
agent.sources.ftp1.filter.pattern = .+
\.
json
agent.sources.ftp1.filter.pattern = .+
\.
json
<br>
#ftp的根目录
#ftp的根目录
agent.sources.ftp1.folder = /flume
agent.sources.ftp1.folder = /flume
<br>
agent.sources.ftp1.run.discover.delay=5000
agent.sources.ftp1.run.discover.delay=5000
<br>
agent.sources.ftp1.flushlines = true
agent.sources.ftp1.flushlines = true
<br>
agent.sources.ftp1.search.recursive = true
agent.sources.ftp1.search.recursive = true
<br>
agent.sources.ftp1.processInUse = false
agent.sources.ftp1.processInUse = false
<br>
agent.sources.ftp1.processInUseTimeout = 30
agent.sources.ftp1.processInUseTimeout = 30
<br>
agent.sources.ftp1.channels = ch1
agent.sources.ftp1.channels = ch1
<br>
agent.sources.ftp1.inputCharset=utf8
agent.sources.ftp1.inputCharset=utf8
<br>
agent.channels.ch1.type = memory
agent.channels.ch1.type = memory
<br>
agent.channels.ch1.capacity = 100000000
agent.channels.ch1.capacity = 100000000
<br>
agent.channels.ch1.transactionCapacity = 100000000
agent.channels.ch1.transactionCapacity = 100000000
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink
<br>
agent.sinks.k1.url = jdbc:mysql://127.0.0.1:3306/flume?autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&characterEncoding=UTF-8&useUnicode=true
agent.sinks.k1.url = jdbc:mysql://127.0.0.1:3306/flume?autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&characterEncoding=UTF-8&useUnicode=true
agent.sinks.k1.user= root
agent.sinks.k1.user= root
<br>
agent.sinks.k1.password= root
agent.sinks.k1.password= root
<br>
agent.sinks.k1.driver= com.mysql.cj.jdbc.Driver
agent.sinks.k1.driver= com.mysql.cj.jdbc.Driver
<br>
agent.sinks.k1.databaseName = flume
agent.sinks.k1.databaseName = flume
<br>
agent.sinks.k1.tableName = flume_test
agent.sinks.k1.tableName = flume_test
<br>
agent.sinks.k1.iscustom = false
agent.sinks.k1.iscustom = false
<br>
agent.sinks.k1.channel = ch1
agent.sinks.k1.channel = ch1
<br>
#####参数解释
#####参数解释
sources配置的数据来源
sources配置的数据来源
<br>
agent.sources.ftp1.type填的是三方jar中的方法去开启读取ftp文件 必填项
agent.sources.ftp1.type填的是三方jar中的方法去开启读取ftp文件 必填项
<br>
agent.sources.ftp1.folder填的是读取日志文件生成目录(记录那些文件已读取过 避免重复读取)
agent.sources.ftp1.folder填的是读取日志文件生成目录(记录那些文件已读取过 避免重复读取)
<br>
agent.sources.ftp1.flushlines true代表用行去读,false用代码块去读
agent.sources.ftp1.flushlines true代表用行去读,false用代码块去读
<br>
agent.sources.ftp1.channels = ch1此配置必填不然无法去读取sink
agent.sources.ftp1.channels = ch1此配置必填不然无法去读取sink
<br>
sinks配置的数据接受项(此配置可以自己定义参数,后台通过context对象获取到)
sinks配置的数据接受项(此配置可以自己定义参数,后台通过context对象获取到)
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink 此配置填写项目中的执行类(该类继承 AbstractSink)
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink 此配置填写项目中的执行类(该类继承 AbstractSink)
<br>
agent.sinks.k1.channel = ch1此配置必填 关联数据源
agent.sinks.k1.channel = ch1此配置必填 关联数据源
<br>
#####代码讲解
#####代码讲解
处理数据逻辑参考com.ihooyah.flume.sink.MySqlSink这个类
处理数据逻辑参考com.ihooyah.flume.sink.MySqlSink这个类
<br>
event对象里面中body读取的是文件内容 headers读取的是文件名称
event对象里面中body读取的是文件内容 headers读取的是文件名称
<br>
需要注意body的byte数组转换成string是需要指定编码格式 防止乱码
需要注意body的byte数组转换成string是需要指定编码格式 防止乱码
<br>
读取的文件也需要确定编码格式utf8否则会出现乱码
读取的文件也需要确定编码格式utf8否则会出现乱码
<br>
####注意事项
####注意事项
process方法中 必须要有事务的代码不然会报错
process方法中 必须要有事务的代码不然会报错
<br>
Transaction transaction = channel.getTransaction();
Transaction transaction = channel.getTransaction();
<br>
transaction.begin();
transaction.begin();
<br>
transaction.commit();
transaction.commit();
<br>
transaction.rollback();
transaction.rollback();
<br>
transaction.close();
transaction.close();
<br>
否则事务报错会回退
否则事务报错会回退
<br>
详情请见源代码
详情请见源代码
<br>
五:将本次项目打包放入lib目录下
五:将本次项目打包放入lib目录下
<br>
共五个jar
共五个jar
<br>
进入D:
\f
lume
\a
pache-flume-1.9.0-bin
\b
in此目录 打开控制台
进入D:
\f
lume
\a
pache-flume-1.9.0-bin
\b
in此目录 打开控制台
<br>
启动命令flume-ng agent --conf-file ../conf/ftp.conf --name agent -property flume.root.logger=INFO,console
启动命令flume-ng agent --conf-file ../conf/ftp.conf --name agent -property flume.root.logger=INFO,console
<br>
#####flume实战案例二:从ftp环境读取文件存到本地服务器路径
#####flume实战案例二:从ftp环境读取文件存到本地服务器路径
conf文件示例
conf文件示例
...
@@ -153,11 +153,11 @@
...
@@ -153,11 +153,11 @@
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink
agent.sinks.k1.uploadDir = D:
\\
agent.sinks.k1.uploadDir = D:
\\
agent.sinks.k1.batchSize = 10000
agent.sinks.k1.batchSize = 10000
agent.sinks.k1.channel = ch1
agent.sinks.k1.channel = ch1
<br>
与实战一区别的配置
与实战一区别的配置
<br>
agent.sources.ftp1.flushlines = false 图片需要按照块去读不能按行读取否则会出现字节丢失导致图片损坏打不开
agent.sources.ftp1.flushlines = false 图片需要按照块去读不能按行读取否则会出现字节丢失导致图片损坏打不开
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink 重新写的一个sink
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink 重新写的一个sink
<br>
event读取文件的时候是多次读取的也就是说不能只读到一个event就去生成文件 这是不完整的
event读取文件的时候是多次读取的也就是说不能只读到一个event就去生成文件 这是不完整的
<br>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment