logstash日期插件支持纳秒


最近项目上遇到时间为纳秒的情况,用到logstash解析时是不支持纳秒的,这里提供一个思路就是自己修改logstash的日期插件,让他支持纳秒,具体涉及的插件是logstash-filter-date

前期准备

  • 下载源代码
  • 下载logstash
  • 安装必要编译组件
  • 修改源代码
  • 打包编译
  • 替换默认的logstash-filter-date-${version}.jar
  • 测试纳秒解析

下载源码

cd ~/share/tmp

# 如果下载太慢,可以先用gitee同步该仓库,然后在使用gitee地址下载
git clone https://github.com/logstash-plugins/logstash-filter-date.git

# gitee地址
git clone https://gitee.com/iogogogo/logstash-filter-date.git

# 切换到最新的tag分支,目前是v3.1.9
git checkout -b v3.1.9 v3.1.9

下载logstash

因为我用到的logstash版本为7.9.3,所以下载的logstash也是7.9.3,理论上编译出来的logstash-filter-date是通用的

cd ~/share/tmp

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.9.3.tar.gz

安装编译组件

  • 安装jruby

    https://www.ruby-lang.org/zh_cn/downloads/
    
  • 安装rvm

    https://rvm.io/
    
  • 安装rbenv

    https://ruby-china.org/wiki/rbenv-guide
    

修改源代码

将源代码导入idea

源码导入idea会下载gradle组件和对应的依赖,这里需要保证网络畅通。

添加纳秒解析

找到src/main/java/org/logstash/filters/parser包位置,新建一个纳秒解析类

/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.logstash.filters.parser;

import org.joda.time.Instant;

import java.math.BigDecimal;

/**
 * Created by tao.zeng on 2020/12/12.
 */
public class UnixNanosecondParser implements TimestampParser {

    private static long MAX_EPOCH_NANOSECOND = (long) Integer.MAX_VALUE * 1000 * 1000;

    @Override
    public Instant parse(String value) {
        return parse(Long.parseLong(value));
    }

    @Override
    public Instant parse(Long value) {
        return new Instant(value / 1000);
    }

    @Override
    public Instant parse(Double value) {
        // XXX: Should we accept a double?
        return parse(value.longValue());
    }


    @Override
    public Instant parseWithTimeZone(String value, String timezone) {
        return parse(value);
    }

    @Override
    public Instant parse(BigDecimal value) {
        long lv = value.longValue();
        if (lv > MAX_EPOCH_NANOSECOND) {
            throw new IllegalArgumentException("Cannot parse date for value larger than UNIX NS maximum seconds");
        }
        return new Instant(lv / 1000);
    }
}

在解析工厂类添加纳秒解析

org.logstash.filters.parser.TimestampParserFactory

/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.logstash.filters.parser;

import org.joda.time.DateTimeZone;

import java.util.Locale;

public class TimestampParserFactory {
    private DateTimeZone timezone;

    private static final String ISO8601 = "ISO8601";
    private static final String UNIX = "UNIX";
    private static final String UNIX_MS = "UNIX_MS";
    /**
     * 新增纳秒
     */
    private static final String UNIX_NS = "UNIX_NS";
    private static final String TAI64N = "TAI64N";

    /*
     * zone is a String because it can be dynamic and come from the event while we parse it.
     */
    public static TimestampParser makeParser(String pattern, Locale locale, String zone) {
        if (locale == null) {
            locale = Locale.getDefault();
        }

        String tz = zone;

        if (tz == null) {
            tz = DateTimeZone.getDefault().getID();
        } else if (zone.contains("%{")) {
            tz = null;
        }

        switch (pattern) {
            case ISO8601: // Short-hand for a few ISO8601-ish formats
                return new CasualISO8601Parser(tz);
            case UNIX: // Unix epoch in seconds
                return new UnixEpochParser();
            case TAI64N: // TAI64N format
                return new TAI64NParser();
            case UNIX_MS: // Unix epoch in milliseconds
                return new UnixMillisEpochParser();
            case UNIX_NS: // Unix epoch in nanoseconds
                // 纳秒解析类
                return new UnixNanosecondParser();

            default:
                return new JodaParser(pattern, locale, tz);
        }
    }

    public static TimestampParser makeParser(String pattern) {
        return makeParser(pattern, (Locale) null, null);
    }

    public static TimestampParser makeParser(String pattern, String locale, String zone) {
        return makeParser(pattern, locale == null ? null : Locale.forLanguageTag(locale), zone);
    }
}

注释一个单元测试

因为该类会因为找不到方法而导致编译报错,可以注释忽略org.logstash.filters.DateFilterTest#commonAssertions

private void commonAssertions(Event event, ParseExecutionResult code, String expected) {
        /*Assert.assertSame(ParseExecutionResult.SUCCESS, code);
        String actual = ((Timestamp) event.getField("[result_ts]")).toIso8601();
        Assert.assertTrue(String.format("Unequal - expected: %s, actual: %s", expected, actual), expected.equals(actual));*/
}

修改配置文件

build.gradle

修改build.gradle文件中63、66两行的logstash-core配置,改为下载的标准logstash/logstash-core目录

// 默认配置
testCompile fileTree(dir: logstashCoreGemPath, include: '**/*.jar')
compileOnly fileTree(dir: logstashCoreGemPath, include: '**/*.jar')

// 修改为下载的logstash/logstash-core的绝对路径
testCompile fileTree(dir: '/Users/tao.zeng/share/software/logstash-7.9.3/logstash-core', include: '**/*.jar')
compileOnly fileTree(dir: '/Users/tao.zeng/share/software/logstash-7.9.3/logstash-core', include: '**/*.jar')

Rakefile

修改Rakefile文件的lsc_path配置为logstash/logstash-core目录

// 默认配置
lsc_path = `bundle show logstash-core`

// 修改为下载的logstash/logstash-core的绝对路径
lsc_path = `/Users/tao.zeng/share/software/logstash-7.9.3/logstash-core`

打包编译

执行gradle的build任务,生成新的logstash-date-filter-{version}.jar,新生成文件目录如下:

# 有个好奇的是明明是3.1.9的tag包,但是在源码里面确实3.1.6的版本,所以打包出来也是3.1.6的版本号,解压logstash发现也是该版本号,所以不用在意版本号
build/libs/logstash-filter-date-3.1.6.jar
19:38:39: Executing task 'build'...

:buildSrc:compileJava UP-TO-DATE
:buildSrc:compileGroovy UP-TO-DATE
:buildSrc:processResources UP-TO-DATE
:buildSrc:classes UP-TO-DATE
:buildSrc:jar UP-TO-DATE
:buildSrc:assemble UP-TO-DATE
:buildSrc:compileTestJava UP-TO-DATE
:buildSrc:compileTestGroovy UP-TO-DATE
:buildSrc:processTestResources UP-TO-DATE
:buildSrc:testClasses UP-TO-DATE
:buildSrc:test UP-TO-DATE
:buildSrc:check UP-TO-DATE
:buildSrc:build UP-TO-DATE
:distTar UP-TO-DATE
:distZip UP-TO-DATE
:compileJava
:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
:check
:build

BUILD SUCCESSFUL

Total time: 7.276 secs
19:38:47: Task execution finished 'build'.

替换logstash中默认的jar

解压logstash-7.9.3.tar.gz

cd ~/share/tmp

# 解压
tar -zxvf logstash-7.9.3.tar.gz

查找logstash-filter-date-3.1.6.jar

cd logstash-7.9.3 && find . -name 'logstash-filter-date*'

# 日志
➜  logstash-7.9.3 find . -name 'logstash-filter-date*'
./vendor/bundle/jruby/2.5.0/specifications/logstash-filter-date-3.1.9.gemspec
./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9
./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9/logstash-filter-date.gemspec
./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9/lib/logstash-filter-date_jars.rb
./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9/vendor/jar-dependencies/org/logstash/filters/logstash-filter-date
./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9/vendor/jar-dependencies/org/logstash/filters/logstash-filter-date/3.1.6/logstash-filter-date-3.1.6.jar

可以看到原先的jar在./vendor/bundle/jruby/2.5.0/gems/logstash-filter-date-3.1.9/vendor/jar-dependencies/org/logstash/filters/logstash-filter-date/3.1.6/这个目录下面,我们只需要用修改过的jar替换掉就可以了。

测试纳秒解析

为了方便测试,我们将上一篇文章的启动脚本复制过来,并且新建一个配置文件conf/test.conf

