Commit 12324a95 by 吴凯波

flume监控ftp得json文件

parent 58a3c27f
...@@ -9,11 +9,10 @@ import java.sql.ResultSet; ...@@ -9,11 +9,10 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.HashMap; import com.alibaba.fastjson.JSONArray;
import java.util.List; import com.alibaba.fastjson.JSONObject;
import java.util.Map;
import org.apache.flume.Event; import org.apache.flume.Event;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -37,6 +36,7 @@ public class DAOClass { ...@@ -37,6 +36,7 @@ public class DAOClass {
} }
public void insertData(Event event, Map<String, String> params) throws Exception { public void insertData(Event event, Map<String, String> params) throws Exception {
//同步.json文件的逻辑
try { try {
String body = new String(event.getBody(),"UTF-8"); String body = new String(event.getBody(),"UTF-8");
Map<String, String> headers = event.getHeaders(); Map<String, String> headers = event.getHeaders();
...@@ -45,86 +45,57 @@ public class DAOClass { ...@@ -45,86 +45,57 @@ public class DAOClass {
String tableName = (String)params.get("tableName"); String tableName = (String)params.get("tableName");
String partition = (String)params.get("partition"); String partition = (String)params.get("partition");
String iscustom = (String)params.get("iscustom"); String iscustom = (String)params.get("iscustom");
JSONObject initObject=JSONObject.parseObject(body);
JSONArray initArray=JSONArray.parseArray(initObject.get("data").toString());
if (!"false".equals(iscustom)) { if (!"false".equals(iscustom)) {
if ("true".equals(iscustom)) { if ("true".equals(iscustom)) {
; ;
} }
} else { } else {
String columns = ""; String columns = "";
List<String> columnList = new ArrayList<>();
Statement st = this.connection.createStatement(); Statement st = this.connection.createStatement();
String nowDate; tableName = initObject.get("type").toString();
String[] files=fileName.split("\\."); String isTableExist = "select table_name from INFORMATION_SCHEMA.Tables where table_name='" + tableName + "'";
String isTableExist="select table_name from INFORMATION_SCHEMA.Tables where table_name='"+files[0].trim()+"'"; ResultSet tableResult = st.executeQuery(isTableExist);
ResultSet tableResult=st.executeQuery(isTableExist); if ("false".equals(tableResult.next()+"")) {
if ("false".equals(tableResult.next()+"")){ JSONObject jsonObject = JSONObject.parseObject(initArray.getString(0));
if (body.contains(";")){ Iterator<String> keys = jsonObject.keySet().iterator();
String[] columnArray=body.split(";"); while (keys.hasNext()) {
String createTableSql="create table "+files[0].trim()+"(id int PRIMARY key AUTO_INCREMENT"; String column=keys.next();
columns = columns + column + ",";
columnList.add(column);
}
String createTableSql = "create table " + tableName + "(id int PRIMARY key AUTO_INCREMENT";
String columnSql; String columnSql;
for (int i = 0; i <columnArray.length ; i++) { for (String column : columnList) {
columnSql=","+columnArray[i].trim()+" varchar(100)"; columnSql = "," + column.trim() + " varchar(100)";
createTableSql=createTableSql+columnSql; createTableSql = createTableSql + columnSql;
} }
createTableSql=createTableSql+",file_name varchar(30),create_time datetime);"; createTableSql = createTableSql + ",file_name varchar(30),create_time datetime);";
st.execute(createTableSql); st.execute(createTableSql);
} }
} String values = "";
if (columnsMap.get(tableName) == null) { for (Object data : initArray) {
nowDate = "select COLUMN_NAME from INFORMATION_SCHEMA.Columns where table_name='" + tableName + "' and table_schema='" + databaseName + "'"; JSONObject dataJson = JSONObject.parseObject(data.toString());
Iterator<String> keys = dataJson.keySet().iterator();
for(ResultSet resultSet = st.executeQuery(nowDate); resultSet.next(); columns = columns + resultSet.getString("COLUMN_NAME") + ",") { while (keys.hasNext()) {
; String index=keys.next();
} columns = columns + index + ",";
values = values +"'"+dataJson.get(index)+"'"+ ",";
columns = columns.substring(0, columns.length() - 1); }
columnsMap.put(tableName, columns); columns=columns+"file_name,create_time";
} else { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
columns = (String)columnsMap.get(tableName); values=values+"'"+fileName+"','"+sdf.format(new Date())+"'";
} INSERT_QUERY = "INSERT INTO " + tableName + "(" + columns + ") values (" + values + ")";
System.out.println("数据新增语句:"+INSERT_QUERY);
nowDate = getNowDate();
String[] datas = body.split(partition);
List<String> dataList = new ArrayList();
dataList.add("");
String[] arr$ = datas;
int i1 = datas.length;
for(int i$ = 0; i$ < i1; ++i$) {
String data = arr$[i$];
if (data!=null && !"".equals(data)){
dataList.add(data);
}
}
dataList.add(fileName);
dataList.add(nowDate);
String indexs = "";
i1 = 0;
while(true) {
if (i1 >= dataList.size()) {
indexs = indexs.substring(0, indexs.length() - 1);
INSERT_QUERY = "INSERT INTO " + tableName + "(" + columns + ") values (" + indexs + ")";
break;
}
String ind = "";
if (((String)dataList.get(i1)).equals("")) {
ind = "null,";
} else {
ind = "'" + (String)dataList.get(i1) + "',";
}
indexs = indexs + ind;
++i1;
}
}
System.out.println("插入语句:"+INSERT_QUERY);
PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY); PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);
insertStmnt.execute(); insertStmnt.execute();
}
}
} catch (SQLException var20) { } catch (SQLException var20) {
var20.printStackTrace(); var20.printStackTrace();
} }
} }
public void createConnection(String driver, String db_url, String user, String password) throws IOException { public void createConnection(String driver, String db_url, String user, String password) throws IOException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment