當前位置:
首頁 > 知識 > 增量同步mysql資料庫信息到ElasticSearch

增量同步mysql資料庫信息到ElasticSearch

1.簡介

1.1 canal介紹

Canal是一個基於MySQL二進位日誌的高性能數據同步系統。Canal廣泛用於阿里巴巴集團(包括https://www.taobao.com),以提供可靠的低延遲增量數據管道,github地址:https://github.com/alibaba/canal

Canal Server能夠解析MySQL binlog並訂閱數據更改,而Canal Client可以實現將更改廣播到任何地方,例如資料庫和Apache Kafka。

它具有以下功能:

支持所有平台。

支持由Prometheus提供支持的細粒度系統監控。

支持通過不同方式解析和訂閱MySQL binlog,例如通過GTID。

支持高性能,實時數據同步。(詳見Performance)

Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持

Docker支持。缺點:

不支持全量更新,只支持增量更新。

完整wiki地址:https://github.com/alibaba/canal/wiki

1.2 運作原理

原理很簡單:

Canal模擬MySQL的slave的交互協議,偽裝成mysql slave,並將轉發協議發送到MySQL Master伺服器。

MySQL Master接收到轉儲請求並開始將二進位日誌推送到slave(即canal)。

Canal將二進位日誌對象解析為自己的數據類型(原始位元組流)如圖所示:

1.3 同步es

在同步數據到es的時候需要使用適配器:canal adapter。目前最新版本1.1.3,下載地址:https://github.com/alibaba/canal/releases。

目前es貌似支持6.x版本,不支持7.x版本!!!

2.準備工作

2.1 es和jdk

安裝es可以參考:https://www.dalaoyang.cn/article/78

安裝jdk可以參考:https://www.dalaoyang.cn/article/16

2.2 安裝canal server

下載canal.deployer-1.1.3.tar.gz

解壓文件

進入解壓後的文件夾

修改conf/example/instance.properties文件,主要注意以下幾處:

canal.instance.master.address:資料庫地址,例如127.0.0.1:3306

canal.instance.dbUsername:資料庫用戶

canal.instance.dbPassword:資料庫密碼完整內容如下:

回到canal.deployer-1.1.3目錄下,啟動canal:

sh bin/startup.sh

查看日誌:

vi logs/canal/canal.log

查看具體instance日誌:

vi logs/example/example.log

關閉命令

sh bin/stop.sh

2.3 安裝canal-adapter

下載canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz

解壓

tar -zxvf canal.adapter-1.1.3.tar.gz

進入解壓後的文件夾

cd canal.adapter-1.1.3

修改conf/application.yml文件,主要注意如下內容,由於是yml文件,注意我這裡說明的屬性名稱:

server.port:canal-adapter埠號

canal.conf.canalServerHost:canal-server地址和ip

canal.conf.srcDataSources.defaultDS.url:資料庫地址

canal.conf.srcDataSources.defaultDS.username:資料庫用戶名

canal.conf.srcDataSources.defaultDS.password:資料庫密碼

canal.conf.canalAdapters.groups.outerAdapters.hosts:es主機地址,tcp埠

完整內容如下:

server:

port: 8081

spring:

jackson:

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT 8

default-property-inclusion: non_null

canal.conf:

mode: tcp

canalServerHost: 127.0.0.1:11111

batchSize: 500

syncBatchSize: 1000

retries: 0

timeout:

accessKey:

secretKey:

srcDataSources:

defaultDS:

url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true

username: root

password: 12345678

canalAdapters:

- instance: example

groups:

- groupId: g1

outerAdapters:

- name: es

hosts: 127.0.0.1:9300

properties:

cluster.name: elasticsearch

另外需要配置conf/es/*.yml文件,adapter將會自動載入conf / es下的所有.yml結尾的配置文件。在介紹配置前,需要先介紹一下本案例使用的表結構,如下:

CREATE TABLE `test` (

`id` int(11) NOT NULL,

`name` varchar(200) NOT NULL,

`address` varchar(1000) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

需要手動去es中創建索引,比如這裡使用es-head創建,如下圖:

test索引結構如下:

{

mappings:{

_doc:{

properties:{

name:{

type:text

},

address:{

type:text

}

}

}

}

}

接下來創建test.yml(文件名隨意),內容很好理解_index為索引名稱,sql為對應語句,內容如下:

dataSourceKey: defaultDS

destination: example

groupId:

esMapping:

_index: test

_type: _doc

_id: _id

upsert: true

sql: select a.id as _id,a.name,a.address from test a

commitBatch: 3000

配置完成後,回到canal-adapter根目錄,執行命令啟動

bin/startup.sh

查看日誌

vi logs/adapter/adapter.log

關閉canal-adapter命令

bin/stop.sh

3.測試

都啟動成功後,先查看一下es-head,如圖,現在是沒有任何數據的。

接下來,我們在資料庫中插入一條數據進行測試,語句如下:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, "北京", "北京市朝陽區");

然後在看一下es-head,如下

接下來看一下日誌,如下:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {data:[{id:7,name:北京,address:北京市朝陽區}],database:test,destination:example,es:1561197255000,groupId:null,isDdl:false,old:null,pkNames:[id],sql:,table:test,ts:1561197255384,type:INSERT}

Affected indexes: test

小知識點:上面介紹的查看日誌的方法可能不是很好用,推薦使用如下語法,比如查看日誌最後200行:

tail -200f logs/adapter/adapter.log

4.總結

1.全量更新不能實現,但是增刪改都是可以的。

2.一定要提前創建好索引。

3.es配置的是tcp埠,比如默認的9300

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 千鋒JAVA開發學院 的精彩文章:

深入淺析一致性模型之Linearizability
JStorm系列:概念與編程模型

TAG:千鋒JAVA開發學院 |