主数据管理平台 主数据管理平台
产品介绍
产品安装指南
快速入门手册
用户操作手册
接口文档
  • 订阅者管理
  • 订阅者分组
  • 新增订阅者
  • 系统名称
  • 订阅者类型
  • 报文配置-HTTP/HTTPS响应报文校验
  • 报文配置-业务数据加密安全传输
  • 通知配置
  • 删除订阅者

# 订阅者管理

订阅者即下游系统订阅主数据各模型数据的业务系统,其提供如http(webhook)或其他服务来接受主数据各模型的业务数据推送。

访问路径:【主数据管理门户】-> 【数据分发】->【订阅者系统】

image-20250722175847222

# 订阅者分组

订阅者分组无实际业务意义,只是为了方便查看和归类。所有的订阅者系统必须放在订阅者分组下,所以用户添加订阅者之前先按需新建订阅者分组即可。

# 新增订阅者

点击【新增】订阅者按钮,弹出新增订阅者弹框,输入订阅者信息点击【提交】即可(部分订阅者类型支持测试连接)。

image-20250723132831359

# 系统名称

订阅者的显示名称,上限为64字符。

# 订阅者类型

已适配类型http,https,webservice等(允许用户通过SPI进行扩展,以扩展JAR文件的方式部署即可)。

  • http/https: 配置服务地址(主机和端口)、推送API路径,测试API路径(强烈建议配置:有助于避免无效调度次数——如在网络不稳定/停机维护等场景下)以及请求头。
  • webservice: 需要配置服务地址(WSDL)、命名空间(Namespace)和推送方法(Method),选择配置测试连接方法(Method)。(默认适配不一定满足用户需求,用户可以按需扩展)
  • socket: 需要配置服务地址(主机和端口)。(默认适配不一定满足用户需求,用户可以按需扩展)

# 报文配置-HTTP/HTTPS响应报文校验

响应报文校验仅使用于HTTP/HTTPS订阅者类型,属于可选配置(切换到报文配置标签页)。建议用户采用HTTP标准状态码2XX来表示业务数据推送调用成功(校验方式选择忽略),否则可以配置一个报文内容校验格式(自定义,即HTTP响应状态码2XX且响应JSON报文体内容中业务调用结果码符合预期)。e.g. 响应报文JSON结构:

// e.g. 状态码属性:code
{
    "code": "200",
    "message": "OK"
}
// e.g. 状态码属性:response.code
{
    "response": {
        "code": "200",
        "message": "OK"
    }
}

image-20250723105511957

支持以下五种配置验证办法:

序号 匹配方式 示例
1 精确匹配一个常量值,即只有业务代码为XXX才算业务调用成功 200
2 穷举多个常量值,即业务代码在这些穷举常量值内都算业务调用成功(使用英文分号;进行分割) 200;201;204
3 排除一个常量值,即不是XXX都算业务调用成功(头部加英文!符号) !500
4 排除多个常量值,即不是这些穷举常量值都算业务调用成功(头部加英文!符号) !401;403;404;500
5 正则匹配,按格式regex: {expression_write_to_here} 进行书写(前缀regex: 不可省略) regex: ^(200|201|204)$

系统默认校验方式配置(config/application-mdm.properties):

#mdm.push.http.validate-response-body.enabled=true
#mdm.push.http.validate-response-body.property=code
#mdm.push.http.validate-response-body.pattern=200
# if multi-success-codes, e.g.
#mdm.push.http.validate-response-body.pattern=200;204
# not equals, e.g.
#mdm.push.http.validate-response-body.pattern=!500
# not in, e.g.
#mdm.push.http.validate-response-body.pattern=!401;403;404;500
# use expression (suffix: `regex: `)
#mdm.push.http.validate-response-body.pattern=regex: {COPY_YOUR_REGEX_TO_HERE}

# 报文配置-业务数据加密安全传输

如果数据推送需要先把模型业务数据进行加密后通过网络传输(密文字节码)可以对报文进行配置。

image-20250723114339207

建议选择RSA加密方式(对称加密技术),从外部拷贝一份RAS公钥(BASE64字符串)或使用生成密钥对按钮立即生成(生成后注意保存你的私钥,此私钥将用于下游系统解密推送报文)。

RSA加解密工具(java):

import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.KeyFactory;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.interfaces.RSAKey;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import javax.crypto.Cipher;

import org.apache.commons.collections4.keyvalue.UnmodifiableMapEntry;

/**
 * 基于RSA加解密工具
 */
public class RsaEncryptUtils {

    private static final String ALGORITHM = "RSA";

    private RsaEncryptUtils() {
    }

    /**
     * 文本加密
     * @param content 字符串内容
     * @param secretKey RSA公钥或私钥
     * @return BASE64密文
     */
    public static String encrypt(String content, String secretKey) {
        byte[] keyBytes = Base64.getDecoder().decode(secretKey.getBytes());
        byte[] cipherBytes = encrypt(content.getBytes(StandardCharsets.UTF_8), keyBytes);
        return Base64.getEncoder().encodeToString(cipherBytes);
    }

