SDK发布原理介绍
- 增量日志解析,兼容DML(INSERT/UPDATE/DELETE操作),以及DDL操作
- 增量数据读取,所有定时SELECT出来的数据全部为INSERT操作,其他参考日志解析,增量信息数据
建立BusDataClient客户端模式:
//sdk容器在同一台主机中包含有restapi服务,获取相关信息
BusDataClient dataClient=new BusDataClient(10818);
//通过restapi访问相关API信息,获取对应服务地址对应NODEID相关信息位置
BusDataClient dataClient=new BusDataClient("http://zcbusrestapi/", 10818);
//通过TCP/IP以及用户密码验证等方式
BusDataClient dataClient = new BusDataClient("39.96.51.215", 22000, 10818, "test", "test");
测试DEMO
public void dataClientDemo() {
// TODO Auto-generated method stub
//BusDataClient dataClient=new BusDataClient(10818);
//BusDataClient dataClient=new BusDataClient("http://v2.zbomc.com/", 10818);
BusDataClient dataClient = new BusDataClient("39.96.51.215", 22000, 10818, "test", "test");
int send_count = 100000;
ZcbusLog.setLevel(2);
BusDataVector bdVector = null;
List<BusDataVectorColVal> colList = null;
List<BusDataVectorColVal> colBeforeList = null;
BusDataVectorColVal colVal = null;
BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
try {
dataClient.open();
for (int i = 0; i < send_count; i++) {
ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
dataClient.refreshTableList();
// -----------------------------------------
// 1. insert ds.test
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
bdVector.setDbName("test");
bdVector.setTableName("tb01");
// column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
colVal = new BusDataVectorColVal("c2", "string", "test");
colList.add(colVal);
colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
colList.add(colVal);
bdVector.setColList(colList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 2. update ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// after column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
colVal = new BusDataVectorColVal("c2", "string", "testtest");
colList.add(colVal);
colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
colList.add(colVal);
bdVector.setColList(colList);
// before column
colBeforeList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colBeforeList.add(colVal);
bdVector.setColBeforeList(colBeforeList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 3. delete ds.test
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
bdVector.setDbName("ds");
bdVector.setTableName("test");
// column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
bdVector.setColList(colList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 4. ddl ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// ddl sql
bdVector.setDDlSql("truncate table ds.test1");
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 5. custom ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// custom data
JSONObject jsonObject = new JSONObject();
jsonObject.put("abc", "test");
jsonObject.put("abc1", "test");
jsonObject.put("abc2", "test");
jsonObject.put("abc3", "test" + (i + 1));
bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// send data to server
dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());
byteBuild.clear();
ZcbusLog.info("test send ok.");
ZcbusTime.sleep(1);
}
dataClient.close();
} catch (Exception e) {
ZcbusLog.error(e);
}
}
推送特定数据(二进制数据)
- 推送数据为固定数据,不需要在进行解析加工,只传输固定数据\文本\json\二进制等数据信息
public void SendCustomDemo() {
BusDataClient dataClient = new BusDataClient(ip, port, nodeid, "test", "test");
int send_count = 100000;
ZcbusLog.setLevel(2);
BusDataVector bdVector = null;
BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
try {
dataClient.open();
dataClient.refreshTableList();
JSONObject jsonObject = new JSONObject();
jsonObject.put("abc", "test");
jsonObject.put("abc1", "test");
jsonObject.put("abc2", "test");
jsonObject.put("abc3", "test");
jsonObject.put("abc4", "test");
jsonObject.put("abc5", "test");
jsonObject.put("abc6", "test");
jsonObject.put("abc7", "test");
jsonObject.put("abc8", "test");
jsonObject.put("abc9", "test");
jsonObject.put("abc10", "test");
StringBuffer stringBuffer=new StringBuffer();
for(int j=0;j<500000;j++) {
stringBuffer.append(jsonObject.toJSONString());
}
StringBuffer sbBuffer=new StringBuffer();
sbBuffer.append(stringBuffer.substring(0, limitLen));
stringBuffer=null;
for (int i = 0; i < send_count; i++) {
ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
// 5. custom ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
bdVector.setDbName("test");
bdVector.setTableName("tb01");
// custom data
bdVector.setCustomData(sbBuffer.toString().getBytes());
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// send data to server
dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());
byteBuild.clear();
// ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
// ZcbusTime.sleep(1);
}
dataClient.close();
} catch (Exception e) {
ZcbusLog.error(e);
}
}
文档更新时间: 2023-10-13 17:11 作者:阿力