一架梯子,一头程序猿,仰望星空!

Java Elasticsearch Update By Query API 批量更新


ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。

创建UpdateByQueryRequest对象

// 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
// 同时更新source1和source2索引内容
UpdateByQueryRequest request =
        new UpdateByQueryRequest("source1", "source2"); 

版本冲突

批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。

版本冲突解决方案如下:

  • proceed - 忽略版本冲突,继续执行
  • abort - 遇到版本冲突,中断执行
request.setConflicts("proceed"); 

设置查询条件

// 设置term查询条件,查询user字段=kimchy的文档内容
request.setQuery(new TermQueryBuilder("user", "kimchy")); 

ES的查询语法是非常丰富的,这里仅给出一种写法,JAVA ES查询用法请参考后续的章节。

限制更新文档数量

可以限制批量更新文档的数量

request.setMaxDocs(10); 

设置更新内容

Update by query api更新文档内容,仅支持通过脚本的方式修改文档内容。

request.setScript(
    new Script( // 创建inline脚本,使用painless语言。
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));

执行请求

BulkByScrollResponse bulkResponse =
        client.updateByQuery(request, RequestOptions.DEFAULT);

处理结果

TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
long totalDocs = bulkResponse.getTotal(); // 更新文档总数
long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数