    /**
     * 字节码加密
     * @param content 字节内容
     * @param secretKey RSA公钥或私钥
     * @return 密文字节码
     */
    public static byte[] encrypt(byte[] content, byte[] secretKey) {
        try {
            KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
            return encrypt(content, secretKey, true);
        } catch (Exception ignore) { // retry
            return encrypt(content, secretKey, false);
        }
    }

    /**
     * 字节码加密
     * @param content 字节内容
     * @param secretKey RSA公钥或私钥
     * @param publicKey 是否为公钥
     * @return 密文字节码
     */
    public static byte[] encrypt(byte[] content, byte[] secretKey, boolean publicKey) {
        if (null == content || content.length < 1) {
            return null;
        }
        // 数据推送会限制单次发送的数据量也就是能限制住`content`的大小
        try {
            Key rsaKey = publicKey ? KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey))
                    : KeyFactory.getInstance(ALGORITHM).generatePrivate(new PKCS8EncodedKeySpec(secretKey));
            Cipher cipher = Cipher.getInstance(ALGORITHM);
            cipher.init(Cipher.ENCRYPT_MODE, rsaKey);

            final int keySize = ((RSAKey) rsaKey).getModulus().bitLength();
            final int maxBlockSize = (keySize / 8) - 11;

            if (content.length <= maxBlockSize) {
                return cipher.doFinal(content);
            }
            // ByteBuffer#put vs System#arraycopy 哪个性能好 ???
            // ByteArrayOutputStream#write 也使用了System#arraycopy
            int snippetSize = content.length / maxBlockSize + (content.length % maxBlockSize == 0 ? 0 : 1);
            List<byte[]> snippetBytes = new ArrayList<>(snippetSize);
            int totalBytes = 0;
            for (int i = 0; i < snippetSize; i++) {
                int offset = maxBlockSize * i;
                int length = Math.min(maxBlockSize, content.length - offset);
                snippetBytes.add(cipher.doFinal(content, offset, length));
                totalBytes += snippetBytes.get(i).length;
            }
            // join by `System.arraycopy`
            byte[] byteBuffer = new byte[totalBytes];
            // ^_^ 一次性开辟所需空间,避免多次数组拷贝,应该是性能最佳的
            int index = 0;
            for (int i = 0; i < snippetSize; i++) {
                byte[] bytes = snippetBytes.get(i);
                if (null == bytes || bytes.length < 1) {
                    continue; // unreachable
                }
                System.arraycopy(bytes, 0, byteBuffer, index, bytes.length);
                index += bytes.length;
            }
            /**
             // join by `ByteBuffer#put`
             ByteBuffer byteBuffer = ByteBuffer.allocate(totalBytes);
             snippetBytes.forEach(byteBuffer::put);
             return byteBuffer.array();
             // join by `ByteArrayOutputStream#write`
             try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
             snippetBytes.forEach(out::write);
             return out.toByteArray();
             }
             */
            return byteBuffer;
        } catch (Exception e) {
            throw new RuntimeException("Encrypt failed.", e); //NOSONAR
        }
    }

    /**
     * BASE64密文解密
     *
     * @param content BASE64密文内容
     * @param secretKey RSA公钥或私钥
     * @return 原始文本
     */
    public static String decrypt(String content, String secretKey) {
        byte[] keyBytes = Base64.getDecoder().decode(secretKey.getBytes());
        byte[] strBytes = decrypt(Base64.getDecoder().decode(content.getBytes()), keyBytes);
        return new String(strBytes, StandardCharsets.UTF_8);
    }

    /**
     * 密文字节码解密
     *
     * @param content 字节码密文
     * @param secretKey RSA公钥或私钥
     * @return 原始字节码
     */
    public static byte[] decrypt(byte[] content, byte[] secretKey) {
        try {
            KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
            return decrypt(content, secretKey, false);
        } catch (Exception ignore) {
            return decrypt(content, secretKey, true);
        }
    }

    /**
     * 密文字节码解密
     *
     * @param content 字节码密文
     * @param secretKey RSA公钥或私钥
     * @param privateKey 是否是私钥
     * @return 原始字节码
     */
    public static byte[] decrypt(byte[] content, byte[] secretKey, boolean privateKey) {
        if (null == content) {
            return null;
        }
        if (content.length < 1) {
            return content;
        }
        try {
            Key rsaKey = privateKey ? KeyFactory.getInstance(ALGORITHM).generatePrivate(new PKCS8EncodedKeySpec(secretKey))
                    : KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
            Cipher cipher = Cipher.getInstance(ALGORITHM);
            cipher.init(Cipher.DECRYPT_MODE, rsaKey);

            final int keySize = ((RSAKey) rsaKey).getModulus().bitLength();
            final int maxBlockSize = keySize / 8;
            if (content.length < maxBlockSize) {
                return cipher.doFinal(content);
            }
            int snippetSize = content.length / maxBlockSize + (content.length % maxBlockSize == 0 ? 0 : 1);

            List<byte[]> snippetBytes = new ArrayList<>(snippetSize);
            int totalBytes = 0;
            for (int i = 0; i< snippetSize; i++) {
                int offset = maxBlockSize * i;
                int len = Math.min(maxBlockSize, content.length - offset);
                byte[] bytes = cipher.doFinal(content, offset, len);
                snippetBytes.add(bytes);
                totalBytes += bytes.length;
            }
            // join by `System.arraycopy`
            byte[] byteBuffer = new byte[totalBytes];
            int index = 0;
            for (int i = 0; i < snippetSize; i++) {
                byte[] bytes = snippetBytes.get(i);
                if (null == bytes || bytes.length < 1) {
                    continue; // unreachable
                }
                System.arraycopy(bytes, 0, byteBuffer, index, bytes.length);
                index += bytes.length;
            }
            return byteBuffer;
        } catch (Exception e) {
            throw new RuntimeException("Decrypt failed.", e); //NOSONAR
        }
    }

    /**
     * 随机生成密钥对
     * @return {publicKey}:{privateKey}
     */
    public static UnmodifiableMapEntry<String, String> genKeyPair() {
        KeyPairGenerator keyPairGen;
        try {
            keyPairGen = KeyPairGenerator.getInstance(ALGORITHM);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("NoSuchAlgorithmException: "  + ALGORITHM, e);
        }
        try {
            // 初始化密钥对生成器,密钥大小为96-1024位
            keyPairGen.initialize(1024, new SecureRandom());
            java.security.KeyPair keyPair = keyPairGen.generateKeyPair();
            RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
            RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
            return new UnmodifiableMapEntry(Base64.getEncoder().encodeToString(publicKey.getEncoded()),
                    Base64.getEncoder().encodeToString((privateKey.getEncoded())));
        } catch (Exception e) {
            throw new RuntimeException("Failed to generate RSA key-pair.");
        }
    }

}

