logstash时区

发布时间:2020-05-27 11:50:43 阅读:1240次

https://www.cnblogs.com/wang-yaz/p/10232417.html

input {
      stdin {
       }
      jdbc {
        # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
        jdbc_user => "root"
        jdbc_password => "123456"
        # 下载连接数据库的驱动包,建议使用绝对地址
       jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar"

       jdbc_driver_class => "com.mysql.jdbc.Driver"
       jdbc_paging_enabled => "true"
       jdbc_page_size => "50000"
       codec => plain { charset => "UTF-8"}

        #使用其它字段追踪,而不是用时间
      #use_column_value => true   //这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true, 切记切记切记,我是用update_time来追踪的
        #追踪的字段
     tracking_column => update_time
     record_last_run => true
     #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这里说是必须指定初始值,我没指定默认是1970-01-01 08:00:00
     last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run"  //这里的lastrun文件夹和.logstash_jdbc_last_run是自己创建的

      jdbc_default_timezone => "Asia/Shanghai"   //设置时区
      #statement => SELECT * FROM goods  WHERE update_time > :last_sql_value  //这里要说明一下如果直接写sql语句,前面这种写法肯定不对的
,加上引号也试过也不对,所以我直接写在jdbc.sql文件中
      statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql"


     #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
     clean_run => false

       # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
       schedule => "* * * * *"
       type => "std"
     }
 }

 filter {

    json {

        source => "message"

        remove_field => ["message"]

    }

}

output {

    elasticsearch {

        # 要导入到的Elasticsearch所在的主机

        hosts => "127.0.0.1:9200"

        # 要导入到的Elasticsearch的索引的名称

        index => "goods"

        # 类型名称(类似数据库表名)

        document_type => "spu"

        # 主键名称(类似数据库主键)

        document_id => "%{id}"
    }

    stdout {

        # JSON格式输出

        codec => json_lines

    }
}

filter {
  json {
      source => "message"
      remove_field => ["message"]
  }
 // date类型不能省略,不然会报错,       就是把当前字段+8小时后赋值给新的字段,然后再取新字段的值赋值给老的字段,再把新的字段删除  date {
    match => ["message","UNIX_MS"]
    target => "@timestamp"
     }
       ruby {
              code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
       }      
      ruby{   
              code => "event.set('@timestamp',event.get('timestamp'))"
      }       
      mutate{ 
             remove_field => ["timestamp"]
      }      
      
 date {
  match => ["message","UNIX_MS"]
  target => "create_time"
       } 
       ruby {
               code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
       }       
      ruby {   
               code => "event.set('create_time',event.get('@create_time'))"
       }       
      mutate { 
       remove_field => ["@create_time"]
      }
      
      date {
      match => ["message","UNIX_MS"]
      target => "update_time"
       }
       ruby {
               code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
       }       
      ruby {   
               code => "event.set('update_time',event.get('@update_time'))"
       }       
      mutate {
       remove_field => ["@update_time"]
      }
}

如有问题,可以QQ搜索群1028468525加入群聊,欢迎一起研究技术

支付宝 微信

有疑问联系站长,请联系QQ:QQ咨询

转载请注明:logstash时区 出自老鄢博客 | 欢迎分享