1 java api代码
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
public class ElasticSearchInsertUtils {
private RestHighLevelClient restHighLevelClient;
public RestHighLevelClient getRestHighLevelClient() {
return restHighLevelClient;
}
public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
private IndexRequest buildIndexRequest(String id, String json, OpType opType,
RefreshPolicy refreshPolicy) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.id(id);
indexRequest.source(json, XContentType.JSON);
if(opType!=null) {
indexRequest.opType(opType);
}
if(refreshPolicy!=null) {
indexRequest.setRefreshPolicy(refreshPolicy);
}
return indexRequest;
}
private void commonIndex(String index, String id, String json, OpType opType,
RefreshPolicy refreshPolicy) throws IOException {
IndexRequest indexRequest =this.buildIndexRequest(id, json, opType, refreshPolicy);
indexRequest.index(index);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
RestStatus status = indexResponse.status();
if (status == null || status.getStatus() < 200 || status.getStatus() > 300) {
throw new IOException("更新索引失败:"+status.getStatus());
}
}
public void index(String index,String id, String json) throws IOException {
//存在就更新,否则就插入
this.commonIndex(index, id, json, OpType.INDEX,RefreshPolicy.NONE);
}
public void index(String index,String id, String json,OpType opType) throws IOException {
this.commonIndex(index, id, json, opType,RefreshPolicy.NONE);
}
public void index(String index,String id, String json,OpType opType,RefreshPolicy refreshPolicy) throws IOException {
this.commonIndex(index, id, json, opType,refreshPolicy);
}
public void bulkIndex(String index,Map<String,String> idAndJson,RefreshPolicy refreshPolicy) throws IOException {
BulkRequest bulkRequest=new BulkRequest();
for (Map.Entry<String, String> entry : idAndJson.entrySet()) {
String id = entry.getKey();
String json = entry.getValue();
bulkRequest.add(this.buildIndexRequest(id, json, null, null));
}
if(refreshPolicy!=null) {
bulkRequest.setRefreshPolicy(refreshPolicy);
}
this.restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
public void bulkIndex(String index,Map<String,String> idAndJson) throws IOException {
this.bulkIndex(index, idAndJson, null);
}
public void bulkIndex(String index,List<String> jsonList) throws IOException {
BulkRequest bulkRequest=new BulkRequest();
for (String json : jsonList) {
bulkRequest.add(this.buildIndexRequest(null, json, null, null));
}
this.restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
public void update(String index,String id,String json) throws IOException {
this.update(index, id, json, null);
}
/**
* https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
* @param index
* @param id
* @param json
* @param refreshPolicy
* @throws IOException
*/
public void update(String index,String id,String json,RefreshPolicy refreshPolicy) throws IOException {
UpdateRequest request= new UpdateRequest();
request.index(index);
request.id(id);
request.doc(json,XContentType.JSON);
//默认是true
//默认情况下只有原来的source和新的source存在不同的字段情况下才会重建索引,
//如果一模一样是不会触发重建索引的,如果将detect_noop=false不管内容有没有变化都会重建索引,
//这一点可以通过version的值的变化来发现更新的文档,必须提前存在,除非你用upset+script来更新,
//否则会报document missing异常。
request.detectNoop(true);
request.docAsUpsert(true);
if(refreshPolicy!=null) {
request.setRefreshPolicy(refreshPolicy);
}
UpdateResponse update = this.restHighLevelClient.update(request, RequestOptions.DEFAULT);
RestStatus status = update.status();
System.out.println(status);
System.out.println(update.getResult());
}
/**
* https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
* 使用painless 脚本更新
* Painless特点:
* 性能优秀:Painless脚本运行速度比备选方案(包括Groovy)快几倍。
* 安全性强:使用白名单来限制函数与字段的访问,避免了可能的安全隐患。
* 可选输入:变量和参数可以使用显式类型或动态def类型。
* 上手容易:扩展了java的基本语法,并兼容groove风格的脚本语言特性。
* 特定优化:是ES官方专为Elasticsearch脚本编写而设计。
*/
public void updateByQuery(ElasticSearchQueryBuilder build,Map<String,Object> paramMap,boolean refreshPolicy) {
UpdateByQueryRequest request = new UpdateByQueryRequest(build.getIndex());
//设置批量大小
request.setBatchSize(10);
request.setAbortOnVersionConflict(false);
request.setQuery(build.getBoolQueryBuilder());
request.setSlices(1);
request.setScroll(TimeValue.timeValueSeconds(10));
request.setTimeout(TimeValue.timeValueSeconds(20));
request.setRefresh(refreshPolicy);
StringBuilder idOrCode= new StringBuilder();
for (Map.Entry<String, Object> entry : paramMap.entrySet()) {
String key = entry.getKey();
idOrCode.append("ctx._source.").append(key)
.append("=")
.append("params.").append(key)
.append(";");
}
request.setScript(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, idOrCode.toString(), paramMap));
this.restHighLevelClient.updateByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
System.out.println(response.toString());
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
});
}
}
2 根据ID,创建文档
# 创建文档 PUT esbook/_doc/5ffadd84e864482798fcfb76 { "authors" : "主编 马鹏", "page_number" : 372, "isbn" : "978-7-111-46533-1", "name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解", "description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。", "price" : 55.0, "logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=", "publish_date" : "2014-04-30T16:00:00", "type" : "商业", "source_code" : "14742", "source" : "hz" } # 返回结果 { "_index" : "esbook", "_type" : "_doc", "_id" : "5ffadd84e864482798fcfb76", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1 }
对应java代码调用
RestHighLevelClient client = ElasticSearchUtils.initElasticsearchClient("localhost", 9200, "http", null, null);
ElasticSearchInsertUtils insert = new ElasticSearchInsertUtils();
insert.restHighLevelClient = client;
String json = "{\n"
+ " \"id\" : \"5ffadd84e864482798fcfb76\",\n"
+ " \"authors\" : \"主编 马鹏\",\n"
+ " \"page_number\" : 372,\n"
+ " \"isbn\" : \"978-7-111-46533-1\",\n"
+ " \"name\" : \"2015年MBA、MPA、MPAcc管理类联考英语历年真题详解\",\n"
+ " \"description\" : \"对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。\",\n"
+ " \"price\" : 55.0,\n"
+ " \"logo\" : \"http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=\",\n"
+ " \"publish_date\" : \"2014-04-30T16:00:00\",\n"
+ " \"type\" : \"商业\",\n"
+ " \"source_code\" : \"14742\",\n"
+ " \"source\" : \"hz\"\n"
+ "}";
System.out.println(json);
insert.index("esbook","5ffadd84e864482798fcfb76",json);
3 创建文档 ID冲突场景
#创建文档指定Id。如果id已经存在,报错
PUT esbook/_doc/5ffadd84e864482798fcfb76?op_type=create
或者
PUT esbook/_create/5ffadd84e864482798fcfb76
{
"id" : "5ffadd84e864482798fcfb76",
"authors" : "主编 马鹏",
"page_number" : 372,
"isbn" : "978-7-111-46533-1",
"name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解",
"description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。",
"price" : 55.0,
"logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=",
"publish_date" : "2014-04-30T16:00:00",
"type" : "商业",
"source_code" : "14742",
"source" : "hz"
}
#返回
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])",
"index_uuid": "BdZ0MgBCR1ij8fRQW5-Mlg",
"shard": "0",
"index": "esbook"
}
],
"type": "version_conflict_engine_exception",
"reason": "[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])",
"index_uuid": "BdZ0MgBCR1ij8fRQW5-Mlg",
"shard": "0",
"index": "esbook"
},
"status": 409
}
#调用
insert.index("esbook","5ffadd84e864482798fcfb76",json,OpType.CREATE);
14:46:18.767 [main] DEBUG org.elasticsearch.client.RestClient - request [PUT http://localhost:9200/esbook/_create/5ffadd84e864482798fcfb76?version=-4&timeout=1m] returned [HTTP/1.1 409 Conflict]
Exception in thread "main" [esbook/BdZ0MgBCR1ij8fRQW5-Mlg][[esbook][0]] ElasticsearchStatusException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[5ffadd84e864482798fcfb76]: version conflict, document already exists (current version [1])]]
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:187)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1869)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1626)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1583)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1553)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:970)
调用_create api时,如果ID存在则会冲突。
4 更新操作
# 更新文档 直接覆盖原来的doc
#这里去掉 authors 字段
POST esbook/_doc/5ffadd84e864482798fcfb76
{
"id":"5ffadd84e864482798fcfb76",
"page_number" : 372,
"isbn" : "978-7-111-46533-1",
"name" : "2015年MBA、MPA、MPAcc管理类联考英语历年真题详解",
"description" : "对联考历年逻辑真题和写作真题给予系统分析和解答,并给出常用解题思路的总结和归纳,使考生能够举一反三。",
"price" : 55.0,
"logo" : "http://images.hzcourse.com/resource/access/L29wZW5yZXNvdXJjZXMvdGVhY2hfZWJvb2svaW1hZ2UvMjAxNy8wNS84YWFiNzg4MDcyMjUwMDg5NGViZjExYzAyYmMxMmQwNi5qcGckOTc4NzExMTQ2NTMzMS5qcGc=",
"publish_date" : "2014-04-30T16:00:00",
"type" : "商业",
"source_code" : "14742",
"source" : "hz"
}
#利用脚本更新
#如果存在 更新,否则进行插入
POST esbook/_update/5ffadd84e864482798fcfb76_1
{
"script": {
"source": "ctx._source.authors = params.authors",
"lang": "painless",
"params": {
"authors": "主编 马鹏"
}
},
"upsert": {
}
}
DELETE esbook/_doc/5ffab8fe8e77757307e37e8d
#如果存在 更新,否则进行插入
POST esbook/_update/5ffab8fe8e77757307e37e8d
{
"scripted_upsert": true,
"script": {
"lang": "painless",
"source": """
if ( ctx.op == 'create' ) {
ctx._source.id = params.id;ctx._source.authors=params.authors;ctx._source.name=params.name;ctx._source.description=params.description;ctx._source.price=params.price;ctx._source.logo=params.logo;ctx._source.source=params.source;ctx._source.page_number=params.page_number;ctx._source.publish_date=params.publish_date;ctx._source.source_code=params.source_code
} else {
ctx._source.new_price += params.price
}
""",
"params": {
"id" : "5ffab8fe8e77757307e37e8d",
"authors" : "笨叔",
"isbn" : "978-7-115-54999-0",
"name" : "奔跑吧Linux内核(第2版)卷1:基础架构",
"description" : "本书基于Linux 5.0内核的源代码讲述Linux内核中核心模块的实现。本书共9章,主要内容包括处理器架构、ARM64在Linux内核中的实现、内存管理之预备知识、物理内存与虚拟内存、内存管理之高级主题、内存管理之实战案例、进程管理之基本概念、进程管理之调度和负载均衡、进程管理之调试与案例分析。\r\n本书适合Linux系统开发人员、嵌入式系统开发人员及Android开发人员阅读,也可供计算机相关专业的师生阅读。\r\n",
"price" : "118.15",
"logo" : "https://cdn.ptpress.cn/pubcloud/bookImg/null/20210108CFD67AE8.jpg?x-oss-process=image/quality,q_10",
"source" : "epubit",
"page_number" : 598,
"publish_date" : "2020-12-31T16:00:00",
"source_code" : "UB7263761464b35"
}
},
"upsert": {
"new_price":"5"
}
}
文章评论