示例

    Api 详情,请查看总览

    1. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
    2. String configBasePath, int msgType) {
    3. ProxyClientConfig dataProxyConfig = null;
    4. DefaultMessageSender messageSender = null;
    5. try {
    6. dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
    7. Integer.valueOf(inLongManagerPort), dataProxyGroup, netTag);
    8. if (StringUtils.isNotEmpty(configBasePath)) {
    9. dataProxyConfig.setConfStoreBasePath(configBasePath);
    10. }
    11. dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
    12. messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
    13. messageSender.setMsgtype(msgType);
    14. logger.error("getMessageSender has exception e = {}", e);
    15. }
    16. return messageSender;
    17. }

    参数说明如下:

    当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。

    本地文件的路径为:

    1. ${configBasePath}

    文件名称为:

    1. ${dataProxyGroup}.local
    1. dataProxyGroup = inlong_test

    则本地文件的全路径名称为:

    文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port为对应的端口,这需要至少配置两个(可以配置为相同的两项):

    1. {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}
    1. public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId,
    2. String inlongStreamId, String messageBody, long dt) throws Exception {
    3. SendResult result = sender.sendMessage(messageBody.getBytes("utf8"),inlongGroupId, inlongStreamId,
    4. 0, String.valueOf(dt), 20,TimeUnit.SECONDS);
    5. logger.info("messageSender {} ", result);
    6. }

    参数说明如下:

    1. public HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
    2. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
    3. String configBasePath) {
    4. ProxyClientConfig proxyConfig = null;
    5. HttpProxySender sender = null;
    6. try {
    7. proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
    8. Integer.valueOf(inLongManagerPort),
    9. proxyConfig.setGroupId(dataProxyGroup);
    10. proxyConfig.setConfStoreBasePath(configBasePath);
    11. proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
    12. sender = new HttpProxySender(proxyConfig);
    13. } catch (ProxysdkException e) {
    14. e.printStackTrace();
    15. } catch (Exception e) {
    16. e.printStackTrace();
    17. }
    18. return sender;
    19. }

    参数说明如下:

    当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。

    1. ${configBasePath}

    文件名称为:

    例如:

    1. configBasePath = /data/inlong
    2. dataProxyGroup = inlong_test

    则本地文件的全路径名称为:

    1. /data/inlong/inlong_test.local

    文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port 为对应的端口,这需要至少配置两个(可以配置为相同的两项):

    1. {"isInterVisit":1,"clusterId":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}
    1. public void sendHttpMessage(HttpProxySender sender, String inlongGroupId,
    2. String inlongStreamId, String messageBody) throws Exception {
    3. List<String> bodyList = new ArrayList<>();
    4. bodyList.add(messageBody);
    5. sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
    6. 20, TimeUnit.SECONDS, new MyMessageCallBack());

    参数说明如下: