发布节点准备:

添加发布节点,选择service类型
软件链接进来的时候,需要验证host,端口,用户,密码。
说明:

        host:开发期间为docker所在的主机地址,测试期间需要对外提供服务
              上线之后,主机地址修改为docker容器名,上线之后可以关闭对外服务
        port:对外提供端口服务
              开发期间,修改安装目录下yaml下的docker-compose.yml文件,添加一个
              ports:
              - 对外服务端口: 内部服务端口(port

创建发布节点

  • 添加发布节点

  • 配置添加服务发布节点

  • 配置发布服务信息

  • 添加发布配置表(可以理解为发布通道)

  • 添加发布成功后,状态如下

修改容器端口映射,将端口映射到主机

[root@iZ2ze7y8byz5e2ki7gbcjpZ simple_server]# export ZCBUS_HOME=/data1/zcbus_docker/zcbusSoft/simple_server
[root@iZ2ze7y8byz5e2ki7gbcjpZ simple_server]# cd yaml/
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# vi docker-compose.yml
  zcbus:
    container_name: zcbus
    environment:
      LANG: en_US.UTF-8
      ZCBUS_CONTAINER: zcbus
      ZC_IPADDRESS: 172.17.104.186
      database: zcbus
      dbhost: zcbusdb
      dbport: 3306
      dbpwd: e0twWGp8aVtWfGB8dn9YdTo
      dbuser: QFlYT0k6
    image: reg.zbomc.com/zcbus_server:v2.0.1
    ports:           #### 增加ports,以及后续端口映射,发布服务和端口数量1:1
    - 21010:21010
    volumes:
    - /data/docker02/zcbusdata/zcbus/bin:/usr/local/zcbus/bin
    - /data/docker02/zcbusdata/zcbus/lib:/usr/local/zcbus/lib
    - /data/docker02/zcbusdata/zcbus/cache:/usr/local/zcbus/cache
    - /data/docker02/zcbusdata/zcbus/mq:/usr/local/zcbus/mq
    - /data/docker02/zcbusdata/zcbus/log:/usr/local/zcbus/log
    - /data/docker02/zcbusdata/zcbus/jdk:/usr/local/jdk
    - /data/docker02/zcbusdata/zcbus/jar:/usr/local/zcbus/jar
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# docker-compose up -d #### 更新容器状态,端口对外映射
[root@iZ2ze7y8byz5e2ki7gbcjpZ yaml]# docker exec -it zcbus bash   ####进入容器,检查发布服务日志
[root@a294924cee45 zcbus]# cd log
[root@a294924cee45 log]# tail -f log.zcbus_real.10832
[oracle@iZ2zeih4ukggzureoarz7pZ log]$ more log.zcbus_real.10832
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
zcbus: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 348766
[INF] local host: name[iZ2zeih4ukggzureoarz7pZ], ip[172.17.46.245]
[LV0] 2022-12-15 22:22:28: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'

influx config:
INFLUX_BUCKET_NAME=BUCKET
INFLUX_HOST=http://172.17.104.186:8086
INFLUX_ORG=ORG
INFLUX_TOKEN=mVr3RJWNhaCu235sqOnOivo59xp3V0ASwqKTH38bMMWpfNSzbp0kN3E-rm4hyGu27uUY1qKn6_6-hpYVRmMALw==


config:
db_type=server
default_schema=zcbus
host=172.17.46.245
multi_connection_mode=5
password=test
port=21010
user=test
real_send_queue_cnt adjust to 8

kafka config:
acks=all
bootstrap.servers=172.17.46.244:9092
compression.type=snappy
message.max.bytes=10485760

[INF] 2022-12-15 22:22:28: log extract thread is running...
[LV0] 2022-12-15 22:22:28: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
[INF] set real status is starting.
[INF] update real table list...
table in dict[0]:
[INF] 2022-12-15 22:22:28: check repair tables...
[INF] 2022-12-15 22:22:28: extract start pos ...
[INF] 2022-12-15 22:22:28: bind host zcbus:21010...
[LV0] try bind zcbus:21010...
[INF] 2022-12-15 22:22:28: Listener running on zcbus:21010
[INF] Listener bound to sd: 12
[LV0] epoll add accept sd: 12.
[INF] 2022-12-15 22:22:30: flush data to kafka...
[INF] 2022-12-15 22:22:30: flush data to kafka ok.
[INF] set real status is started.
[INF] 2022-12-15 22:22:30: extract end pos .

ZCBUS发布开发模式:

//通过TCP/IP以及用户密码验证等方式
BusDataClient dataClient = new BusDataClient("172.17.46.245", 21010, 10832, "test", "test");

可嵌入ZCBUS平台demo

package com.zcbus.Service;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.zcbus.common.BusDataClient;
import com.zcbus.common.BusDataVector;
import com.zcbus.common.BusDataVectorColVal;
import com.zcbus.common.BytesBuilder;
import com.zcbus.common.ZcbusDbParam;
import com.zcbus.common.ZcbusLog;
import com.zcbus.common.ZcbusTime;

public class ZcbusPushSdkDemo{

    private static int customer_id = 0;
    private static int max_threads = 512;
    private static int logLevel = -1;
    private static String zcbusurl =null;


    private BusDataClient dataClient;

    private static String server_ip=null;
    private static int server_port=0; 
    private static int limitLen=0;

    private void InitPushSdk() {
        if(server_ip!=null) {
            dataClient = new BusDataClient(server_ip, server_port, customer_id, "test", "test");
        }else if(zcbusurl==null) {
            dataClient=new BusDataClient(zcbusurl, customer_id);            
        }else{
            dataClient=new BusDataClient(customer_id);
        }
    }
    public void PushSdkDemo() {
        InitPushSdk();
        int send_count = 100000;
        ZcbusLog.setLevel(2);
        BusDataVector bdVector = null; 
        BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
        try {
            dataClient.open();
            dataClient.refreshTableList();
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("abc", "test");
            jsonObject.put("abc1", "test");
            jsonObject.put("abc2", "test");
            jsonObject.put("abc3", "test");
            jsonObject.put("abc4", "test");
            jsonObject.put("abc5", "test");
            jsonObject.put("abc6", "test");
            jsonObject.put("abc7", "test");
            jsonObject.put("abc8", "test");
            jsonObject.put("abc9", "test");
            jsonObject.put("abc10", "test");
            StringBuffer stringBuffer=new StringBuffer();
            for(int j=0;j<500000;j++) {
                stringBuffer.append(jsonObject.toJSONString());
            }
            StringBuffer sbBuffer=new StringBuffer();
            sbBuffer.append(stringBuffer.substring(0, limitLen));
            stringBuffer=null;
            for (int i = 0; i < send_count; i++) {
                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                // -----------------------------------------
                // 5. custom ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                bdVector.setDbName("rz_user");
                bdVector.setTableName("channel_01");
                // custom data
                bdVector.setCustomData(sbBuffer.toString().getBytes());

                // serialize
                bdVector.serialize(byteBuild);
                // -----------------------------------------
                // send data to server
                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                byteBuild.clear();
//                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                ZcbusTime.sleep(1);
            }
            dataClient.close();
        } catch (Exception e) {
            ZcbusLog.error(e);
        }
    }
    public void dataClientDemo() {
        // TODO Auto-generated method stub
        InitPushSdk();
        int send_count = 100000;
        ZcbusLog.setLevel(2);
        BusDataVector bdVector = null;
        List<BusDataVectorColVal> colList = null;
        List<BusDataVectorColVal> colBeforeList = null;
        BusDataVectorColVal colVal = null;
        BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
        try {
            dataClient.open();
            for (int i = 0; i < send_count; i++) {
                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                dataClient.refreshTableList();
                // -----------------------------------------
                // 1. insert ds.test
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
                bdVector.setDbName("test");
                bdVector.setTableName("tb01");

                // column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c2", "string", "test");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                colList.add(colVal);
                bdVector.setColList(colList);
                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 2. update ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");

                // after column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c2", "string", "testtest");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                colList.add(colVal);
                bdVector.setColList(colList);

                // before column
                colBeforeList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colBeforeList.add(colVal);
                bdVector.setColBeforeList(colBeforeList);

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 3. delete ds.test
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
                bdVector.setDbName("ds");
                bdVector.setTableName("test");

                // column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                bdVector.setColList(colList);

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 4. ddl ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");

                // ddl sql
                bdVector.setDDlSql("truncate table ds.test1");

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 5. custom ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");
                // custom data
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("abc", "test");
                jsonObject.put("abc1", "test");
                jsonObject.put("abc2", "test");
                jsonObject.put("abc3", "test" + (i + 1));

                bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));

                // serialize
                bdVector.serialize(byteBuild);
                // -----------------------------------------
                // send data to server
                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                byteBuild.clear();
                ZcbusLog.info("test send ok.");
                ZcbusTime.sleep(1);
            }
            dataClient.close();
        } catch (Exception e) {
            ZcbusLog.error(e);
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < args.length; i++) {
            if (0 == args[i].compareTo("-h")) {
                System.out.println(
                        "Usage: \n" + "       -h                 help\n" + "       -log_level 2       log_level\n"
                                + "       -max_thread 128    specify max threads of client service, default 512\n"
                                + "       -customer_id 10058 specify customer id of client service, use for port\n"
                                + "       -zcbusurl http://zcbusrestapi/ push zcbus url \n"
                                + "       -server_ip 198.218.249.130 push docker for zcbus's ip \n"
                                + "       -server_port 10088 push docker for zcbus's port \n");
                return;
            } else if (0 == args[i].compareTo("-customer_id")) {
                customer_id = Integer.parseInt(args[++i]);
            }else if (0 == args[i].compareTo("-log_level")) {
                logLevel = Integer.parseInt(args[++i]);
            }else if (0 == args[i].compareTo("-max_thread")) {
                max_threads = Integer.parseInt(args[++i]);
            }else if (0 == args[i].compareTo("-zcbusurl")) {
                zcbusurl=args[++i];
            }else if (0 == args[i].compareTo("-server_ip")) {
                server_ip=args[++i];
            }else if (0 == args[i].compareTo("-server_port")) {
                server_port=Integer.parseInt(args[++i]);
            }else {
                ZcbusLog.error("unkown options: " + args[i]);
                return;
            }
        }

        if (customer_id <= 0) {
            ZcbusLog.error("cusomter_id not set");
            return;
        }
        ZcbusPushSdkDemo dataClientDemo=new ZcbusPushSdkDemo();
        dataClientDemo.PushSdkDemo();
    }
}
  • 参数说明:
    测试过程中,由于为外围测试,因此直接链接端口,测试成功之后,即可以此类为基础发布数据包
  • DEMO启动测试日志
[oracle@iZ2zeih4ukggzureoarz7pZ jar]$ java -jar ZcbusPushSdkDemo.jar -customer_id 50000 -zcbusurl http://v2.zbomc.com
[INF] 2022-12-15 22:39:02: url :http://v2.zbomc.com/api/zbomc/databus/expose/publish/busPushDbParameter/queryBusPushDbParameterByNodeId
[INF] 2022-12-15 22:39:03: jsonobject :{"nodeId":50000}
[INF] 2022-12-15 22:39:03: connect to server 127.0.0.1:21003...
[INF] 2022-12-15 22:39:03: connect to server ok.
[INF] 2022-12-15 22:39:03: login to server ok.
[LV0] 2022-12-15 22:39:03: refresh table list...
[INF] 2022-12-15 22:39:06: test send bsdata(1/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.

[INF] 2022-12-15 22:39:06: test send bsdata(2/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.

[INF] 2022-12-15 22:39:06: test send bsdata(3/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.

[INF] 2022-12-15 22:39:06: test send bsdata(4/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.

[INF] 2022-12-15 22:39:06: test send bsdata(5/100000)...
[LV1] 2022-12-15 22:39:06: sendBsdata Length 5 MB ok.
  • 发布服务接收日志如下:
[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1]  --->  CUSTOM[1] 
   Send data: 5.0000 M
    Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .

[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1]  --->  CUSTOM[1] 
   Send data: 5.0000 M
    Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .

[INF] 2022-12-15 22:39:06: check repair tables...
[INF] 2022-12-15 22:39:06: extract start pos ...
[INF] 2022-12-15 22:39:06: flush data to kafka...
[INF] 2022-12-15 22:39:06: flush data to kafka ok.
[INF] CUSTOM[1]  --->  CUSTOM[1] 
   Send data: 5.0000 M
    Use time: 0 s
[INF] 2022-12-15 22:39:06: extract end pos .

KAFKA消费测试

关注日志开始接收日志的时候,会打印如下信息



[INF] 2022-12-15 15:27:16: log extract thread is running...
[LV0] 2022-12-15 15:27:16: connect to mysql zbomc/***@172.17.58.146:3306 ...
[INF] set client character set utf8mb4...
[INF] new client character set: utf8mb4
[INF] MYSQL VERSION: 50738
[INF] MYSQL INFO: 5.7.38-log
SET SESSION sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
[INF] set real status is starting.
[INF] update real table list...
add table test.tb01(1/1)...
[DICT] put table test.tb01
test.tb01          0/0   flags 0x0       topic[10824.790204.r]
table in dict[1]:
test.tb01          0/0   flags 0x0       topic[10824.790204.r]
[INF] 2022-12-15 15:27:16: check repair tables...
[INF] 2022-12-15 15:27:16: extract start pos ...
[INF] 2022-12-15 15:27:16: bind host 127.0.0.1:21003...
[LV0] try bind 127.0.0.1:21003...
[INF] 2022-12-15 15:27:16: Listener running on 127.0.0.1:21003
[INF] Listener bound to sd: 12
[LV0] epoll add accept sd: 12.
[INF] 2022-12-15 15:27:18: flush data to kafka...
[INF] 2022-12-15 15:27:18: flush data to kafka ok.
[INF] set real status is started.
[INF] 2022-12-15 15:27:18: extract end pos .

代表topic为10824.790204.r,消费topic测试

[oracle@iZ2zeih4ukggzureoarz7pZ bin]$  ../bin/kafka_tool -broker zcbuskafka:9092 -topic 10824.790204.r -info (-info查看topicoffset范围)
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
kafka_tool: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 407133
topic offset range: 0 -- 21926.
[INF] Consumer closed

[oracle@iZ2zeih4ukggzureoarz7pZ bin]$  ../bin/kafka_tool -broker 172.17.46.244:9092 -topic 10824.790204.r -offset 0 -s -count 5   #### 消费数据出来并统计结果,消费5个数据包(数据发布成功)
Linux iZ2zeih4ukggzureoarz7pZ 3.10.0-957.21.3.el7.x86_64 #1 SMP Tue Jun 18 16:35:19 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
kafka_tool: Release 7.8-16 64 bit (QA) - Production on 2022-12-15 20:04:52
Copyright (c) 2022 ZCBUS. All Rights Reserved.
process id 409399
[INF] 2022-12-15 22:46:03: consume: 0, offset[0] length[5242880]
 OP: 9 CUSTOM db_type 1 flg 192 test.tb01       2022-12-13 16:19:05
   col data offset: 38
  packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[0] 
[INF] 2022-12-15 22:46:03: consume: 1, offset[1] length[4997165]
  packet tail
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[1] 
[INF] 2022-12-15 22:46:03: consume: 2, offset[2] length[5242880]
 OP: 9 CUSTOM db_type 1 flg 192 test.tb01       2022-12-13 16:19:05
   col data offset: 38
  packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[1] 
[INF] 2022-12-15 22:46:03: consume: 3, offset[3] length[4997165]
  packet tail
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[2] 
[INF] 2022-12-15 22:46:03: consume: 4, offset[4] length[5242880]
 OP: 9 CUSTOM db_type 1 flg 192 test.tb01       2022-12-13 16:19:05
   col data offset: 38
  packet head
INSERT[0] UPDATE[0] DELETE[0] DDL[0] TAG[0] CONTROL[0] INFO[0] SQL[0] DICT[0] CUSTOM[2] 
reach max count, exit.
TOTAL:
  INSERT[0] UPDATE[0] DELETE[0] DDL[0] TABLEMARK[0]
[INF] Consumer closed

开发模式发布测试完成,可以将程序打包成可执行jar程序,提交到组件服务模式,进行数据推送测试

文档更新时间: 2022-12-15 04:55   作者:阿力