flume的source用于收集日志,父类为AbstractSource,下图中的其他类都继承于AbstractSource
AvroSource
lifecycleAware 会调用start方法启动avroSource。avroSource主要启动了一个NettyServer用于接收数据,然后交由avroSource处理。
@Override public void start() { logger.info("Starting {}...", this); //when receive data, AvroSourceProtocol will parse this data, then call AvroSource process received data Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); //will create NioServerSocketChannelFactory depend on maxThreads NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory(); //will create SSLCompressionChannelPipelineFactory or ChannelPipelineFactory ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory(); server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), socketChannelFactory, pipelineFactory, null); connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start(); sourceCounter.start(); super.start(); final NettyServer srv = (NettyServer)server; connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){ @Override public void run() { //用于监控connection count sourceCounter.setOpenConnectionCount( Long.valueOf(srv.getNumActiveConnections())); } }, 0, 60, TimeUnit.SECONDS); logger.info("Avro source {} started.", getName()); }
当AvroSource接收到数据时,会调用append函数,append函数会调用getChannelProcessor().processEvent处理接收的event
@Override public Status append(AvroFlumeEvent avroEvent) { logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent); sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); try { 调用 getChannelProcessor().processEvent处理接收的event getChannelProcessor().processEvent(event); } catch (ChannelException ex) { logger.warn("Avro source " + getName() + ": Unable to process event. " + "Exception follows.", ex); return Status.FAILED; } sourceCounter.incrementAppendAcceptedCount(); sourceCounter.incrementEventAcceptedCount(); return Status.OK; }
AvroLegacySource
AvroLegacySource implementation that receives Avro events from Avro sink of Flume OG
@Override public void start() { // setup http server to receive OG events res = new SpecificResponder(FlumeOGEventAvroServer.class, this); try { http = new HttpServer(res, host, port); } catch (IOException eI) { LOG.warn("Failed to start server", eI); return; } http.start(); super.start(); } @Override public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException { counterGroup.incrementAndGet("rpc.received"); Map<String, String> headers = new HashMap<String, String>(); // extract Flume OG event headers headers.put(HOST, evt.getHost().toString()); headers.put(TIMESTAMP, evt.getTimestamp().toString()); headers.put(PRIORITY, evt.getPriority().toString()); headers.put(NANOS, evt.getNanos().toString()); for (Entry<CharSequence, ByteBuffer> entry : evt.getFields().entrySet()) { headers.put(entry.getKey().toString(), entry.getValue().toString()); } headers.put(OG_EVENT, "yes"); Event event = EventBuilder.withBody(evt.getBody().array(), headers); try { getChannelProcessor().processEvent(event); counterGroup.incrementAndGet("rpc.events"); } catch (ChannelException ex) { return null; } counterGroup.incrementAndGet("rpc.successful"); return null; }
EmbeddedSource
EmbeddedSource is simple source used to allow direct access to the channel for the Embedded Agent. There is a EmbeddedAgent class. When call EmbeddedAgent put event, EmbeddedAgent will call put method of EmbeddedSource, EmbeddedSource directory call processEvent function.
public class EmbeddedSource extends AbstractSource implements EventDrivenSource, Configurable { @Override public void configure(Context context) { } public void put(Event event) throws ChannelException { getChannelProcessor().processEvent(event); } public void putAll(List<Event> events) throws ChannelException { getChannelProcessor().processEventBatch(events); } }
ExecSource
execsource启动了一个ExecRunnable用于执行command
public void start() { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); /* * NB: This comes at the end rather than the beginning of the method because * it sets our state to running. We want to make sure the executor is alive * and well first. */ sourceCounter.start(); super.start(); logger.debug("Exec source started"); }下面是ExecRunnable的run函数,先启动了一个定时任务用于定时刷新数据到channel,然后从process的input stream 读数据,并提交到channel,当process结束的时候,如果需要restart,会重启另一个处理进程。
public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if(!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occured when processing event batch", e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if(!eventList.isEmpty()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for exec source", ex); } } exitCode = String.valueOf(kill()); } if(restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart); }HTTPSource httpsource which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable "handler" which must implement the {@linkplain HTTPSourceHandler} interface. 在start函数中启动了server并且调用FlumeHTTPServlet中的doPost方法来处理接收到的request
@Override public void start() { Preconditions.checkState(srv == null, "Running HTTP Server found in source: " + getName() + " before I started one." + "Will not attempt to start."); srv = new Server(); SocketConnector connector = new SocketConnector(); connector.setPort(port); connector.setHost(host); srv.setConnectors(new Connector[] { connector }); try { org.mortbay.jetty.servlet.Context root = new org.mortbay.jetty.servlet.Context( srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS); root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/"); srv.start(); Preconditions.checkArgument(srv.getHandler().equals(root)); } catch (Exception ex) { LOG.error("Error while starting HTTPSource. Exception follows.", ex); Throwables.propagate(ex); } Preconditions.checkArgument(srv.isRunning()); sourceCounter.start(); super.start(); } @Override public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { List<Event> events = Collections.emptyList(); //create empty list try { events = handler.getEvents(request); } catch (HTTPBadRequestException ex) { LOG.warn("Received bad request from client. ", ex); response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Bad request from client. " + ex.getMessage()); return; } catch (Exception ex) { LOG.warn("Deserializer threw unexpected exception. ", ex); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Deserializer threw unexpected exception. " + ex.getMessage()); return; } sourceCounter.incrementAppendBatchReceivedCount(); sourceCounter.addToEventReceivedCount(events.size()); try { getChannelProcessor().processEventBatch(events); } catch (ChannelException ex) { LOG.warn("Error appending event to channel. " + "Channel might be full. Consider increasing the channel " + "capacity or make sure the sinks perform faster.", ex); response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Error appending event to channel. Channel might be full." + ex.getMessage()); return; } catch (Exception ex) { LOG.warn("Unexpected error appending event to channel. ", ex); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Unexpected error while appending event to channel. " + ex.getMessage()); return; } response.setCharacterEncoding(request.getCharacterEncoding()); response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); sourceCounter.incrementAppendBatchAcceptedCount(); sourceCounter.addToEventAcceptedCount(events.size()); }MultiportSyslogTCPSource MultiportSyslogTCPSource是一个多端口的 SyslogTCPSource,用于接收多个端口上的TCP消息。在start方法中启动了一个 acceptor,并且bind到多个端口,当有消息接收到时会调用 MultiportSyslogHandler的 messageReceived方法,将line String转换成event,提交到channel。(用mina来实现传输)
@Override public void start() { logger.info("Starting {}...", this); // allow user to specify number of processors to use for thread pool if (numProcessors != null) { acceptor = new NioSocketAcceptor(numProcessors); } else { acceptor = new NioSocketAcceptor(); } acceptor.setReuseAddress(true); acceptor.getSessionConfig().setReadBufferSize(readBufferSize); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize, getChannelProcessor(), sourceCounter, portHeader, defaultDecoder, portCharsets)); for (int port : ports) { InetSocketAddress addr; if (host != null) { addr = new InetSocketAddress(host, port); } else { addr = new InetSocketAddress(port); } try { //Not using the one that takes an array because we won't want one bind //error affecting the next. acceptor.bind(addr); } catch (IOException ex) { logger.error("Could not bind to address: " + String.valueOf(addr), ex); } } sourceCounter.start(); super.start(); logger.info("{} started.", this); } public void messageReceived(IoSession session, Object message) { IoBuffer buf = (IoBuffer) message; IoBuffer savedBuf = (IoBuffer) session.getAttribute(SAVED_BUF); ParsedBuffer parsedLine = new ParsedBuffer(); List<Event> events = Lists.newArrayList(); // the character set can be specified per-port CharsetDecoder decoder = defaultDecoder.get(); int port = ((InetSocketAddress) session.getLocalAddress()).getPort(); if (portCharsets.containsKey(port)) { decoder = portCharsets.get(port).get(); } // while the buffer is not empty while (buf.hasRemaining()) { events.clear(); // take number of events no greater than batchSize for (int num = 0; num < batchSize && buf.hasRemaining(); num++) { if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) { Event event = parseEvent(parsedLine, decoder); if (portHeader != null) { event.getHeaders().put(portHeader, String.valueOf(port)); } events.add(event); } else { logger.trace("Parsed null event"); } } // don't try to write anything if we didn't get any events somehow if (events.isEmpty()) { logger.trace("Empty set!"); return; } int numEvents = events.size(); sourceCounter.addToEventReceivedCount(numEvents); // write the events to the downstream channel try { channelProcessor.processEventBatch(events); sourceCounter.addToEventAcceptedCount(numEvents); } catch (Throwable t) { logger.error("Error writing to channel, event dropped", t); if (t instanceof Error) { Throwables.propagate(t); } } } }
NetcatSource
NetcatSource open了一个ServerSocketChannel,用于接收client的链接,当接收到数据的时候调用NetcatSocketHandler的run函数来解析line string数据。(采用bio)
@Override public void start() { logger.info("Source starting"); counterGroup.incrementAndGet("open.attempts"); handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat("netcat-handler-%d").build()); try { SocketAddress bindPoint = new InetSocketAddress(hostName, port); serverSocket = ServerSocketChannel.open(); serverSocket.socket().setReuseAddress(true); serverSocket.socket().bind(bindPoint); logger.info("Created serverSocket:{}", serverSocket); } catch (IOException e) { counterGroup.incrementAndGet("open.errors"); logger.error("Unable to bind to socket. Exception follows.", e); throw new FlumeException(e); } AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength); acceptThreadShouldStop.set(false); acceptRunnable.counterGroup = counterGroup; acceptRunnable.handlerService = handlerService; acceptRunnable.shouldStop = acceptThreadShouldStop; acceptRunnable.ackEveryEvent = ackEveryEvent; acceptRunnable.source = this; acceptRunnable.serverSocket = serverSocket; acceptThread = new Thread(acceptRunnable); acceptThread.start(); logger.debug("Source started"); super.start(); } @Override public void run() { logger.debug("Starting connection handler"); Event event = null; try { Reader reader = Channels.newReader(socketChannel, "utf-8"); Writer writer = Channels.newWriter(socketChannel, "utf-8"); CharBuffer buffer = CharBuffer.allocate(maxLineLength); buffer.flip(); // flip() so fill() sees buffer as initially empty while (true) { // this method blocks until new data is available in the socket int charsRead = fill(buffer, reader); logger.debug("Chars read = {}", charsRead); // attempt to process all the events in the buffer int eventsProcessed = processEvents(buffer, writer); logger.debug("Events processed = {}", eventsProcessed); if (charsRead == -1) { // if we received EOF before last event processing attempt, then we // have done everything we can break; } else if (charsRead == 0 && eventsProcessed == 0) { if (buffer.remaining() == buffer.capacity()) { // If we get here it means: // 1. Last time we called fill(), no new chars were buffered // 2. After that, we failed to process any events => no newlines // 3. The unread data in the buffer == the size of the buffer // Therefore, we are stuck because the client sent a line longer // than the size of the buffer. Response: Drop the connection. logger.warn("Client sent event exceeding the maximum length"); counterGroup.incrementAndGet("events.failed"); writer.write("FAILED: Event exceeds the maximum length (" + buffer.capacity() + " chars, including newline)\n"); writer.flush(); break; } } } socketChannel.close(); counterGroup.incrementAndGet("sessions.completed"); } catch (IOException e) { counterGroup.incrementAndGet("sessions.broken"); } logger.debug("Connection handler exiting"); }
ScribeSource
Flume should adopt the Scribe entry {@code LogEntry} from existing Scribe system. Mostly, we may receive message from local Scribe and Flume take responsibility of central Scribe. Scribe是一个分布式的日志收集系统。facebook 广泛采用它,一般用scrbe收集数据,hdfs存储数据,mapreduce 处理数据。start 方法中启动了一个Startup 线程,在startup线程中启动了THsHaServer,THsHaServer在接收到消息时将调用Receiver的Log方法
private class Startup extends Thread { public void run() { try { Scribe.Processor processor = new Scribe.Processor(new Receiver()); TNonblockingServerTransport transport = new TNonblockingServerSocket(port); THsHaServer.Args args = new THsHaServer.Args(transport); args.workerThreads(workers); args.processor(processor); args.transportFactory(new TFramedTransport.Factory()); args.protocolFactory(new TBinaryProtocol.Factory(false, false)); server = new THsHaServer(args); LOG.info("Starting Scribe Source on port " + port); server.serve(); } catch (Exception e) { LOG.warn("Scribe failed", e); } } } @Override public void start() { Startup startupThread = new Startup(); startupThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) {} if (!server.isServing()) { throw new IllegalStateException("Failed initialization of ScribeSource"); } sourceCounter.start(); super.start(); } class Receiver implements Iface { public ResultCode Log(List<LogEntry> list) throws TException { if (list != null) { sourceCounter.addToEventReceivedCount(list.size()); try { List<Event> events = new ArrayList<Event>(list.size()); for (LogEntry entry : list) { Map<String, String> headers = new HashMap<String, String>(1, 1); headers.put(SCRIBE_CATEGORY, entry.getCategory()); Event event = EventBuilder.withBody(entry.getMessage().getBytes(), headers); events.add(event); } if (events.size() > 0) { getChannelProcessor().processEventBatch(events); } sourceCounter.addToEventAcceptedCount(list.size()); return ResultCode.OK; } catch (Exception e) { LOG.warn("Scribe source handling failure", e); } } return ResultCode.TRY_LATER; } }
SequenceGeneratorSource
SequenceGeneratorSource是一个数字自动加1作为消息的source
SpoolDirectorySource
SpoolDirectorySource是用来监控文件夹下的文件的,它要求被监控的文件不能被修改,所以只能将文件MOVE到这个文件夹下。 在start方法中创建了一个reader对象,并生成了一个定时器,每500毫米调用一下
SpoolDirectoryRunnable的run函数,run函数中会调用read的readEvent函数
public void start() { logger.info("SpoolDirectorySource source starting with directory: {}", spoolDirectory); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); File directory = new File(spoolDirectory); try { reader = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(directory) .completedSuffix(completedSuffix) .ignorePattern(ignorePattern) .trackerDirPath(trackerDirPath) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) .inputCharset(inputCharset) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", ioe); } Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay( runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS); super.start(); logger.debug("SpoolDirectorySource source started"); sourceCounter.start(); }
public List<Event> readEvents(int numEvents) throws IOException { currentFile = getNextFile(); EventDeserializer des = currentFile.get().getDeserializer(); List<Event> events = des.readEvents(numEvents); /* It's possible that the last read took us just up to a file boundary. * If so, try to roll to the next file, if there is one. */ if (events.isEmpty()) { retireCurrentFile(); currentFile = getNextFile(); if (!currentFile.isPresent()) { return Collections.emptyList(); } events = currentFile.get().getDeserializer().readEvents(numEvents); } if (annotateFileName) { String filename = currentFile.get().getFile().getAbsolutePath(); for (Event event : events) { event.getHeaders().put(fileNameHeader, filename); } } committed = false; lastFileRead = currentFile; return events; }
SyslogUDPSource 和SyslogTcpSource 主要用于处理Syslog的收集。在start方法中启动了一个server,并且在
messageReceived方法中调用syslogUtils.extractEvent方法去获得Event
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage(); while (buff.readable()) { Event e = syslogUtils.extractEvent(buff); if (e == null) { logger.debug("Parsed partial event, event will be generated when " + "rest of the event is received."); continue; } try { getChannelProcessor().processEvent(e); counterGroup.incrementAndGet("events.success"); } catch (ChannelException ex) { counterGroup.incrementAndGet("events.dropped"); logger.error("Error writting to channel, event dropped", ex); } } } } @Override public void start() { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap serverBootstrap = new ServerBootstrap(factory); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { syslogTcpHandler handler = new syslogTcpHandler(); handler.setEventSize(eventSize); handler.setFormater(formaterProp); return Channels.pipeline(handler); } }); logger.info("Syslog TCP Source starting..."); if (host == null) { nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); } else { nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); } super.start(); }
ThriftLegacySource 和ThriftSource 主要用Thrift传输日志
文档:http://blog.csdn.net/amuseme_lu/article/details/6262572。Thrift是Facebook的一个开源项目,主要是一个跨语言的服务开发框架。server提供借口,client远程调用接口。