一、前言
随着业务的发展,以往的离线批量计算方式,因为延迟太长已经不能满足需求,随着flink这种实时计算工具的出现,实时采集也成为大数据工作中非常重要的一环。
现今企业的数据来源大体分为两种:存储在各种关系数据库中的业务数据、网站或APP产生的用户行为日志数据
日志数据通过flume、kafka等工具已经可以实现实时采集,但关系数据库的同步仍然以批量为主。
当关系数据库的表数据达到一定程度,批量同步耗时太久,增量同步不能解决实时性的要求
mysql可以通过binlog进行实时同步,技术也比较成熟,但无法解决SQLserver、Oracle、postgresql等数据库的问题。
即便有kafka这种流数据分发订阅平台、flink这种实时计算平台,redis这种高效读写数据库,如果不能解决实时采集问题,那么一个整体的实时链路就无法实现。
幸好,国外有一款开源工具实现了市面上各种常见数据库的数据更新日志的获取,它就是debezium
插件模式
二、简介
Debezium是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以查看这些更改并对它们做出响应。Debezium在更改事件流中记录每个数据库表中的所有行级更改,应用程序只需读取这些流,即可按更改事件发生的顺序查看更改事件。
debezium有两种运行方式,一种是以插件的方式继承在kafka connect中,另外一种是以独立服务的方式运行(孵化中)
服务器模式
今天我们要介绍的就是插件模式。
三、部署
插件模式首先要求集群上已经安装好zookeeper和kafka,kafka可以连通上游数据库,我这里用flink消费kafka里的日志实时写入mysql
因此还需要部署好flink集群和mysql数据库
以上都具备之后,就可以开始部署debezium
1.下载安装包
#以mysql为例,下载debezium-connector-mysql-1.4.2.Final-plugin.tar.gz wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
在kafka安装文件夹创建一个connectors文件夹,将下载的debezium插件解压到connectors
2.创建topic
创建kafka connect所需要的三个topic: connect-offsets,connect-configs,connect-status
kafka-topics --zookeeper ip1:2181,ip2:2181,ip3:2181 --create --topic connect-status --replication-factor 2 --partitions 3
3.编写kafka connect配置文件
创建connect-distributed.properties,分发到所有节点
#kafka-connect配置文件 # kafka集群地址 bootstrap.servers=ip1:9092,ip2:9092,ip3:9092 # Connector集群的名称,同一集群内的Connector需要保持此group.id一致 group.id=connect-cluster # 存储到kafka的数据格式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # 内部转换器的格式,针对offsets、config和status,一般不需要修改 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # 用于保存offsets的topic,应该有多个partitions,并且拥有副本(replication) # Kafka Connect会自动创建这个topic,但是你可以根据需要自行创建 offset.storage.topic=connect-offsets offset.storage.replication.factor=2 offset.storage.partitions=3 # 保存connector和task的配置,应该只有1个partition,并且有多个副本 config.storage.topic=connect-configs config.storage.replication.factor=2 # 用于保存状态,可以拥有多个partition和replication status.storage.topic=connect-status status.storage.replication.factor=2 status.storage.partitions=3 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # RESET主机名,默认为本机 #rest.host.name= # REST端口号 rest.port=18083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= # 保存connectors的路径 #plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/opt/cloudera/parcels/CDH/lib/kafka/connectors
4.启动kafka-connect
备注:全部节点都要执行
cd /opt/cloudera/parcels/CDH/lib/kafka bin/connect-distributed.sh -daemon config/connect-distributed.properties ###jps 可看到 ConnectDistributed 进程
5.以POST URL方式提交连接请求
多个表名以逗号分隔,格式为db.table,参数中指定的topic是元数据topic,真正的topic名以server_name.db_name.table_name构成
POST:http://ip:18083/connectors Headers:Content-Type: application/json Body:{ "name" : "debezium-mysql", "config":{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "host", "database.port": "3306", "database.user": "username", "database.password": "password", "database.server.id" :"1739", "database.server.name": "mysql", "database.history.kafka.bootstrap.servers": "ip1:9092,ip2:9092,ip3:9092", "database.history.kafka.topic": "mysql.test", "database.whitelist": "test", "table.whitelist":"test.test_table2", "include.schema.changes" : "true" , "mode" : "incrementing", "incrementing.column.name" : "id", "database.history.skip.unparseable.ddl" : "true" } }
提交完成后以GET http://ip:18083/connectors获取连接器信息
由于debezium没有建topic的逻辑,因此kafka需要开放自动产生topic的配置
查看kafka是否产生相应的topic,更高源表的内容,如果topic中有对应的更改日志记录,任务就配置成功了
剩下的从kafka消费数据就有很多方式可以实现了
最新评论