logstash MySQL Data Input

前言

往elasticsearch中导入数据,可以采用curl -XPUT 'http://localhost:9200/megacorp/employee/1的方式往里面导入数据,这样的插入可以用来测试,但是数据量比较大的时候,这样一条条的插入是不实际的。

当然elasticsearch可以采用_bulk的方式批量导入数据

1
2
3
4
5
curl -XPOST 'http://localhost:9200/_bulk' -d '
{ "create": { "_index": "website", "_type": "blog", "_id": "3" }}
{ "title": "My third blog post" }
{ "create": { "_index": "website", "_type": "blog", "_id": "4" }}
{ "title": "My fourth blog post" }

或者把我们的数据导成json的格式,然后导入

1
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\testdata\test.json

在之前solr使用过程中,我们是把数据库的数据导入到solr中,elasticsearch也可以,这篇主要是来记录一下logstash的安装以及使用。

logstash的安装

logstash依赖jdk1.8,所以先确保你的环境已经装了jdk的环境。下载与你elasticsearch对应的logstash版本。因为我的elasticsearch是6.x,所以logstash也需要6.x。输入下面命令安装,并解压

1
2
3
wget http://yellowcong.qiniudn.com/logstash-6.0.1.tar.gz

tar -zxvf logstash-6.0.1.tar.gz

使用下面命令测试你的logstash能够正常运行,出现下面结果表示成功。

1
./logstash-6.0.1/bin/logstash -e "input {stdin{}} output {stdout{}}"

你想输入的替代文字

logstash-input-jdbc插件安装

logstash需要安装logstash-input-jdbc插件,才可以对数据库进行操作。下载插件过程中最大的坑是下载插件相关的依赖的时候下不动,因为国内网络的原因,访问不到亚马逊的服务器。解决办法,改成国内的ruby仓库镜像。此镜像托管于淘宝的阿里云服务器上.。

如果没有安装 gem 的话 安装gem,本测试环境是ubuntu,使用以下的命令安装gem

1
sudo apt-get install gem

替换成淘宝的镜像,查看镜像是否切换成功,确保只有 https://ruby.taobao.org 不然会因为墙的原因,导致下载失败。

1
gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/

1
gem sources -l

你想输入的替代文字

进入logstash的安装目录,修改Gemfile的数据源地址。修改source的值为”https://ruby.taobao.org"

你想输入的替代文字

修改Gemfile.jruby-2.3.lock,找到 remote 修改它的值为: https://ruby.taobao.org

你想输入的替代文字

进入到logstash/bin的目录下,执行下面的命令安装logstash-input-jdbc ,这个过程要比较久,耐心等待,如果安装不成功,看这里换一种方法安装logstash-input-jdbc其他方法

1
./logstash-plugin install logstash-input-jdbc

logstash的应用

上面的安装都已经完成了,现在只需要配置一些文件,就可以往elasticsearch中导入MySQL的数据了。首先需要两个文件jdbc.conf、jdbc.sql 名字随便起,以及mysql的java驱动包mysql-connector-java-5.1.36-bin.jar,驱动包网上很好找。

看一下jdbc.conf的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
input {
stdin {
}
jdbc {
# 数据库的地址,以及要操作的数据库名
jdbc_connection_string => "jdbc:mysql://10.1.13.29:3306/test"
# 链接数据库的用户密码
jdbc_user => "root"
jdbc_password => "tdlabDatabase"
# 存放MySQL驱动的位置
jdbc_driver_library => "/usr/local/logstash-6.0.0/mysql-pro/mysql-connector-java-5.1.18-bin.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 存放你的sql操作位置
statement_filepath => "/usr/local/logstash-6.0.0/mysql-pro/jdbc.sql"
schedule => "* * * * *"
type => "jdbc"
}
}

output {
elasticsearch {
# 你elasticsearch服务的IP
hosts => "10.1.13.32"
# elasticsearch索引
index => "book"
# elasticsearch类型
document_type => "mybook"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}

jdbc.sql的内容,简单的表查询

1
select id,keyNo,word FROM bbb

上面的文件配置完成后,就启动logstash,进入到logstash的bin目录下,执行下面命令,得到大约1分钟,你就可以看到数据已经导入到elasticsearch中。

1
./logstash -f jdbc.conf

现在 logstash 已经开始监听mysql 的表了,里面数据会被导入到elasticsearch中,并且如果数据发生变动,elasticsearch也会相应变动。