Bulk API

    BulkRequest 可以被用在使用单个请求执行多个 索引,更新 和/或 删除 操作的情况下。

    它要求至少要一个操作被添加到 Bulk 请求上:

    不同的操作类型也可以添加到同一个BulkRequest中:

    1. request.add(new DeleteRequest("posts", "doc", "3")); //
    2. Adds a DeleteRequest to the BulkRequest. See Delete API for more information on how to build DeleteRequest.
    3. request.add(new UpdateRequest("posts", "doc", "2")
    4. .doc(XContentType.JSON,"other", "test")); //
    5. Adds an UpdateRequest to the BulkRequest. See Update API for more information on how to build UpdateRequest.
    6. request.add(new IndexRequest("posts", "doc", "4") //Adds an IndexRequest using the SMILE format
    7. .source(XContentType.JSON,"field", "baz"));

    提供下列可选参数:

    1. request.timeout(TimeValue.timeValueMinutes(2));
    2. request.timeout("2m");
    3. Timeout to wait for the bulk request to be performed as a TimeValue
    4. Timeout to wait for the bulk request to be performed as a String
    5. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
    6. request.setRefreshPolicy("wait_for");
    7. Refresh policy as a WriteRequest.RefreshPolicy instance
    8. Refresh policy as a String
    9. request.waitForActiveShards(2);
    10. request.waitForActiveShards(ActiveShardCount.ALL);
    11. Sets the number of shard copies that must be active before proceeding with the index/update/delete operations.
    12. Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)
    1. BulkResponse bulkResponse = client.bulk(request);

    返回的BulkResponse包含有关执行操作的信息,并允许对每个结果进行迭代,如下所示:

    1. for (BulkItemResponse bulkItemResponse : bulkResponse) { //迭代所有操作的结果
    2. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
    3. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
    4. // Handle the response of an index operation
    5. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
    6. //Handle the response of a update operation
    7. UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    8. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
    9. // Handle the response of a delete operation
    10. DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    11. }
    12. }

    批量响应提供了一种快速检查一个或多个操作是否失败的方法:

    1. if (bulkResponse.hasFailures()) { // 只要有一个操作失败了,这个方法就返回 true
    2. }
    1. for (BulkItemResponse bulkItemResponse : bulkResponse) {
    2. if (bulkItemResponse.isFailed()) { //Indicate if a given operation failed
    3. BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); //Retrieve the failure of the failed operation
    4. }
    5. }

    BulkProcessor通过提供允许索引/更新/删除操作在添加到处理器时透明执行的实用程序类来简化Bulk API的使用。

    为了执行请求,BulkProcessor需要3个组件:

    • RestHighLevelClient

    这个客户端用来执行 BulkRequest 并接收 BulkResponse 。

    • BulkProcessor.Listener

    这个监听器会在每个 BulkRequest 执行之前和之后被调用,或者当 BulkRequest 失败时调用。

    • ThreadPool

    BulkRequest执行是使用这个池的线程完成的,允许BulkProcessor以非阻塞的方式工作,并允许在批量请求执行的同时接受新的索引/更新/删除请求。

    然后 BulkProcessor.Builder 类可以被用来构建新的 BulkProcessor :

    BulkProcessor.Builder 提供了方法来配置 BulkProcessor 应该如何处理请求的执行:

    1. BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
    2. builder.setBulkActions(500); //Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
    3. builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
    4. builder.setConcurrentRequests(0); //Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
    5. builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
    6. builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.
    1. source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
    2. IndexRequest two = new IndexRequest("posts", "doc", "2")
    3. .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
    4. .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
    5. bulkProcessor.add(one);
    6. bulkProcessor.add(two);
    7. bulkProcessor.add(three);

    这些请求将由 BulkProcessor 执行,它负责为每个批量请求调用 BulkProcessor.Listener 。
    监听器提供方法接收 BulkResponse 和 BulkResponse :

    1. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    2. @Override
    3. public void beforeBulk(long executionId, BulkRequest request) {
    4. int numberOfActions = request.numberOfActions(); //Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
    5. logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    6. }
    7. @Override
    8. public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    9. if (response.hasFailures()) {
    10. //在每个 BulkRequest 执行之后调用,此方法允许获知 BulkResponse 是否包含错误
    11. logger.warn("Bulk [{}] executed with failures", executionId);
    12. } else {
    13. logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
    14. }
    15. }
    16. @Override
    17. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    18. logger.error("Failed to execute bulk", failure); //如果 BulkRequest 执行失败则调用,此方法可获知失败情况。
    19. }
    20. };

    一旦将所有请求都添加到BulkProcessor,其实例需要使用两种可用的关闭方法之一关闭。

    一旦所有请求都被添加到了 BulkProcessor, 它的实例就需要使用两种可用的关闭方法之一进行关闭。

    awaitClose() 可以被用来等待到所有请求都被处理,或者到指定的等待时间:

    如果所有批量请求完成,则该方法返回 true ,如果在完成所有批量请求之前等待时间过长,则返回 false 。

    close() 方法可以被用来立即关闭 BulkProcessor :

      在关闭处理器之前,两个方法都会刷新已经被天教导处理器的请求,并禁止添加任何新的请求。