包含自定义类型,插入,更新,删除等类型

1. 配置ZCBUS 发布 Server服务
  • 数据主函数不变,启动后,将数据推送到zcbus对应zcbus子分类下的server类型,配置如下:
2. 修改yaml文件,将对应设置到 port端口映射到容器外部来
3. 编写程序:【如果使用zcbus云端服务,实现模拟测试服务,需要联系ZCBUS工程师】

使用java代码,可以参考下边DEMO,下载对应zcbusServer.jar包(驱动包在程序运行容器对应的/usr/local/zcbus/bin目录下),拷贝出来即可直接引入依赖库即可使用

  • 注意:main主函数不需要修改,只需要根据实际情况修改主体部分,比如解析文件,读取文件,均需要生成插入操作即可。针对非结构化数据、二进制数据等物联网数据可以参考custome类型,根据可以自定义情况设置即可。
package com.zcbus.Service;

import java.util.ArrayList;
import java.util.List;

import com.zcbus.common.BusDataClient;
import com.zcbus.common.BusDataVector;
import com.zcbus.common.BusDataVectorColVal;
import com.zcbus.common.BytesBuilder;
import com.zcbus.common.ZcbusDbParam;
import com.zcbus.common.ZcbusEncode;
import com.zcbus.common.ZcbusEnv;
import com.zcbus.common.ZcbusLog;
import com.zcbus.common.ZcbusTime;

public class ZcbusPushSdkDemo{

        private static int customer_id = 0;
        private static int max_threads = 512;
        private static int logLevel = -1;
        private static String zcbusurl =null;


        private BusDataClient dataClient;

        private static String server_ip=null;
        private static int server_port=0; 
        private static int limitLen=5242880;

        private void InitPushSdk() {
                if(server_ip!=null) {
                        dataClient = new BusDataClient(server_ip, server_port, customer_id, "test", "test");
                }else if(zcbusurl!=null) {
                        dataClient=new BusDataClient(zcbusurl, customer_id);
                }else{
                        dataClient=new BusDataClient(customer_id);
                }
        }
        public void PushSdkDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null; 
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("abc", "test");
                        jsonObject.put("abc1", "test");
                        jsonObject.put("abc2", "test");
                        jsonObject.put("abc3", "test");
                        jsonObject.put("abc4", "test");
                        jsonObject.put("abc5", "test");
                        jsonObject.put("abc6", "test");
                        jsonObject.put("abc7", "test");
                        jsonObject.put("abc8", "test");
                        jsonObject.put("abc9", "test");
                        jsonObject.put("abc10", "test");
                        StringBuffer stringBuffer=new StringBuffer();
                        for(int j=0;j<500000;j++) {
                                stringBuffer.append(jsonObject.toJSONString());
                        }
                        StringBuffer sbBuffer=new StringBuffer();
                        sbBuffer.append(stringBuffer.substring(0, limitLen));
                        stringBuffer=null;
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                // -----------------------------------------
                                // 5. custom ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                                bdVector.setDbName("test");
                                bdVector.setTableName("tb01");
                                // custom data
                                bdVector.setCustomData(sbBuffer.toString().getBytes());

                                // serialize
                                bdVector.serialize(byteBuild);
                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                                byteBuild.clear();
//                                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                                ZcbusTime.sleep(1);
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public void dataInsertDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null; 
                List<BusDataVectorColVal> colList = null;
                List<BusDataVectorColVal> colBeforeList = null;
                BusDataVectorColVal colVal = null;
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                // -----------------------------------------

                                // 1. insert ds.test
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
                                bdVector.setDbName("test");
                                bdVector.setTableName("tb01");

                                // column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c2", "string", "test");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                                colList.add(colVal);
                                bdVector.setColList(colList);
                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                                byteBuild.clear();
//                                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                                ZcbusTime.sleep(1);
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public void dataUpdateDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null; 
                List<BusDataVectorColVal> colList = null;
                List<BusDataVectorColVal> colBeforeList = null;
                BusDataVectorColVal colVal = null;
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                // -----------------------------------------
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test"+send_count%2);

                                // after column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c2", "string", "testtest");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                                colList.add(colVal);
                                bdVector.setColList(colList);

