canal学习及k8s集群部署

Monday, March 7, 2022

1. 概述

Canal 是阿里开源的一款 MySQL 数据库增量日志解析工具,提供增量数据订阅和消费,可以实现如下业务:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

Canal 的原理详见Wiki ,简单理解就是模拟 MySQL 的主从备份,工作流程如下:

  1. Canal 将自己伪装成一个 MySQL 的从服务器,向主服务器发送 dump 协议
  2. MySQL 主服务器收到 dump 协议后,推送日志给 Canal
  3. Canal 解析日志流

img

Canal 包含了如下几个服务:

  1. canal-server :Canal 服务端,核心工作都在此处
  2. canal-admin :Canal管理端,为Canal提供整体配置管理、节点运维等面向运维的功能,需要 Canal 在 1.14版本以上

Canal 当前已经支持 Prometheus、Kafka等,并且支持单机模式和集群模式两种部署方式,这里我们结合 MySQL、ZooKeeper、Kafka、ELasticsearch、Kibana 来试验。

2. 资源准备

2.1 部署方案

安装前先说明下当前的环境,规划一下部署方案,这里还是同之前安装 etcd 等服务一样,部署在 node1(usefulness=devops)上。

节点 角色 IP 配置 Label
master master, etcd 192.168.1.100 4核4G50G usefulness=schedule
node1 worker 192.168.1.101 8核32G100G usefulness=devops
node2 worker 192.168.1.102 8核12G100G usefulness=business
node3 worker 192.168.1.103 8核12G100G usefulness=business

2.2 域名

需要将以下域名都指向 node1,因为这里部署的所有服务以及traefik 部署在 node1(192.168.1.101) 上,并且traefik 监听了 node1 的80端口。

  1. kafka.local.com
  2. canal.local.com
  3. canal-admin.local.com
  4. kibana.local.com

2.3 依赖服务

依赖服务的部署之前都有介绍,这里列出链接,参照文档部署即可:

我之前已经部署好了,这里列出部署后的服务的内部dns或外部访问地址:

  • MySQL:mysql-primary.devops.svc.cluster.local:3306
  • ELasticsearch
    • k8s内部:elasticsearch-client.devops.svc.cluster.local:9200
    • k8s外部:es.local.com:4511
  • Kibana:kibana.local.com
  • Zookeeper:zookeeper.devops.svc.cluster.local:2181
  • Kafka
    • k8s内部:kafka.devops.svc.cluster.local:9092
    • k8s外部:kafka.local.com:19090、kafka.local.com:19091、kafka.local.com:19092

2.3 Canal

这里针对我部署Canal踩到的一些坑做个整理,部署的时候可以做个参考。

2.3.1 关于数据库
  1. MySQL 需要开启 binlog,如果是 8.0 以上的版本,默认已开启,并且设置为了 row 模式

    [mysqld]
    # 开启 binlog
    log-bin=mysql-bin
    # 选择 ROW 模式
    binlog-format=ROW
    # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    server_id=1
    
  2. 需要为 Canal 创建一个具有同步数据权限的账号 canal

    # canal
    CREATE USER canal IDENTIFIED BY 'si1n39zuybQ=';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
  3. 需要为 Canal Admin 创建一个访问数据库的账号密码 canal_admin

    # canal_admin
    CREATE USER canal_admin IDENTIFIED BY '2HcZUUTVyNo=';
    GRANT ALL PRIVILEGES ON canal_admin.* TO 'canal_admin'@'%';
    FLUSH PRIVILEGES;
    
  4. canal-admin 的默认数据库是 canal_manager,我修改为了 canal_admin,需要在部署前创建数据库并导入数据

    # 创建数据库
    create schema canal_admin;
    
  5. canal-admin 的数据库sql可以点此下载 ,需要注意,这份sql中默认向 canal_user 表写入了 admin 账号,密码是 123456,如果需要修改,导入前修改 sql 中的如下部分密码即可,此处的密码生成方式见下一条。

