flume在windows环境下的使用

时间:2016-01-06 11:39:03   收藏:0   阅读:8147

     技术分享 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。目前属于apache的一个子项目。

       一般来说,部署到服务器上的flume是安装在unix/linux环境下使用的,但是有时为了测试和调试方便,我们也会有在windows系统上安装的需求。对当前flume最新版本1.6来说,在windows上使用相对比较方便,因为其自带了一套执行环境shell。

       这是一套使用powershell编制的执行环境,启动程序在apache-flume-1.6.0-bin\bin目录下,flume-ng.cmd。

       打开命令行,输入flume-ng.cmd help可以查看该程序使用方法。  如图:

     技术分享

以一个采集脱机目录日志源的flume agent为例,可以以如下命令运行这个agent :

flume-ng.cmd agent --conf ..\conf --conf-file ..\conf\t1.conf --name a1

 

t1.conf:

a1.sources = r1
a1.channels = memoryChannel
a1.sinks = spoolSink


a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = spool_dir
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = memoryChannel

#interceptors
a1.sources.r1.interceptors = e1
a1.sources.r1.interceptors.e1.type = regex_extractor
a1.sources.r1.interceptors.e1.regex = ^([\\D]+[\\d]+[\\s]+[\\d\\:]+)\\s+([\\d\\-]+[\\s]+[\\d\\:]+[\\s]+[\\d\\:]+)\\s([\\S\\-]+)\\s([\\S\\/]+)\\s.[\\w]+\\S([\\d\\.]+)\\S\\s[\\w]+\\S([\\w]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s[\\W]+([\\d\\.]+\\S[\\d]+)\\S\\s\\S([\\d\\/]+[\\s]+[\\d\\:]+)\\s\\S\\s([\\d\\/]+[\\s][\\d\\:]+)\\S\\s[\\w\\s]+\\S([\\d]+)\\s[\\w\\s]+\\S([\\d]+)\\S\\s[\\w]+\\S([\\d\\#]+)[\\w\\s]+\\S([\\d\\.]+);$
a1.sources.r1.interceptors.e1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 s14 s15
a1.sources.r1.interceptors.e1.serializers.s1.name = time1
a1.sources.r1.interceptors.e1.serializers.s2.name = time2
a1.sources.r1.interceptors.e1.serializers.s3.name = xy
a1.sources.r1.interceptors.e1.serializers.s4.name = session
a1.sources.r1.interceptors.e1.serializers.s5.name = devip
a1.sources.r1.interceptors.e1.serializers.s6.name = protocol
a1.sources.r1.interceptors.e1.serializers.s7.name = ip1
a1.sources.r1.interceptors.e1.serializers.s8.name = ip2
a1.sources.r1.interceptors.e1.serializers.s9.name = ip3
a1.sources.r1.interceptors.e1.serializers.s10.name = starttime
a1.sources.r1.interceptors.e1.serializers.s11.name = endtime
a1.sources.r1.interceptors.e1.serializers.s12.name = srcvpn
a1.sources.r1.interceptors.e1.serializers.s13.name = desvpn
a1.sources.r1.interceptors.e1.serializers.s14.name = status
a1.sources.r1.interceptors.e1.serializers.s15.name = username

#channels
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 300
a1.channels.memoryChannel.transactionCapacity= 300

#sink
a1.sinks.spoolSink.type = com.hzfi.flume.PatternTestSink
a1.sinks.spoolSink.channel = memoryChannel

上例中使用了一个interceptor regex_extractor来对脱机目录下的日志中的记录进行正则表达式模式识别,将记录切分为15个子模式,分别加入到flume event的header里边。

sink为一个自定义的PatternTestSink,代码如下:

package com.hzfi.flume;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class PatternTestSink extends AbstractSink implements Configurable {

    @Override
    public Status process() throws EventDeliveryException {
        
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        try {
            transaction.begin();
            event = channel.take();
            if (event != null) {
                String body = new String(event.getBody(), "utf-8");
                System.out.println("----->event headers...");
                System.out.println("header content:[" + event.getHeaders().toString() + "]");
                System.out.println("----->event body...");
                System.out.println("body content:[" + body + "]");
            } else {
                result = Status.BACKOFF;
            }
            transaction.commit();
        } catch (Exception ex) {
            transaction.rollback();
            throw new EventDeliveryException("Failed to got pattern event: " + event, ex);
        } finally {
            transaction.close();
        }
        return result;
    }

    @Override
    public void configure(Context arg0) {
        // TODO Auto-generated method stub

    }
}

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!