SDK 加工服务配置流程:

  1. SDK数据加工原理介绍
  2. 加工SDK兼容DEMO
  3. 组件注册
  4. 加工服务启动配置&管理

SDK发布原理介绍

数据处理1234ZCBUS数据缓存区数据加工服务SDK服务SDK服务发布服务数据订阅SDK目标

加工SDK兼容DEMO

加工SDK Demo

  • ZcbusEtlServiceDemo.java
package com.zcbus.Service;

import com.zcbus.common.BusEtlCmd;

import java.util.ArrayList;
import java.util.List;

import com.zcbus.common.BusDataVector;
import com.zcbus.common.BusDataVectorColVal;
import com.zcbus.common.BusJsonConverter;
import com.zcbus.common.BusServer;
import com.zcbus.common.BytesBuilder;
import com.zcbus.common.ZcbusTime;

public class ZcbusEtlServiceDemo extends BusEtlCmd {

    public byte[] processBusdata(byte[] busData, int dataLength) throws Exception {
        System.out.println(" customerID: " + getCustomerID());
        System.out.println("  controlID: " + getControlID());
        System.out.println("    tableID: " + getTableID());
        System.out.println("     dbName: " + getDbName());
        System.out.println("  tableName: " + getTableName());
        System.out.println("data length: " + dataLength); 

        BytesBuilder byteBuild = new BytesBuilder(dataLength + 1024 * 1024, 1024 * 1024);
        BusDataVector bdVector = new BusDataVector();
        List<BusDataVectorColVal> colList = null;
        int size = 0;
        int offset = 0;
        while (offset < dataLength) {
            int len = bdVector.deserialize(busData, offset, dataLength);
            if (len <= 0) {
                throw new Exception(String.format("bus data offset %d, length is %d", offset, len));
            } 
            switch (bdVector.getOpType()) {
            case BusDataVector.BSDATA_OPTYPE_UPDATE:
                if (bdVector.hasBeforeValue()) {
                    colList = bdVector.getColBeforeList();
                    size = colList.size();
                    for (int i = 0; i < size; i++) {
                        String str = colList.get(i).getDataString();
                        str += "cvtb";
                        colList.get(i).setData(str);
                    }
                }
            case BusDataVector.BSDATA_OPTYPE_INSERT:
            case BusDataVector.BSDATA_OPTYPE_DELETE:
                colList = bdVector.getColList();
                size = colList.size();
                for (int i = 0; i < size; i++) {
                    String str = colList.get(i).getDataString();
                    str += "cvt";
                    colList.get(i).setData(str);
                }

                bdVector.serialize(byteBuild);
                break;
            }
            offset += len;
        }

        return byteBuild.toArray();
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        for (int i = 0; i < args.length; i++) {
            if (0 == args[i].compareTo("-h")) {
                System.out.println(
                        "Usage: \n" + "       -h                 help\n" + "       -log_level 2       log_level\n"
                                + "       -max_thread 128    specify max threads of client service, default 512\n"
                                + "       -customer_id 10058 specify customer id of client service, use for port\n");
                return;
            } else if (0 == args[i].compareTo("-customer_id"))
                customer_id = Integer.parseInt(args[++i]);
            else if (0 == args[i].compareTo("-log_level"))
                logLevel = Integer.parseInt(args[++i]);
            else if (0 == args[i].compareTo("-max_thread"))
                max_threads = Integer.parseInt(args[++i]);
            else {
                ZcbusLog.error("unkown options: " + args[i]);
                return;
            }
        }

        if (customer_id <= 0) {
            ZcbusLog.error("cusomter_id not set");
            return;
        }

        try {
            ZcbusEnv.init();

            String logFile = ZcbusEnv.getZcbusLogPath() + "/log.db_service." + customer_id;
            ZcbusLog.open(logFile);
            ZcbusLog.setLevel(logLevel);

            String workDirectory = ZcbusEnv.getZcbusCachePath() + "/db_service" + customer_id;
            ZcbusEnv.setWorkDirectory(workDirectory);
            ZcbusLog.print(2, "make workDirectory: " + workDirectory);
            ZcbusEnv.createDir(workDirectory);

            if (max_threads < 1)
                max_threads = 512;
            BusServer busServer = new BusServer(customer_id,max_threads);
            BusServer.registEtlCmdClass(ZcbusEtlServiceDemo.class);
            busServer.run();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

组件注册

INSERT INTO `bus_service_type_model` (`usage`, `db_type`, `use_way`, `service_subtype`, `program_name`, `updatetime`) VALUES ('2', 'client', '1', 'etl', 'ZcbusEtlServiceDemo.jar', '2022-12-11 15:41:07');
  • 目前手工将开发的SendSdkDemo.jar包上传到容器下/usr/local/zcbus/jar目录下
  1. 客户端管理

    1. 添加客户端
      注意:客户端名称可自定义
      客户端模式选择API客户端
      选择适当运行容器
  2. 订阅客户端配置

文档更新时间: 2022-12-14 17:41   作者:阿力