# 订阅者管理
订阅者即下游系统订阅主数据各模型数据的业务系统,其提供如http(webhook)或其他服务来接受主数据各模型的业务数据推送。
访问路径:【主数据管理门户】-> 【数据分发】->【订阅者系统】
# 订阅者分组
订阅者分组无实际业务意义,只是为了方便查看和归类。所有的订阅者系统必须放在订阅者分组下,所以用户添加订阅者之前先按需新建订阅者分组即可。
# 新增订阅者
点击【新增】订阅者按钮,弹出新增订阅者弹框,输入订阅者信息点击【提交】即可(部分订阅者类型支持测试连接)。
# 系统名称
订阅者的显示名称,上限为64字符。
# 订阅者类型
已适配类型http,https,webservice
等(允许用户通过SPI进行扩展,以扩展JAR文件的方式部署即可)。
http/https
: 配置服务地址(主机和端口)、推送API路径,测试API路径(强烈建议配置:有助于避免无效调度次数——如在网络不稳定/停机维护等场景下)以及请求头。webservice
: 需要配置服务地址(WSDL)、命名空间(Namespace)和推送方法(Method),选择配置测试连接方法(Method)。(默认适配不一定满足用户需求,用户可以按需扩展)socket
: 需要配置服务地址(主机和端口)。(默认适配不一定满足用户需求,用户可以按需扩展)
# 报文配置-HTTP/HTTPS响应报文校验
响应报文校验仅使用于HTTP/HTTPS
订阅者类型,属于可选配置(切换到报文配置
标签页)。建议用户采用HTTP标准状态码2XX
来表示业务数据推送调用成功(校验方式选择忽略),否则可以配置一个报文内容校验格式(自定义,即HTTP响应状态码2XX且响应JSON报文体内容中业务调用结果码符合预期)。e.g. 响应报文JSON结构:
// e.g. 状态码属性:code
{
"code": "200",
"message": "OK"
}
// e.g. 状态码属性:response.code
{
"response": {
"code": "200",
"message": "OK"
}
}
支持以下五种配置验证办法:
序号 | 匹配方式 | 示例 |
---|---|---|
1 | 精确匹配一个常量值,即只有业务代码为XXX才算业务调用成功 | 200 |
2 | 穷举多个常量值,即业务代码在这些穷举常量值内都算业务调用成功(使用英文分号; 进行分割) | 200;201;204 |
3 | 排除一个常量值,即不是XXX都算业务调用成功(头部加英文! 符号) | !500 |
4 | 排除多个常量值,即不是这些穷举常量值都算业务调用成功(头部加英文! 符号) | !401;403;404;500 |
5 | 正则匹配,按格式regex: {expression_write_to_here} 进行书写(前缀regex: 不可省略) | regex: ^(200|201|204)$ |
系统默认校验方式配置(config/application-mdm.properties
):
#mdm.push.http.validate-response-body.enabled=true
#mdm.push.http.validate-response-body.property=code
#mdm.push.http.validate-response-body.pattern=200
# if multi-success-codes, e.g.
#mdm.push.http.validate-response-body.pattern=200;204
# not equals, e.g.
#mdm.push.http.validate-response-body.pattern=!500
# not in, e.g.
#mdm.push.http.validate-response-body.pattern=!401;403;404;500
# use expression (suffix: `regex: `)
#mdm.push.http.validate-response-body.pattern=regex: {COPY_YOUR_REGEX_TO_HERE}
# 报文配置-业务数据加密安全传输
如果数据推送需要先把模型业务数据进行加密后通过网络传输(密文字节码)可以对报文进行配置。
建议选择RSA
加密方式(对称加密技术),从外部拷贝一份RAS公钥(BASE64字符串)或使用生成密钥对
按钮立即生成(生成后注意保存你的私钥,此私钥将用于下游系统解密推送报文)。
RSA加解密工具(java):
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.KeyFactory;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.interfaces.RSAKey;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import javax.crypto.Cipher;
import org.apache.commons.collections4.keyvalue.UnmodifiableMapEntry;
/**
* 基于RSA加解密工具
*/
public class RsaEncryptUtils {
private static final String ALGORITHM = "RSA";
private RsaEncryptUtils() {
}
/**
* 文本加密
* @param content 字符串内容
* @param secretKey RSA公钥或私钥
* @return BASE64密文
*/
public static String encrypt(String content, String secretKey) {
byte[] keyBytes = Base64.getDecoder().decode(secretKey.getBytes());
byte[] cipherBytes = encrypt(content.getBytes(StandardCharsets.UTF_8), keyBytes);
return Base64.getEncoder().encodeToString(cipherBytes);
}
/**
* 字节码加密
* @param content 字节内容
* @param secretKey RSA公钥或私钥
* @return 密文字节码
*/
public static byte[] encrypt(byte[] content, byte[] secretKey) {
try {
KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
return encrypt(content, secretKey, true);
} catch (Exception ignore) { // retry
return encrypt(content, secretKey, false);
}
}
/**
* 字节码加密
* @param content 字节内容
* @param secretKey RSA公钥或私钥
* @param publicKey 是否为公钥
* @return 密文字节码
*/
public static byte[] encrypt(byte[] content, byte[] secretKey, boolean publicKey) {
if (null == content || content.length < 1) {
return null;
}
// 数据推送会限制单次发送的数据量也就是能限制住`content`的大小
try {
Key rsaKey = publicKey ? KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey))
: KeyFactory.getInstance(ALGORITHM).generatePrivate(new PKCS8EncodedKeySpec(secretKey));
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, rsaKey);
final int keySize = ((RSAKey) rsaKey).getModulus().bitLength();
final int maxBlockSize = (keySize / 8) - 11;
if (content.length <= maxBlockSize) {
return cipher.doFinal(content);
}
// ByteBuffer#put vs System#arraycopy 哪个性能好 ???
// ByteArrayOutputStream#write 也使用了System#arraycopy
int snippetSize = content.length / maxBlockSize + (content.length % maxBlockSize == 0 ? 0 : 1);
List<byte[]> snippetBytes = new ArrayList<>(snippetSize);
int totalBytes = 0;
for (int i = 0; i < snippetSize; i++) {
int offset = maxBlockSize * i;
int length = Math.min(maxBlockSize, content.length - offset);
snippetBytes.add(cipher.doFinal(content, offset, length));
totalBytes += snippetBytes.get(i).length;
}
// join by `System.arraycopy`
byte[] byteBuffer = new byte[totalBytes];
// ^_^ 一次性开辟所需空间,避免多次数组拷贝,应该是性能最佳的
int index = 0;
for (int i = 0; i < snippetSize; i++) {
byte[] bytes = snippetBytes.get(i);
if (null == bytes || bytes.length < 1) {
continue; // unreachable
}
System.arraycopy(bytes, 0, byteBuffer, index, bytes.length);
index += bytes.length;
}
/**
// join by `ByteBuffer#put`
ByteBuffer byteBuffer = ByteBuffer.allocate(totalBytes);
snippetBytes.forEach(byteBuffer::put);
return byteBuffer.array();
// join by `ByteArrayOutputStream#write`
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
snippetBytes.forEach(out::write);
return out.toByteArray();
}
*/
return byteBuffer;
} catch (Exception e) {
throw new RuntimeException("Encrypt failed.", e); //NOSONAR
}
}
/**
* BASE64密文解密
*
* @param content BASE64密文内容
* @param secretKey RSA公钥或私钥
* @return 原始文本
*/
public static String decrypt(String content, String secretKey) {
byte[] keyBytes = Base64.getDecoder().decode(secretKey.getBytes());
byte[] strBytes = decrypt(Base64.getDecoder().decode(content.getBytes()), keyBytes);
return new String(strBytes, StandardCharsets.UTF_8);
}
/**
* 密文字节码解密
*
* @param content 字节码密文
* @param secretKey RSA公钥或私钥
* @return 原始字节码
*/
public static byte[] decrypt(byte[] content, byte[] secretKey) {
try {
KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
return decrypt(content, secretKey, false);
} catch (Exception ignore) {
return decrypt(content, secretKey, true);
}
}
/**
* 密文字节码解密
*
* @param content 字节码密文
* @param secretKey RSA公钥或私钥
* @param privateKey 是否是私钥
* @return 原始字节码
*/
public static byte[] decrypt(byte[] content, byte[] secretKey, boolean privateKey) {
if (null == content) {
return null;
}
if (content.length < 1) {
return content;
}
try {
Key rsaKey = privateKey ? KeyFactory.getInstance(ALGORITHM).generatePrivate(new PKCS8EncodedKeySpec(secretKey))
: KeyFactory.getInstance(ALGORITHM).generatePublic(new X509EncodedKeySpec(secretKey));
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, rsaKey);
final int keySize = ((RSAKey) rsaKey).getModulus().bitLength();
final int maxBlockSize = keySize / 8;
if (content.length < maxBlockSize) {
return cipher.doFinal(content);
}
int snippetSize = content.length / maxBlockSize + (content.length % maxBlockSize == 0 ? 0 : 1);
List<byte[]> snippetBytes = new ArrayList<>(snippetSize);
int totalBytes = 0;
for (int i = 0; i< snippetSize; i++) {
int offset = maxBlockSize * i;
int len = Math.min(maxBlockSize, content.length - offset);
byte[] bytes = cipher.doFinal(content, offset, len);
snippetBytes.add(bytes);
totalBytes += bytes.length;
}
// join by `System.arraycopy`
byte[] byteBuffer = new byte[totalBytes];
int index = 0;
for (int i = 0; i < snippetSize; i++) {
byte[] bytes = snippetBytes.get(i);
if (null == bytes || bytes.length < 1) {
continue; // unreachable
}
System.arraycopy(bytes, 0, byteBuffer, index, bytes.length);
index += bytes.length;
}
return byteBuffer;
} catch (Exception e) {
throw new RuntimeException("Decrypt failed.", e); //NOSONAR
}
}
/**
* 随机生成密钥对
* @return {publicKey}:{privateKey}
*/
public static UnmodifiableMapEntry<String, String> genKeyPair() {
KeyPairGenerator keyPairGen;
try {
keyPairGen = KeyPairGenerator.getInstance(ALGORITHM);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("NoSuchAlgorithmException: " + ALGORITHM, e);
}
try {
// 初始化密钥对生成器,密钥大小为96-1024位
keyPairGen.initialize(1024, new SecureRandom());
java.security.KeyPair keyPair = keyPairGen.generateKeyPair();
RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
return new UnmodifiableMapEntry(Base64.getEncoder().encodeToString(publicKey.getEncoded()),
Base64.getEncoder().encodeToString((privateKey.getEncoded())));
} catch (Exception e) {
throw new RuntimeException("Failed to generate RSA key-pair.");
}
}
}
HTTP/HTTPS订阅者示例(java):
@RestController
@RequestMapping(value = "/api/mdm")
public class MDMWebhookController {
@GetMapping("/ping")
public String ping() {
return "OK";
}
/**
* 自适应明文传输和密文传输
*/
@PostMapping("/hook")
public Map<String, Object> hook(
@RequestParam String model,
@RequestBody byte[] payload,
@RequestParam(required = false) boolean dict,
@RequestHeader(required = false, value = "x-mdm-security") String encoder,
@RequestHeader(required = false, value = "x-mdm-gzip") String gzip) {
boolean gzipEnabled = "on".equalsIgnoreCase(gzip);
byte[] jsonBytes;
// 测试配置订阅者时,在推送 API PATH 结尾加入查询参数?encoder=RSA[BASE64] 与所配置加密方式保持一致
if ("BASE64".equalsIgnoreCase(encoder)) {
jsonBytes = Base64.getDecoder().decode(gzipEnabled ? GzipUtils.unzip(payload) : payload);
} else if ("RSA".equalsIgnoreCase(encoder)) {
byte[] data = gzipEnabled ? GzipUtils.unzip(payload) : payload;
try { // 推送配置那边既可以采用公钥加密也可以采用私钥加密 - 取决于用户如何选择公私密钥配置(这里是Example就加一下try-another)
jsonBytes = RsaEncryptUtils.decrypt(data, Base64.getDecoder().decode(RSA_KEY.getValue()), true);
} catch (Exception e) {
jsonBytes = RsaEncryptUtils.decrypt(data, Base64.getDecoder().decode(RSA_KEY.getKey()), false);
}
} else {
// throw new RuntimeException("Illegal encoder = " + encoder);
jsonBytes = payload; // 本接口也可以接收明文推送数据
}
// TODO 把明文JSON字节码转成你的业务数据对象
// List<Map<String, Object>> data = Arrays.asList(JSONUtils.parseObject(jsonBytes, HashMap[].class));
// System.out.println((dict ? "Dict" : "Model") + "[" + model + "] Received: " + data); //NOSONAR
String data = new String(jsonBytes, StandardCharsets.UTF_8);
LOGGER.info("{}[{}] Received: {}", dict ? "Dict" : "Model", model, data); //NOSONAR
Map<String, Object> msg = new HashMap<>();
msg.put("code", "200");
msg.put("message", "OK");
return msg;
}
}
# 通知配置
数据推送失败时,如果需要进行邮件等渠道通知可以切换到通知配置
标签页进行配置。
# 删除订阅者
对于没有配置数据模型推送的订阅者才允许删除。