您的位置:首页 > 其它

elasticsearch之modeling your data(not flat)--handing relationships

2014-11-11 14:51 405 查看
跟sql相比,es是另一个不同的世界。它带来了很多优势:性能,可扩展性,面向海量数据的准实时查询和分析,并且非常容易使用。但是它毕竟不是魔法。获得这些优势的同时,你需要去理解它的工作机制,并使之为我们服务。处理实体之间的关系并不像关系性数据存储一样明显。关系型数据的黄金规则---normalize your data--不再使用于es。以下将会分为处理关系数据、嵌套实体、父子关系三个方面介绍es对非扁平化数据的处理方式。之后会介绍design for scale,来确保你的数据模型能很好的扩展。最后我们讨论一种无法扩展的情形。1:handling relationships现实世界中存在很多关系数据:博客有评论数据,银行账户有交易数据,目录有文件数据等等。关系型数据库应运而生了:每一个实体可以通过主键唯一存储;实体只存储一次,修改只在一处,关联实体只需要主键即可;查询时可以进行实体关联,支持跨实体的查询;支持ACID等。但是关系型数据库也有自身的缺陷:对全文索引很难支持;实体之间的关联操作代价很大,而且关联实体越多,代价越大;如果是在不同机器上的实体关联就显得不太实际了,这就导致了单击存储的限制。Elasticsearch就像大多数NoSql数据库一样,把数据看作扁平的。一个索引就是一系列相互独立的docunent的集合。一个document应该包含能满足一个查询请求的所有数据。对单独的document的数据改变是ACID的,但是面向多个document的数据改变并不是ACID的。如果部分事务失败了,是没有办法将整个事务回滚的。数据扁平化也有它的优势:索引快速并且无锁;查询快速并且无锁;由于没一个document都是独立的,因此大量数据可以分布到不同的节点上。但是ralationship呢?我们需要将扁平化数据跟真实世界的数据之间的距离缩短。以下有四种方式来处理真实世界中的关系型数据。1.1 Application-side joins仿真一个关系型数据库来实现应用层的join。比如:我们index一份数据,用户和他的博客。
PUT /my_index/user/1 
{
  "name":     "John Smith",
  "email":    "john@smith.com",
  "dob":      "1970/10/24"
}

PUT /my_index/blogpost/2 
{
  "title":    "Relationships",
  "body":     "It's complicated...",
  "user":     1 
}
两个索引描述了类似关系型数据库中的两张表,用user做了关联。现在要查询某个用户的博客信息,需要执行两次查询:先获取用户id,然后获取博客信息。这一方案的好处显而易见:数据归一化,对一份数据的修改无需影响其他数据。缺陷也同样明显:需要执行多次查询。以上的例子如果某一个term能match到的user非常多,这个查询代价就比较高了。1.2 denormalizing your data要想提升查询的效率,当然就要使用elasticsearch的优势,将数据尽量扁平话,在一个doc中满足查询所需信息。
PUT /my_index/user/1
{
  "name":     "John Smith",
  "email":    "john@smith.com",
  "dob":      "1970/10/24"
}

PUT /my_index/blogpost/2
{
  "title":    "Relationships",
  "body":     "It's complicated...",
  "user":     {
    "id":       1,
    "name":     "John Smith" 
  }
}
以上将用户的名字放在了blogpost中去了,因此一次查询就可以完成任务。
优势就是提升了查询效率,缺陷就是修改数据的时候需要修改两处。
1.3 field collapsing
一个日常的应用场景是:对符合条件的查询结果针对某一个字段进行分组,每组选出相关性最强的信息。假设我们想要按照用户名进行分组,获取没一个用户相关性最强的博客信息。如何处理呢?按name进行分组用terms aggregation支持,该字段需要是not_analyzed的。因此:
PUT /my_index/_mapping/blogpost
{
  "properties": {
    "user": {
      "properties": {
        "name": { 
          "type": "string",
          "fields": {
            "raw": { 
              "type":  "string",
              "index": "not_analyzed"
            }
          }
        }
      }
    }
  }
}
user.name.raw将作为分组字段。
我们运行一个查询:user=john
GET /my_index/blogpost/_search?search_type=count 