image-20220310151741475

  1. MySQL 生成密码的方法,结果中需要去掉最开始的 *

    # mysql 5.7.5 以下版本:
    select password('xxx');
    # mysql 5.7.5 以上版本:
    SELECT CONCAT('*', UPPER(SHA1(UNHEX(SHA1('xxx')))));
    
  2. 本次测试需要创建一个测试数据库和测试数据表

    # 创建数据库
    create schema canal_test;
    # 切换数据库
    use canal_test;
    # 创建数据表
    create table if not exists user
    (
        id         int(11) unsigned auto_increment comment 'ID',
        name       varchar(100)               not null comment '姓名',
        gender     tinyint(1)                 not null comment '性别,0:男,1:女',
        mobile     varchar(100)               not null comment '手机号',
        email      varchar(100)               not null comment '邮箱',
        created_at int(10) unsigned default 0 not null comment '创建时间',
        updated_at int(10) unsigned default 0 not null comment '更新时间',
        deleted_at int(10) unsigned default 0 not null comment '删除时间',
        primary key (`id`),
        index index_mobile (`mobile`),
        index index_email (`email`)
    ) engine = innodb
      default charset = utf8mb4
      collate = utf8mb4_general_ci comment '用户表';
    
2.3.2 关于账号密码

Canal 的密码比较绕,主要提现在账号密码较多以及明文密文配置,这里简单总结下:

  1. 所有密码的加密都使用上述MySQL 生成密码的方法进行加密
  2. 有的密码有时候需要配置明文,有时候又需要配置密文
# 类型 账号 密码 用处
1 数据库 canal si1n39zuybQ= 授权 Canal 作为从服务器访问MySQL
2 数据库 canal_admin 2HcZUUTVyNo= canal_admin访问数据库账号
3 web admin O8avksPO95s= canal-admin web 登录账号密码
4 tcp admin QLyPwzWazjg= canal server 连接 admin 时的账号密码
5 tcp canal dAsQZ2otzK4= 客户端连接 Server 时的账号密码,在 canal.properties 中配置

需要注意的是,3和4是两个账号密码,一个用于web界面登录,一个用于 canal server 连接 admin。

3. 部署Canal

官方已经为 Cannal 提供了docker镜像,这里我们自己使用官方镜像编写 Deployment(admin) 和 Statefulset(server) 来部署到 K8s 上。

3.1 部署 Canal Admin

Canal Admin 配置如下:

  1. admin_user 和 admin_password

    Canal Server 连接 admin 时的账号密码,admin 端配置的是明文,但 server 端配置的是密文。

  2. 数据库配置信息

    此处配置都是明文,需要注意,管理员密码需要合数据库中的一致,但是数据库中存储的是密文。

完整的配置如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-admin
  namespace: devops
data:
  admin_user: "admin"
  admin_password: "QLyPwzWazjg="
  datasource_address: "mysql-primary.devops.svc.cluster.local:3306"
  datasource_database: "canal_admin"
  datasource_username: "canal_admin"
  datasource_password: "2HcZUUTVyNo="

完整的资源清单如下,需要注意,部署的端口是 8089,这里部署了 ingressroute,将域名 canal-admin.local.com 的流量指定到了该服务,完整的资源清单如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-admin
  namespace: devops
data:
  admin_user: "admin"
  admin_password: "QLyPwzWazjg="
  datasource_address: "mysql-primary.devops.svc.cluster.local:3306"
  datasource_database: "canal_admin"
  datasource_username: "canal_admin"
  datasource_password: "2HcZUUTVyNo="

---

kind: Deployment
apiVersion: apps/v1
metadata:
  name: canal-admin
  namespace: devops
  labels:
    app.kubernetes.io/name: canal-admin
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: canal-admin
  template:
    metadata:
      name: canal-admin
      creationTimestamp: null
      labels:
        app.kubernetes.io/name: canal-admin
    spec:
      containers:
        - name: canal-admin
          image: 'canal/canal-admin:latest'
          imagePullPolicy: IfNotPresent
          ports:
            - name: web
              containerPort: 8089
              protocol: TCP
          env:
            - name: server.port
              value: '8089'
            - name: canal.adminUser
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: admin_user
            - name: canal.adminPasswd
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: admin_password
            - name: spring.datasource.address
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_address
            - name: spring.datasource.database
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_database
            - name: spring.datasource.username
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_username
            - name: spring.datasource.password
              valueFrom:
                configMapKeyRef:
                  name: canal-admin
                  key: datasource_password
          resources:
            requests:
              cpu: 250m
              memory: 256Mi
          livenessProbe:
            httpGet:
              path: /
              port: 8089
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /
              port: 8089
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
      restartPolicy: Always
      nodeSelector:
        usefulness: devops
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 1
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app.kubernetes.io/name: canal-admin
                namespaces:
                  - devops
                topologyKey: kubernetes.io/hostname
  revisionHistoryLimit: 10

---

kind: Service
apiVersion: v1
metadata:
  name: canal-admin
  namespace: devops
spec:
  ports:
    - protocol: TCP
      port: 8089
      targetPort: 8089
  selector:
    app.kubernetes.io/name: canal-admin

---

apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: canal-admin
  namespace: devops
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`canal-admin.local.com`)
      kind: Rule
      services:
        - name: canal-admin
          port: 8089

使用 kubectl apply 后结果如下,部署了一个 configmap、deployment、service 和 ingressroute。

root@master:/data/yaml/canal# kubectl apply -f admin.yml
configmap/canal-admin unchanged
deployment.apps/canal-admin created
service/canal-admin created
ingressroute.traefik.containo.us/canal-admin created

3.2 集群配置

Canal Admin 部署完成后,访问 canal-admin.local.com 即可访问管理后台,登入进去后,点击左侧的集群管理,之后点击新建集群,这里需要输入集群名称和ZooKeeper地址,我输入的集群名称是local,点击确定后可以看到新建的集群。

image-20220310182653062

这时候再点击集群后面操作按钮下的主配置,进行 canal.properties 配置,如果是单机版的话,该配置需要在 server 中维护,集群中需要在 admin 配置,server 连接上 admin 后统一拉取配置,如下:

image-20220310182522024

第一次配置时为配置为空,点击 载入模板 按钮,之后再进行编辑,这里需要修改如下几个地方:

  1. canal.user 和 cancal.passwd

    客户端访问 Canal Server 的账号密码,密码需要是密文配置,这里我配置的是 canal64D406545EB954F76A16939C5CA9A7F49C10DDF2,对应的明文是 dAsQZ2otzK4=

  2. canal.admin.passwd

​ 修改为 admin 的密码,这里的账号密码是 server 端连接的时候用的,默认账号密码都是 admin,其中密码是密文配置,对应的c anal server 的配置也是密文配置,这里我配置的是 15C038E1A3F6286C301FC986FE8E28CFFBD73C5B,对应的明文是 QLyPwzWazjg=

  1. canal.zkServers

​ 修改为 ZooKeeper 对应的访问地址,这里我配置的是 zookeeper.devops.svc.cluster.local:2181

  1. kafka.bootstrap.servers

    修改为 Kafka 对应的访问地址,这里我配置的是 kafka.devops.svc.cluster.local:9092

  2. canal.serverMode = kafka

    指定数据写入 Kafka

  3. canal.instance.global.spring.xml

    全局的spring配置方式的组件文件,需要配置成 classpath:spring/default-instance.xml

修改为上述项后点击保存,保存操作会导致 Canal Server 重启,完整的配置如下:

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = 64D406545EB954F76A16939C5CA9A7F49C10DDF2

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 15C038E1A3F6286C301FC986FE8E28CFFBD73C5B
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers = zookeeper.devops.svc.cluster.local:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = 
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = kafka.devops.svc.cluster.local:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = 

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

3.3 部署 Canal Server

这里我部署方式为 StatefulSet,replicas 配置为3,对应启动了3个Service,并且都绑定了节点端口,这样k8s外部也可以访问,对应的访问方式如下:

  • canal.local.com:11111
  • canal.local.com:11112
  • canal.local.com:11113

Canal Server 的配置如下:

  1. admin_user 和 admin_password

    Canal Server 连接 admin 时的账号密码,admin 端配置的是明文,但 server 端配置的是密文,需要和上一步集群配置中的 canal.admin.usercanal.admin.passwd 一致。

  2. admin_register_cluster

    上一步建好的集群名称,我配置的是 local

  3. admin_register_auto

    是否自动注册,如果配置为true,则会自动注册到集群中,否则需要在管理后台手动添加 Server 信息。

