发布节点准备:
添加发布节点,选择service类型
软件链接进来的时候,需要验证host,端口,用户,密码。
说明:
host:开发期间为docker所在的主机地址,测试期间需要对外提供服务
上线之后,主机地址修改为docker容器名,上线之后可以关闭对外服务
port:对外提供端口服务
开发期间,修改安装目录下yaml下的docker-compose.yml文件,添加一个
ports:
- 对外服务端口: 内部服务端口(port)
创建发布节点
添加发布节点
配置添加服务发布节点
配置发布服务信息
添加发布配置表(可以理解为发布通道)
添加发布成功后,状态如下
修改容器端口映射,将端口映射到主机
[root@iZ2ze7y8byz5e2ki7gbcjpZ simple_server]# export ZCBUS_HOME=/data1/zcbus_docker/zcbusSoft/simple_server
[root@iZ2ze7y8byz5e2ki7gbcjpZ simple_server]# cd yaml/
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# vi docker-compose.yml
zcbus:
container_name: zcbus
environment:
LANG: en_US.UTF-8
ZCBUS_CONTAINER: zcbus
ZC_IPADDRESS: 172.17.104.186
database: zcbus
dbhost: zcbusdb
dbport: 3306
dbpwd: e0twWGp8aVtWfGB8dn9YdTo
dbuser: QFlYT0k6
image: reg.zbomc.com/zcbus_server:v2.0.1
ports: #### 增加ports,以及后续端口映射,发布服务和端口数量1:1
- 21010:21010
volumes:
- /data/docker02/zcbusdata/zcbus/bin:/usr/local/zcbus/bin
- /data/docker02/zcbusdata/zcbus/lib:/usr/local/zcbus/lib
- /data/docker02/zcbusdata/zcbus/cache:/usr/local/zcbus/cache
- /data/docker02/zcbusdata/zcbus/mq:/usr/local/zcbus/mq
- /data/docker02/zcbusdata/zcbus/log:/usr/local/zcbus/log
- /data/docker02/zcbusdata/zcbus/jdk:/usr/local/jdk
- /data/docker02/zcbusdata/zcbus/jar:/usr/local/zcbus/jar
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# docker-compose up -d #### 更新容器状态,端口对外映射
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# docker exec -it zcbus bash ####进入容器,检查发布服务日志
[root@a294924cee45 zcbus]# cd log
[root@a294924cee45 log]# tail -f log.zcbus_real.10832
[oracle@iZ2zeih4ukggzureoarz7pZ log]$ more log.zcbus_real.10832
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
zcbus: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 348766
[INF] local host: name[iZ2zeih4ukggzureoarz7pZ], ip[172.17.46.245]
[LV0] 2022-12-15 22:22:28: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
influx config:
INFLUX_BUCKET_NAME=BUCKET
INFLUX_HOST=http://172.17.104.186:8086
INFLUX_ORG=ORG
INFLUX_TOKEN=mVr3RJWNhaCu235sqOnOivo59xp3V0ASwqKTH38bMMWpfNSzbp0kN3E-rm4hyGu27uUY1qKn6_6-hpYVRmMALw==
config:
db_type=server
default_schema=zcbus
host=172.17.46.245
multi_connection_mode=5
password=test
port=21010
user=test
real_send_queue_cnt adjust to 8
kafka config:
acks=all
bootstrap.servers=172.17.46.244:9092
compression.type=snappy
message.max.bytes=10485760
[INF] 2022-12-15 22:22:28: log extract thread is running...
[LV0] 2022-12-15 22:22:28: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
[INF] set real status is starting.
[INF] update real table list...
table in dict[0]:
[INF] 2022-12-15 22:22:28: check repair tables...
[INF] 2022-12-15 22:22:28: extract start pos ...
[INF] 2022-12-15 22:22:28: bind host zcbus:21010...
[LV0] try bind zcbus:21010...
[INF] 2022-12-15 22:22:28: Listener running on zcbus:21010
[INF] Listener bound to sd: 12
[LV0] epoll add accept sd: 12.
[INF] 2022-12-15 22:22:30: flush data to kafka...
[INF] 2022-12-15 22:22:30: flush data to kafka ok.
[INF] set real status is started.
[INF] 2022-12-15 22:22:30: extract end pos .
ZCBUS发布开发模式:
//通过TCP/IP以及用户密码验证等方式
BusDataClient dataClient = new BusDataClient("172.17.46.245", 21010, 10832, "test", "test");
可嵌入ZCBUS平台demo
package com.zcbus.Service;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.zcbus.common.BusDataClient;
import com.zcbus.common.BusDataVector;
import com.zcbus.common.BusDataVectorColVal;
import com.zcbus.common.BytesBuilder;
import com.zcbus.common.ZcbusDbParam;
import com.zcbus.common.ZcbusLog;
import com.zcbus.common.ZcbusTime;
public class ZcbusPushSdkDemo{
private static int customer_id = 0;
private static int max_threads = 512;
private static int logLevel = -1;
private static String zcbusurl =null;
private BusDataClient dataClient;
private static String server_ip=null;
private static int server_port=0;
private static int limitLen=0;
private void InitPushSdk() {
if(server_ip!=null) {
dataClient = new BusDataClient(server_ip, server_port, customer_id, "test", "test");
}else if(zcbusurl==null) {
dataClient=new BusDataClient(zcbusurl, customer_id);
}else{
dataClient=new BusDataClient(customer_id);
}
}
public void PushSdkDemo() {
InitPushSdk();
int send_count = 100000;
ZcbusLog.setLevel(2);
BusDataVector bdVector = null;
BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
try {
dataClient.open();
dataClient.refreshTableList();
JSONObject jsonObject = new JSONObject();
jsonObject.put("abc", "test");
jsonObject.put("abc1", "test");
jsonObject.put("abc2", "test");
jsonObject.put("abc3", "test");
jsonObject.put("abc4", "test");
jsonObject.put("abc5", "test");
jsonObject.put("abc6", "test");
jsonObject.put("abc7", "test");
jsonObject.put("abc8", "test");
jsonObject.put("abc9", "test");
jsonObject.put("abc10", "test");
StringBuffer stringBuffer=new StringBuffer();
for(int j=0;j<500000;j++) {
stringBuffer.append(jsonObject.toJSONString());
}
StringBuffer sbBuffer=new StringBuffer();
sbBuffer.append(stringBuffer.substring(0, limitLen));
stringBuffer=null;
for (int i = 0; i < send_count; i++) {
ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
// -----------------------------------------
// 5. custom ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
bdVector.setDbName("rz_user");
bdVector.setTableName("channel_01");
// custom data
bdVector.setCustomData(sbBuffer.toString().getBytes());
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// send data to server
dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());
byteBuild.clear();
// ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
// ZcbusTime.sleep(1);
}
dataClient.close();
} catch (Exception e) {
ZcbusLog.error(e);
}
}
public void dataClientDemo() {
// TODO Auto-generated method stub
InitPushSdk();
int send_count = 100000;
ZcbusLog.setLevel(2);
BusDataVector bdVector = null;
List<BusDataVectorColVal> colList = null;
List<BusDataVectorColVal> colBeforeList = null;
BusDataVectorColVal colVal = null;
BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
try {
dataClient.open();
for (int i = 0; i < send_count; i++) {
ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
dataClient.refreshTableList();
// -----------------------------------------
// 1. insert ds.test
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
bdVector.setDbName("test");
bdVector.setTableName("tb01");
// column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
colVal = new BusDataVectorColVal("c2", "string", "test");
colList.add(colVal);
colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
colList.add(colVal);
bdVector.setColList(colList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 2. update ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// after column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
colVal = new BusDataVectorColVal("c2", "string", "testtest");
colList.add(colVal);
colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
colList.add(colVal);
bdVector.setColList(colList);
// before column
colBeforeList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colBeforeList.add(colVal);
bdVector.setColBeforeList(colBeforeList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 3. delete ds.test
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
bdVector.setDbName("ds");
bdVector.setTableName("test");
// column
colList = new ArrayList<>();
colVal = new BusDataVectorColVal("c1", "number", "1");
colList.add(colVal);
bdVector.setColList(colList);
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 4. ddl ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// ddl sql
bdVector.setDDlSql("truncate table ds.test1");
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// 5. custom ds.test1
bdVector = new BusDataVector();
bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
bdVector.setOpTime(ZcbusTime.getCurrentTime());
bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
bdVector.setDbName("ds");
bdVector.setTableName("test1");
// custom data
JSONObject jsonObject = new JSONObject();
jsonObject.put("abc", "test");
jsonObject.put("abc1", "test");
jsonObject.put("abc2", "test");
jsonObject.put("abc3", "test" + (i + 1));
bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));
// serialize
bdVector.serialize(byteBuild);
// -----------------------------------------
// send data to server
dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());
byteBuild.clear();
ZcbusLog.info("test send ok.");
ZcbusTime.sleep(1);
}
dataClient.close();
} catch (Exception e) {
ZcbusLog.error(e);
}
}
public static void main(String[] args) {
for (int i = 0; i < args.length; i++) {
if (0 == args[i].compareTo("-h")) {
System.out.println(
"Usage: \n" + " -h help\n" + " -log_level 2 log_level\n"
+ " -max_thread 128 specify max threads of client service, default 512\n"
+ " -customer_id 10058 specify customer id of client service, use for port\n"
+ " -zcbusurl http://zcbusrestapi/ push zcbus url \n"
+ " -server_ip 198.218.249.130 push docker for zcbus's ip \n"
+ " -server_port 10088 push docker for zcbus's port \n");
return;
} else if (0 == args[i].compareTo("-customer_id")) {
customer_id = Integer.parseInt(args[++i]);
}else if (0 == args[i].compareTo("-log_level")) {
logLevel = Integer.parseInt(args[++i]);
}else if (0 == args[i].compareTo("-max_thread")) {
max_threads = Integer.parseInt(args[++i]);
}else if (0 == args[i].compareTo("-zcbusurl")) {
zcbusurl=args[++i];
}else if (0 == args[i].compareTo("-server_ip")) {
server_ip=args[++i];
}else if (0 == args[i].compareTo("-server_port")) {
server_port=Integer.parseInt(args[++i]);
}else {
ZcbusLog.error("unkown options: " + args[i]);
return;
}
}
if (customer_id <= 0) {
ZcbusLog.error("cusomter_id not set");
return;
}
ZcbusPushSdkDemo dataClientDemo=new ZcbusPushSdkDemo();
dataClientDemo.PushSdkDemo();
}
}
- 参数说明:
测试过程中,由于为外围测试,因此直接链接端口,测试成功之后,即可以此类为基础发布数据包 - DEMO启动测试日志
[oracle@iZ2zeih4ukggzureoarz7pZ jar]$ java -jar ZcbusPushSdkDemo.jar -customer_id 50000 -zcbusurl http://v2.zbomc.com
[INF] 2022-12-15 22:39:02: url :http://v2.zbomc.com/api/zbomc/databus/expose/publish/busPushDbParameter/queryBusPushDbParameterByNodeId
[INF] 2022-12-15 22:39:03: jsonobject :{"nodeId":50000}
[INF] 2022-12-15 22:39:03: connect to server 127.0.0.1:21003...
[INF] 2022-12-15 22:39:03: connect to server ok.
[INF] 2022-12-15 22:39:03: login to server ok.
[LV0] 2022-12-15 22:39:03: refresh table list...
[INF] 2022-12-15 22:39:06: test send bsdata(1/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
[INF] 2022-12-15 22:39:06: test send bsdata(2/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
[INF] 2022-12-15 22:39:06: test send bsdata(3/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
[INF] 2022-12-15 22:39:06: test send bsdata(4/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
[INF] 2022-12-15 22:39:06: test send bsdata(5/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
- 发布服务接收日志如下:
[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1] ---> CUSTOM[1]
Send data: 5.0000 M
Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .
[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1] ---> CUSTOM[1]
Send data: 5.0000 M
Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .
[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1] ---> CUSTOM[1]
Send data: 5.0000 M
Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .
KAFKA消费测试
关注日志开始接收日志的时候,会打印如下信息
[INF] 2022-12-15 15:27:16: log extract thread is running...
[LV0] 2022-12-15 15:27:16: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
[INF] set real status is starting.
[INF] update real table list...
add table test.tb01(1/1)...
[DICT] put table test.tb01
test.tb01 0/0 flags 0x0 topic[10824.790204.r]
table in dict[1]:
test.tb01 0/0 flags 0x0 topic[10824.790204.r]
[INF] 2022-12-15 15:27:16: check repair tables...
[INF] 2022-12-15 15:27:16: extract start pos ...
[INF] 2022-12-15 15:27:16: bind host 127.0.0.1:21003...
[LV0] try bind 127.0.0.1:21003...
[INF] 2022-12-15 15:27:16: Listener running on 127.0.0.1:21003
[INF] Listener bound to sd: 12
[LV0] epoll add accept sd: 12.
[INF] 2022-12-15 15:27:18: flush data to kafka...
[INF] 2022-12-15 15:27:18: flush data to kafka ok.
[INF] set real status is started.
[INF] 2022-12-15 15:27:18: extract end pos .
代表topic为10824.790204.r,消费topic测试
[oracle@iZ2zeih4ukggzureoarz7pZ bin]$ ../bin/kafka_tool -broker zcbuskafka:9092 -topic 10824.790204.r -info (-info查看topicoffset范围)
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
kafka_tool: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 407133
topic offset range: 0 -- 21926.
[INF] Consumer closed
[oracle@iZ2zeih4ukggzureoarz7pZ bin]$ ../bin/kafka_tool -broker 172.17.46.244:9092 -topic 10824.790204.r -offset 0 -s -count 5 #### 消费数据出来并统计结果,消费5个数据包(数据发布成功)
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
kafka_tool: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 409399
[INF] 2022-12-15 22:46:03: consume: 0, offset[0] length[5242880]
OP: 9 CUSTOM db_type 1 flg 192 test.tb01 2022-12-13 16:19:05
col data offset: 38
packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[0]
[INF] 2022-12-15 22:46:03: consume: 1, offset[1] length[4997165]
packet tail
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[1]
[INF] 2022-12-15 22:46:03: consume: 2, offset[2] length[5242880]
OP: 9 CUSTOM db_type 1 flg 192 test.tb01 2022-12-13 16:19:05
col data offset: 38
packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[1]
[INF] 2022-12-15 22:46:03: consume: 3, offset[3] length[4997165]
packet tail
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[2]
[INF] 2022-12-15 22:46:03: consume: 4, offset[4] length[5242880]
OP: 9 CUSTOM db_type 1 flg 192 test.tb01 2022-12-13 16:19:05
col data offset: 38
packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[2]
reach max count, exit.
TOTAL:
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TABLEMARK[0]
[INF] Consumer closed
开发模式发布测试完成,可以将程序打包成可执行jar程序,提交到组件服务模式,进行数据推送测试
文档更新时间: 2022-12-15 04:55 作者:阿力