Springboot使用ES官方推荐方式REST Client整合ES实现关键词高亮全文搜索

wylc123 3月前 ⋅ 277 阅读

1、 简介REST Client

SpringBoot整合ES的方式(TransportClient、Data-ES、Elasticsearch SQL、REST Client)。
1.1 TransportClient即将弃用。
1.2 Spring提供的封装的方式,好像底层也是基于TransportClient,Elasticsearch7.0后的版本不怎么支持。
1.3 将Elasticsearch的Query DSLSQL转换查询,早期有一个第三方的插件Elasticsearch-SQL,后来随着官方也开始做这方面,这个插件好像就没怎么更新了。

1.4 官方推荐使用,所以我们采用这个方式,这个分为两个Low Level REST Client和High Level REST Client,Low Level REST Client是早期出的API比较简陋了,还需要自己去拼写Query DSL,High Level REST Client使用起来更好用,更符合面向对象的感觉,所有选择使用High Level REST Client。

2、 引用依赖

<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>7.5.1</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>elasticsearch-rest-high-level-client</artifactId>
			<version>7.5.1</version>
		</dependency>
<!-- lettuce pool 缓存连接池 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

3、 添加配置

# Elasticsearch配置
es:
  host: 127.0.0.1
  port: 9200

4、添加配置类

ESConf.java

package com.example.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class ESConf {
    @Value("${es.host}")
    private String host;
    @Value("${es.port}")
    private int port;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

5、对象池配置类

MyEsClientPool.java

package com.example.base;

import com.example.config.ESConf;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: SongBin
 * @date: 2020/4/28 9:12
 * @description:
 * @version: 1.0
 */
@Component
public class MyEsClientPool {
    //private static final String HOST = "192.168.20.138"; // 集群节点
    //private static final int PORT = 9200;
    // 对象池配置类,不写也可以,采用默认配置
    private GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

    // 采用默认配置maxTotal是8,池中有8个client
    public MyEsClientPool() {

        poolConfig.setMaxTotal(20);

        poolConfig.setMaxIdle(5);

        poolConfig.setTestWhileIdle(true);
        poolConfig.setTestOnBorrow(false);

        poolConfig.setTimeBetweenEvictionRunsMillis(300000L);
        poolConfig.setMinIdle(5);
    }

    @Autowired
    public ESConf esConf;

    private GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<RestHighLevelClient>(new PooledObjectFactory<RestHighLevelClient>() {

        public PooledObject<RestHighLevelClient> makeObject() throws Exception {
            RestHighLevelClient client = null;
            try {
                client = new RestHighLevelClient(RestClient.builder(new HttpHost(esConf.getHost(), esConf.getPort(), "http")));
            } catch (Exception e) {
                e.printStackTrace();
            }
            return new DefaultPooledObject<RestHighLevelClient>(client);
        }

        public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
            RestHighLevelClient client = pooledObject.getObject();
            client.close();
        }

        public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
            return true;
        }

        public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
            System.out.println("激活客户端");
        }

        public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
            System.out.println("释放客户端");
        }
    }, poolConfig);

    /**
     * 获得对象
     *
     * @return
     * @throws Exception
     */
    public RestHighLevelClient getClient() throws Exception {

        RestHighLevelClient client = clientPool.borrowObject();
        return client;
    }

    /**
     * 归还对象
     *
     * @param client
     */
    public void returnClient(RestHighLevelClient client) {
        if (client != null) {
            clientPool.returnObject(client);
        }
    }


    /*public static void main(String[] args) throws Exception {
        RestHighLevelClient client = MyEsClientPool.getClient();
        System.out.println(client);
    }*/
}

6、ES操作工具类

MyEsUtils.java

package com.example.base;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author: SongBin
 * @date: 2020/4/27 10:11
 * @description:
 * @version: 1.0
 */
@Component
public class MyEsUtils {

    private static RestHighLevelClient client = null;

    @Autowired
    private MyEsClientPool MyEsClientPool;

    /**
     * 获取客户端
     *
     * @return
     */
    public RestHighLevelClient getRestHighLevelClient() {
        if (client == null) {
            try {
                client = MyEsClientPool.getClient();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return client;
    }

    /**
     * 关闭客户端
     */
    public void closeClient(){
        if (client!=null){
            MyEsClientPool.returnClient(client);
        }
    }



    /**
     * 构建查询对象
     * @param filedsMap 查询条件 (key:查询字段 ,vlues:值)
     * @return
     */
    public BoolQueryBuilder getQueryBuilder(Map<String,String> filedsMap){

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        Set<String> strings = filedsMap.keySet();
        for (String string : strings) {
            boolQueryBuilder.must(QueryBuilders.wildcardQuery(string,"*"+filedsMap.get(string)+"*"));
        }
        return boolQueryBuilder;
    }

    /**
     * 获取分页后的结果集
     * @param queryBuilder 查询对象
     * @param esIndex      索引名
     * @param pageNo    页数
     * @param pagesize  页大小
     * @param glFields  需要高亮显示的字段
     * @return
     */
    public List<Map<String,Object>> getPageResultList(QueryBuilder queryBuilder, String esIndex, int pageNo, int pagesize,List<String> glFields){
        SearchRequest searchRequest = new SearchRequest(esIndex);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        //设置高亮显示
        HighlightBuilder highlightBuilder = new HighlightBuilder().field("*").requireFieldMatch(false);
        highlightBuilder.preTags("<span style=\"color:red\">");
        highlightBuilder.postTags("</span>");
        searchSourceBuilder.highlighter(highlightBuilder);

        if (pageNo>=1) {
            searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize);
        }else {
            searchSourceBuilder.query(queryBuilder).from(0).size(pagesize);
        }
        searchRequest.source(searchSourceBuilder);
        client = getRestHighLevelClient();
        SearchResponse searchResponse = null;

        try {
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 从response中获得结果
        List<Map<String,Object>> list = new LinkedList();
        searchResponse.getHits();

        SearchHits hits = searchResponse.getHits();

        Iterator<SearchHit> iterator = hits.iterator();
        while (iterator.hasNext()) {
            SearchHit next = iterator.next();
            Map<String, Object> source = next.getSourceAsMap();
            //处理高亮片段
            Map<String, HighlightField> highlightFields = next.getHighlightFields();
            for(String fieldName : glFields){
                HighlightField nameField = highlightFields.get(fieldName);
                if(nameField!=null){
                    Text[] fragments = nameField.fragments();
                    StringBuilder nameTmp = new StringBuilder();
                    for(Text text:fragments){
                        nameTmp.append(text);
                    }
                    //将高亮片段组装到结果中去
                    source.put(fieldName, nameTmp.toString());
                }
            }
            list.add(source);
        }
        return list;
    }

    /**
     * 全文检索
     * @param query
     * @return
     */
    public Map<String,Object> search(SearchRequestQuery query) throws Exception {
        //获取客户端
        client = getRestHighLevelClient();
        Map<String,Object> result = new HashMap<>();
        List<Map<String,Object>> list = new ArrayList<>();
        // 1、创建查询索引
        SearchRequest searchRequest = new SearchRequest(query.getEsIndex());
        // 2、条件查询
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //3.构建分页
        int pageNo = 1,pageSize =10;
        if(query.getPageNo() != null){
            pageNo = query.getPageNo();
        }
        if(query.getPageSize() != null){
            pageSize = query.getPageSize();
        }
        //3.1 es默认从第0页开始
        sourceBuilder.from((pageNo - 1) * pageSize);
        sourceBuilder.size(pageSize);
        //4.构建基础查询(包含基础查询和过滤条件)【过滤关系,key为(and或者or或者not),value为过滤字段和值】
        QueryBuilder queryBuilder =buildBasicQueryWithFilter(query);
        sourceBuilder.query(queryBuilder);
        //4.2 设置最长等待时间1分钟
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        // 5、高亮设置(替换返回结果文本中目标值的文本内容)
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        for (int i = 0; i < query.getKeywordFields().length; i++) {
            highlightBuilder.field(query.getKeywordFields()[i]);
        }
        //5.1允许同一个检索词多次高亮,false则表示,同意字段中同一个检索词第一个位置的高亮,其他不高亮
        highlightBuilder.requireFieldMatch(true);
        highlightBuilder.preTags("<span style='color:red'>");
        highlightBuilder.postTags("</span>");
        sourceBuilder.highlighter(highlightBuilder);
        //6.构建排序
        String sortBy = query.getSortBy();
        Boolean desc = query.getIsDesc();
        if (StringUtils.isNotBlank(sortBy)) {
            sourceBuilder.sort(new FieldSortBuilder(sortBy).order(desc ? SortOrder.DESC : SortOrder.ASC));
        }
        //7.聚合(分组)
        Map<String, String> aggs = query.getAggMap();
        if(aggs != null){
            for (Map.Entry<String, String> entry : aggs.entrySet()) {
                //聚合名称(分组)
                String aggName = entry.getKey();
                //聚合字段
                String aggFiled = entry.getValue();
                if(aggName != null || aggFiled != null) {
                    sourceBuilder.aggregation(AggregationBuilders.terms(aggName).field(aggFiled+".keyword"));
                }
            }
        }
        //8、通过sourceFilter设置返回的结果字段,第一个参数是显示的字段,第二个参数是不显示的字段,默认设置为null
        sourceBuilder.fetchSource(query.getSourceFilter(),null);
        //9、执行搜索
        searchRequest.source(sourceBuilder);
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            for (SearchHit doc : searchResponse.getHits().getHits()) {
                // 解析高亮字段
                Map<String, HighlightField> highlightFields = doc.getHighlightFields();
                for (int i = 0; i < query.getKeywordFields().length; i++) {
                    HighlightField fieldTitle = highlightFields.get(query.getKeywordFields()[i]);
                    // 获取原来的结果集
                    Map<String, Object> sourceAsMap = doc.getSourceAsMap();
                    if (fieldTitle != null) {
                        // 获取内容中匹配的片段
                        Text[] fragments = fieldTitle.fragments();
                        // 设置当前的目标字段为空
                        String new_fieldTitle = "";
                        for (Text res : fragments) {
                            new_fieldTitle += res;
                        }
                        // 将原来的结果替换为新结果
                        sourceAsMap.put(query.getKeywordFields()[i], new_fieldTitle);
                    }
                    list.add(sourceAsMap);
                }
            }
            // List 数组去重, 多字段查询高亮解析的时候存在数组重复的情况(优化方法未知!)
            list = list.stream().distinct().collect(Collectors.toList());
            int total = (int) searchResponse.getHits().getTotalHits().value;
            result.put("data",list);
            result.put("total",total);
            result.put("totalPage",total== 0 ? 0: (total%pageSize == 0 ? total / pageSize : (total / pageSize) + 1));
            result.put("pageSize",pageSize);
            result.put("pageNo",pageNo);

            //聚和结果处理
            Aggregations aggregations = searchResponse.getAggregations();
            List<Object> aggData = new ArrayList<>();
            if(aggregations != null){
                aggData = getAggData(aggregations,query);
            }
            result.put("aggData",aggData);

        } catch (IOException e) {
//            log.error(e);
        }finally {
//            closeClient(client);
        }
        return result;
    }

    /**
     * 聚合数据处理(分组)
     * @param aggregations
     * @param query
     * @return
     */
    private static List<Object> getAggData( Aggregations aggregations ,SearchRequestQuery query) {
        List<Object> result = new ArrayList<>();
        for (Map.Entry<String, String> entry : query.getAggMap().entrySet()) {
            LinkedHashMap<String,Object> map = new LinkedHashMap<>();
            //聚合名称(分组)
            String aggName = entry.getKey();
            //聚合字段
            String aggFiled = entry.getValue();
            if(aggName != null) {
                LinkedHashMap<String,Object> groupItem=new LinkedHashMap<>();
                Terms aggregation = aggregations.get(aggName);
                for (Terms.Bucket bucket : aggregation.getBuckets()) {
                    map.put(bucket.getKey().toString(),bucket.getDocCount());
                }
                groupItem.put("aggregationName",aggName);
                groupItem.put("aggregationField",aggFiled);
                groupItem.put("aggregationData",map);
                result.add(groupItem);
            }
        }
        return result;
    }

    private QueryBuilder buildBasicQueryWithFilter( SearchRequestQuery query ) {
        String flag = "";
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery();
        //过滤条件(and,or,not关系)
        Map<String, Map<String,String>> filter = query.getFilter();
        if(filter != null) {
            for (Map.Entry<String, Map<String,String>> entry : filter.entrySet()) {
                String key = entry.getKey();
                flag = key;
                Map<String, String> value =entry.getValue();
                for (Map.Entry<String, String> map : value.entrySet()) {
                    String filterKey = map.getKey();
                    String filterValue = map.getValue();
                    if(key == "and") {
                        queryBuilder.filter(QueryBuilders.termQuery(filterKey, filterValue));
                    }
                    if(key == "or") {
                        shouldQuery.should(QueryBuilders.termQuery(filterKey, filterValue));
                    }
                    if(key == "not") {
                        queryBuilder.mustNot(QueryBuilders.termQuery(filterKey, filterValue));
                    }
                }
            }
        }
        //过滤日期期间的值,比如2019-07-01到2019-07-17
        if(StringUtils.isNotBlank(query.getDateField()) || StringUtils.isNotBlank(query.getStartDate()) || StringUtils.isNotBlank(query.getEndDate())) {
            queryBuilder.must(QueryBuilders.rangeQuery(query.getDateField()).from(query.getStartDate()).to(query.getEndDate()));
        }
        //如果输入的查询条件为空,则查询所有数据
        if(query.getKeyword() == null || "".equals(query.getKeyword())) {
            queryBuilder.must(QueryBuilders.matchAllQuery());
            return queryBuilder;
        }
        if(flag == "or") {
            //配置中文分词器并指定并分词的搜索方式operator
            queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(), query.getKeywordFields()))
                    //解决should和must共用不生效问题
                    .must(shouldQuery);
        }else {
            //多字段查询,字段直接是or的关系
            queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(),query.getKeywordFields()));
        	/*queryBuilder.must(QueryBuilders.multiMatchQuery(query.getKeyword(),query.getKeywordFields())
                    .analyzer("ik_smart").operator(Operator.OR));*/
        }
        return queryBuilder;
    }

    public List<LinkedHashMap<String,Object>> getPageResultListLinked(QueryBuilder queryBuilder, String esIndex, int pageNo, int pagesize, SortBuilder sortBuilder, String[] includes, String[] excludes){
        SearchRequest searchRequest = new SearchRequest(esIndex);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        //searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize).sort(sortBuilder).fetchSource(includes,excludes);
        if(sortBuilder != null) {
            searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize).sort(sortBuilder);
        } else {
            searchSourceBuilder.query(queryBuilder).from((pageNo - 1) * pagesize).size(pagesize);
        }
        if(includes != null && includes.length > 0){
            searchSourceBuilder.fetchSource(includes,excludes);
        }
        searchRequest.source(searchSourceBuilder);
        client = getRestHighLevelClient();
        SearchResponse searchResponse = null;

        try {
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 从response中获得结果
        List<LinkedHashMap<String,Object>> list = new LinkedList();
        searchResponse.getHits();

        SearchHits hits = searchResponse.getHits();

        Iterator<SearchHit> iterator = hits.iterator();
        while (iterator.hasNext()) {
            SearchHit next = iterator.next();
            list.add(getMapValueForLinkedHashMap(next.getSourceAsMap()));
        }
        return list;
    }

    public static LinkedHashMap getMapValueForLinkedHashMap(Map dataMap) {
        LinkedHashMap returnMap = new LinkedHashMap();
        Iterator iterator = dataMap.keySet().iterator();
        while (iterator.hasNext()) {
            Object objKey = iterator.next();
            Object objValue = dataMap.get(objKey);
            if (objValue instanceof Map) {
                returnMap.put(objKey, getMapValueForLinkedHashMap((Map) objValue));
            } else {
                returnMap.put(toLowerCaseFirstOne(objKey.toString()), objValue);
            }
        }
        return returnMap;
    }

    private static String toLowerCaseFirstOne(String s) {
        if (Character.isLowerCase(s.charAt(0)))
            return s;
        else
            return (new StringBuilder()).append(Character.toLowerCase(s.charAt(0))).append(s.substring(1)).toString();
    }

    /**
     * 获取结果总数
     * @param queryBuilder
     * @param esIndex
     * @return
     */
    public Long getResultCount(QueryBuilder queryBuilder, String esIndex){
        CountRequest countRequest=new CountRequest(esIndex);
        countRequest.query(queryBuilder);
        try {
            CountResponse response=getRestHighLevelClient().count(countRequest,RequestOptions.DEFAULT);
            long length = response.getCount();
            return length;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0L;


    }

    /**
     * 获取文档总数
     * @param index
     * @return
     */
    public long getDocCount(String index){
        CountRequest countRequest = new CountRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        countRequest.source(searchSourceBuilder);
        CountResponse countResponse = null;
        client=getRestHighLevelClient();
        try {
            countResponse = client
                    .count(countRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return countResponse.getCount();
    }

    /**
     * 判断索引是否存在
     * @param esIndex
     * @return
     */
    public boolean isIndexExist(String esIndex){
        boolean isExists = true;
        GetIndexRequest request = new GetIndexRequest(esIndex);
        try {
            isExists = getRestHighLevelClient().indices().exists(request, RequestOptions.DEFAULT);
            if (isExists){
                System.out.println(String.format("索引%s已存在",esIndex));

            }else{
                System.out.println(String.format("索引%s不存在",esIndex));
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
        return isExists;
    }

    /**
     * 新建索引
     * @param esIndex
     * @param shards 分片数
     * @param replications 副本数
     * @param fileds 字段名->类型
     */
    public void createIndex(String esIndex,int shards,int replications,Map<String,String> fileds){
        if (!isIndexExist(esIndex)){
            try {
                XContentBuilder builder = XContentFactory.jsonBuilder()
                        .startObject()
                        .field("properties")
                        .startObject();
                for (String s : fileds.keySet()) {
                    builder.field(s).startObject().field("index","true").field("type",fileds.get(s)).endObject();
                }
                builder.endObject().endObject();
                CreateIndexRequest request = new CreateIndexRequest(esIndex);
                request.settings(Settings.builder()
                        .put("index.number_of_shards", shards)
                        .put("index.number_of_replicas", replications)
                ).mapping(builder);
                CreateIndexResponse createIndexResponse = getRestHighLevelClient().indices().create(request, RequestOptions.DEFAULT);
                boolean acknowledged = createIndexResponse.isAcknowledged();
                if (acknowledged){
                    System.out.println(String.format("索引%s创建成功",esIndex));
                }else{
                    System.out.println(String.format("索引%s创建失败",esIndex));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else{
            System.out.println(String.format("索引%s已存在",esIndex));
        }
    }

    /**
     * 删除索引
     * @param esIndex
     */
    public void deleteIndex(String esIndex){
        DeleteIndexRequest request = new DeleteIndexRequest(esIndex);

        try {
            AcknowledgedResponse deleteIndexResponse = getRestHighLevelClient().indices().delete(request, RequestOptions.DEFAULT);
            boolean acknowledged = deleteIndexResponse.isAcknowledged();
            if (acknowledged){
                System.out.println(String.format("索引%s已删除",esIndex));
            }else{
                System.out.println(String.format("索引%s删除失败",esIndex));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据id获取数据,返回map(字段名,字段值)
     * @param esIndex
     * @param id
     * @return
     */
    public Map<String,Object> getDataById(String esIndex,String id){
        GetRequest request = new GetRequest(esIndex, id);
        GetResponse response = null;
        Map<String,Object> source = null;
        try {
            response = getRestHighLevelClient().get(request,RequestOptions.DEFAULT);
            if (response.isExists()){
                source = response.getSource();

            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return source;


    }

    /**
     * 更新文档
     * @param esIndex
     * @param id
     * @param updateFileds 更新的字段名->字段值
     */
    public void updateDataById(String esIndex,String id,Map<String,Object> updateFileds){
        UpdateRequest request = new UpdateRequest(esIndex, id).doc(updateFileds);
        try {
            UpdateResponse response = getRestHighLevelClient().update(request, RequestOptions.DEFAULT);
            if (response.status()== RestStatus.OK){
                System.out.println(String.format("更新索引为%s,id为%s的文档成功",response.getIndex(),response.getId()));
            }else{
                System.out.println(String.format("更新索引为%s,id为%s的文档失败",response.getIndex(),response.getId()));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除指定id的文档
     * @param esIndex
     * @param id
     */
    public void deleteDataById(String esIndex,String id){
        DeleteRequest request = new DeleteRequest(esIndex,id);
        try {
            DeleteResponse response = getRestHighLevelClient().delete(request, RequestOptions.DEFAULT);
            if (response.getResult()== DocWriteResponse.Result.DELETED){
                System.out.println(String.format("id为%s的文档删除成功",id));
            }else{
                System.out.println(String.format("id为%s的文档删除失败",id));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 批量插入
     * @param esIndex
     * @param datalist  数据集,数据格式为map<字段名,字段值>
     */
    public void bulkLoad(String esIndex,List<Map<String,Object>> datalist){
        BulkRequest bulkRequest = new BulkRequest();
        for (Map<String,Object> data : datalist) {
            Object id = data.get("id");
            //如果数据包含id字段,使用数据id作为文档id
            if (id!=null){
                data.remove("id");
                bulkRequest.add(new IndexRequest(esIndex).id(id.toString()).source(data));
            }else{//让es自动生成id
                bulkRequest.add(new IndexRequest(esIndex).source(data));
            }
        }
        try {
            BulkResponse response = getRestHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
            System.out.println(response.hasFailures());
            if (!response.hasFailures()){
                System.out.println(String.format("索引%s批量插入成功,共插入%d条",esIndex,datalist.size()));
            }else{
                System.out.println(String.format("索引%s批量插入失败",esIndex));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        MyEsUtils myEsUtils = new MyEsUtils();
        Map<String ,String> filedsMap = new HashMap<String,String>();
        //filedsMap.put("area","Botswana");
        //filedsMap.put("item","Roots, Other");
        //filedsMap.put("indicatorname","Quantity");
        BoolQueryBuilder queryBuilder = myEsUtils.getQueryBuilder(filedsMap);
        List<String> glFields = new ArrayList<>();
        glFields.add("name");
        List<Map<String,Object>> list = myEsUtils.getPageResultList(queryBuilder, "nv_excel_pickup_origi",1,5, glFields);
        System.out.println(list.size());
        for (Map<String,Object> s : list) {
            System.out.println(s);
        }
        System.out.println("count:"+myEsUtils.getResultCount(queryBuilder,"nv_excel_pickup_origi"));
       /* HashMap<String, String> fileds = new HashMap<String, String>();
        fileds.put("name","keyword");
        fileds.put("age","long");
        fileds.put("create_time","keyword");

        createIndex("company3",1,0,fileds);*/
        //deleteIndex("company3");
       /* HashMap<String, Object> fileds = new HashMap<String, Object>();
        fileds.put("age",25);
        fileds.put("name","wangwu2");
        fileds.put("@version",3);*/


        //deleteDataById("company","4");
      /*  ArrayList<Map<String, Object>> datas = new ArrayList<Map<String, Object>>();
        HashMap<String, Object> data1 = new HashMap<String, Object>();
        data1.put("id",5);
        data1.put("name","zhangsan");
        data1.put("age",18);
        HashMap<String, Object> data2 = new HashMap<String, Object>();
        data2.put("id",7);
        data2.put("name","diao");
        data2.put("age",22);
        datas.add(data1);
        datas.add(data2);
        bulkLoad("company",datas);*/
        myEsUtils.closeClient();
    }

    public long deleteDataByQuery(String esIndex, QueryBuilder queryBuilder, int maxDocs) throws IOException {
        long deletedCount = 0L;
        DeleteByQueryRequest request = new DeleteByQueryRequest(esIndex);
        request.setQuery(queryBuilder);
        //最大设置10000
        request.setBatchSize(10000);
        //设置版本冲突时继续
        request.setConflicts("proceed");
        //最多处理文档数
        request.setMaxDocs(maxDocs);
        // 使用滚动参数来控制“搜索上下文”存活的时间
        //request.setScroll(TimeValue.timeValueMinutes(10));
        Long resultCount = getResultCount(queryBuilder, esIndex);
        System.out.println(String.format("待删除数据%d条",resultCount));
        int num = (int) Math.ceil((resultCount / (double) maxDocs));

        for (int i = 1; i <= num; i++) {
            BulkByScrollResponse bulkByScrollResponse = getRestHighLevelClient().deleteByQuery(request, RequestOptions.DEFAULT);
            deletedCount += bulkByScrollResponse.getDeleted();
        }
        return deletedCount;
    }

}

7、控制器

HighLevelRestController.java

package com.example.controller;

import com.alibaba.fastjson.JSON;
import com.example.common.ResponseBean;
import com.example.constant.Constant;
import com.example.model.BookDto;
import com.example.model.User;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import com.example.base.MyEsUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * HighLevelRestController
 * @author SongBin
 * @date 2019/8/14 16:24
 */
@RestController
@RequestMapping("/high")
public class HighLevelRestController {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(HighLevelRestController.class);

    @Autowired
    private MyEsUtils MyEsUtils;

    /**
     * 列表查询
     *
     * @param page
	 * @param rows
	 * @param keyword
     * @return com.example.common.ResponseBean
     * @throws
     * @author wliduo[i@dolyw.com]
     * @date 2019/8/15 16:01
     */
    @GetMapping("/user")
    public ResponseBean list(@RequestParam(defaultValue = "1") Integer page,
                             @RequestParam(defaultValue = "10") Integer rows,
                             String keyword) throws IOException {

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.wildcardQuery("name",keyword+"*"));
        List<Map<String,Object>> list = MyEsUtils.getPageResultList(boolQueryBuilder, "mto_user",page,rows);
        System.out.println(list.size());
        System.out.println("count:"+MyEsUtils.getResultCount(boolQueryBuilder,"mto_user"));
        long total = MyEsUtils.getResultCount(boolQueryBuilder,"mto_user");

        // 遍历封装列表对象
        List<User> userList = new ArrayList<>();
        for (Map<String,Object> map : list) {
            userList.add(JSON.parseObject(JSON.toJSONString(map), User.class));
        }
        // 封装Map参数返回
        Map<String, Object> result = new HashMap<String, Object>(16);
        result.put("count", total);
        result.put("data", userList);
        return new ResponseBean(HttpStatus.OK.value(), "查询成功", result);
    }
}

 


相关文章推荐

全部评论: 0

    我有话说: