Windows Logstash同步 Sqlserver 到Elasticsearch
时间:2021-07-26 17:01:09
收藏:0
阅读:0
1下载与Elasticsearch对应版本Logstash7.13.2 与数据库驱动JDBC
下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.13.2-windows-x86_64.zip JDBC https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15
2.Logstash下载完成,解压,在bin文件下面创建jdbcconfig文件夹
3.Logstash配置
在jdbcconfig文件夹下面创建jdbc.conf,如图:
配置如下:
input { stdin { } jdbc { #数据库驱动所在位置,可以是绝对路径或者相对路径 jdbc_driver_library => "D:\software\Elasticsearch\logstash-7.13.2\bin\jdbcconfig\mssql-jdbc-9.2.1.jre8.jar" #驱动类名 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" #数据库连接 jdbc_connection_string => "jdbc:sqlserver://服务器名称;DatabaseName=CstCRMTest;" #用户 jdbc_user => "sa" #密码 jdbc_password => "const-123456" #设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务 schedule => "* * * * *" #sql语句 statement => "SELECT [Id] ,[QuoteCode] ,[QuoteName] ,[CustomerId] ,[QuotePerson] ,[UserId] ,[QuotePhone] ,[PayType] ,[QuoteVailDate] ,[LeadTime] ,[QuoteDate] ,[CompanyPhone] ,[CompanyAddress] ,[CompanyUrl] ,[Remark] ,[CreatedTime] ,CONVERT (VARCHAR (30),UpdatedTime,25) AS updatedTime ,[CreatedUser] ,[UpdatedUser] FROM [CstCRMTEST].[dbo].[T_Quote] where updatedTime>:sql_last_value" #sql可执行文件 #statement_filepath => "路径" #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true # 对应字段的类型 tracking_column_type => "timestamp" #如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间 tracking_column => "updatedTime" #是否记录上次执行结果, 如果record_last_run为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => true # 记录上一次追踪的结果值,保存文件到对应路径中 last_run_metadata_path => "D:\software\Elasticsearch\logstash-7.13.2\bin\jdbcconfig\updatedTime.txt" # 索引类型 #type => "_doc" # 数据库字段名称大写转小写 lowercase_column_names => false #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 #clean_run : } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名称 可自定义(只可以小写) index => "quote" document_type => "out" # 需要关联的数据库中有有一个id字段,对应类型中的id document_id => "%{Id}" } stdout { # codec => json_lines #设置输出的格式 codec => line { format => "updatedTime: %{[updatedTime]}" } } }
配置完成 bin 目录下,执行.\logstash -f .\jdbcconfig\jdbc.conf --path.data=/jdbcconfig/
评论(0)