完整的配置如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-server
  namespace: devops
data:
  admin_manager: "canal-admin.devops.svc.cluster.local:8089"
  admin_port: "11110"
  admin_user: "admin"
  admin_password: "17F69641AD5AF38C9BE516C3D3AC78D2D672D802"
  admin_register_cluster: "local"
  admin_register_auto: "true"

完整的资源清单如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: canal-server
  namespace: devops
data:
  admin_manager: "canal-admin.devops.svc.cluster.local:8089"
  admin_port: "11110"
  admin_user: "admin"
  admin_password: "15C038E1A3F6286C301FC986FE8E28CFFBD73C5B"
  admin_register_cluster: "local"
  admin_register_auto: "true"

---

kind: StatefulSet
apiVersion: apps/v1
metadata:
  name: canal-server
  namespace: devops
  labels:
    app.kubernetes.io/name: canal-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app.kubernetes.io/name: canal-server
  template:
    metadata:
      name: canal-server
      creationTimestamp: null
      labels:
        app.kubernetes.io/name: canal-server
    spec:
      containers:
        - name: canal-server
          image: 'canal/canal-server:latest'
          imagePullPolicy: IfNotPresent
          ports:
            - name: tcp
              containerPort: 11111
              protocol: TCP
          env:
            - name: canal.admin.manager
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_manager
            - name: canal.admin.port
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_port
            - name: canal.admin.user
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_user
            - name: canal.admin.passwd
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_password
            - name: canal.admin.register.cluster
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_register_cluster
            - name: canal.admin.register.auto
              valueFrom:
                configMapKeyRef:
                  name: canal-server
                  key: admin_register_auto
          resources:
            requests:
              cpu: 250m
              memory: 256Mi
          livenessProbe:
            tcpSocket:
              port: 11112
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
          readinessProbe:
            tcpSocket:
              port: 11112
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 30
      restartPolicy: Always
      nodeSelector:
        usefulness: devops
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 1
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app.kubernetes.io/name: canal-server
                namespaces:
                  - devops
                topologyKey: kubernetes.io/hostname
  serviceName: canal-server
  revisionHistoryLimit: 10

如果配置的 canal.serverMode 为 tcp 的话,容器内会启动 11111 端口,这时可以创建几个 Service 进行访问,如果是 mq 模式,则不会启动该端口。

kind: Service
apiVersion: v1
metadata:
  name: canal-server-0-external
  namespace: devops
  labels:
    app.kubernetes.io/name: canal-server
    pod: canal-server-0
spec:
  ports:
    - name: tcp-canal-server
      protocol: TCP
      port: 11111
      targetPort: 11111
      nodePort: 11111
  selector:
    app.kubernetes.io/name: canal-server
    statefulset.kubernetes.io/pod-name: canal-server-0
  type: NodePort
  sessionAffinity: None
  externalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack

---

kind: Service
apiVersion: v1
metadata:
  name: canal-server-1-external
  namespace: devops
  labels:
    app.kubernetes.io/name: canal-server
    pod: canal-server-1
spec:
  ports:
    - name: tcp-canal-server
      protocol: TCP
      port: 11111
      targetPort: 11111
      nodePort: 11112
  selector:
    app.kubernetes.io/name: canal-server
    statefulset.kubernetes.io/pod-name: canal-server-1
  type: NodePort
  sessionAffinity: None
  externalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack

---

kind: Service
apiVersion: v1
metadata:
  name: canal-server-2-external
  namespace: devops
  labels:
    app.kubernetes.io/name: canal-server
    pod: canal-server-2
spec:
  ports:
    - name: tcp-canal-server
      protocol: TCP
      port: 11111
      targetPort: 11111
      nodePort: 11113
  selector:
    app.kubernetes.io/name: canal-server
    statefulset.kubernetes.io/pod-name: canal-server-2
  type: NodePort
  sessionAffinity: None
  externalTrafficPolicy: Cluster
  ipFamilies:
    - IPv4
  ipFamilyPolicy: SingleStack

使用 kubectl apply 后结果如下:

root@master:/data/yaml/canal# kubectl apply -f server.yml
configmap/canal-server created
statefulset.apps/canal-server created
root@master:/data/yaml/canal# kubectl apply -f service.yml
service/canal-server-0-external created
service/canal-server-1-external created
service/canal-server-2-external created

此时查看管理后台,可以看到三个 Server 已经加入到了集群中:

image-20220310194734139

3.4 创建 Instance

进入管理后台,左侧菜单点击 Instance 管理,点击新建instance,在表单页面中输入名称,选择集群/主机,点击载入模板后修改配置信息,配置中修改如下内容后,点击保存。

  1. canal.instance.master.address

    访问 MySQL 服务的连接信息,这里我配置的是 mysql-primary.devops.svc.cluster.local:3306

  2. canal.instance.dbUsername 和 canal.instance.dbPassword

    访问 MySQL 服务的账号密码,这里我配置的是 canalsi1n39zuybQ=

  3. canal.mq.topic

    写入消息队列的主题名称,这里我配置的是 canal_test

  4. canal.instance.filter.regex

    监听的数据库及表的正则信息,我配置的是 canal_test\..*

完整的配置如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=mysql-primary.devops.svc.cluster.local:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=si1n39zuybQ=
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=canal_test\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

因为选择的是集群,所以默认停止状态下没有分配主机,点击操作按钮中的启动,可以看到分配了主机:

image-20220310195429398

3.5 测试

这里我们使用 Golang 编写代码消费 Kafka 的主题 canal_test,通过手动执行 Sql,并观察每一次 sql 执行后 Kibana 中看到的数据。

Golang 代码启动时会检测 ELasticsearch 中是否存在索引 user,如果不存在会自动创建索引,我们先启动创建下索引,之后在 Kibana 上创建索引模式,这样就可以执行 Sql 观察结果,如下图:

image-20220311113252890

接着我们依次执行 Insert、Update 以及 Delete 三种类型的 Sql

3.5.1 Insert

Sql 如下:

insert into user (`name`, `gender`, `mobile`, `email`, `created_at`, `updated_at`, `deleted_at`) values ('周杰伦', '0', '13565654521', '[email protected]', 1646453618, 1646453618, 0), ('张学友', '0', '13565654522', '[email protected]', 1646453618, 1646453618, 0), ('林俊杰', '0', '13565654523', '[email protected]', 1646453618, 1646453618, 0), ('王力宏', '0', '13565654524', '[email protected]', 1646453618, 1646453618, 0), ('孙燕姿', '1', '13565654525', '[email protected]', 1646453618, 1646453618, 0), ('梁静茹', '1', '13565654526', '[email protected]', 1646453618, 1646453618, 0), ('谭维维', '1', '13565654527', '[email protected]', 1646453618, 1646453618, 0), ('五月天', '0', '13565654528', '[email protected]', 1646453618, 1646453618, 0), ('腾格尔', '0', '13565654529', '[email protected]', 1646453618, 1646453618, 0), ('陈奕迅', '0', '13565654530', '[email protected]', 1646453618, 1646453618, 0);

执行后观察 Kibana 结果,可以看到已经写入了10个用户:

image-20220311114820262

3.5.2 Update

Sql 如下:

update user set email = '[email protected]', updated_at = '1646453628'  where id = 1;

执行后观察 Kibana 结果,可以看到周杰伦的邮箱和更新时间都以变更:

image-20220311115032186

3.5.3 Delete

删除操作会删掉数据库中的数据,但是 Es 中的数据不会删除,只会变更 is_deleted 和 deleted_at 字段。

Sql 如下

delete from user where id = 1;

执行后观察 Kibana 结果,可以看到 is_deleted 和 deleted_at 字段都已更新:

image-20220311120033064

3.6 代码

完整的 Golang 代码如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/Shopify/sarama"
	"github.com/olivere/elastic/v7"
	"github.com/olivere/elastic/v7/config"
)

// esServer ES连接信息
var esServer = "http://es.local.com:19200"

// esIndex ES索引
var esIndex = "user"

// esUsername ES账号
var esUsername = "elastic"

