SDK 加工服务配置流程:
- SDK数据加工原理介绍
- 加工SDK兼容DEMO
- 组件注册
- 加工服务启动配置&管理
SDK发布原理介绍
加工SDK兼容DEMO
加工SDK Demo
- ZcbusEtlServiceDemo.java
package com.zcbus.Service;
import com.zcbus.common.BusEtlCmd;
import java.util.ArrayList;
import java.util.List;
import com.zcbus.common.BusDataVector;
import com.zcbus.common.BusDataVectorColVal;
import com.zcbus.common.BusJsonConverter;
import com.zcbus.common.BusServer;
import com.zcbus.common.BytesBuilder;
import com.zcbus.common.ZcbusTime;
public class ZcbusEtlServiceDemo extends BusEtlCmd {
public byte[] processBusdata(byte[] busData, int dataLength) throws Exception {
System.out.println(" customerID: " + getCustomerID());
System.out.println(" controlID: " + getControlID());
System.out.println(" tableID: " + getTableID());
System.out.println(" dbName: " + getDbName());
System.out.println(" tableName: " + getTableName());
System.out.println("data length: " + dataLength);
BytesBuilder byteBuild = new BytesBuilder(dataLength + 1024 * 1024, 1024 * 1024);
BusDataVector bdVector = new BusDataVector();
List<BusDataVectorColVal> colList = null;
int size = 0;
int offset = 0;
while (offset < dataLength) {
int len = bdVector.deserialize(busData, offset, dataLength);
if (len <= 0) {
throw new Exception(String.format("bus data offset %d, length is %d", offset, len));
}
switch (bdVector.getOpType()) {
case BusDataVector.BSDATA_OPTYPE_UPDATE:
if (bdVector.hasBeforeValue()) {
colList = bdVector.getColBeforeList();
size = colList.size();
for (int i = 0; i < size; i++) {
String str = colList.get(i).getDataString();
str += "cvtb";
colList.get(i).setData(str);
}
}
case BusDataVector.BSDATA_OPTYPE_INSERT:
case BusDataVector.BSDATA_OPTYPE_DELETE:
colList = bdVector.getColList();
size = colList.size();
for (int i = 0; i < size; i++) {
String str = colList.get(i).getDataString();
str += "cvt";
colList.get(i).setData(str);
}
bdVector.serialize(byteBuild);
break;
}
offset += len;
}
return byteBuild.toArray();
}
public static void main(String[] args) {
// TODO Auto-generated method stub
for (int i = 0; i < args.length; i++) {
if (0 == args[i].compareTo("-h")) {
System.out.println(
"Usage: \n" + " -h help\n" + " -log_level 2 log_level\n"
+ " -max_thread 128 specify max threads of client service, default 512\n"
+ " -customer_id 10058 specify customer id of client service, use for port\n");
return;
} else if (0 == args[i].compareTo("-customer_id"))
customer_id = Integer.parseInt(args[++i]);
else if (0 == args[i].compareTo("-log_level"))
logLevel = Integer.parseInt(args[++i]);
else if (0 == args[i].compareTo("-max_thread"))
max_threads = Integer.parseInt(args[++i]);
else {
ZcbusLog.error("unkown options: " + args[i]);
return;
}
}
if (customer_id <= 0) {
ZcbusLog.error("cusomter_id not set");
return;
}
try {
ZcbusEnv.init();
String logFile = ZcbusEnv.getZcbusLogPath() + "/log.db_service." + customer_id;
ZcbusLog.open(logFile);
ZcbusLog.setLevel(logLevel);
String workDirectory = ZcbusEnv.getZcbusCachePath() + "/db_service" + customer_id;
ZcbusEnv.setWorkDirectory(workDirectory);
ZcbusLog.print(2, "make workDirectory: " + workDirectory);
ZcbusEnv.createDir(workDirectory);
if (max_threads < 1)
max_threads = 512;
BusServer busServer = new BusServer(customer_id,max_threads);
BusServer.registEtlCmdClass(ZcbusEtlServiceDemo.class);
busServer.run();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
组件注册
INSERT INTO `bus_service_type_model` (`usage`, `db_type`, `use_way`, `service_subtype`, `program_name`, `updatetime`) VALUES ('2', 'client', '1', 'etl', 'ZcbusEtlServiceDemo.jar', '2022-12-11 15:41:07');
- 目前手工将开发的SendSdkDemo.jar包上传到容器下/usr/local/zcbus/jar目录下
客户端管理
- 添加客户端
注意:客户端名称可自定义
客户端模式选择API客户端
选择适当运行容器
- 添加客户端
订阅客户端配置
文档更新时间: 2022-12-14 17:41 作者:阿力