Commit 2c382558 by 吴凯波

111

parent e9cd693c
package com.ihooyah.flume.sink;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
......@@ -17,6 +18,8 @@ import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.swing.plaf.synth.SynthScrollBarUI;
/**
* 描述
*
......@@ -33,9 +36,9 @@ public class DAOClass {
public DAOClass() {
}
public void insertData(Event event, Map<String, String> params) {
public void insertData(Event event, Map<String, String> params) throws Exception {
try {
String body = new String(event.getBody());
String body = new String(event.getBody(),"UTF-8");
Map<String, String> headers = event.getHeaders();
String fileName = (String)headers.get("fileName");
String databaseName = (String)params.get("databaseName");
......@@ -50,6 +53,23 @@ public class DAOClass {
String columns = "";
Statement st = this.connection.createStatement();
String nowDate;
String[] files=fileName.split("\\.");
String isTableExist="select table_name from INFORMATION_SCHEMA.Tables where table_name='"+files[0].trim()+"'";
ResultSet tableResult=st.executeQuery(isTableExist);
if ("false".equals(tableResult.next()+"")){
if (body.contains(";")){
String[] columnArray=body.split(";");
String createTableSql="create table "+files[0].trim()+"(id int PRIMARY key AUTO_INCREMENT";
String columnSql;
for (int i = 0; i <columnArray.length ; i++) {
columnSql=","+columnArray[i].trim()+" varchar(100)";
createTableSql=createTableSql+columnSql;
}
createTableSql=createTableSql+",file_name varchar(30),create_time datetime);";
System.out.println("建表语句:"+createTableSql);
st.execute(createTableSql);
}
}
if (columnsMap.get(tableName) == null) {
nowDate = "select COLUMN_NAME from INFORMATION_SCHEMA.Columns where table_name='" + tableName + "' and table_schema='" + databaseName + "'";
......@@ -66,19 +86,21 @@ public class DAOClass {
nowDate = getNowDate();
String[] datas = body.split(partition);
List<String> dataList = new ArrayList();
dataList.add("");
String[] arr$ = datas;
int i1 = datas.length;
for(int i$ = 0; i$ < i1; ++i$) {
String data = arr$[i$];
dataList.add(data);
if (data!=null && !"".equals(data)){
dataList.add(data);
}
}
dataList.add(fileName);
dataList.add(nowDate);
String indexs = "";
i1 = 0;
while(true) {
if (i1 >= dataList.size()) {
indexs = indexs.substring(0, indexs.length() - 1);
......@@ -97,7 +119,7 @@ public class DAOClass {
++i1;
}
}
System.out.println("插入语句:"+INSERT_QUERY);
PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);
insertStmnt.execute();
} catch (SQLException var20) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment