elasticsearch(一)java 分别使用同步和异步方法进行索引、更新操作

in with 0 comment

一、索引或更新基本步骤

  1. 创建与elasticsearch服务进行连接的RestHighLevelClient对象
     RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );

2.将文档内容以一个XContentBuilder 对象的方式进行创建,elasticsearch内容助手会根据该对象自动生成json格式内容进行保存

  XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();

  1. 创建IndexRequest 索引请求对象,并将XContentBuilder 作为参数传入其source方法
   IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
                .source(builder);

posts为索引库,doc为类型,1为指定的文档id

4.其它参数设置

indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.opType(DocWriteRequest.OpType.INDEX);

a,如果操作设置为DocWriteRequest.OpType.INDEX(默认值),如果文档存在,则更新文档;如果文档不存在,则创建文档

b,如果操作设置为DocWriteRequest.OpType.CREATE,则是指定为创建文档操作,如果对象的文档(根据id判断)存在,则报错如下:

ElasticsearchStatusException[Elasticsearch exception 
[type=version_conflict_engine_exception, 
reason=[doc][16]: version conflict, document already exists

索引操作只能为以上两种操作值,不能为UPDATE和DELETE

二、进行同步请求:使用client.index(indexRequest, RequestOptions.DEFAULT);方法

  try {
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String index = indexResponse.getIndex();
            String type = indexResponse.getType();
            String id = indexResponse.getId();
            long version = indexResponse.getVersion();
            if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                System.out.println("添加成功");
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
                System.out.println("index:" + index);
            } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                System.out.println("更新成功");
                System.out.println("index:" + index);
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                System.out.println("创建的文档与已存在的发生冲突");
            }
        }

三、异步请求:

1.创建异步请求的,回调对象:ActionListener

如果执行成功,会自动调用onResponse方法,如果执行失败,会回调onFailure方法

可以从传入的IndexResponse和Exception类型参数中获取相关创建情况信息

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("添加成功");
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                    System.out.println("index:" + index);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("更新成功");
                    System.out.println("index:" + index);
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                }
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
 
                }
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                    }
                }
            }
 
            @Override
            public void onFailure(Exception e) {
                e.printStackTrace();
            }
        };

2.进行异步请求:将请求对象、和回调对象作为参数传入

client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);

三、完整示例代码如下:

1)同步方法代码示例:


package com.example.elasticsearch.main;
 
 
 
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
 
import java.util.Date;
 
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/6
 * @Time: 15:16
 * @Description:
 */
public class Test {
    public static void main(String[] args) throws Exception{
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );
 
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "20")
                .source(builder);
        indexRequest.timeout(TimeValue.timeValueSeconds(5));
        indexRequest.opType(DocWriteRequest.OpType.INDEX);
        
        try {
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String index = indexResponse.getIndex();
            String type = indexResponse.getType();
            String id = indexResponse.getId();
            long version = indexResponse.getVersion();
            if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                System.out.println("添加成功");
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
                System.out.println("index:" + index);
            } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                System.out.println("更新成功");
                System.out.println("index:" + index);
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                System.out.println("创建的文档与已存在的发生冲突");
            }
        }
 
        client.close();
    }
}

2.异步方法代码示例


package com.example.elasticsearch.main;
 
 
 
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
 
import java.util.Date;
 
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/6
 * @Time: 15:16
 * @Description:
 */
public class Test {
    public static void main(String[] args) throws Exception{
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );
 
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
                .source(builder);
        indexRequest.timeout(TimeValue.timeValueSeconds(5));
        indexRequest.opType(DocWriteRequest.OpType.CREATE);
 
        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("添加成功");
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                    System.out.println("index:" + index);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("更新成功");
                    System.out.println("index:" + index);
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                }
            }
 
            @Override
            public void onFailure(Exception e) {
                ElasticsearchException elasticsearchException = (ElasticsearchException)e;
                if (elasticsearchException.status() == RestStatus.CONFLICT) {
                    System.out.println("创建的文档已存在");
                }
            }
        };
     
        client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
 
 
//        client.close();
    }
}

注意:

1,// client.close(); 如果不被注释掉,可能还没有将请求发送出去,连接就会被关闭,从而创建或更新失败。 所以如上代码的异步请求中,将此行注释掉以进行实验

2,IndexRequest indexRequest = new IndexRequest("posts", "doc", null)

创建请求对象的id设置为null,则每次执行后都会自动生成一个新的id