{
  "query": { 


    "bool": {
      "must": [
        { "match": { "title":     "relationships" }},
        { "match": { "user.name": "John"          }}
      ]
    }
  },
  "aggs": {
    "users": {
      "terms": {
        "field":   "user.name.raw",      


        "order": { "top_score": "desc" } 


      },
      "aggs": {
        "top_score": { "max":      { "script":  "_score"           }}, 


        "blogposts": { "top_hits": { "_source": "title", "size": 5 }}  


      }
    }
  }
}
解释1:我们感兴趣的博客信息已经在aggregation中返回,所以此处的查询类型设置为count
解释2:满足条件的查询
解释3:分组字段,创建bucket
解释4,5:排序规则,按照sub aggr中的top_score进行排序,top_score是max类型的,因此是按照每一个bucket中_score的最大值对bucket进行排序。
解释6:top-hit aggr,限定了只返回source中的title字段,每一个bucket只返回5条记录。
top-hit在此处相当于执行了一个mini query,性能比较高。
1.4 denormalization and concurrency数据进行denormalization的缺陷也是显而易见的:索引会变大。但是当下这并不是一个大问题。写入磁盘的数据都是压缩过的,并且磁盘是相对便宜的。另外就是数据修改的问题,要修改多处地方。这个要根据具体的应用场景来确定数据如何denormalization,哪些字段做denormalization。当前的例子中,name是很少改变的,即使是改变,也可以借助于buck api完成。速度也会很快。然后,让我们看一个change data 非常频繁的例子。我们模拟一个文件系统,类型linux的文件目录。我们想要搜索某一个指定目录下符合某一些规则的文件。类似:grep “some text” /root/opt/*这需要我们对file的path进行索引:
PUT /fs/file/1
{
  "name":     "README.txt", 
  "path":     "/clinton/projects/elasticsearch", 
  "contents": "Starting a new Elasticsearch project is easy..."
}
我们还想实现类似的搜索: grep -r "some text" /opt
因此,我们还需要将path的层层目录做索引。借助path_hierarchy tokenizer。
PUT /fs
{
  "settings": {
    "analysis": {
      "analyzer": {
        "paths": { 
          "tokenizer": "path_hierarchy"
        }
      }
    }
  }
}
mapping:
PUT /fs/_mapping/file
{
  "properties": {
    "name": { 
      "type":  "string",
      "index": "not_analyzed"
    },
    "path": { 
      "type":  "string",
      "index": "not_analyzed",
      "fields": {
        "tree": { 
          "type":     "string",
          "analyzer": "paths"
        }
      }
    }
  }
}
name存储了file的名称,path存储了目录信息,path.tree存储了目录的层级关系。
因此我们执行以下查询:
GET /fs/file/_search
{
  "query": {
    "filtered": {
      "query": {
        "match": {
          "contents": "elasticsearch"
        }
      },
      "filter": {
        "term": { 
          "path.tree": "/clinton"
        }
      }
    }
  }
}
结果为/clinton及其子目录下文件内容可以匹配到"elasticsearch"的文件。
目前来看一切都是好的,rename一个文件也很easy,直接用update api或者index api即可。我们也可以rename一个目录,这就需要将目录下所有子目录及文件都要rename配合scan-and-scroll和bulk api可以完成,速度要看该目录下到底有多少文件了。这个过程并不是原子的,但是所有文件也会很快的移动到新的目录下。
1.5 solving concurrency issues
现在问题来了:如果同时有多个人在rename操作呢?
es并不支持ACID操作。如果主要的存储是关系型数据库,而es只是作为索引服务器使用的话,那ACID就在数据库完成,然后批量更新到es中即可。
如果不是这样,这些并行事务需要在es层面上解决了。还是老思路,用lock,es提供了三种lock:global lock, document locking, tree lock。
global lock:
大多数操作只是涉及到一小部分file而且会很快完成。如果一个top level 的目录做rename操作将会堵塞所有其他的change操作较长时间,但是这种操作发生的斌率很低。
因为es对单个document的操作是ACID的,因此我们可以创建一个global lock。
PUT /fs/lock/global/_create
{}
如果创建操作失败,说明已经有其他进程锁定了,只有等待。如果创建成功,则可以执行我们的操作了。
完成操作后,释放lock即可:
DELETE /fs/lock/global
document locking:
全局锁粒度太大,document locking 只是锁定我们将要处理的document。通常是通过scan and scroll来获取要操作的文档id,然后批量建立lock:
PUT /fs/lock/_bulk
{ "create": { "_id": 1}} 
{ "process_id": 123    } 
{ "create": { "_id": 2}}
{ "process_id": 123    }
...
lock的id跟doc的id保持一致,process_id是进程标识。
因为是批量建立lock,因此如果有些doc已经被锁,则bulk操作对应的操作项就会失败,整个操作就失败了,我们应该重试。在重试过程中,可以采用以下逻辑(因为有些lock你已经得到了,但是还是要校验一下,是不是你自己锁定的,借助与process_id即可):
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
如果没有锁则新建,借助upsert,如果有锁但不是自己的,则失败,继续重拾,如果是自己的,则op=noop,返回成功。
获取锁之后,完成操作,然后释放锁:
POST /fs/_refresh 

DELETE /fs/lock/_query
{
  "query": {
    "term": {
      "process_id": 123
    }
  }
}
refresh操作确保lock document对delete操作是可见的。
tree lock:
如果操作涉及到比较多的document,则会创建很多锁。tree lock在这种情形下只需要按照目录结构创建部分锁即可,exclusive lock对应具体的file,shared lock对应父亲目录的级别上:
{
  "lock_type":  "shared",
  "lock_count": 1 
}
获取锁的逻辑如下:
POST /fs/lock/clinton/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": "if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++"
}
这是在父目录级别获取锁的方式:没有就创建,有的话就根据类型确定是否共享,增加计数。
一旦我们成功获取了父目录的shareed lock,我们就可以接着获取我们实际要操作的file的lock:
PUT /fs/lock/clinton/projects/elasticsearch/README.txt/_create
{ "lock_type": "exclusive" }
获取成功后就可以操作数据了。
在这种情况下,如果有其他进程要对父目录进行rename,就需要创建exclusive lock:
PUT /fs/lock/clinton/_create
{ "lock_type": "exclusive" }
显然这个操作是会失败的,需要堵塞,等待其他进程释放锁。
释放锁逻辑如下:
先释放exclusive lock: DELETE /fs/lock/clinton/projects/elasticsearch/README.txt
再释放shared lock:
POST /fs/lock/clinton/projects/elasticsearch/_update
{
  "script": "if (--ctx._source.lock_count == 0) { ctx.op = 'delete' } "
}
逐层父目录释放。
以上策略只是针对数据模型可以抽象为"文件系统"的数据,并不适用于所有情况。
注意:如果持有锁的进程挂了,锁怎么释放?超出范畴了!这个需要应用进程在异常推出的时候去清理资源吧。

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