SDK发布原理介绍

发布任务SDK服务SDK服务发布服务ZCBUS数据缓存区订阅服务SDK服务
  • 增量日志解析,兼容DML(INSERT/UPDATE/DELETE操作),以及DDL操作
  • 增量数据读取,所有定时SELECT出来的数据全部为INSERT操作,其他参考日志解析,增量信息数据
建立BusDataClient客户端模式:
//sdk容器在同一台主机中包含有restapi服务,获取相关信息
BusDataClient dataClient=new BusDataClient(10818);
//通过restapi访问相关API信息,获取对应服务地址对应NODEID相关信息位置
BusDataClient dataClient=new BusDataClient("http://zcbusrestapi/", 10818);
//通过TCP/IP以及用户密码验证等方式
BusDataClient dataClient = new BusDataClient("39.96.51.215", 22000, 10818, "test", "test");
测试DEMO
    public void dataClientDemo() {
        // TODO Auto-generated method stub
        //BusDataClient dataClient=new BusDataClient(10818);
        //BusDataClient dataClient=new BusDataClient("http://v2.zbomc.com/", 10818);
        BusDataClient dataClient = new BusDataClient("39.96.51.215", 22000, 10818, "test", "test");
        int send_count = 100000;
        ZcbusLog.setLevel(2);
        BusDataVector bdVector = null;
        List<BusDataVectorColVal> colList = null;
        List<BusDataVectorColVal> colBeforeList = null;
        BusDataVectorColVal colVal = null;
        BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
        try {
            dataClient.open();
            for (int i = 0; i < send_count; i++) {
                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                dataClient.refreshTableList();
                // -----------------------------------------
                // 1. insert ds.test
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
                bdVector.setDbName("test");
                bdVector.setTableName("tb01");

                // column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c2", "string", "test");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                colList.add(colVal);
                bdVector.setColList(colList);
                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 2. update ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");

                // after column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c2", "string", "testtest");
                colList.add(colVal);
                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                colList.add(colVal);
                bdVector.setColList(colList);

                // before column
                colBeforeList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colBeforeList.add(colVal);
                bdVector.setColBeforeList(colBeforeList);

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 3. delete ds.test
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
                bdVector.setDbName("ds");
                bdVector.setTableName("test");

                // column
                colList = new ArrayList<>();
                colVal = new BusDataVectorColVal("c1", "number", "1");
                colList.add(colVal);
                bdVector.setColList(colList);

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 4. ddl ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");

                // ddl sql
                bdVector.setDDlSql("truncate table ds.test1");

                // serialize
                bdVector.serialize(byteBuild);

                // -----------------------------------------
                // 5. custom ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                bdVector.setDbName("ds");
                bdVector.setTableName("test1");
                // custom data
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("abc", "test");
                jsonObject.put("abc1", "test");
                jsonObject.put("abc2", "test");
                jsonObject.put("abc3", "test" + (i + 1));

                bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));

                // serialize
                bdVector.serialize(byteBuild);
                // -----------------------------------------
                // send data to server
                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                byteBuild.clear();
                ZcbusLog.info("test send ok.");
                ZcbusTime.sleep(1);
            }
            dataClient.close();
        } catch (Exception e) {
            ZcbusLog.error(e);
        }        
    }

推送特定数据(二进制数据)

  • 推送数据为固定数据,不需要在进行解析加工,只传输固定数据\文本\json\二进制等数据信息
    public void SendCustomDemo() {
        BusDataClient dataClient = new BusDataClient(ip, port, nodeid, "test", "test");
        int send_count = 100000;
        ZcbusLog.setLevel(2);
        BusDataVector bdVector = null; 
        BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
        try {
            dataClient.open();
            dataClient.refreshTableList();
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("abc", "test");
            jsonObject.put("abc1", "test");
            jsonObject.put("abc2", "test");
            jsonObject.put("abc3", "test");
            jsonObject.put("abc4", "test");
            jsonObject.put("abc5", "test");
            jsonObject.put("abc6", "test");
            jsonObject.put("abc7", "test");
            jsonObject.put("abc8", "test");
            jsonObject.put("abc9", "test");
            jsonObject.put("abc10", "test");
            StringBuffer stringBuffer=new StringBuffer();
            for(int j=0;j<500000;j++) {
                stringBuffer.append(jsonObject.toJSONString());
            }
            StringBuffer sbBuffer=new StringBuffer();
            sbBuffer.append(stringBuffer.substring(0, limitLen));
            stringBuffer=null;
            for (int i = 0; i < send_count; i++) {
                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                // 5. custom ds.test1
                bdVector = new BusDataVector();
                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                bdVector.setDbName("test");
                bdVector.setTableName("tb01");
                // custom data
                bdVector.setCustomData(sbBuffer.toString().getBytes());

                // serialize
                bdVector.serialize(byteBuild);
                // -----------------------------------------
                // send data to server
                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                byteBuild.clear();
//                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                ZcbusTime.sleep(1);
            }
            dataClient.close();
        } catch (Exception e) {
            ZcbusLog.error(e);
        }
    }
文档更新时间: 2023-10-13 17:11   作者:阿力