在加入 PingCAP 之前,很长一段时间,我都跟 MySQL 打交道。MySQL 性能强悍,但是在一些全文检索,复杂查询上面并不快,效率堪忧。为了解决快速查的问题,我们之前尝试考虑过 Sphinx,但总觉得使用起来不方便。恰好那时候碰到了 Elasticsearch(ES),立刻就觉得这特么就是我们要的东西。
ES 底层基于 Lucene ,支持分布式,同时还提供了强大的 web 页面,点点鼠标就很容易进行数据查询。但我们的数据是存放在 MySQL 里面,如何将数据实时的导入给 Elasticsearch?
最开始我想到的是用 MySQL trigger 或者 UDF,不过立刻觉得这条路非常不靠谱,所以就把目标放在了 MySQL binlog 上面。
MySQL Binlog
同步 MySQL binlog 就很简单了,按照 MySQL replication 的协议,自己写一个客户端,模拟成 MySQL slave,注册给 MySQL master 就可以了。MySQL master 会实时的将数据的更新通过 binlog event 发送给 slave,然后我们自己解析 event 之后就能得到实际的数据了。
MySQL dump
如果是一个新建 MySQL,我们当然可以通过 binlog 的方式方便的同步数据。但如果我们想同步一个已经运行一段时间的 MySQL ,就可能会有问题了。因为这时候早期的 binlog 文件已经被删除,如果直接开始同步,我们就可能会缺失一部分早期更新的数据。
go-mysql-elasticsearch
我们通过 go-mysql 的 canal 组件,能非常方便的同步 MySQL 的数据,那么剩下的事情就简单了,将同步的 MySQL 数据直接发送给 ES 就可以了。
我们唯一需要注意的就是设置同步规则,也就是说,当我们同步了一行数据之后,这一行数据到底是如何组装发送给 ES 的。
同步规则在配置文件里面详细说明。首先,我们要定义需要同步的 table,这个我们在 source 里面指定,例如:
[[source]]
schema = "test"
# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
在上面的例子中,我们需要同步 test 这个 database 里面的几张表。对于一些项目如果使用了分表机制,我们可以用通配符来匹配,譬如上面的 t_[0-9]{4}
,就可以匹配 table t_0000
到 t_9999
。
对一个 table,我们需要指定将它的数据同步到 ES 的哪一个 index 的 type 里面。如果不指定,我们默认会用起 schema name 作为 ES 的 index 和 type。例如:
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"
在上面的例子中,我们将 table t
的数据同步到 ES 的 index 为 test
, type 为 t
的下面。
自定义 Field mapping
默认情况下面,我们会使用 table 的 column name 作为 ES 里面的 field name,但有时候我们也可以换成另外的名字,例如:
[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"
[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"
在上面的例子中,table tfield
的 column id
,我们映射成了 es_id
,而 tags
则映射成了 es_tags
。
上面我们可以注意到 list
这个字段,他显示的告知需要将对应的 column 数据转成 ES 的 array type。这个现在通常用于 MySQL 的 varchar 等类型,我们可能会存放类似 “a,b,c” 这样的数据,然后希望同步给 ES 的时候变成 [a, b, c]
这样的列表形式。
Filter Field
我们也可以只 sync 一个 table 里面特定的 column,譬如:
[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"
# Only sync following columns
filter = ["id", "name"]
上面对于 table tfilter
,我们只会同步 id
和 name
这两列,其他的都不会同步了。
聚合多张表
上面我们说了支持多张表聚合,譬如:
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"
在上面的例子中,我们会将所有满足格式 t_[0-9]{4}
的 table 同步到 ES 的 index 为 test
,type 为 t
的下面。当然,这些表需要保证 schema 是一致的。
小结
可以看到,使用 go-mysql-elasticsearch,我们仅需要在配置文件里面写规则,就能非常方便的将数据从 MySQL 同步给 ES。上面仅仅举了一些简单的例子,go-mysql-elasticserch 现在还支持 parent-child relationship 的同步等。
当然,go-mysql-elasticsearch 还不完善,譬如还不能很好的处理 DDL 的情况,还需要支持更多的同步规则等。如果大家有兴趣,非常欢迎参与进来。
最后在列下相关的两个项目: