Commit 6a20d7b5 by 吴凯波

flume监控ftp得json文件

parent 12324a95
......@@ -30,20 +30,19 @@ public class DAOClass {
private static final Logger logger = LoggerFactory.getLogger(DAOClass.class);
private static Map<String, String> columnsMap = new HashMap();
private static String INSERT_QUERY;
private static String DELETE_QUERY;
private Connection connection;
public DAOClass() {
}
public void insertData(Event event, Map<String, String> params) throws Exception {
//同步.json文件的逻辑
//同步json文件的逻辑
try {
String body = new String(event.getBody(),"UTF-8");
Map<String, String> headers = event.getHeaders();
String fileName = (String)headers.get("fileName");
String databaseName = (String)params.get("databaseName");
String tableName = (String)params.get("tableName");
String partition = (String)params.get("partition");
String iscustom = (String)params.get("iscustom");
JSONObject initObject=JSONObject.parseObject(body);
JSONArray initArray=JSONArray.parseArray(initObject.get("data").toString());
......@@ -55,39 +54,27 @@ public class DAOClass {
String columns = "";
List<String> columnList = new ArrayList<>();
Statement st = this.connection.createStatement();
tableName = initObject.get("type").toString();
String isTableExist = "select table_name from INFORMATION_SCHEMA.Tables where table_name='" + tableName + "'";
String isTableExist = "select table_name from INFORMATION_SCHEMA.Tables where table_name='flume_json'";
ResultSet tableResult = st.executeQuery(isTableExist);
if ("false".equals(tableResult.next()+"")) {
JSONObject jsonObject = JSONObject.parseObject(initArray.getString(0));
Iterator<String> keys = jsonObject.keySet().iterator();
while (keys.hasNext()) {
String column=keys.next();
columns = columns + column + ",";
columnList.add(column);
}
String createTableSql = "create table " + tableName + "(id int PRIMARY key AUTO_INCREMENT";
String columnSql;
for (String column : columnList) {
columnSql = "," + column.trim() + " varchar(100)";
createTableSql = createTableSql + columnSql;
}
createTableSql = createTableSql + ",file_name varchar(30),create_time datetime);";
String createTableSql = "create table flume_json (id int PRIMARY key AUTO_INCREMENT,type varchar(20),data text," +
"file_name varchar(40),create_time datetime);";
st.execute(createTableSql);
}
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar c=Calendar.getInstance();
c.setTime(new Date());
c.add(Calendar.MONTH, -1);
Date m = c.getTime();
String mon = sdf.format(m);
DELETE_QUERY="DELETE FROM flume_json where create_time<'"+mon+"';";
st.execute(DELETE_QUERY);
String values = "";
for (Object data : initArray) {
JSONObject dataJson = JSONObject.parseObject(data.toString());
Iterator<String> keys = dataJson.keySet().iterator();
while (keys.hasNext()) {
String index=keys.next();
columns = columns + index + ",";
values = values +"'"+dataJson.get(index)+"'"+ ",";
}
columns=columns+"file_name,create_time";
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
values=values+"'"+fileName+"','"+sdf.format(new Date())+"'";
INSERT_QUERY = "INSERT INTO " + tableName + "(" + columns + ") values (" + values + ")";
columns="type,data,file_name,create_time";
values="'"+initObject.get("type").toString()+"','"+dataJson+"','"+fileName+"','"+sdf.format(new Date())+"'";
INSERT_QUERY = "INSERT INTO flume_json (" + columns + ") values (" + values + ")";
System.out.println("数据新增语句:"+INSERT_QUERY);
PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);
insertStmnt.execute();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment