数据可视化如果做到以下几点就属于比较好的可视化平台了
1)过程简单、生产效率要高(无论是研发、运营、产品都快可以快速的创建想要的图表)。
2)自定义程度高:展现形式多样可选、交互自定义程度高(各种筛选过滤)。
3)便于分享:数据的分享是数据价值体现的一个环节。
4)数据的评论和备注:备注能够记录异常和原因,评论利于保存讨论和分析过程(一个相对完善的系统而言这个功能方便实际使用)。
5)权限控制:自由总是相对的,相对分享就是控制,张弛有度必是这个系统长存的基石
实现上述的可视化系统并非易事,里面涉及到很多的环节。
但可视化做的比较好的tableau却给我们创建了一个比较好的典范。它在数据的分享、效率方面做得无可挑剔。
实际环境中却不一定能直接使用一些成熟的解决方案,还需要自己搭建。
如何快速的搭建一个数据可视化工具,桥接数据仓库、或底层数据。比较灵活多变的支持上层复杂多变的需求。
但如果只考虑一个数据存储方案,满足以下几个点会对后续数据展现
1)易于和大数据(hadoop/hive)的生态环境桥接
2)结构化和非结构化的中间状态。
3)分布式扩展性
es 的确这些都能满足
但相对传统的结构化存储其主要缺点是:不同于mongodb 和 mysql 关联查询并不支持 低版本的sql支持也比较差。
适用场景:适用于最终结果的展现(简单过滤)
实践:
1. hadoop集成
2. hive集成
2.1 hive集成的时候各种配置(生产环境使用过程中这个还是需要具体了解一下的)
2.2 hive 集成依赖jar
常见的问题:
1. 数据是否会丢,怎么保证数据完整性。
2. 并发如何控制,写入es
写入数据的api使用了es的http bulk接口,以下几个参数是控制这个bulk请求的这个地方用好了 数据就不会丢。
org.elasticsearch.hadoop.rest.bulk/BulkProcessor
elasticsearch-hadoop/mr/src/main/java/org/elasticsearch/hadoop/rest/bulk/BulkProcessor.java
http层可选配置
es.http.timeout(默认1m) 与Elasticsearch的HTTP / REST连接超时。
es.http.retries(默认3)建立(损坏的)http连接的重试次数。 使用Elasticsearch节点对每个会话应用重试。 重试耗尽后,连接将自动重新连接到下一个可用的Elasticsearch节点(基于es.nodes的声明,然后是已发现的节点 – 如果已启用)。
bulk接口可选配置
es.batch.size.bytes(默认1mb)Elasticsearch _bulk API进行批量写入的大小(以字节为单位)。
这个参数是具体任务实例的批量写入大小,一个job的整体并行写入量 应该是 es.batch.size.bytes*任务实例数目。
es.batch.size.entries(默认1000)使用Elasticsearch _bulk API进行批量写入的大小(控制条目) -(0禁用)。和es.batch.size.bytes一样,一旦匹配,就会执行批量更新。此设置是每个任务实例;在运行时乘以运行的Hadoop任务总数才是整体的并发。
es.batch.write.refresh(默认为true)是否在_bulk更新完成后调用索引刷新。请注意,只有在执行了整个写入(意味着多个批量更新)之后才会调用此方法。
es.batch.write.retry.count(默认3) Elasticsearch过载且数据被拒绝的情况下给定批次的重试次数。请注意,仅重试被拒绝的数据。如果在执行重试后仍有数据被拒绝,则Hadoop作业将被取消(并失败)。负值表示无限重试;设置此值时要小心,因为它可能会产生不必要的副作用。
es.batch.write.retry.limit 最大重试次数 默认50 count 和 limit 配合使用: 如果 count 设置-1 就是无限, count 大于0 两者取最大值。
es.batch.write.retry.wait(默认10秒)批量拒绝导致批量写入重试之间的等待时间。
数据完整性
按照默认设置,reject的的数据会被重试,任务失败数据会有部分写入,一次写入无法保证数据完整。
如果设置es.mapping.id 唯一id 和 es.write.operation = update 重新运行后能保证数据完整(没有数据重复)。
总之事务性是保证不了的,只能有条件的保证完整性
并发控制
单任务:
1) 要在任务层面进行控制
有reduce的任务 mapred.reduce.tasks 控制reduce任务的数目
只有map任务 mapred.max.split.size 控制map任务的数目
2) 结合_bulk的参数控制整体任务的并发。
多个任务:
1)例如 有100个任务,不知道会什么时候执行,如何控制并发?
多个任务实际上是没有好的控制方案的,单个任务的控制并不能影响到整体的并发。 如果是Mapreduce类的任务控制reduce数目控制并发。
但是如果利用前面的失败机制做es的反向代理仅对 _bulk请求进行流控,就能控制整体的并发 ,保障es的稳定。bulk reject后,客户端会默认等待10秒(默认配置 es.batch.write.retry.wait),利用这个机制。 反向代理服务根据 _bulk进行流控,返回bulk reject ,控制 bulk的集群任务并发。reject的处理会比较复杂,改写 elasticsearch-hadoop 也会比较复杂,原因是reject 是具体的某个item而不是整批数据。
以下 es-hadoop客户端如果对特殊httpcode(429)做处理,实现了retry和流控。
1) 改写的eshadoop 配合hive 和 mr 任务使用 https://github.com/whomm/elasticsearch-hadoop
2) 服务器反向代理 https://github.com/whomm/hrproxy 这两个配合即可实现bulk qps流控。
hive 表创建语句参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | CREATE external TABLE if not exists app.app_mytest_di ( date bigint comment "日期" ,concat_pk string comment "拼接主键" ,load_date string comment "数据加载时间" ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'app_mytest_di/app_mytest_di', 'es.nodes'='10.0.10.12:8999,10.0.10.13:8999', 'es.mapping.date.rich' = 'false', --读取es数据是否返回 时间对象 'es.nodes.discovery' = 'false', --不自动发现数据节点 'es.nodes.wan.only' = 'true', --仅使用配置的节点 好使用代理 'es.batch.write.retry.count' = '16', --重试次数 负数无限重试 默认值3 'es.batch.write.retry.limit' = '50', --最大重试次数 默认50 如果 count 设置-1 就是无限, count 大于0 count 和 limit 相比取最大值 作为请求重试上限 'es.batch.write.retry.wait' = '10', --重试等待时间 'es.batch.size.bytes' = '5mb', --每个批次上限大小 'es.batch.size.entries' = '1000', --每个批次上限数目 'es.mapping.id' = 'concat_pk', 'es.write.operation'='upsert' ); |
2. 分片如何设置。
对于少量指标类的数据,分片默认即可。
其他要单独配置
3. 副本如何设置。
双副本可完全适用大多数场景。
4. 数据类型的常见问题:
timestamp
text 和 keyword
1 timestamp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | CREATE external TABLE IF NOT EXISTS app.wmwtest ( DATE BIGINT comment "date" ,mydate string comment "拼接主键" ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'wmwtest/wmwtest', 'es.nodes'='10.0.12.108,10.0.11.110,10.0.12.110', 'es.port'='6335', 'http.port'='6335', 'es.mapping.date.rich' = 'false', ---这个配置挺实用 从es 返回数据的时候是否 返回日期时间对象 false 会返回 int 或 text类型的具体数据防止hive 查询错误 。 'es.mapping.id' = 'date', 'es.write.operation'='upsert' ); |
2. text 和 keyword
有些数据需要进行过滤 ,默认string类型的数据,es会进行分词, 需要创建预先创建index 设置 对应字段的类型防止其自动分词。
其他es存储优化
1)禁用_all字段
相关资料
- es中文社区
- es权威指南
- 搜索类型
- es在查询时,可以指定搜索类型为QUERY_THEN_FETCH,QUERY_AND_FEATCH,DFS_QUERY_THEN_FEATCH和DFS_QUERY_AND_FEATCH
- http://www.cnblogs.com/donlianli/p/3857500.html
- https://www.elastic.co/blog/understanding-query-then-fetch-vs-dfs-query-then-fetch