侧边栏壁纸
博主头像
一个九零后的萤火虫博主等级

行动起来,活在当下

  • 累计撰写 33 篇文章
  • 累计创建 7 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

elasticsearch(三)java 分别使用同步和异步方法进行删除操作

Administrator
2024-01-25 / 0 评论 / 0 点赞 / 21 阅读 / 8965 字

一、 基本步骤:

1.创建连接对象:

        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );

2.创建请求对象:

DeleteRequest request = new DeleteRequest("posts", "doc", "20");

3.设置请求参数:

                DeleteRequest request = new DeleteRequest("posts", "doc", "20");
                // 等待主分片可用的超时时间
                request.timeout(TimeValue.timeValueMinutes(10));

4. 执行同步请求:

 DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

5.查看返回结果:

a.判断删除的文档是否存在 :

deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND

b.查看分片执行情况信息:

ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
    }
     
    if (shardInfo.getFailed() > 0) {
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
             String reason = failure.reason();
             System.out.println("失败原因:" + reason);
             return;
        }
}

二、同步请求代码示例:

package com.example.elasticsearch.main;
 
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
 
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/7
 * @Time: 11:10
 * @Description:
 */
public class SynDeleteRequest {
 
    public static void main(String[] args) {
        try (RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        )) {
 
 
            DeleteRequest request = new DeleteRequest("posts", "doc", "1");
//            DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
//            DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
            // 等待主分片可用的超时时间
            request.timeout(TimeValue.timeValueMinutes(10));
            //WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
            if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                System.out.println("未找到需要删除的文档!");
                return;
            }
            String index = deleteResponse.getIndex();
            String type = deleteResponse.getType();
            String id = deleteResponse.getId();
            long version = deleteResponse.getVersion();
            System.out.println("index:" + index + "; type:" + type + "; id:" + id + ",version:" + version);
            ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
//                return;
            }
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason();
                    System.out.println("失败原因:" + reason);
                    return;
                }
            }
 
        }catch (ElasticsearchException e) {
            if(e.status() == RestStatus.CONFLICT) {
                System.out.println("需要删除的文档版本与现在文档冲突!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

三、异步删除代码示例

package com.example.elasticsearch.document;
 
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
 
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/7
 * @Time: 13:46
 * @Description:
 */
public class AsynDeleteRequest {
    public static void main(String[] args) {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );
 
        DeleteRequest request = new DeleteRequest("posts", "doc", "2");
//            DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
//            DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(5);
        // 等待主分片可用的超时时间
        request.timeout(TimeValue.timeValueMinutes(10));
        //WAIT_UNTIL 一直保持请求连接中,直接当所做的更改对于搜索查询可见时的刷新发生后,再将结果返回
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
 
        // 异步回调对象
        ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
 
            @Override
            public void onResponse(DeleteResponse deleteResponse) {
                if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                    System.out.println("未找到需要删除的文档!");
                    return;
                }
                String index = deleteResponse.getIndex();
                String type = deleteResponse.getType();
                String id = deleteResponse.getId();
                long version = deleteResponse.getVersion();
                System.out.println("index:" + index + "; type:" + type + "; id:" + id + ",version:" + version);
                ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    System.out.println("未完全执行所有分片,总分片数为:" + shardInfo.getTotal() + ",执行的分片数为:"+ shardInfo.getSuccessful());
                }
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                        System.out.println("失败原因:" + reason);
                        return;
                    }
                }
            }
 
            @Override
            public void onFailure(Exception e) {
 
            }
        };
        client.deleteAsync(request, RequestOptions.DEFAULT, listener);
    }
}

四、

1,所有的异步操作,一定不要将连接client关闭太早,否则异步操作还没执行程序就被终止了

所以同步方法创作的连接对象可以放在try()条件中,执行完自动关闭。而异步方法的连接则不可以

2,注意:如果对一个文档添加版本条件(如id为2,版本为2)进行删除(删除后文档version变为3),再次删除时,

DeleteRequest request = new DeleteRequest("posts", "doc", "2").version(2);

报错如下,而不是提示找不到文档:

{
    "error":{
        "root_cause":[
            {
                "type":"version_conflict_engine_exception",
                "reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
                "index_uuid":"60e-U9cXSYqFGC34_gTrug",
                "shard":"2",
                "index":"posts"
            }
        ],
        "type":"version_conflict_engine_exception",
        "reason":"[doc][2]: version conflict, current version [3] is different than the one provided [2]",
        "index_uuid":"60e-U9cXSYqFGC34_gTrug",
        "shard":"2",
        "index":"posts"
    },
    "status":409
}

0

评论区