Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
I
ihooyah-flume
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
黄杰
ihooyah-flume
Commits
2a4bb7e1
Commit
2a4bb7e1
authored
Sep 22, 2021
by
陈科朋
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
启动命令修改
parent
303c181b
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
30 additions
and
30 deletions
+30
-30
README.md
README.md
+30
-30
No files found.
README.md
View file @
2a4bb7e1
...
@@ -5,31 +5,31 @@
...
@@ -5,31 +5,31 @@
可以高速采集数据,采集的数据能够以想要的文件格式及压缩方式存储在hdfs上
<br>
可以高速采集数据,采集的数据能够以想要的文件格式及压缩方式存储在hdfs上
<br>
事务功能保证了数据在采集的过程中数据不丢失
<br>
事务功能保证了数据在采集的过程中数据不丢失
<br>
部分Source保证了Flume挂了以后重启依旧能够继续在上一次采集点采集数据,真正做到数据零丢失
<br>
部分Source保证了Flume挂了以后重启依旧能够继续在上一次采集点采集数据,真正做到数据零丢失
<br>
#####flume的组成
#####flume的组成
flume有3大组件
<br>
flume有3大组件
<br>
source(源端数据采集):Flume提供了各种各样的Source、同时还提供了自定义的Source
<br>
source(源端数据采集):Flume提供了各种各样的Source、同时还提供了自定义的Source
<br>
Channel(临时存储聚合数据):主要用的是memory channel和File channel(生产最常用),生产中channel的数据一定是要监控的,防止sink挂了,撑爆channel
<br>
Channel(临时存储聚合数据):主要用的是memory channel和File channel(生产最常用),生产中channel的数据一定是要监控的,防止sink挂了,撑爆channel
<br>
Sink(移动数据到目标端):如HDFS、KAFKA、DB以及自定义的sink
<br>
Sink(移动数据到目标端):如HDFS、KAFKA、DB以及自定义的sink
<br>
flume-ng agent -c D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf -f D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf
\f
lume-ftp-source-dir.conf -n agent
flume-ng agent -c D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf -f D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf
\f
lume-ftp-source-dir.conf -n agent
flume-ng agent -c D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf -f D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf
\f
lume-ftp-result.conf -n agent
flume-ng agent -c D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf -f D:
\s
oftware
\a
pache-flume-1.8.0-bin
\c
onf
\f
lume-ftp-result.conf -n agent
telnet localhost 44444
telnet localhost 44444
flume-ng agent -c conf -
conf-file
conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
flume-ng agent -c conf -
f
conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
#####flume实战案例一:从ftp环境读取json文件,数据存入mysql
#####flume实战案例一:从ftp环境读取json文件,数据存入mysql
参考链接:https://blog.csdn.net/qq_40015759/article/details/82429117
<br>
参考链接:https://blog.csdn.net/qq_40015759/article/details/82429117
<br>
前提条件:jdk1.8
<br>
前提条件:jdk1.8
<br>
flume1.9的安装包
<br>
flume1.9的安装包
<br>
mysql数据库
<br>
mysql数据库
<br>
ftp环境
<br>
ftp环境
<br>
步骤:
<br>
步骤:
<br>
一:解压flume1.9的压缩包,配置环境变量(linux环境需要做)
<br>
一:解压flume1.9的压缩包,配置环境变量(linux环境需要做)
<br>
二:添加flume读取ftp环境中文件的三方包(https://github.com/keedio/flume-ftp-source)
<br>
二:添加flume读取ftp环境中文件的三方包(https://github.com/keedio/flume-ftp-source)
<br>
...
@@ -42,10 +42,10 @@
...
@@ -42,10 +42,10 @@
agent.sources = ftp1
<br>
agent.sources = ftp1
<br>
agent.sinks = k1
<br>
agent.sinks = k1
<br>
agent.channels = ch1
<br>
agent.channels = ch1
<br>
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
<br>
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
<br>
agent.sources.ftp1.client.source = ftp
<br>
agent.sources.ftp1.client.source = ftp
<br>
#ftp服务地址
#ftp服务地址
agent.sources.ftp1.name.server = 61.159.214.198
<br>
agent.sources.ftp1.name.server = 61.159.214.198
<br>
#端口
#端口
...
@@ -58,7 +58,7 @@
...
@@ -58,7 +58,7 @@
agent.sources.ftp1.working.directory = /
<br>
agent.sources.ftp1.working.directory = /
<br>
#文件的名称匹配 (java正则表达式)
#文件的名称匹配 (java正则表达式)
agent.sources.ftp1.filter.pattern = .+
\.
json
<br>
agent.sources.ftp1.filter.pattern = .+
\.
json
<br>
#ftp的根目录
#ftp的根目录
agent.sources.ftp1.folder = /flume
<br>
agent.sources.ftp1.folder = /flume
<br>
agent.sources.ftp1.run.discover.delay=5000
<br>
agent.sources.ftp1.run.discover.delay=5000
<br>
...
@@ -68,11 +68,11 @@
...
@@ -68,11 +68,11 @@
agent.sources.ftp1.processInUseTimeout = 30
<br>
agent.sources.ftp1.processInUseTimeout = 30
<br>
agent.sources.ftp1.channels = ch1
<br>
agent.sources.ftp1.channels = ch1
<br>
agent.sources.ftp1.inputCharset=utf8
<br>
agent.sources.ftp1.inputCharset=utf8
<br>
agent.channels.ch1.type = memory
<br>
agent.channels.ch1.type = memory
<br>
agent.channels.ch1.capacity = 100000000
<br>
agent.channels.ch1.capacity = 100000000
<br>
agent.channels.ch1.transactionCapacity = 100000000
<br>
agent.channels.ch1.transactionCapacity = 100000000
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink
<br>
agent.sinks.k1.url = jdbc:mysql://127.0.0.1:3306/flume?autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&characterEncoding=UTF-8&useUnicode=true
agent.sinks.k1.url = jdbc:mysql://127.0.0.1:3306/flume?autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&characterEncoding=UTF-8&useUnicode=true
agent.sinks.k1.user= root
<br>
agent.sinks.k1.user= root
<br>
...
@@ -82,24 +82,24 @@
...
@@ -82,24 +82,24 @@
agent.sinks.k1.tableName = flume_test
<br>
agent.sinks.k1.tableName = flume_test
<br>
agent.sinks.k1.iscustom = false
<br>
agent.sinks.k1.iscustom = false
<br>
agent.sinks.k1.channel = ch1
<br>
agent.sinks.k1.channel = ch1
<br>
#####参数解释
#####参数解释
sources配置的数据来源
<br>
sources配置的数据来源
<br>
agent.sources.ftp1.type填的是三方jar中的方法去开启读取ftp文件 必填项
<br>
agent.sources.ftp1.type填的是三方jar中的方法去开启读取ftp文件 必填项
<br>
agent.sources.ftp1.folder填的是读取日志文件生成目录(记录那些文件已读取过 避免重复读取)
<br>
agent.sources.ftp1.folder填的是读取日志文件生成目录(记录那些文件已读取过 避免重复读取)
<br>
agent.sources.ftp1.flushlines true代表用行去读,false用代码块去读
<br>
agent.sources.ftp1.flushlines true代表用行去读,false用代码块去读
<br>
agent.sources.ftp1.channels = ch1此配置必填不然无法去读取sink
<br>
agent.sources.ftp1.channels = ch1此配置必填不然无法去读取sink
<br>
sinks配置的数据接受项(此配置可以自己定义参数,后台通过context对象获取到)
<br>
sinks配置的数据接受项(此配置可以自己定义参数,后台通过context对象获取到)
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink 此配置填写项目中的执行类(该类继承 AbstractSink)
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink 此配置填写项目中的执行类(该类继承 AbstractSink)
<br>
agent.sinks.k1.channel = ch1此配置必填 关联数据源
<br>
agent.sinks.k1.channel = ch1此配置必填 关联数据源
<br>
#####代码讲解
#####代码讲解
处理数据逻辑参考com.ihooyah.flume.sink.MySqlSink这个类
<br>
处理数据逻辑参考com.ihooyah.flume.sink.MySqlSink这个类
<br>
event对象里面中body读取的是文件内容 headers读取的是文件名称
<br>
event对象里面中body读取的是文件内容 headers读取的是文件名称
<br>
需要注意body的byte数组转换成string是需要指定编码格式 防止乱码
<br>
需要注意body的byte数组转换成string是需要指定编码格式 防止乱码
<br>
读取的文件也需要确定编码格式utf8否则会出现乱码
<br>
读取的文件也需要确定编码格式utf8否则会出现乱码
<br>
####注意事项
####注意事项
process方法中 必须要有事务的代码不然会报错
<br>
process方法中 必须要有事务的代码不然会报错
<br>
Transaction transaction = channel.getTransaction();
<br>
Transaction transaction = channel.getTransaction();
<br>
...
@@ -109,12 +109,12 @@
...
@@ -109,12 +109,12 @@
transaction.close();
<br>
transaction.close();
<br>
否则事务报错会回退
<br>
否则事务报错会回退
<br>
详情请见源代码
<br>
详情请见源代码
<br>
五:将本次项目打包放入lib目录下
<br>
五:将本次项目打包放入lib目录下
<br>
共五个jar
<br>
共五个jar
<br>
进入D:
\f
lume
\a
pache-flume-1.9.0-bin
\b
in此目录 打开控制台
<br>
进入D:
\f
lume
\a
pache-flume-1.9.0-bin
\b
in此目录 打开控制台
<br>
启动命令flume-ng agent --conf-file ../conf/ftp.conf --name agent -property flume.root.logger=INFO,console
<br>
启动命令flume-ng agent --conf-file ../conf/ftp.conf --name agent -property flume.root.logger=INFO,console
<br>
#####flume实战案例二:从ftp环境读取文件存到本地服务器路径
#####flume实战案例二:从ftp环境读取文件存到本地服务器路径
conf文件示例
<br>
conf文件示例
<br>
# flume监控ftp文件功能的配置如下
# flume监控ftp文件功能的配置如下
...
@@ -123,7 +123,7 @@
...
@@ -123,7 +123,7 @@
agent.channels = ch1
<br>
agent.channels = ch1
<br>
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
<br>
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
<br>
agent.sources.ftp1.client.source = ftp
<br>
agent.sources.ftp1.client.source = ftp
<br>
#ftp服务地址
#ftp服务地址
agent.sources.ftp1.name.server = 61.159.214.198
<br>
agent.sources.ftp1.name.server = 61.159.214.198
<br>
#端口
#端口
...
@@ -145,11 +145,11 @@
...
@@ -145,11 +145,11 @@
agent.sources.ftp1.processInUseTimeout = 30
<br>
agent.sources.ftp1.processInUseTimeout = 30
<br>
agent.sources.ftp1.channels = ch1
<br>
agent.sources.ftp1.channels = ch1
<br>
agent.sources.ftp1.chunk.size = 102400
<br>
agent.sources.ftp1.chunk.size = 102400
<br>
agent.channels.ch1.type = memory
<br>
agent.channels.ch1.type = memory
<br>
agent.channels.ch1.capacity = 100000000
<br>
agent.channels.ch1.capacity = 100000000
<br>
agent.channels.ch1.transactionCapacity = 100000000
<br>
agent.channels.ch1.transactionCapacity = 100000000
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink
<br>
agent.sinks.k1.uploadDir = D:
\\
<br>
agent.sinks.k1.uploadDir = D:
\\
<br>
agent.sinks.k1.batchSize = 10000
<br>
agent.sinks.k1.batchSize = 10000
<br>
...
@@ -158,14 +158,14 @@
...
@@ -158,14 +158,14 @@
agent.sources.ftp1.flushlines = false 图片需要按照块去读不能按行读取否则会出现字节丢失导致图片损坏打不开
<br>
agent.sources.ftp1.flushlines = false 图片需要按照块去读不能按行读取否则会出现字节丢失导致图片损坏打不开
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink 重新写的一个sink
<br>
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink 重新写的一个sink
<br>
event读取文件的时候是多次读取的也就是说不能只读到一个event就去生成文件 这是不完整的
<br>
event读取文件的时候是多次读取的也就是说不能只读到一个event就去生成文件 这是不完整的
<br>
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment