elasticsearch 操作(1)RestHighLevelClient 插入更新

2022年1月2日 1920点热度 0人点赞 0条评论

1 java api代码

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;


public class ElasticSearchInsertUtils {

private RestHighLevelClient restHighLevelClient;
	
	public RestHighLevelClient getRestHighLevelClient() {
		return restHighLevelClient;
	}
	public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
		this.restHighLevelClient = restHighLevelClient;
	}
	private IndexRequest buildIndexRequest(String id, String json, OpType opType,
			RefreshPolicy refreshPolicy) {
		IndexRequest indexRequest = new IndexRequest();

		indexRequest.id(id);
		indexRequest.source(json, XContentType.JSON);
		if(opType!=null) {
			indexRequest.opType(opType);
		}
		if(refreshPolicy!=null) {
			indexRequest.setRefreshPolicy(refreshPolicy);
		}
		
		return indexRequest;
	}
	private void commonIndex(String index, String id, String json, OpType opType,
			RefreshPolicy refreshPolicy) throws IOException {

		IndexRequest indexRequest =this.buildIndexRequest(id, json, opType, refreshPolicy);
		indexRequest.index(index);
		
		IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
		RestStatus status = indexResponse.status();
        if (status == null || status.getStatus() < 200 || status.getStatus() > 300) {
			throw new IOException("更新索引失败:"+status.getStatus());
		}
	}
	
	public void index(String index,String id, String json) throws IOException {
		//存在就更新,否则就插入
		this.commonIndex(index, id, json, OpType.INDEX,RefreshPolicy.NONE);
	}
	
	public void index(String index,String id, String json,OpType opType) throws IOException {
		this.commonIndex(index, id, json, opType,RefreshPolicy.NONE);
	}
	
	public void index(String index,String id, String json,OpType opType,RefreshPolicy refreshPolicy) throws IOException {
		this.commonIndex(index, id, json, opType,refreshPolicy);
	}
	
	public void bulkIndex(String index,Map<String,String> idAndJson,RefreshPolicy refreshPolicy) throws IOException {
		BulkRequest bulkRequest=new BulkRequest();
		for (Map.Entry<String, String> entry : idAndJson.entrySet()) {
			String id = entry.getKey();
			String json = entry.getValue();
			bulkRequest.add(this.buildIndexRequest(id, json, null, null));
		}
		if(refreshPolicy!=null) {
			bulkRequest.setRefreshPolicy(refreshPolicy);
		}
		this.restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
	}
	
	
	public void bulkIndex(String index,Map<String,String> idAndJson) throws IOException {
		this.bulkIndex(index, idAndJson, null);
	}
	
	public void bulkIndex(String index,List<String> jsonList) throws IOException {
		BulkRequest bulkRequest=new BulkRequest();
		for (String json : jsonList) {
			bulkRequest.add(this.buildIndexRequest(null, json, null, null));
		}
		this.restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
	}

	public void update(String index,String id,String json) throws IOException {
		this.update(index, id, json, null);
	}
	/**
	 * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
	 * @param index
	 * @param id
	 * @param json
	 * @param refreshPolicy
	 * @throws IOException
	 */
	public void update(String index,String id,String json,RefreshPolicy refreshPolicy) throws IOException {
		UpdateRequest request= new UpdateRequest();
		request.index(index);
		request.id(id);
		request.doc(json,XContentType.JSON);
		//默认是true
		//默认情况下只有原来的source和新的source存在不同的字段情况下才会重建索引,
		//如果一模一样是不会触发重建索引的,如果将detect_noop=false不管内容有没有变化都会重建索引,
		//这一点可以通过version的值的变化来发现更新的文档,必须提前存在,除非你用upset+script来更新,
		//否则会报document missing异常。

		request.detectNoop(true);
		request.docAsUpsert(true);
		if(refreshPolicy!=null) {
			request.setRefreshPolicy(refreshPolicy);
		}
		UpdateResponse update = this.restHighLevelClient.update(request, RequestOptions.DEFAULT);
		RestStatus status = update.status();
		System.out.println(status);
		System.out.println(update.getResult());
	}
	
	/**
	 * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
	 * 使用painless 脚本更新
	 * Painless特点:
	 * 	性能优秀:Painless脚本运行速度比备选方案(包括Groovy)快几倍。
	 *	安全性强:使用白名单来限制函数与字段的访问,避免了可能的安全隐患。
	 *	可选输入:变量和参数可以使用显式类型或动态def类型。
	 *	上手容易:扩展了java的基本语法,并兼容groove风格的脚本语言特性。
	 *	特定优化:是ES官方专为Elasticsearch脚本编写而设计。
	 */
	public void updateByQuery(ElasticSearchQueryBuilder build,Map<String,Object> paramMap,boolean refreshPolicy) {
		UpdateByQueryRequest request = new UpdateByQueryRequest(build.getIndex());
		//设置批量大小
		request.setBatchSize(10);
		request.setAbortOnVersionConflict(false);
		request.setQuery(build.getBoolQueryBuilder());
		
		request.setSlices(1);
		request.setScroll(TimeValue.timeValueSeconds(10));
		request.setTimeout(TimeValue.timeValueSeconds(20));
		request.setRefresh(refreshPolicy);
		
		StringBuilder idOrCode= new StringBuilder();
		for (Map.Entry<String, Object> entry : paramMap.entrySet()) {
			String key = entry.getKey();
			idOrCode.append("ctx._source.").append(key)
			.append("=")
			.append("params.").append(key)
			.append(";");
		}
		
		request.setScript(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, idOrCode.toString(), paramMap));
		this.restHighLevelClient.updateByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
			
			@Override
			public void onResponse(BulkByScrollResponse response) {
				System.out.println(response.toString());
			}
			
			@Override
			public void onFailure(Exception e) {
				e.printStackTrace();
			}
		});
	}
}

2 根据ID,创建文档

# 创建文档
PUT esbook/_doc/5ffadd84e864482798fcfb76
{
"authors" : "主编 马鹏",
"page_number" : 372,
"isbn" : "978-7-111-46533-1",
"name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解",
"description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。",
"price" : 55.0,
"logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=",
"publish_date" : "2014-04-30T16:00:00",
"type" : "商业",
"source_code" : "14742",
"source" : "hz"
}

# 返回结果
{
  "_index" : "esbook",
  "_type" : "_doc",
  "_id" : "5ffadd84e864482798fcfb76",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

对应java代码调用

RestHighLevelClient client = ElasticSearchUtils.initElasticsearchClient("localhost", 9200, "http", null, null);
		ElasticSearchInsertUtils insert = new ElasticSearchInsertUtils();
		insert.restHighLevelClient = client;
		String json = "{\n"
				+ "    \"id\" : \"5ffadd84e864482798fcfb76\",\n"
				+ "    \"authors\" : \"主编 马鹏\",\n"
				+ "    \"page_number\" : 372,\n"
				+ "    \"isbn\" : \"978-7-111-46533-1\",\n"
				+ "    \"name\" : \"2015年MBA、MPA、MPAcc管理类联考英语历年真题详解\",\n"
				+ "    \"description\" : \"对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。\",\n"
				+ "    \"price\" : 55.0,\n"
				+ "    \"logo\" : \"http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=\",\n"
				+ "    \"publish_date\" : \"2014-04-30T16:00:00\",\n"
				+ "    \"type\" : \"商业\",\n"
				+ "    \"source_code\" : \"14742\",\n"
				+ "    \"source\" : \"hz\"\n"
				+ "}";
		
		
		
		System.out.println(json);
		insert.index("esbook","5ffadd84e864482798fcfb76",json);

3 创建文档 ID冲突场景

#创建文档指定Id。如果id已经存在,报错
PUT esbook/_doc/5ffadd84e864482798fcfb76?op_type=create
或者
PUT esbook/_create/5ffadd84e864482798fcfb76
{
    "id" : "5ffadd84e864482798fcfb76",
    "authors" : "主编 马鹏",
    "page_number" : 372,
    "isbn" : "978-7-111-46533-1",
    "name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解",
    "description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。",
    "price" : 55.0,
    "logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=",
    "publish_date" : "2014-04-30T16:00:00",
    "type" : "商业",
    "source_code" : "14742",
    "source" : "hz"
}
#返回
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])",
        "index_uuid": "BdZ0MgBCR1ij8fRQW5-Mlg",
        "shard": "0",
        "index": "esbook"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])",
    "index_uuid": "BdZ0MgBCR1ij8fRQW5-Mlg",
    "shard": "0",
    "index": "esbook"
  },
  "status": 409
}

#调用
insert.index("esbook","5ffadd84e864482798fcfb76",json,OpType.CREATE);

14:46:18.767 [main] DEBUG org.elasticsearch.client.RestClient - request [PUT http://localhost:9200/esbook/_create/5ffadd84e864482798fcfb76?version=-4&timeout=1m] returned [HTTP/1.1 409 Conflict]
Exception in thread "main" [esbook/BdZ0MgBCR1ij8fRQW5-Mlg][[esbook][0]] ElasticsearchStatusException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:187)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1869)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1626)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1583)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1553)
	at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:970)

调用_create api时,如果ID存在则会冲突。

4 更新操作


# 更新文档 直接覆盖原来的doc
#这里去掉 authors 字段
POST esbook/_doc/5ffadd84e864482798fcfb76
{
    "id":"5ffadd84e864482798fcfb76",
    "page_number" : 372,
    "isbn" : "978-7-111-46533-1",
    "name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解",
    "description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。",
    "price" : 55.0,
    "logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=",
    "publish_date" : "2014-04-30T16:00:00",
    "type" : "商业",
    "source_code" : "14742",
    "source" : "hz"
}

#利用脚本更新
#如果存在 更新,否则进行插入
POST esbook/_update/5ffadd84e864482798fcfb76_1
{
  "script": {
    "source": "ctx._source.authors = params.authors",
    "lang": "painless",
    "params": {
      "authors": "主编 马鹏"
    }
  },
  "upsert": {
    
  }
}

DELETE esbook/_doc/5ffab8fe8e77757307e37e8d

#如果存在 更新,否则进行插入
POST esbook/_update/5ffab8fe8e77757307e37e8d
{
  "scripted_upsert": true,
  "script": {
    "lang": "painless",
    "source": """
      if ( ctx.op == 'create' ) {
        ctx._source.id = params.id;ctx._source.authors=params.authors;ctx._source.name=params.name;ctx._source.description=params.description;ctx._source.price=params.price;ctx._source.logo=params.logo;ctx._source.source=params.source;ctx._source.page_number=params.page_number;ctx._source.publish_date=params.publish_date;ctx._source.source_code=params.source_code
      } else {
        ctx._source.new_price += params.price
      }
    """,
    "params": {
      "id" : "5ffab8fe8e77757307e37e8d",
    "authors" : "笨叔",
    "isbn" : "978-7-115-54999-0",
    "name" : "奔跑吧Linux内核(第2版)卷1:基础架构",
    "description" : "本书基于Linux 5.0内核的源代码讲述Linux内核中核心模块的实现。本书共9章,主要内容包括处理器架构、ARM64在Linux内核中的实现、内存管理之预备知识、物理内存与虚拟内存、内存管理之高级主题、内存管理之实战案例、进程管理之基本概念、进程管理之调度和负载均衡、进程管理之调试与案例分析。\r\n本书适合Linux系统开发人员、嵌入式系统开发人员及Android开发人员阅读,也可供计算机相关专业的师生阅读。\r\n",
    "price" : "118.15",
    "logo" : "https://cdn.ptpress.cn/pubcloud/bookImg/null/20210108CFD67AE8.jpg?x-oss-process=image/quality,q_10",
    "source" : "epubit",
    "page_number" : 598,
    "publish_date" : "2020-12-31T16:00:00",
    "source_code" : "UB7263761464b35"
    }
  },
  "upsert": {
    "new_price":"5"
  }
}

管理员

这个人很懒,什么都没留下

文章评论