Commit bc1f087b by 吴凯波

readme更新

parent 13c14bc9
...@@ -23,3 +23,149 @@ ...@@ -23,3 +23,149 @@
flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
#####flume实战案例一:从ftp环境读取json文件,数据存入mysql
参考链接:https://blog.csdn.net/qq_40015759/article/details/82429117
前提条件:jdk1.8
flume1.9的安装包
mysql数据库
ftp环境
步骤:
一:解压flume1.9的压缩包,配置环境变量(linux环境需要做)
二:添加flume读取ftp环境中文件的三方包(https://github.com/keedio/flume-ftp-source)
flume本身并没有类型去读取ftp文件的功能,需要引入三方包去实现
三:commons-net-3.3.jar,jsch-0.1.54.jar ,mysql-connector-java-6.0.5.jar,
flume-ftp-source-2.2.0.jar(步骤二打成的jar包)共四个jar包加入之前解压的flume中的lib 中
四:在flume的conf中新增配置文件ftp.conf
具体配置如下
# flume监控ftp文件功能的配置如下
agent.sources = ftp1
agent.sinks = k1
agent.channels = ch1
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.client.source = ftp
#ftp服务地址
agent.sources.ftp1.name.server = 61.159.214.198
#端口
agent.sources.ftp1.port = 50021
#ftp用户名
agent.sources.ftp1.user = flume
#密码
agent.sources.ftp1.password = sSt23dDmcikTNiw8
#文件所在目录 (相对于根目录)
agent.sources.ftp1.working.directory = /
#文件的名称匹配 (java正则表达式)
agent.sources.ftp1.filter.pattern = .+\.json
#ftp的根目录
agent.sources.ftp1.folder = /flume
agent.sources.ftp1.run.discover.delay=5000
agent.sources.ftp1.flushlines = true
agent.sources.ftp1.search.recursive = true
agent.sources.ftp1.processInUse = false
agent.sources.ftp1.processInUseTimeout = 30
agent.sources.ftp1.channels = ch1
agent.sources.ftp1.inputCharset=utf8
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 100000000
agent.channels.ch1.transactionCapacity = 100000000
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink
agent.sinks.k1.url = jdbc:mysql://127.0.0.1:3306/flume?autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&characterEncoding=UTF-8&useUnicode=true
agent.sinks.k1.user= root
agent.sinks.k1.password= root
agent.sinks.k1.driver= com.mysql.cj.jdbc.Driver
agent.sinks.k1.databaseName = flume
agent.sinks.k1.tableName = flume_test
agent.sinks.k1.iscustom = false
agent.sinks.k1.channel = ch1
#####参数解释
sources配置的数据来源
agent.sources.ftp1.type填的是三方jar中的方法去开启读取ftp文件 必填项
agent.sources.ftp1.folder填的是读取日志文件生成目录(记录那些文件已读取过 避免重复读取)
agent.sources.ftp1.flushlines true代表用行去读,false用代码块去读
agent.sources.ftp1.channels = ch1此配置必填不然无法去读取sink
sinks配置的数据接受项(此配置可以自己定义参数,后台通过context对象获取到)
agent.sinks.k1.type = com.ihooyah.flume.sink.MySqlSink 此配置填写项目中的执行类(该类继承 AbstractSink)
agent.sinks.k1.channel = ch1此配置必填 关联数据源
#####代码讲解
处理数据逻辑参考com.ihooyah.flume.sink.MySqlSink这个类
event对象里面中body读取的是文件内容 headers读取的是文件名称
需要注意body的byte数组转换成string是需要指定编码格式 防止乱码
读取的文件也需要确定编码格式utf8否则会出现乱码
####注意事项
process方法中 必须要有事务的代码不然会报错
Transaction transaction = channel.getTransaction();
transaction.begin();
transaction.commit();
transaction.rollback();
transaction.close();
否则事务报错会回退
详情请见源代码
五:将本次项目打包放入lib目录下
共五个jar
进入D:\flume\apache-flume-1.9.0-bin\bin此目录 打开控制台
启动命令flume-ng agent --conf-file ../conf/ftp.conf --name agent -property flume.root.logger=INFO,console
#####flume实战案例二:从ftp环境读取文件存到本地服务器路径
conf文件示例
# flume监控ftp文件功能的配置如下
agent.sources = ftp1
agent.sinks = k1
agent.channels = ch1
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.client.source = ftp
#ftp服务地址
agent.sources.ftp1.name.server = 61.159.214.198
#端口
agent.sources.ftp1.port = 50021
#ftp用户名
agent.sources.ftp1.user = flume
#密码
agent.sources.ftp1.password = sSt23dDmcikTNiw8
#文件所在目录 (相对于根目录)
agent.sources.ftp1.working.directory = /
#文件的名称匹配 (java正则表达式)
agent.sources.ftp1.filter.pattern = .+
#ftp的根目录
agent.sources.ftp1.folder = /flume
agent.sources.ftp1.run.discover.delay=5000
agent.sources.ftp1.flushlines = false
agent.sources.ftp1.search.recursive = true
agent.sources.ftp1.processInUse = false
agent.sources.ftp1.processInUseTimeout = 30
agent.sources.ftp1.channels = ch1
agent.sources.ftp1.chunk.size = 102400
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 100000000
agent.channels.ch1.transactionCapacity = 100000000
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink
agent.sinks.k1.uploadDir = D:\\
agent.sinks.k1.batchSize = 10000
agent.sinks.k1.channel = ch1
与实战一区别的配置
agent.sources.ftp1.flushlines = false 图片需要按照块去读不能按行读取否则会出现字节丢失导致图片损坏打不开
agent.sinks.k1.type = com.ihooyah.flume.sink.LocalUploadSink 重新写的一个sink
event读取文件的时候是多次读取的也就是说不能只读到一个event就去生成文件 这是不完整的
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment