Elasticsearch 高级操作

文档局部更新

Elasticsearch 文档操作 中我们说了一种通过检索,修改,然后重建整文档的索引方法来更新文档。这是对的。然而,使用update API,我们可以使用一个请求来实现局部更新,例如增加数量的操作。我们也说过文档是不可变的——它们不能被更改,只能被替换。update API必须 遵循相同的规则。表面看来,我们似乎是局部更新了文档的位置,内部却是像我们之前说的一样简单的使用update API处理相同的检索-修改-重建索引 流程,我们也减少了其他进程可能导致冲突的修改。最简单的update请求表单接受一个局部文档参数doc,它会合并到现有文档中——对象合并在一起,存在的标量字段被覆盖,新字段被添加。

  • 举个例子,我们可以使用以下请求为博客添加一个tags字段和一个views字段:
    POST /website/blog/1/_update
    {
       "doc" : {
          "tags" : [ "testing" ],
          "views": 0
       }
    }
    
  • 如果请求成功,我们将看到类似index请求的响应结果:
    {
       "_index" :   "website",
       "_id" :      "1",
       "_type" :    "blog",
       "_version" : 3
    }
    
  • 检索文档文档显示被更新的_source字段:
    {
       "_index":    "website",
       "_type":     "blog",
       "_id":       "1",
       "_version":  3,
       "found":     true,
       "_source": {
          "title":  "My first blog entry",
          "text":   "Starting to get the hang of this...",
          "tags": [ "testing" ], <1>
          "views":  0 <1>
       }
    }
    
  • <1> 我们新添加的字段已经被添加到_source字段中。

使用脚本局部更新

使用Groovy脚本

这时候当API不能满足要求时,Elasticsearch允许你使用脚本实现自己的逻辑。脚本支持非常多的API,例如搜索、排序、聚合和文档更新。脚本可以通过请求的一部分、检索特殊的.scripts索引或者从磁盘加载方式执行。

默认的脚本语言是Groovy,一个快速且功能丰富的脚本语言,语法类似于JavaScript。它在一个沙盒(sandbox) 中运行,以防止恶意用户毁坏Elasticsearch或攻击服务器。

你可以在《脚本参考文档》中获得更多信息。

  • 脚本能够使用update API改变_source字段的内容,它在脚本内部以ctx._source表示。例如,我们可以使用脚本增加博客的views数量:
    POST /website/blog/1/_update
    {
       "script" : "ctx._source.views+=1"
    }
    
  • 我们还可以使用脚本增加一个新标签到tags数组中。在这个例子中,我们定义了一个新标签做为参数而不是硬编码在脚本里。这允许Elasticsearch未来可以重复利用脚本,而不是在想要增加新标签时必须每次编译新脚本:
    POST /website/blog/1/_update
    {
       "script" : "ctx._source.tags+=new_tag",
       "params" : {
          "new_tag" : "search"
       }
    }
    
  • 获取最后两个有效请求的文档:
    {
       "_index":    "website",
       "_type":     "blog",
       "_id":       "1",
       "_version":  5,
       "found":     true,
       "_source": {
          "title":  "My first blog entry",
          "text":   "Starting to get the hang of this...",
          "tags":  ["testing", "search"], <1>
          "views":  1 <2>
       }
    }
    
  • <1> search标签已经被添加到tags数组。
  • <2> views字段已经被增加。
  • 通过设置ctx.opdelete我们可以根据内容删除文档:
    POST /website/blog/1/_update
    {
       "script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'",
        "params" : {
            "count": 1
        }
    }
    

更新可能不存在的文档

  • 想象我们要在Elasticsearch中存储浏览量计数器。每当有用户访问页面,我们增加这个页面的浏览量。但如果这是个新页面,我们并不确定这个计数器存在与否。当我们试图更新一个不存在的文档,更新将失败。在这种情况下,我们可以使用upsert参数定义文档来使其不存在时被创建。
    POST /website/pageviews/1/_update
    {
       "script" : "ctx._source.views+=1",
       "upsert": {
           "views": 1
       }
    }
    
  • 第一次执行这个请求,upsert值被索引为一个新文档,初始化views字段为1.接下来文档已经存在,所以script被更新代替,增加views数量。

更新和冲突

  • 这一节的介绍中,我们介绍了如何在检索(retrieve)重建索引(reindex) 中保持更小的窗口,如何减少冲突性变更发生的概率,不过这些无法被完全避免,像一个其他进程在update进行重建索引时修改了文档这种情况依旧可能发生。
  • 为了避免丢失数据,update API在检索(retrieve) 阶段检索文档的当前_version,然后在重建索引(reindex) 阶段通过index请求提交。如果其他进程在检索(retrieve)重建索引(reindex) 阶段修改了文档,_version将不能被匹配,然后更新失败。
  • 对于多用户的局部更新,文档被修改了并不要紧。例如,两个进程都要增加页面浏览量,增加的顺序我们并不关心——如果冲突发生,我们唯一要做的仅仅是重新尝试更新既可。
  • 这些可以通过retry_on_conflict参数设置重试次数来自动完成,这样update操作将会在发生错误前重试——这个值默认为0。
    POST /website/pageviews/1/_update?retry_on_conflict=5 <1>
    {
       "script" : "ctx._source.views+=1",
       "upsert": {
           "views": 0
       }
    }
    
  • <1> 在错误发生前重试更新5次
  • 这适用于像增加计数这种顺序无关的操作,但是还有一种顺序非常重要的情况。例如index API,使用**“保留最后更新(last-write-wins)”** 的update API,但它依旧接受一个version参数以允许你使用乐观并发控制(optimistic concurrency control) 来指定你要更细文档的版本。

检索多个文档

像Elasticsearch一样,检索多个文档依旧非常快。合并多个请求可以避免每个请求单独的网络开销。如果你需要从Elasticsearch中检索多个文档,相对于一个一个的检索,更快的方式是在一个请求中使用multi-get 或者mget API。

  • mget API参数是一个docs数组,数组的每个节点定义一个文档的_index_type_id元数据。如果你只想检索一个或几个确定的字段,也可以定义一个_source参数:
    POST /_mget
    {
       "docs" : [
          {
             "_index" : "website",
             "_type" :  "blog",
             "_id" :    2
          },
          {
             "_index" : "website",
             "_type" :  "pageviews",
             "_id" :    1,
             "_source": "views"
          }
       ]
    }
    
  • 响应体也包含一个docs数组,每个文档还包含一个响应,它们按照请求定义的顺序排列。每个这样的响应与单独使用 get request 响应体相同:
    {
       "docs" : [
          {
             "_index" :   "website",
             "_id" :      "2",
             "_type" :    "blog",
             "found" :    true,
             "_source" : {
                "text" :  "This is a piece of cake...",
                "title" : "My first external blog entry"
             },
             "_version" : 10
          },
          {
             "_index" :   "website",
             "_id" :      "1",
             "_type" :    "pageviews",
             "found" :    true,
             "_version" : 2,
             "_source" : {
                "views" : 2
             }
          }
       ]
    }
    
  • 如果你想检索的文档在同一个_index中(甚至在同一个_type中),你就可以在URL中定义一个默认的/_index或者/_index/_type
  • 你依旧可以在单独的请求中使用这些值:
    POST /website/blog/_mget
    {
       "docs" : [
          { "_id" : 2 },
          { "_type" : "pageviews", "_id" :   1 }
       ]
    }
    
  • 事实上,如果所有文档具有相同_index_type,你可以通过简单的ids数组来代替完整的docs数组:
    POST /website/blog/_mget
    {
       "ids" : [ "2", "1" ]
    }
    
  • 注意到我们请求的第二个文档并不存在。我们定义了类型为blog,但是ID为1的文档类型为pageviews。这个不存在的文档会在响应体中被告知。
    {
      "docs" : [
        {
          "_index" :   "website",
          "_type" :    "blog",
          "_id" :      "2",
          "_version" : 10,
          "found" :    true,
          "_source" : {
            "title":   "My first external blog entry",
            "text":    "This is a piece of cake..."
          }
        },
        {
          "_index" :   "website",
          "_type" :    "blog",
          "_id" :      "1",
          "found" :    false  <1>
        }
      ]
    }
    
  • <1> 这个文档不存在
  • 事实上第二个文档不存在并不影响第一个文档的检索。每个文档的检索和报告都是独立的。

注意:

尽管前面提到有一个文档没有被找到,但HTTP请求状态码还是200。事实上,就算所有文档都找不到,请求也还是返回200,原因是mget请求本身成功了。如果想知道每个文档是否都成功了,你需要检查found标志。

更新时的批量操作

就像mget允许我们一次性检索多个文档一样,bulk API允许我们使用单一请求来实现多个文档的createindexupdatedelete。这对索引类似于日志活动这样的数据流非常有用,它们可以以成百上千的数据为一个批次按序进行索引。

  • bulk请求体如下,它有一点不同寻常:
    { action: { metadata }}\n
    { request body        }\n
    { action: { metadata }}\n
    { request body        }\n
    ...
    

这种格式类似于用"\n"符号连接起来的一行一行的JSON文档流(stream) 。两个重要的点需要注意:

  • 每行必须以"\n"符号结尾,包括最后一行 。这些都是作为每行有效的分离而做的标记。
  • 每一行的数据不能包含未被转义的换行符,它们会干扰分析——这意味着JSON不能被美化打印。

提示:

在《批量格式》一章我们介绍了为什么bulk API使用这种格式。

action/metadata 这一行定义了文档行为(what action) 发生在哪个文档(which document) 之上。

行为(action) 必须是以下几种:

行为 解释
create 当文档不存在时创建之。详见《创建文档》
index 创建新文档或替换已有文档。见《索引文档》和《更新文档》
update 局部更新文档。见《局部更新》
delete 删除一个文档。见《删除文档》

在索引、创建、更新或删除时必须指定文档的_index_type_id这些元数据(metadata)

  • 例如删除请求看起来像这样:
    { "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
    
  • 请求体(request body) 由文档的_source组成——文档所包含的一些字段以及其值。它被indexcreate操作所必须,这是有道理的:你必须提供文档用来索引。
  • 这些还被update操作所必需,而且请求体的组成应该与update API(doc, upsert, script等等)一致。删除操作不需要请求体(request body)
    { "create":  { "_index": "website", "_type": "blog", "_id": "123" }}
    { "title":    "My first blog post" }
    
  • 如果定义_id,ID将会被自动创建:
    { "index": { "_index": "website", "_type": "blog" }}
    { "title":    "My second blog post" }
    
  • 为了将这些放在一起,bulk请求表单是这样的:
    POST /_bulk
    { "delete": { "_index": "website", "_type": "blog", "_id": "123" }} <1>
    { "create": { "_index": "website", "_type": "blog", "_id": "123" }}
    { "title":    "My first blog post" }
    { "index":  { "_index": "website", "_type": "blog" }}
    { "title":    "My second blog post" }
    { "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
    { "doc" : {"title" : "My updated blog post"} } <2>
    
  • <1> 注意delete行为(action) 没有请求体,它紧接着另一个行为(action)
  • <2> 记得最后一个换行符
  • Elasticsearch响应包含一个items数组,它罗列了每一个请求的结果,结果的顺序与我们请求的顺序相同:
    {
       "took": 4,
       "errors": false, <1>
       "items": [
          {  "delete": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "123",
                "_version": 2,
                "status":   200,
                "found":    true
          }},
          {  "create": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "123",
                "_version": 3,
                "status":   201
          }},
          {  "create": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "EiwfApScQiiy7TIKFxRCTw",
                "_version": 1,
                "status":   201
          }},
          {  "update": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "123",
                "_version": 4,
                "status":   200
          }}
       ]
    }}
    
  • <1> 所有子请求都成功完成。
  • 每个子请求都被独立的执行,所以一个子请求的错误并不影响其它请求。如果任何一个请求失败,顶层的error标记将被设置为true,然后错误的细节将在相应的请求中被报告:
    POST /_bulk
    { "create": { "_index": "website", "_type": "blog", "_id": "123" }}
    { "title":    "Cannot create - it already exists" }
    { "index":  { "_index": "website", "_type": "blog", "_id": "123" }}
    { "title":    "But we can update it" }
    
  • 响应中我们将看到create文档123失败了,因为文档已经存在,但是后来的在123上执行的index请求成功了:
    {
       "took": 3,
       "errors": true, <1>
       "items": [
          {  "create": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "123",
                "status":   409, <2>
                "error":    "DocumentAlreadyExistsException <3>
                            [[website][4] [blog][123]:
                            document already exists]"
          }},
          {  "index": {
                "_index":   "website",
                "_type":    "blog",
                "_id":      "123",
                "_version": 5,
                "status":   200 <4>
          }}
       ]
    }
    
  • <1> 一个或多个请求失败。
  • <2> 这个请求的HTTP状态码被报告为409 CONFLICT
  • <3> 错误消息说明了什么请求错误。
  • <4> 第二个请求成功了,状态码是200 OK

这些说明bulk请求不是原子操作——它们不能实现事务。每个请求操作时分开的,所以每个请求的成功与否不干扰其它操作。

不要重复

  • 你可能在同一个index下的同一个type里批量索引日志数据。为每个文档指定相同的元数据是多余的。就像mget API,bulk请求也可以在URL中使用/_index/_index/_type
    POST /website/_bulk
    { "index": { "_type": "log" }}
    { "event": "User logged in" }
    
  • 你依旧可以覆盖元数据行的_index_type,在没有覆盖时它会使用URL中的值作为默认值:
    POST /website/log/_bulk
    { "index": {}}
    { "event": "User logged in" }
    { "index": { "_type": "blog" }}
    { "title": "Overriding the default type" }
    

多大才算太大?

  • 整个批量请求需要被加载到接受我们请求节点的内存里,所以请求越大,给其它请求可用的内存就越小。有一个最佳的bulk请求大小。超过这个大小,性能不再提升而且可能降低。
  • 最佳大小,当然并不是一个固定的数字。它完全取决于你的硬件、你文档的大小和复杂度以及索引和搜索的负载。幸运的是,这个最佳点(sweetspot) 还是容易找到的:
  • 试着批量索引标准的文档,随着大小的增长,当性能开始降低,说明你每个批次的大小太大了。开始的数量可以在1000~5000个文档之间,如果你的文档非常大,可以使用较小的批次。
  • 通常着眼于你请求批次的物理大小是非常有用的。一千个1kB的文档和一千个1MB的文档大不相同。一个好的批次最好保持在5-15MB大小间。

结语

现在你知道如何把Elasticsearch当作一个分布式的文件存储了。你可以存储、更新、检索和删除它们,而且你知道如何安全的进行这一切。这确实非常非常有用,尽管我们还没有看到更多令人激动的特性,例如如何在文档内搜索。但让我们首先讨论下如何在分布式环境中安全的管理你的文档相关的内部流程。