Commit 4ae90071 by 陈科朋

flume同步增加操作字段

parent 6ae89134
package com.ihooyah.flume.sink; package com.ihooyah.flume.sink;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.*;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Event; import org.apache.flume.Event;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.swing.plaf.synth.SynthScrollBarUI; import java.io.IOException;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.*;
/** /**
* 描述 * 描述
...@@ -45,7 +37,8 @@ public class DAOClass { ...@@ -45,7 +37,8 @@ public class DAOClass {
String tableName = (String)params.get("tableName"); String tableName = (String)params.get("tableName");
String iscustom = (String)params.get("iscustom"); String iscustom = (String)params.get("iscustom");
JSONObject initObject=JSONObject.parseObject(body); JSONObject initObject=JSONObject.parseObject(body);
JSONObject dataObject=initObject.getJSONObject("data"); String dataObject=initObject.getString("data");
String operation = initObject.getString("operation");
if (!"false".equals(iscustom)) { if (!"false".equals(iscustom)) {
if ("true".equals(iscustom)) { if ("true".equals(iscustom)) {
; ;
...@@ -70,16 +63,17 @@ public class DAOClass { ...@@ -70,16 +63,17 @@ public class DAOClass {
DELETE_QUERY="DELETE FROM flume_json where create_time<'"+mon+"';"; DELETE_QUERY="DELETE FROM flume_json where create_time<'"+mon+"';";
st.execute(DELETE_QUERY); st.execute(DELETE_QUERY);
String values = ""; String values = "";
columns="type,data,file_name,create_time,status"; columns="type,operation,data,file_name,create_time,status";
values="'"+initObject.get("type").toString()+"','"+dataObject+"','"+fileName+"','"+sdf.format(new Date())+"','0'"; values="'"+initObject.getString("type")+"','"+ operation + "','" + dataObject+"','"+fileName+"','"+sdf.format(new Date())+"','0'";
INSERT_QUERY = "INSERT INTO flume_json (" + columns + ") values (" + values + ")"; INSERT_QUERY = "INSERT INTO flume_json (" + columns + ") values (" + values + ")";
System.out.println("数据新增语句:"+INSERT_QUERY); System.out.println("数据新增语句:"+INSERT_QUERY);
PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY); PreparedStatement insertStmnt = this.connection.prepareStatement(INSERT_QUERY);
insertStmnt.execute(); insertStmnt.execute();
} }
} catch (SQLException var20) { } catch (SQLException var20) {
var20.printStackTrace(); var20.printStackTrace();
}finally {
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment