示例
Api 详情,请查看总览
String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
Integer.valueOf(inLongManagerPort), dataProxyGroup, netTag);
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfStoreBasePath(configBasePath);
}
dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
messageSender.setMsgtype(msgType);
logger.error("getMessageSender has exception e = {}", e);
}
return messageSender;
}
参数说明如下:
当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。
本地文件的路径为:
${configBasePath}
文件名称为:
${dataProxyGroup}.local
dataProxyGroup = inlong_test
则本地文件的全路径名称为:
文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port为对应的端口,这需要至少配置两个(可以配置为相同的两项):
{"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}
public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId,
String inlongStreamId, String messageBody, long dt) throws Exception {
SendResult result = sender.sendMessage(messageBody.getBytes("utf8"),inlongGroupId, inlongStreamId,
0, String.valueOf(dt), 20,TimeUnit.SECONDS);
logger.info("messageSender {} ", result);
}
参数说明如下:
public HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
String configBasePath) {
ProxyClientConfig proxyConfig = null;
HttpProxySender sender = null;
try {
proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
Integer.valueOf(inLongManagerPort),
proxyConfig.setGroupId(dataProxyGroup);
proxyConfig.setConfStoreBasePath(configBasePath);
proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
sender = new HttpProxySender(proxyConfig);
} catch (ProxysdkException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return sender;
}
参数说明如下:
当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。
${configBasePath}
文件名称为:
例如:
configBasePath = /data/inlong
dataProxyGroup = inlong_test
则本地文件的全路径名称为:
/data/inlong/inlong_test.local
文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port 为对应的端口,这需要至少配置两个(可以配置为相同的两项):
{"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}
public void sendHttpMessage(HttpProxySender sender, String inlongGroupId,
String inlongStreamId, String messageBody) throws Exception {
List<String> bodyList = new ArrayList<>();
bodyList.add(messageBody);
sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
20, TimeUnit.SECONDS, new MyMessageCallBack());
参数说明如下: