2019年12月21日 • 作者:DataPipeline
引言:2018年7月25日,DataPipeline CTO陈肃在第一期公开课上作了题为《从ETL到ELT,AI时代数据集成的问题与解决方案》的分享,本文根据陈肃分享内容整理而成。
大家好!很高兴今天有机会和大家分享一些数据集成方面的看法和应用经验。先自我介绍一下。我叫陈肃,博士毕业于中国科学院大学,数据挖掘研究方向。现在北京数见科技(DataPipeline)任 CTO。之前在中国移动研究院任职算法工程师和用户行为实验室技术经理,之后作为合伙人加入过一家互联网教育公司,从事智能学习方面的研发工作。
在毕业后工作的这多年以来,我大部分时候在做大数据和机器学习相关的应用系统研发工作,数据的整合是其中一个非常重要的环节。加入 DataPipeline 后,公司研发的是一款企业级的数据集成产品,旨在帮助企业一站式解决数据集成和元数据管理问题。
ELT 和 ETL 是数据集成的两种基本方式。前者专注于大数据的实时抽取和可靠传输,后者则包含了更丰富的数据转换功能。 由于今天是和 AI 前线的朋友们一起探讨数据集成,我主要结合 AI 应用的场景谈谈:为什么 ELT 是更适合 AI 应用场景的数据集成方案、采用 Kafka 技术栈来构建 ELT 平台所具备的优势和问题以及我们所做的一些优化工作。希望能够对大家的工作和学习有所帮助。
今天我的分享主要内容如上图:
首先,我会介绍一下 AI 应用中数据集成的典型场景,ETL 和 ELT 两种数据集成模式的异同点,以及为什么 AI 应用下更适合采用 ELT 模式。然后,我会花一些篇幅介绍数据集成中需要重点考虑的基本问题,以及我们所采用的底层平台——Kafka Connect 在解决这些问题上的优势和局限。
接下来,我会介绍 DataPipeline 对于 Kafka Connect 一些优化。有的是从底层做的优化,例如线程池的优化。有的则是从产品特性上的优化,例如错误数据队列。
最后,我们谈一谈 Kafka Connect 和 Kafka Stream 的结合,以及我们用 Kafka Stream 做数据质量预警方面的一个应用 Case。
一、AI 应用场景下的数据集成
数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,为企业提供全面的数据共享。AI 是典型的数据驱动应用,数据集成在其中起着关键的基础性作用。
以一个大家所熟悉的在线推荐服务为例,通常需要依赖三类数据:用户的属性 (年龄、性别、地域、注册时间等)、商品的属性(分类、价格、描述等)、用户产生的各类行为(登录、点击、搜索、加购物车、购买、评论、点赞、收藏、加好友、发私信等)事件数据。
随着微服务框架的流行,这三类数据通常会存在于不同的微服务中:“用户管理服务”储存着用户的属性、好友关系、登录等数据;“商品管理服务”存储的商品信息;“订单服务”存储着用户的订单数据;“支付服务”存储用户的支付数据;“评论服务”记录着用户的评论和点赞数据。为了实现一个推荐服务,我们首先需要让服务能够访问到这些数据。这种数据访问应该是非侵入式的,也就是说不能对原有系统的性能、稳定性、安全性造成额外的负担。因此,推荐服务不应当直接访问这些分散的数据源,而是应该通过某种方式将这些数据从各个业务子系统中提取出来,汇集到一个逻辑上集中的数据库 / 仓库,然后才能方便地使用机器学习框架(例如 Spark MLlib)来读取数据、训练和更新模型。
ETL 和 ELT 的区别与联系
数据集成包含三个基本的环节:Extract(抽取)、Transform(转换)、Load(加载)。
抽取是将数据从已有的数据源中提取出来,例如通过 JDBC/Binlog 方式获取 MySQL 数据库的增量数据;转换是对原始数据进行处理,例如将用户属性中的手机号替换为匿名的唯一 ID、计算每个用户对商品的平均打分、计算每个商品的购买数量、将 B 表的数据填充到 A 表中形成新的宽表等;加载是将数据写入目的地。
根据转换转换发生的顺序和位置,数据集成可以分为 ETL 和 ELT 两种模式。ETL 在数据源抽取后首先进行转换,然后将转换的结果写入目的地。ELT 则是在抽取后将结果先写入目的地,然后由下游应用利用数据库的聚合分析能力或者外部计算框架,例如 Spark 来完成转换的步骤。
为什么 ELT 更适合 AI 应用场景
首先这是由 AI 应用对数据转换的高度灵活性需求决定的。 绝大多数 AI 应用使用的算法模型都包括一个特征提取和变换的过程。根据算法的不同,这个特征提取可能是特征矩阵的简单的归一化或平滑处理,也可以是用 Aggregation 函数或 One-Hot 编码进行维度特征的扩充,甚至特征提取本身也需要用到其它模型的输出结果。这使得 AI 模型很难直接利用 ETL 工具内建的转换功能,来完成特征提取步骤。此外,企业现在很少会从零构建 AI 应用。当应用包括 Spark/Flink MLlib 在内的机器学习框架时,内建的模型库本身往往包含了特征提取和变换的逻辑,这使得在数据提取阶段就做复杂变换的必要性进一步降低。
其次,企业经常会基于同样的数据构建不同应用。 以我之前所在的一家在线教育公司为例,我们构建了两个 AI 的应用:其中一个是针对各类课程的推荐应用,主要用于增加用户的购买转化率。另外一个是自适应学习系统,用于评估用户的知识掌握程度和题目的难度和区分度,从而为用户动态地规划学习路径。两个应用都需要用户属性、做题记录、点击行为以及学习资料文本,但采用的具体模型的特征提取和处理方式完全不同。如果用 ETL 模式,我们需要从源端抽取两遍数据。而采用 ELT 模式,所有数据存储在 HBase 中,不同的应用根据模型需要过滤提取出所需的数据子集,在 Spark 集群完成相应的特征提取和模型计算,降低了对源端的依赖和访问频次。
最后,主流的机器学习框架,例如 Spark MLlib 和 Flink MLlib,对于分布式、并行化和容错都有良好的支持,并且易于进行节点扩容。 采用 ELT 模式,我们可以避免构建一个专有数据转换集群(可能还伴随着昂贵的 ETL 产品 License 费用),而是用一个通用的、易于创建和维护的分布式计算集群来完成所有的工作,有利于降低总体拥有成本,同时提升系统的可维护性和扩展性。
二、从 ETL 和 ELT 面临的主要问题
采用 ELT 模式,意味着可以较少的关注数据集成过程中的复杂转换,而将重点放在让数据尽快地传输上。然而,一些共性的问题依然需要得到解决:
数据源的异构性: 传统 ETL 方案中,企业要通过 ETL 工具或者编写脚本的方式来完成数据源到目的地同步工作。当数据源异构的时候,需要特别考虑 Schema(可以简单理解为数据字段类型)兼容性带来的影响。无论是 ETL 还是 ELT,都需要解决这一问题。
数据源的动态性: 动态性有两方面含义。一是如何获取数据源的增量;二是如何应对数据源端的 Schema 变化,例如增加列和删除列。
任务的可伸缩性: 当面对少量几个数据源,数据增量不过每日几百 MB 的时候,ELT 平台的可伸缩性不是什么大问题。当 ELT 面对的是成百上千个数据源,或者数据源数据增速很快时,ELT 平台的任务水平切分和多任务并行处理就成为一个必备的要求。平台不仅要支持单节点的多任务并行,还需要支持节点的水平扩展。此外,ELT 的上游通常会遇到一些吞吐能力较差的数据源,需要能够对读取进行限速,避免对现有业务产生影响。
任务的容错性:ELT 平台某些节点出现故障的时候,失败的作业必须能够迁移到健康的节点上继续工作。同时,作业的恢复需要实现断点重传,至少不能出现丢失数据,最好能够做到不产生重复的数据。
三、Kafka Connect 的架构
Kafka Connect:基于 Kafka 的 ELT 框架
可用于构建 ELT 的开源数据集成平台方案不止一种,较广泛采用的包括 Kafka Connect、DataX 等,也有公司直接采用 Flink 等流式计算框架。DataPipeline 作为一家提供企业数据集成产品的公司,我们在 Kafka Connect 之上踩了许多坑并且也做了许多优化。
四、踩过的坑与优化的点
Kafka Connect 应用于ELT的关键问题1
下面我们聊一聊 Kafka Connect 应用过程中的几个关键问题。
首先是 任务的限速和数据缓存问题。从 Kafka Connect 设计之初,就遵从从源端到目的地解耦性。当 Source 的写入速度长时间大于 Sink 端的消费速度时,就会产生 Kafka 队列中消息的堆积。如果 Kafka 的 Topic Retention 参数设置不当,有可能会造成数据在消费前被回收,造成数据丢失。Kafka Connect 框架本身并没有提供 Connector 级别的限速措施,需要进行二次开发。
Kafka Connect 应用于ELT的关键问题2
用户有多个数据源,或者单一数据源中有大量的表需要进行并行同步时,任务的并行化问题 就产生了。Kafka Connect 的 rebalance 是牵一发动全身,一个新任务的开始和停止都会导致所有任务的 reload。当任务数很多的时候,整个 Kafka Connect 集群可能陷入长达数分钟的 rebalance 过程。
解决的方法,一是用 CDC(Change Data Capture)来捕获全局的数据增量;二是 在任务内部引入多线程轮询机制,减少任务数量并提高资源利用率。
Kafka Connect 应用于ELT的关键问题3
异构数据源同步会遇到 Schema 不匹配 的问题。在需要精确同步的场景下(例如金融机构的异构数据库同步),通常需要 Case by Case 的去定义映射规则。而在 AI 应用场景下,这个问题并不是很突出,模型训练对于损失一点精度通常是可容忍的,一些数据库独有的类型也不常用。
Kafka Connect 应用于ELT的关键问题4
Source 端需要能够检测到 Schema 的变化,从而生成具有正确 Schema 格式的 Source Record。CDC 模式下,通过解析 DDL 语句可以获取到。非 CDC 模式下,需要保存一个快照才能够获取到这种变化。
下面我用一些时间对 DataPipeline 所做的优化和产品特性方面的工作。
DataPipeline 是一个底层使用 Kafka Connect 框架的 ELT 产品。首先,我们在底层上引入了 Manager 来进行全局化的任务管理。Manager 负责管理 Source Connector 和 Sink Connector 的生命周期,与 Kafka Connect 的管理 API 通过 REST 进行交互。
系统的任何运行异常,都会进行统一的处理,并由通知中心发送给任务的负责人和运维工程师。我们还提供了一个 Dashboard,用于图形化方式对任务进行生命周期管理、检索和状态监控。用户可以告别 Kafka Connect 的命令行。
DataPipeline的任务并行模型
DataPipeline 在任务并行方面做了一些加强。在 DataPipeline Connector 中,我们在每个 Task 内部定义和维护一个线程池,从而能够用较少的 Task 数量达到比较高的并行度,降低了 rebalance 的开销。 而对于 JDBC 类型的 Connector,我们额外允许配置连接池的大小,减少上游和下游资源的开销。此外,每个 Connector 还可以定义自己限速策略,以适应不同的应用环境需求。
DataPipeline 的错误队列机制
通过产品中错误队列预警功能,用户可以指定面对错误数据暂存和处理逻辑,比如错误队列达到某个百分比的时候任务会暂停,这样的设置可以保证任务不会因少量异常数据而中断,被完整记录下来的异常数据可以被管理员非常方便地进行追踪、排查和处理。
相比以前通过日志来筛查异常数据,这种错误队列可视化功能能够大大提升管理员的工作效率。
DataPipeline 的数据转换
DataPipeline 实现了自己的 动态加载机制。提供了两种 可视化的转换器:基本转换器和高级转换器。前者提供包括字段过滤、字段替换和字段忽略等功能;后者基于 Java,可以更加灵活地对数据处理,并且校验处理结果的 Schema 一致性。DataPipeline 还提供了数据采样和动态调试能力,方便用户进行表级别的转换规则开发。
值得注意的是,Kafka 不仅仅是一个消息队列系统,本身也提供了持久化能力。一个很自然的问题就是:能否不额外引入 Sink 端的外部存储,直接从 Kafka 中获取训练数据?
如果模型本身要用到某个 Topic 的全量数据或者最近一段时间的数据,那么通过设置合适的 retention 参数,可以直接将 Kafka 作为训练数据的来源。Kafka 的顺序读模式可以提供非常高的读取速度;如果模型要根据消息的内容做数据筛选,那么由于 Kafka 本身并不提供检索能力,需要遍历所有消息,这样就显得比较低效了。
当模型用于线上时,可能还需要引入流式计算来完成实时特征的提取工作。Kafka 本身就提供了这种流式计算能力。
流式计算在 ELT 中的作用 - 数据质量预警
DataPipeline 也将流式计算引入到平台的质量预警功能中。在我们的未来版本中,用户可以定义 Topic 级别的质量预警规则模型,例如“在 5 分钟时间内,数据记录的字段 1 均值超过历史均值记录的比率超过 70%”为异常,采取策略为“告警并暂停同步”。通过这种方式,可以在 ELT 的过程中,尽早发现数据中的异常现象,避免大量异常数据进入数据目的地。
五、总结与展望
最后总结一下。数据集成并不是什么新的概念,在过去二十多年间已经广泛应用于各个行业的信息系统。ELT 和 ETL 相比,最大的区别是“重抽取和加载,轻转换”,从而可以用更简单的技术栈、更轻量的方案搭建起一个满足现代企业应用的数据集成平台。AI 应用内在的特点也使得 ELT 特别适合这个场景。
Kafka Connect 本身是一个业界被广泛采用的 ELT 框架,针对容错、分布式、Schema 一致性等方面都提供了良好的支持,同时有大量的社区和商业资源可供参考和选择。DataPipeline 基于 Kafka Connect 做了大量数据集成场景下的优化,与 Kafka Stream 相结合,能够为包括 AI 在内的各种应用场景构建起一个完整的数据层支撑方案。
有其它关于数据集成的技术问题,也欢迎一起探讨、共同提高。
参考资料
· How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka
https://www.confluent.io/blog/
· Kafka Connect 官方文档
https://docs.confluent.io/current/connect/index.html
· Machine Learning + Kafka Streams Examples
https://github.com/kaiwaehner
· PredictionIO- 基于 Spark 的机器学习框架
http://predictionio.apache.org
Q & A
Q1:DataPipeline 避开了数据处理这个过程,并以此提高性能,这个思路很认可。但是有个问题:从数据生产到数据利用的环节中,总要有一步数据处理的步骤的,这个步骤,从产品角度,DataPipeline 是如何考虑的?
A1:ELT 的核心思想就是要利用下游数据存储性能大幅提升和机器学习应用的灵活性的优势,在数据流转的过程中不做过于复杂的计算。如果真的需要做处理,也可以基于我们的产品可以去写转换的代码。但这种处理都是无状态的。有状态处理,建议放到下游去做。这样才更符合 ELT 的理念。
Q2:请问数据的落地是自动的吗?
A2: 基于原生 Kafka Connector,需要命令行启动目标端类型的 Sink Connector,指定消费的 topic 列表,通过代码完成数据落地。基于 DataPipeline 产品,通过界面配置源和目的地后,落地是完全自动的。
Q3:多线程读,对源端的数据表或用户权限有没有特定的要求?
A3:JDBC 模式的 Source Connector 使用的 RDBMS 用户,需要具有选择同步表的 select 权限。CDC 模式的各不相同,参照产品内详尽的权限配置说明。
Q4:如何保证生产和消费的 EOS 刚好一次语义?
A4: Kafka Connect 下的 Exactly Once Semantic 依赖于具体 Connector 实现,Kafka Connect 框架本身对此只提供了必要非充分的支持。我们先来看 Source 端:假定 Source Connector 是从 MySQL 的 Binlog 中抽取数据到 Kafka,为了实现 EOS,首先 Source Connector 在每次提交记录到 Kafka 的时候,需要原子化的记录下来对应的 binlog position,这样才能保证任务异常中断、重启后能够从这个 position 继续读取。Kafka Connect 框架在 Source 端封装了 offset storage 的存储更新逻辑。offset storage 本质上是一个 Kafka 的 topic,利用 Kafka 的事务机制,理论上可以保证 offset 的修改和消息发送的原子性。再来看 Sink 端:如果 Sink Connector 可以将数据的输出和 Offset 的记录进行原子化操作,那么同理也能够做到 EOS。但这个原子化操作需要 Sink 端自己用某种机制实现,例如 Confluent 的 HDFS Connector 就用 WAL 日志保证了写入的 EOS。