// esPassword ES密码
var esPassword = "eYTmelHfmA4="

// esClient ES客户端
var esClient *elastic.Client

// kafkaServers Kafka连接信息
var kafkaServers = []string{
	"kafka.local.com:19090",
	"kafka.local.com:19091",
	"kafka.local.com:19092",
}

// kafkaTopic Kafka主题
var kafkaTopic = "canal_test"

// kafkaConsumer Kafka消费者
var kafkaConsumer sarama.Consumer

// 上下文
var ctx = context.Background()

func main() {
	var err error
	// 获取ES客户端
	esClient, err = getEsClient()
	if err != nil {
		panic(err)
	}
	// 获取Kafka消费者
	sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
	kafkaConsumer, _ = sarama.NewConsumer(kafkaServers, nil)
	partitions, _ := kafkaConsumer.Partitions(kafkaTopic)
	for partition := range partitions {
		go func(partition int32) {
			pc, _ := kafkaConsumer.ConsumePartition(kafkaTopic, partition, sarama.OffsetOldest)
			for v := range pc.Messages() {
				var kafkaMsg KafkaMsg
				_ = json.Unmarshal(v.Value, &kafkaMsg)
				for _, user := range kafkaMsg.Data {
					data, _ := json.Marshal(kafkaMsg.Data)
					fmt.Println(kafkaMsg.Type, kafkaMsg.Database, kafkaMsg.Table, string(data), kafkaMsg.Ts)
					if kafkaMsg.Type == "DELETE" {
						user.DeletedAt = fmt.Sprintf("%d", kafkaMsg.Ts/1000)
					}
					err = saveUserToEs(user)
					if err != nil {
						fmt.Printf("存入用户信息失败,error: %v\n", err)
					}
				}
			}

		}(int32(partition))
	}
	select {}
}

// KafkaMsg Kafka消息
type KafkaMsg struct {
	Data      []*User `json:"data"`
	Database  string  `json:"database"`
	Es        int64   `json:"es"`
	ID        int     `json:"id"`
	IsDdl     bool    `json:"isDdl"`
	MysqlType struct {
		ID        string `json:"id"`
		Name      string `json:"name"`
		Gender    string `json:"gender"`
		Mobile    string `json:"mobile"`
		Email     string `json:"email"`
		CreatedAt string `json:"created_at"`
		UpdatedAt string `json:"updated_at"`
		DeletedAt string `json:"deleted_at"`
	} `json:"mysqlType"`
	Old     interface{} `json:"old"`
	PkNames []string    `json:"pkNames"`
	SQL     string      `json:"sql"`
	SQLType struct {
		ID        int `json:"id"`
		Name      int `json:"name"`
		Gender    int `json:"gender"`
		Mobile    int `json:"mobile"`
		Email     int `json:"email"`
		CreatedAt int `json:"created_at"`
		UpdatedAt int `json:"updated_at"`
		DeletedAt int `json:"deleted_at"`
	} `json:"sqlType"`
	Table string `json:"table"`
	Ts    int64  `json:"ts"`
	Type  string `json:"type"`
}

// User 用户
type User struct {
	ID        string `json:"id"`
	Name      string `json:"name"`
	Gender    string `json:"gender"`
	Mobile    string `json:"mobile"`
	Email     string `json:"email"`
	CreatedAt string `json:"created_at"`
	UpdatedAt string `json:"updated_at"`
	DeletedAt string `json:"deleted_at"`
}

// esMapping ES索引映射
const esMapping = `
{
	"settings": {
		"number_of_shards": 1,
		"number_of_replicas": 0
	},
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "keyword"
			},
			"gender": {
				"type": "keyword"
			},
			"mobile": {
				"type": "keyword"
			},
			"email": {
				"type": "keyword"
			},
			"is_deleted": {
				"type": "keyword"
			},
			"created_at": {
				"type": "date"
			},
			"updated_at": {
				"type": "date"
			},
			"deleted_at": {
				"type": "date"
			}
		}
	}
}`