HTTP/HTTPS订阅者示例(java):

@RestController
@RequestMapping(value = "/api/mdm")
public class MDMWebhookController {
    
    @GetMapping("/ping")
    public String ping() {
        return "OK";
    }
    
    /**
     * 自适应明文传输和密文传输
     */
    @PostMapping("/hook")
    public Map<String, Object> hook(
            @RequestParam String model,
            @RequestBody byte[] payload,
            @RequestParam(required = false) boolean dict,
            @RequestHeader(required = false, value = "x-mdm-security") String encoder,
            @RequestHeader(required = false, value = "x-mdm-gzip") String gzip) {
        boolean gzipEnabled = "on".equalsIgnoreCase(gzip);
        byte[] jsonBytes;
        // 测试配置订阅者时,在推送 API PATH 结尾加入查询参数?encoder=RSA[BASE64] 与所配置加密方式保持一致
        if ("BASE64".equalsIgnoreCase(encoder)) {
            jsonBytes = Base64.getDecoder().decode(gzipEnabled ? GzipUtils.unzip(payload) : payload);
        } else if ("RSA".equalsIgnoreCase(encoder)) {
            byte[] data = gzipEnabled ? GzipUtils.unzip(payload) : payload;
            try { // 推送配置那边既可以采用公钥加密也可以采用私钥加密 - 取决于用户如何选择公私密钥配置(这里是Example就加一下try-another)
                jsonBytes = RsaEncryptUtils.decrypt(data, Base64.getDecoder().decode(RSA_KEY.getValue()), true);
            } catch (Exception e) {
                jsonBytes = RsaEncryptUtils.decrypt(data, Base64.getDecoder().decode(RSA_KEY.getKey()), false);
            }
        } else {
            // throw new RuntimeException("Illegal encoder = " + encoder);
            jsonBytes = payload; // 本接口也可以接收明文推送数据
        }
        // TODO 把明文JSON字节码转成你的业务数据对象
        // List<Map<String, Object>> data = Arrays.asList(JSONUtils.parseObject(jsonBytes, HashMap[].class));
        // System.out.println((dict ? "Dict" : "Model") + "[" + model + "] Received: " + data); //NOSONAR
        String data = new String(jsonBytes, StandardCharsets.UTF_8);
        LOGGER.info("{}[{}] Received: {}", dict ? "Dict" : "Model", model, data); //NOSONAR

        Map<String, Object> msg = new HashMap<>();
        msg.put("code", "200");
        msg.put("message", "OK");
        return msg;
    }

}

# 通知配置

数据推送失败时,如果需要进行邮件等渠道通知可以切换到通知配置标签页进行配置。

image-20250723113343550

# 删除订阅者

对于没有配置数据模型推送的订阅者才允许删除。

← 相似检索 订阅配置 →