携程是全球领先的一站式旅行平台,现有员工约30000人,公司旗下的平台可面向全球用户提供一套完整的旅行产品、服务及差异化的旅行内容。携程大住宿部是国内最大的酒店分销电子商务平台,在全球拥有约63万家国内酒店和70万家国际酒店。携程大住宿数据智能平台中70%的实时数据场景已经接入StarRocks,查询响应速度平均在200ms左右,超过500ms的慢查询数大幅度减少,同时人力和硬件成本大大降低。后续会将剩余的实时场景和离线场景全部迁入StarRocks。
(作者:史文俊 携程大住宿数据智能部资深开发工程师,负责携程大住宿数据智能平台的研发)
平台现状
大住宿数据智能平台(简称HData)是一个为携程大住宿业务提供数据可视化的平台,简而言之,就是用图表的形式更为直观地展示与解读数据,帮助业务获得知识和洞察,形成正确的决策,做出快速决策,少犯错误。在大住宿内部,每个部门关心的指标侧重点会不同,权限控制也都会不一样,所以数据展示的方式也是多样化。
HData每天有将近2200左右的UV,10w左右的PV来访问我们的系统,而节假日期间的访问量基本都会翻2到3倍。
从2018年开始使用ClickHouse以来,我们90%的业务线都强依赖于ClickHouse,95%左右的接口响应时长都在1s以内,ClickHouse强悍的查询性能得到了充分体现。
现在总数据行数大概700亿左右,每天有超过2000个左右的流程,需要更新的数据行数大概有150亿左右。
未压缩前的数据总容量:8T,压缩后的数据总容量:1.75T。
但是ClickHouse无法支持高并发查询的缺陷也很明显,现在CPU大部分情况下消耗是在30%以内,不过当有用户大量查询时CPU使用率可能就会被拉的很高。并且如果出现一个复杂的高消耗查询,只靠人工手刷,可能在很短的时间内就可以把40C的CPU使用率打满:
工作日的早上9点一般会有一波访问高峰,为了保持系统稳定,我们采用主动建立缓存+用户被动触发缓存的机制来降低ClickHouse服务器的压力。
一方面我们会将一些高频访问的页面查询结果进行缓存。另一方面,在离线数据更新完成后,我们通过分析用户行为数据,主动给最近5天来访问过相关数据的用户缓存默认条件的数据,降低波峰。
现在的主动缓存+被动缓存取代了原本需要从ClickHouse上很大一部分的查询量,这样可以避免我们无限的扩容服务器。同时也可以把因为集中并发的查询拉起来的峰刺打平。
现阶段痛点
在节假日期间,实时数据是关注的重点,以今年劳动节为例,实时看板的访问量要比平时高10倍左右。
工作日期间,CPU使用率一般不会超过30%。
节假日期间,CPU使用率一度超过70%,这对服务器的稳定性造成了很大隐患。
面对这种情况,一方面我们在前端做了节流来防止用户高频查询,同时在后端也做了缓存,但是实时数据的缓存时间不能太久,一般1~2分钟已经是用户可接受的极限。通过下图可以发现,离线数据的缓存命中率一般都会比较高,基本能达到50%以上甚至更高,但对于实时数据,命中率则只有10%左右:
另一方面,我们在服务端启用了分流机制:实际应用场景中有一些业务的权限比较小,对应需要查询的数据量也会比较小,我们通过分析定义出了一个阈值,比如权限数小于5000的用户从MySQL请求数据,这部分用户即使通过MySQL查询速度也很快。让权限大的用户通过ClickHouse请求数据,这样可以引流很大一部分用户。
这样做虽然暂时解决了眼下的问题,不过新的问题又接踵而至:
·数据需要双写到ClickHouse和MySQL,无法保证两边数据的一致性
·同时存在两套数据,导致硬件成本增加
·ClickHouse不支持标准SQL语法,所以代码也需要维护两套,开发成本增加
针对上述问题的挑战,我们的目标是寻求一个新的OLAP引擎来减少开发和运维成本,同时还要兼顾查询性能,并在高并发和高吞吐的场景下有较好的适用性。
为此我们尝试了一些市面上其他引擎,如Ingite、CrateDB、Kylin等,每种引擎从硬件成本或性能上都有自己特有的优势,不过综合到使用场景,最终我们选择了StarRocks。
StarRocks介绍
·StarRocks是一个高性能分布式关系型列式数据库,通过MPP执行框架,单节点每秒可处理多达100亿行数据,同时支持星型模型和雪花模型。
·StarRocks集群由FE和BE构成,可以使用MySQL客户端访问StarRocks集群。
·FE接收MySQL客户端的连接,解析并执行SQL语句,管理元数据,执行SQL DDL命令,用Catalog记录库、表、分区,tablet副本等信息。
·BE管理tablet副本,tablet是table经过分区分桶形成的子表,采用列式存储。BE受FE指导,创建或删除子表。
·BE接收FE分发的物理执行计划并指定BE coordinator节点,在BE coordinator的调度下,与其他BE worker共同协作完成执行。
·BE读本地的列存储引擎,获取数据,通过索引和谓词下沉快速过滤数据。
我们选择StarRocks主要基于以下几方面的考虑:
1.亚秒级查询延时
2.在高并发查询、多表关联等复杂多维分析场景有良好的性能表现
3.支持弹性扩展,扩容不影响线上业务,后台自动完成数据rebalance
4.集群中服务有热备,多实例部署,节点的宕机、下线、异常都不会影响集群服务的整体稳定性。
5.支持物化视图和Online Schema Change
6.兼容MySQL协议,支持标准的SQL语法
性能测试
HData上的数据以多表关联为主,在这种场景下,ClickHouse单机性能相比集群性能要好,因而在这里选取ClickHouse单机做对比。下面用3个测试用例分别对StarRocks和ClickHouse进行对比,我们用6台虚拟机构建成了一个集群,3台FE、BE混部,3台BE,机器配置如下:
软件版本:StarRocks标准版1.16.2
ClickHouse配置如下:
软件版本:ClickHouse20.8
测试用例1
·StarRocks用时:547ms
·ClickHouse用时:1814ms
测试用例2
·StarRocks用时:126ms
·ClickHouse用时:142ms
测试用例3
·StarRocks用时:387ms
·ClickHouse用时:884ms
可以看到,StarRocks的查询性能完全不逊色于ClickHouse,甚至更快。
数据更新机制
StarRocks根据摄入数据和实际存储数据之间的映射关系,将数据表的明细表,聚合表和更新表,分别对应有明细模型,聚合模型和更新模型。
·明细模型:表中存在主键重复的数据行,和摄入数据行一一对应,用户可以召回所摄入的全部历史数据。
·聚合模型:表中不存在主键重复的数据行,摄入的主键重复的数据行合并为一行,这些数据行的指标列通过聚合函数合并,用户可以召回所摄入的全部历史数据的累积结果,但无法召回全部历史数据。
·更新模型:聚合模型的特殊情形,主键满足唯一性约束,最近摄入的数据行,替换掉其他主键重复的数据行。相当于在聚合模型中,为数据表的指标列指定的聚合函数为REPLACE,REPLACE函数返回一组数据中的最新数据。
·StarRocks系统提供了5种不同的导入方式,以支持不同的数据源(如HDFS、Kafka、本地文件等),或者按不同的方式(异步或同步)导入数据。
·Broker Load:Broker Load通过Broker进程访问并读取外部数据源,然后采用MySQL协议向StarRocks创建导入作业。适用于源数据在Broker进程可访问的存储系统(如HDFS)中。
·Spark Load:Spark Load通过Spark资源实现对导入数据的预处理,提高StarRocks大数据量的导入性能并且节省StarRocks集群的计算资源。
·Stream Load:Stream Load是一种同步执行的导入方式,通过HTTP协议发送请求将本地文件或数据流导入到StarRocks中,并等待系统返回导入的结果状态,从而判断导入是否成功。
·Routine Load:Routine Load提供了一种自动从指定数据源进行数据导入的功能。用户通过MySQL协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如Kafka)中读取数据并导入到StarRocks中。
·Insert Into:类似MySQL中的Insert语句,可以通过INSERT INTO tbl SELEC...或INSERT INTO tbl VALUES(...)等语句插入数据。
·HData中的数据主要分为实时数据和离线T+1数据。
实时数据主要通过Routine load的方式导入,以使用更新模型为主
离线T+1数据主要使用Zeus平台,通过Stream load的方式导入,以使用明细模型为主
实时数据通过携程自研的消息队列系统QMQ实现,下图是原先的实时数据导入流程:
接入StarRocks后的实时数据导入流程:
很快我们就遇到了一个难题:有一个场景是订阅订单状态变化的消息,下游我们以订单号作为主键,使用更新模型来将数据落地。对外我们提供订单状态为非取消的数据进行展示。
在收到消息后,我们还需要调用外部接口来补全一些其他字段,最后再把数据落地。但如果收到一条消息就调用一次接口,这么做会对接口造成压力,所以我们采取了批处理的方式。
不过这样做产生了一个问题:Kafka本身无法保证全局消息是有序的,只能保证partition内的有序性。同一个批次同一个订单,但订单状态不同的2条数据如果分别落在了不同的partition,routine load时无法保证哪条数据会先被消费。如果订单状态为取消的消息先被消费,而其他订单状态的消息后被消费,这样会造成原本应该取消的订单重新变成了非取消订单,从而影响统计的准确性。
我们也考虑过不通过QMQ而改用原生的Kafka,将订单号作为key来指定发送到哪个partition中,不过这样做需要二次开发,而且改动的成本也不低。
为了解决这个问题,我们选择了一个折中的办法:在消息落地同时,又用明细模型落地了一个日志表,表里只需要存订单号、订单状态以及消息发送时间。同时,有一个定时任务每隔一段时间会对该表内相同订单号的数据进行排序,取消息发送时间最新的一条数据,用订单号与正式表中订单状态不一致的数据进行匹配然后进行更新,以这样的形式对数据进行一个补偿。
T+1数据我们通过携程自研的数据同步平台Zeus进行ETL和导入:
DR和高可用
携程对DR有着很高的要求,每隔一段时间都会有公司级的DR演练。StarRocks本身已经具备了十分优秀的DR机制,在此基础之上,我们构建了一套适合自己的高可用体系:
·服务器分别部署在2个机房,以5:5的流量对外提供服务。对外提供服务的FE节点的负载均衡以配置项的形式实现,可以动态修改,实时生效(主要是考虑有服务器打补丁、版本升级等需要手动拉出的情况)。
·每个FE和BE进程全部都用supervisor进行进程守护,保证进程出现意外退出时可以被自动拉起。
·当FE节点出现故障时,存活的follower会立即选举出一个新的leader节点提供服务,但是应用端却无法立即感知,为了应对这种情况,我们起了一个定时任务,每隔一段时间对FE服务器进行health check,一旦发现FE节点故障,则立即将故障节点拉出集群,同时以短信方式通知开发人员。
·当BE节点出现故障时,StarRocks内部会自动进行副本均衡,对外仍可继续提供服务,同时我们也会有一个定时任务对其进行health check,每当发现有BE节点故障,则会以邮件形式通知开发人员。
·同时,我们针对每台服务器的硬件指标也配置了告警,通过携程自研的智能告警中台,一旦服务器的CPU、Mem、磁盘空间等指标发生异常,开发人员可以立即感知并介入。
总结和后期规划
现在HData中70%的实时数据场景已经接入StarRocks,查询响应速度平均在200ms左右,耗时500ms以上的查询只占总查询量的1%;并且数据和代码也只需要维护一套,人力和硬件成本大大降低。
后期规划
·将剩余的实时场景全部迁入StarRocks。
·离线场景也逐渐迁入StarRocks,逐步用StarRocks来统一OLAP分析全场景。
·进一步完善对StarRocks的监控机制,使其更健壮。
·通过读取Hive外表的形式做数据冷热分离,减少硬件成本。
|