➜  logstash-7.9.3 ll
total 1280
-rw-r--r--   1 tao.zeng  staff   2.2K 10 16 20:23 CONTRIBUTORS
-rw-r--r--   1 tao.zeng  staff   3.9K 10 16 20:24 Gemfile
-rw-r--r--   1 tao.zeng  staff    22K 10 16 20:25 Gemfile.lock
-rw-r--r--   1 tao.zeng  staff    13K 10 16 20:23 LICENSE.txt
-rw-r--r--   1 tao.zeng  staff   587K 10 16 20:23 NOTICE.TXT
drwxr-xr-x  22 tao.zeng  staff   704B 10 16 21:35 bin
drwxr-xr-x   3 tao.zeng  staff    96B 12 18 20:00 conf
drwxr-xr-x   8 tao.zeng  staff   256B 10 16 21:35 config
drwxr-xr-x   2 tao.zeng  staff    64B 10 16 20:23 data
drwxr-xr-x   6 tao.zeng  staff   192B 10 16 21:35 lib
drwxr-xr-x   6 tao.zeng  staff   192B 10 16 21:35 logstash-core
drwxr-xr-x   5 tao.zeng  staff   160B 10 16 21:35 logstash-core-plugin-api
-rwxr-xr-x@  1 tao.zeng  staff   1.3K 12 18 20:00 lsboot
drwxr-xr-x   5 tao.zeng  staff   160B 10 16 21:35 modules
drwxr-xr-x   3 tao.zeng  staff    96B 10 16 21:35 tools
drwxr-xr-x   4 tao.zeng  staff   128B 10 16 21:35 vendor
drwxr-xr-x  14 tao.zeng  staff   448B 10 16 21:35 x-pack

编写测试配置conf/test.conf

input {
     stdin { 
         codec => "json"
     } 
}

filter {
    mutate {
        add_field => { "name" => "哈哈哈" }
    }

    # https://zerlong.com/886.html
    ruby { code => 'event.set("unix_ts",(event.get("@timestamp").to_f.round(3)*1000).to_i)' }

    date {
            # 注意这里的UNIX_NS就是我们在TimestampParserFactory新建的纳秒解析器,官方版本是不支持的
        match => ["unix_ns", "UNIX_NS"]
        target => "date_ns"
    }
}

output { 
    stdout {} 
}

测试数据

{ "unix_ns": 1607206093000000, "tags": "test" }

启动测试

➜  logstash-7.9.3 ./lsboot conf/test.conf test
bin/logstash -f conf/test.conf -l logs/test --path.data data/test -n test
Sending Logstash logs to logs/test which is now configured via log4j2.properties
[2020-12-18T20:13:19,801][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc Java HotSpot(TM) 64-Bit Server VM 25.201-b09 on 1.8.0_201-b09 +indy +jit [darwin-x86_64]"}
[2020-12-18T20:13:19,932][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"data/test/queue"}
[2020-12-18T20:13:19,935][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"data/test/dead_letter_queue"}
[2020-12-18T20:13:20,029][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-12-18T20:13:20,061][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"e4188c10-0621-4967-a957-8aa14f1a2945", :path=>"data/test/uuid"}
[2020-12-18T20:13:21,802][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 22 keys and 45 values
[2020-12-18T20:13:23,160][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/Users/tao.zeng/share/tmp/logstash-7.9.3/conf/test.conf"], :thread=>"#"}
[2020-12-18T20:13:23,871][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.7}
[2020-12-18T20:13:23,922][INFO ][logstash.inputs.stdin    ][main] Automatically switching from json to json_lines codec {:plugin=>"stdin"}
[2020-12-18T20:13:23,962][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2020-12-18T20:13:24,041][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-12-18T20:13:24,244][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{ "unix_ns": 1607206093000000, "tags": "test" }
{
       "unix_ts" => 1608293606313,
       "date_ns" => 2020-12-05T22:08:13.000Z,
    "@timestamp" => 2020-12-18T12:13:26.313Z,
          "tags" => "test",
       "unix_ns" => 1607206093000000,
          "host" => "TaoZeng.MBP",
      "@version" => "1",
          "name" => "哈哈哈"
}

以上,1607206093000000纳秒时间戳已经支持转换成date类型的数据,但是转换的结果却还有时区显示的问题,这里有两个方案

  • 第一是在target的时候指定时区;
  • 第二是修改logstash源码,把时区默认修改为东八区,这个我们下一次说怎么修改。

logstash 时间戳时区问题

以上,就是logstash-filter-date插件添加纳秒支持并且替换,编译好的logstash-filter-date-3.1.6.jar


文章作者: 阿牛
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 阿牛 !
评论
  目录