// getEsClient 获取ES客户端
func getEsClient() (*elastic.Client, error) {
	// 关闭嗅探器和健康检查
	off := false
	client, err := elastic.NewClientFromConfig(
		&config.Config{
			URL:         esServer,
			Index:       esIndex,
			Username:    esUsername,
			Password:    esPassword,
			Sniff:       &off,
			Healthcheck: &off,
		},
	)
	if err != nil {
		return nil, err
	}
	// 检测索引是否存在,不存在的话创建索引
	res, err := client.IndexExists(esIndex).Do(ctx)
	if err != nil {
		return nil, err
	}
	if !res {
		_, err = client.CreateIndex(esIndex).BodyString(esMapping).Do(ctx)
		if err != nil {
			return nil, err
		}
	}
	return client, nil
}

// saveUserToEs 用户信息保存到ES
func saveUserToEs(user *User) error {
	m := make(map[string]interface{})
	b, _ := json.Marshal(user)
	_ = json.Unmarshal(b, &m)
	m["deleted_at"] = "0"
	m["is_deleted"] = "否"
	m["gender"] = "男"
	if user.DeletedAt != "0" {
		m["is_deleted"] = "是"
		m["deleted_at"] = parseTime(user.DeletedAt)
	}
	if user.Gender != "0" {
		m["gender"] = "女"
	}
	m["created_at"] = parseTime(m["created_at"].(string))
	m["updated_at"] = parseTime(m["updated_at"].(string))
	b, _ = json.Marshal(m)
	_, err := esClient.Index().Index(esIndex).Id(user.ID).BodyString(fmt.Sprintf("%s", b)).Do(ctx)
	return err
}

// parseTime 解析时间
func parseTime(t string) string {
	tsp, _ := strconv.ParseInt(t, 10, 64)
	return time.Unix(tsp, 0).Format("2006-01-02T15:04:05")
}

测试日志如下:

[Sarama] 2022/03/11 12:01:45 Initializing new client
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 client/metadata fetching metadata for all topics from broker kafka.local.com:19091
[Sarama] 2022/03/11 12:01:45 Connected to broker at kafka.local.com:19091 (unregistered)
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2022/03/11 12:01:45 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2022/03/11 12:01:45 Successfully initialized new client
[Sarama] 2022/03/11 12:01:45 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2022/03/11 12:01:45 Connected to broker at 192.168.1.101:19090 (registered as #0)
[Sarama] 2022/03/11 12:01:45 consumer/broker/0 added subscription to canal_test/0
INSERT canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"2","name":"张学友"er":"0","mobile":"13565654522","email":"zhangxueyou@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"3","name":"林俊杰","gender":"0","mobile":"13565654523","email":"linjunjimail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"4","name":"王力宏","gender":"0","mobile":"13565654524","email":"wanglihong@gmail.com","created_at":"1646453618","updated_at":46453618","deleted_at":"0"},{"id":"5","name":"孙燕姿","gender":"1","mobile":"13565654525","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"6","name":"梁静茹nder":"1","mobile":"13565654526","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"7","name":"谭维维","gender":"1","mobile":"13565654527","email":"[email protected]","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"8","name":"五月天","gender":"0","mobile":"13565654528","email":"[email protected]","created_at":"1646453618","updated_at"646453618","deleted_at":"0"},{"id":"9","name":"腾格尔","gender":"0","mobile":"13565654529","email":"tenggeer@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"},{"id":"10","name":"陈奕gender":"0","mobile":"13565654530","email":"chenyixun@gmail.com","created_at":"1646453618","updated_at":"1646453618","deleted_at":"0"}] 1646969682003
UPDATE canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"jay@gmail.com","created_at":"1646453618","updated_at":"1646453628","deleted_at":"0"}] 1646970591004
DELETE canal_test user [{"id":"1","name":"周杰伦","gender":"0","mobile":"13565654521","email":"jay@gmail.com","created_at":"1646453618","updated_at":"1646453628","deleted_at":"0"}] 1646970723986

4. 总结

本篇文章是学习 Canal 的记录,先简单介绍了下 Canal 的原理,之后通过一个实例完整验证了下。Canal 的配置项比较多,文章篇幅有限,只简单介绍了部分重要的选项,完整的学习文档还请参考官方文档

Kubernetes Kubernetes Canal

Kafka学习笔记k8s搭建MySQL