Windows Logstash同步 Sqlserver 到Elasticsearch

时间:2021-07-26 17:01:09   收藏:0   阅读:0

1下载与Elasticsearch对应版本Logstash7.13.2 与数据库驱动JDBC

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.13.2-windows-x86_64.zip
JDBC  https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15

2.Logstash下载完成,解压,在bin文件下面创建jdbcconfig文件夹

技术图片

 

 

 3.Logstash配置

在jdbcconfig文件夹下面创建jdbc.conf,如图:

技术图片

配置如下:

input {
stdin {
    }
  jdbc {
     #数据库驱动所在位置,可以是绝对路径或者相对路径
    jdbc_driver_library => "D:\software\Elasticsearch\logstash-7.13.2\bin\jdbcconfig\mssql-jdbc-9.2.1.jre8.jar"
    #驱动类名
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
     #数据库连接
    jdbc_connection_string => "jdbc:sqlserver://服务器名称;DatabaseName=CstCRMTest;"
    #用户
    jdbc_user => "sa"
    #密码
    jdbc_password => "const-123456"
    #设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
    schedule => "* * * * *"
    #sql语句
    statement => "SELECT  [Id]
      ,[QuoteCode]
      ,[QuoteName]
      ,[CustomerId]
      ,[QuotePerson]
      ,[UserId]
      ,[QuotePhone]
      ,[PayType]
      ,[QuoteVailDate]
      ,[LeadTime]
      ,[QuoteDate]
      ,[CompanyPhone]
      ,[CompanyAddress]
      ,[CompanyUrl]
      ,[Remark]
      ,[CreatedTime]
      ,CONVERT (VARCHAR (30),UpdatedTime,25) AS updatedTime
      ,[CreatedUser]
      ,[UpdatedUser]
    FROM [CstCRMTEST].[dbo].[T_Quote]  where  updatedTime>:sql_last_value"
    #sql可执行文件
    #statement_filepath => "路径"
    #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    # 对应字段的类型
    tracking_column_type => "timestamp"
    #如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
    tracking_column => "updatedTime"
    #是否记录上次执行结果, 如果record_last_run为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    # 记录上一次追踪的结果值,保存文件到对应路径中
     last_run_metadata_path => "D:\software\Elasticsearch\logstash-7.13.2\bin\jdbcconfig\updatedTime.txt"
      # 索引类型
      #type => "_doc"
      # 数据库字段名称大写转小写
      lowercase_column_names => false
     #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
     #clean_run : 
  }
}
 
 
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["localhost:9200"]
        # 索引名称 可自定义(只可以小写)
        index => "quote"
        document_type => "out"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{Id}"
    
    }
 stdout {
       # codec => json_lines
#设置输出的格式
   codec => line {
  format => "updatedTime: %{[updatedTime]}"
   }
    }

}

配置完成   bin 目录下,执行.\logstash -f .\jdbcconfig\jdbc.conf --path.data=/jdbcconfig/

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2
迷上了代码!