                                // before column
                                colBeforeList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colBeforeList.add(colVal);
                                bdVector.setColBeforeList(colBeforeList);

                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());
                                byteBuild.clear();
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public void dataDeleteDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null; 
                List<BusDataVectorColVal> colList = null;
                List<BusDataVectorColVal> colBeforeList = null;
                BusDataVectorColVal colVal = null;
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                // -----------------------------------------

                                // -----------------------------------------
                                // 3. delete ds.test
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test");

                                // column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                bdVector.setColList(colList);
                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                                byteBuild.clear();
//                                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                                ZcbusTime.sleep(1);
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }

        public void dataDDLDataDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null;  
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                // -----------------------------------------

                                // -----------------------------------------
                                // 4. ddl ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test1");

                                // ddl sql
                                bdVector.setDDlSql("truncate table ds.test1");  
                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                                byteBuild.clear();
//                                ZcbusLog.info("test send length "+stringBuffer.length()/1024/1024+"MB,line "+jsonObject.toJSONString().length()+"B ok.");
//                                ZcbusTime.sleep(1);
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public void dataCustomerDataDemo() {
                InitPushSdk();
                int send_count = 100000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null; 
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        dataClient.refreshTableList();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("abc", "test");
                        jsonObject.put("abc1", "test");
                        jsonObject.put("abc2", "test");
                        jsonObject.put("abc3", "test");
                        jsonObject.put("abc4", "test");
                        jsonObject.put("abc5", "test");
                        jsonObject.put("abc6", "test");
                        jsonObject.put("abc7", "test");
                        jsonObject.put("abc8", "test");
                        jsonObject.put("abc9", "test");
                        jsonObject.put("abc10", "test");  
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));


                                // -----------------------------------------
                                // 5. custom ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test1"); 

                                bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));

                                // serialize
                                bdVector.serialize(byteBuild);
                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength()); 
                                byteBuild.clear(); 
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public void dataClientDemo() {
                // TODO Auto-generated method stub
                InitPushSdk();
                int send_count = 1000;
                ZcbusLog.setLevel(2);
                BusDataVector bdVector = null;
                List<BusDataVectorColVal> colList = null;
                List<BusDataVectorColVal> colBeforeList = null;
                BusDataVectorColVal colVal = null;
                BytesBuilder byteBuild = new BytesBuilder(5 * 1024 * 1024, 1024 * 1024);
                try {
                        dataClient.open();
                        for (int i = 0; i < send_count; i++) {
                                ZcbusLog.info(String.format("test send bsdata(%d/%d)...", i + 1, send_count));
                                dataClient.refreshTableList();
                                // -----------------------------------------
                                // 1. insert ds.test
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_INSERT);
                                bdVector.setDbName("test");
                                bdVector.setTableName("tb01");

                                // column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c2", "string", "test");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                                colList.add(colVal);
                                bdVector.setColList(colList);
                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // 2. update ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_UPDATE);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test"+send_count%2);

                                // after column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c2", "string", "testtest");
                                colList.add(colVal);
                                colVal = new BusDataVectorColVal("c3", "datetime", ZcbusTime.getCurrentTime());
                                colList.add(colVal);
                                bdVector.setColList(colList);

                                // before column
                                colBeforeList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colBeforeList.add(colVal);
                                bdVector.setColBeforeList(colBeforeList);

                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // 3. delete ds.test
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DELETE);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test");

                                // column
                                colList = new ArrayList<>();
                                colVal = new BusDataVectorColVal("c1", "number", "1");
                                colList.add(colVal);
                                bdVector.setColList(colList);

                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // 4. ddl ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_DDL);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test1");

                                // ddl sql
                                bdVector.setDDlSql("truncate table ds.test1");

                                // serialize
                                bdVector.serialize(byteBuild);

                                // -----------------------------------------
                                // 5. custom ds.test1
                                bdVector = new BusDataVector();
                                bdVector.setDbType(ZcbusDbParam.DB_TYPE_MYSQL);
                                bdVector.setOpTime(ZcbusTime.getCurrentTime());
                                bdVector.setOpType(BusDataVector.BSDATA_OPTYPE_CUSTOM);
                                bdVector.setDbName("ds");
                                bdVector.setTableName("test1");
                                // custom data
                                JSONObject jsonObject = new JSONObject();
                                jsonObject.put("abc", "test");
                                jsonObject.put("abc1", "test");
                                jsonObject.put("abc2", "test");
                                jsonObject.put("abc3", "test" + (i + 1));

                                bdVector.setCustomData(jsonObject.toJSONString().getBytes("UTF-8"));

                                // serialize
                                bdVector.serialize(byteBuild);
                                // -----------------------------------------
                                // send data to server
                                dataClient.sendBsdata(byteBuild.getBytes(), 0, byteBuild.getLength());

                                byteBuild.clear();
                                ZcbusLog.info("test send ok.");
                                ZcbusTime.sleep(1);
                        }
                        dataClient.close();
                } catch (Exception e) {
                        ZcbusLog.error(e);
                }
        }
        public static void main(String[] args) {
                String optype=null;
                for (int i = 0; i < args.length; i++) {
                        if (0 == args[i].compareTo("-h")) {
                                System.out.println(
                                                "Usage: \n" + "       -h                 help\n" + "       -log_level 2       log_level\n"
                                                                + "       -max_thread 128    specify max threads of client service, default 512\n"
                                                                + "       -customer_id 10058 specify customer id of client service, use for port\n"
                                                                + "       -zcbusurl http://zcbusrestapi/ push zcbus url \n"
                                                                + "       -server_ip 198.218.249.130 push docker for zcbus's ip \n"
                                                                + "       -server_port 10088 push docker for zcbus's port \n"
                                                                + "       -limit_length 1024000 push topic package size ,default is 5MB\n"
                                                                + "       -test_optype insert/update/delete/ddl/custom/all default is all");
                                return;
                        } else if (0 == args[i].compareTo("-customer_id")) {
                                customer_id = Integer.parseInt(args[++i]);
                        }else if (0 == args[i].compareTo("-log_level")) {
                                logLevel = Integer.parseInt(args[++i]);
                        }else if (0 == args[i].compareTo("-max_thread")) {
                                max_threads = Integer.parseInt(args[++i]);
                        }else if (0 == args[i].compareTo("-zcbusurl")) {
                                zcbusurl=args[++i];
                        }else if (0 == args[i].compareTo("-server_ip")) {
                                server_ip=args[++i];
                        }else if (0 == args[i].compareTo("-server_port")) {
                                server_port=Integer.parseInt(args[++i]);
                        }else if (0 == args[i].compareTo("-limit_length")) {
                                limitLen=Integer.parseInt(args[++i]);
                        }else if (0 == args[i].compareTo("-test_optype")) {
                                optype=args[++i].toLowerCase();
                        }else {
                                ZcbusLog.error("unkown options: " + args[i]);
                                return;
                        }
                }

                try {
                        ZcbusEnv.init();
                        String logFile = ZcbusEnv.getZcbusLogPath() + "/log.db_service." + customer_id;
                        ZcbusLog.open(logFile);
                        ZcbusLog.setLevel(logLevel);

                        String workDirectory = ZcbusEnv.getZcbusCachePath() + "/db_service" + customer_id;
                        ZcbusEnv.setWorkDirectory(workDirectory);
                        ZcbusLog.print(2, "make workDirectory: " + workDirectory);
                        ZcbusEnv.createDir(workDirectory);
                        if (customer_id <= 0) {
                                ZcbusLog.error("cusomter_id not set");
                                return;
                        }
                        ZcbusPushSdkDemo dataClientDemo=new ZcbusPushSdkDemo();
                        ZcbusLog.info("Test optype "+optype);
                        switch (optype) {
                        case "insert":
                                dataClientDemo.dataInsertDemo();
                                break;
                        case "update":
                                dataClientDemo.dataUpdateDemo();
                                break;
                        case "delete":
                                dataClientDemo.dataDeleteDemo();
                                break;
                        case "ddl":
                                dataClientDemo.dataDDLDataDemo();
                                break;
                        case "custom":
                                dataClientDemo.dataCustomerDataDemo();
                                break;
                        case "all":
                                dataClientDemo.PushSdkDemo();
                                break;
                        default:
                                ZcbusLog.warning("test optype must in (insert/update/delete/ddl/custom/all)");
                                break;
                        }
                } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }
}
文档更新时间: 2024-10-08 03:44   作者:阿力