Elasticsearch在数据可视化中的思考

数据可视化如果做到以下几点就属于比较好的可视化平台了
1)过程简单、生产效率要高(无论是研发、运营、产品都快可以快速的创建想要的图表)。
2)自定义程度高:展现形式多样可选、交互自定义程度高(各种筛选过滤)。
3)便于分享:数据的分享是数据价值体现的一个环节。
4)数据的评论和备注:备注能够记录异常和原因,评论利于保存讨论和分析过程(一个相对完善的系统而言这个功能方便实际使用)。
5)权限控制:自由总是相对的,相对分享就是控制,张弛有度必是这个系统长存的基石
实现上述的可视化系统并非易事,里面涉及到很多的环节。
但可视化做的比较好的tableau却给我们创建了一个比较好的典范。它在数据的分享、效率方面做得无可挑剔。
实际环境中却不一定能直接使用一些成熟的解决方案,还需要自己搭建。

如何快速的搭建一个数据可视化工具,桥接数据仓库、或底层数据。比较灵活多变的支持上层复杂多变的需求。
但如果只考虑一个数据存储方案,满足以下几个点会对后续数据展现
1)易于和大数据(hadoop/hive)的生态环境桥接
2)结构化和非结构化的中间状态。
3)分布式扩展性
es 的确这些都能满足
但相对传统的结构化存储其主要缺点是:不同于mongodb 和 mysql 关联查询并不支持 低版本的sql支持也比较差。
适用场景:适用于最终结果的展现(简单过滤)

实践:

1. hadoop集成
2. hive集成
      2.1 hive集成的时候各种配置(生产环境使用过程中这个还是需要具体了解一下的)
      2.2 hive 集成依赖jar

常见的问题:

1. 数据是否会丢,怎么保证数据完整性。
2. 并发如何控制,写入es
       写入数据的api使用了es的http bulk接口,以下几个参数是控制这个bulk请求的这个地方用好了 数据就不会丢。
       org.elasticsearch.hadoop.rest.bulk/BulkProcessor
       elasticsearch-hadoop/mr/src/main/java/org/elasticsearch/hadoop/rest/bulk/BulkProcessor.java
       http层可选配置
                 es.http.timeout(默认1m)   与Elasticsearch的HTTP / REST连接超时。
                 es.http.retries(默认3)建立(损坏的)http连接的重试次数。 使用Elasticsearch节点对每个会话应用重试。 重试耗尽后,连接将自动重新连接到下一个可用的Elasticsearch节点(基于es.nodes的声明,然后是已发现的节点 – 如果已启用)。
       bulk接口可选配置
                 es.batch.size.bytes(默认1mb)Elasticsearch _bulk API进行批量写入的大小(以字节为单位)。
这个参数是具体任务实例的批量写入大小,一个job的整体并行写入量 应该是 es.batch.size.bytes*任务实例数目。
                 es.batch.size.entries(默认1000)使用Elasticsearch _bulk API进行批量写入的大小(控制条目) -(0禁用)。和es.batch.size.bytes一样,一旦匹配,就会执行批量更新。此设置是每个任务实例;在运行时乘以运行的Hadoop任务总数才是整体的并发。
                 es.batch.write.refresh(默认为true)是否在_bulk更新完成后调用索引刷新。请注意,只有在执行了整个写入(意味着多个批量更新)之后才会调用此方法。
                es.batch.write.retry.count(默认3) Elasticsearch过载且数据被拒绝的情况下给定批次的重试次数。请注意,仅重试被拒绝的数据。如果在执行重试后仍有数据被拒绝,则Hadoop作业将被取消(并失败)。负值表示无限重试;设置此值时要小心,因为它可能会产生不必要的副作用。
                es.batch.write.retry.limit 最大重试次数 默认50 count 和 limit 配合使用: 如果 count 设置-1 就是无限, count 大于0 两者取最大值。
                es.batch.write.retry.wait(默认10秒)批量拒绝导致批量写入重试之间的等待时间。
数据完整性
       按照默认设置,reject的的数据会被重试,任务失败数据会有部分写入,一次写入无法保证数据完整。
如果设置es.mapping.id 唯一id 和 es.write.operation = update 重新运行后能保证数据完整(没有数据重复)。
总之事务性是保证不了的,只能有条件的保证完整性
并发控制
      单任务:
      1) 要在任务层面进行控制
              有reduce的任务 mapred.reduce.tasks 控制reduce任务的数目
              只有map任务 mapred.max.split.size 控制map任务的数目
      2) 结合_bulk的参数控制整体任务的并发。
              多个任务:
              1)例如 有100个任务,不知道会什么时候执行,如何控制并发?
              多个任务实际上是没有好的控制方案的,单个任务的控制并不能影响到整体的并发。 如果是Mapreduce类的任务控制reduce数目控制并发。
               但是如果利用前面的失败机制做es的反向代理仅对 _bulk请求进行流控,就能控制整体的并发 ,保障es的稳定。bulk reject后,客户端会默认等待10秒(默认配置 es.batch.write.retry.wait),利用这个机制。 反向代理服务根据 _bulk进行流控,返回bulk reject ,控制 bulk的集群任务并发。reject的处理会比较复杂,改写 elasticsearch-hadoop 也会比较复杂,原因是reject 是具体的某个item而不是整批数据。
                以下 es-hadoop客户端如果对特殊httpcode(429)做处理,实现了retry和流控。
                1) 改写的eshadoop 配合hive 和 mr 任务使用 https://github.com/whomm/elasticsearch-hadoop
                2)  服务器反向代理 https://github.com/whomm/hrproxy 这两个配合即可实现bulk qps流控。
hive 表创建语句参考:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE external TABLE if not exists app.app_mytest_di (
date bigint comment "日期"
,concat_pk string comment "拼接主键"
,load_date string comment "数据加载时间"
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'app_mytest_di/app_mytest_di',
'es.nodes'='10.0.10.12:8999,10.0.10.13:8999',
'es.mapping.date.rich' = 'false',    --读取es数据是否返回 时间对象
'es.nodes.discovery' = 'false',  --不自动发现数据节点
'es.nodes.wan.only' = 'true',     --仅使用配置的节点 好使用代理
'es.batch.write.retry.count' = '16',  --重试次数 负数无限重试 默认值3
'es.batch.write.retry.limit' = '50',  --最大重试次数 默认50     如果 count 设置-1 就是无限, count 大于0 count 和 limit 相比取最大值 作为请求重试上限
'es.batch.write.retry.wait' = '10',   --重试等待时间
'es.batch.size.bytes' = '5mb',  --每个批次上限大小
'es.batch.size.entries' = '1000', --每个批次上限数目
'es.mapping.id' = 'concat_pk',
'es.write.operation'='upsert'
);
2. 分片如何设置。
         对于少量指标类的数据,分片默认即可。
         其他要单独配置
3. 副本如何设置。
          双副本可完全适用大多数场景。
4. 数据类型的常见问题:
      timestamp
      text 和 keyword
     1 timestamp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE external TABLE IF NOT EXISTS app.wmwtest (
DATE BIGINT comment "date"
,mydate string comment "拼接主键"
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'wmwtest/wmwtest',
'es.nodes'='10.0.12.108,10.0.11.110,10.0.12.110',
'es.port'='6335',
'http.port'='6335',
'es.mapping.date.rich' = 'false', ---这个配置挺实用 从es 返回数据的时候是否 返回日期时间对象 false 会返回 int 或 text类型的具体数据防止hive 查询错误 。
'es.mapping.id' = 'date',
'es.write.operation'='upsert'
);
     2. text 和 keyword
           有些数据需要进行过滤 ,默认string类型的数据,es会进行分词, 需要创建预先创建index 设置 对应字段的类型防止其自动分词。
其他es存储优化
1)禁用_all字段

相关资料

发表评论

邮箱地址不会被公开。 必填项已用*标注