Help Center/ Log Tank Service/ Best Practices/ Log Ingestion/ Using Flume to Report Logs to LTS
Updated on 2024-11-19 GMT+08:00

Using Flume to Report Logs to LTS

Flume is a reliable, high-availability, and distributed system for collecting, aggregating, and transporting massive logs. It allows you to customize various data senders in a log system for better data collection. Flume can also process data simply and write data to various data receivers.

You can collect logs using Flume and report logs using the Kafka protocol supported by LTS. The following are some common data collection scenarios:

  1. Using Flume to Collect Text Logs to LTS
  2. Using Flume to Collect Database Table Data to LTS
  3. Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS
  4. Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS
  5. Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS
  6. Using Default Interceptors to Process Logs
  7. Using a Custom Interceptor to Process Logs
  8. Enriching Logs with External Data Sources and Reporting the Logs to LTS

Prerequisites

  • The Java Development Kit (JDK) has been installed on the host.
  • Flume has been installed and the JDK path has been configured in the Flume configuration file.

Using Flume to Collect Text Logs to LTS

You can add the conf file to use Flume to collect text logs and report them to LTS by referring to the following example. For details about the following parameters, see Using Kafka to Report Logs to LTS.

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/test.txt
a1.sources.r1.fileHeader = true
a1.sources.r1.maxBatchCount = 1000

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
a1.sinks.k1.kafka.producer.compression.type = gzip
a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Using Flume to Collect Database Table Data to LTS

You can use Flume to collect database table data and report it to LTS to monitor table data changes. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  1. Download the flume-ng-sql-source plug-in and compress it into a JAR package named flume-ng-sql-source.jar. Before packaging, ensure that the version of flume-ng-core in the POM file is the same as that of Flume to be installed. Then, place the JAR package in the lib directory in the Flume package installation path, for example, FLUME_HOME/lib. Replace FLUME_HOME with the actual installation path.
  2. Add the MySQL driver to the FLUME_HOME/lib directory.

    1. Download the MySQL driver package.
      wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
    2. Decompress the driver package and compress it into a JAR package.
      tar xzf mysql-connector-java-5.1.35.tar.gz
    3. Place the JAR package in the FLUME_HOME/lib/ directory.
      cp mysql-connector-java-5.1.35-bin.jar  FLUME_HOME/lib/

  3. Add the conf file for collecting MySQL data.

    # a1 indicates the agent name.
    # source indicates the input source of a1.
    # channels indicates a buffer.
    # sinks indicates the output destination of a1. In this example, Kafka is used.
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    #source
    a1.sources.r1.type = org.keedio.flume.source.SQLSource
    # Connect to MySQL: Replace {mysql_host} with the IP address of your VM and {database_name} with the database name. You can run the ifconfig or ip addr command to obtain the IP address.
    # Add ?useUnicode=true&characterEncoding=utf-8&useSSL=false to the URL. Otherwise, the connection may fail.
    a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    # Hibernate Database connection properties
    # MySQL account, which generally is root.
    a1.sources.r1.hibernate.connection.user = root
    # Enter your MySQL password.
    a1.sources.r1.hibernate.connection.password = xxxxxxxx
    a1.sources.r1.hibernate.connection.autocommit = true
    # MySQL driver.
    a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    # Store the status file.
    a1.sources.r1.status.file.path = FLUME_HOME/bin
    a1.sources.r1.status.file.name = sqlSource.status
    # Custom query
    # Replace {table_name} with the name of the data table to be collected. You can also use the following method:
    a1.sources.r1.custom.query = select * from {table_name}
    
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000

  4. After being started, Flume collects table data from the database to LTS.

Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS

Syslog is a protocol used to transmit log messages on an IP network. Flume collects logs transmitted using syslog and reports them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • To receive UDP logs, add the conf file for collecting logs transmitted using syslog. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogudp
    # host_port indicates the port number of the syslog server.
    a1.sources.r1.port = {host_port}
    # host_ip indicates the IP address of the syslog server.
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1
  • To receive TCP logs, add the conf file for collecting logs transmitted using syslog. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogtcp
    # host_port indicates the port number of the syslog server.
    a1.sources.r1.port = {host_port}
    # host_ip indicates the IP address of the syslog server.
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1

Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS

You can use Flume to collect logs transmitted with the TCP/UDP protocol and report them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • To collect TCP port logs, add the conf file of the collection port. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • To collect UDP port logs, add the conf file of the collection port. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS

You can use Flume to collect device management data reported with SNMP and send the data to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • Listen to port number 161 of the SNMP communication by adding the conf file for receiving SNMP logs. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 161
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • Listen to port number 162 of SNMP trap communication by adding the conf file for receiving SNMP logs. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 162
    
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Using Default Interceptors to Process Logs

In Flume, an interceptor is a simple pluggable component between a source and channels. Before a source writes received events to channels, the interceptor can convert or delete the events. Each interceptor processes only the events received by a given source.

  • Timestamp interceptor

    This interceptor inserts a timestamp into Flume event headers. If no interceptor is used, Flume receives only messages. To configure the connection between a source and a timestamp interceptor, set type to timestamp and the class name full path preserveExisting to false. You can also set preserveExisting to true, indicating the value of the timestamp header will not be replaced if the header already exists in the event. Configuration example:

    a1.sources.r1.interceptors = timestamp 
    a1.sources.r1.interceptors.timestamp.type=timestamp 
    a1.sources.r1.interceptors.timestamp.preserveExisting=false
  • Regex filtering interceptor

    This interceptor filters out unnecessary logs during collection or selectively collects logs that meet a specified regular expression. Set type to REGEX_FILTER. If you set excludeEvents to false, events that match a specified regular expression are collected. If you set excludeEvents to true, matched events are deleted and unmatched events are collected. Example configuration for connecting a source to a regex filtering interceptor:

    a1.sources.r1.interceptors = regex 
    a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
    a1.sources.r1.interceptors.regex.regex=(today)|(Monday) 
    a1.sources.r1.interceptors.regex.excludeEvents=false

    In this way, the configured interceptor collects only the logs containing today or Monday in the log messages.

  • Search and replace interceptor

    This interceptor provides a simple string-based search and replacement function based on Java regular expressions. Configuration example:

    # Interceptor alias
    a1.sources.r1.interceptors = search-replace
    # Interceptor type. The value must be search_replace.
    a1.sources.r1.interceptors.search-replace.type = search_replace
    
    # Delete characters from the event body and match the event content based on a specified regular expression.
    a1.sources.r1.interceptors.search-replace.searchPattern = today
    # Replace the matched event content.
    a1.sources.r1.interceptors.search-replace.replaceString = yesterday
    # Set a character set. The default value is utf8.
    a1.sources.r1.interceptors.search-replace.charset = utf8

Using a Custom Interceptor to Process Logs

The following uses Java as an example to describe how to customize an interceptor in Flume. FLUME_HOME specifies where Flume is installed. It is /tools/flume in this example and needs to be replaced with the actual Flume installation directory.

  1. Create a Maven project and introduce the Flume dependency.

    Ensure that the dependency matches the Flume version in the target cluster.

    <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.10.1</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

    The dependency does not need to be packaged into the final JAR package. Therefore, set the scope to provided.

  2. Create a class to implement the Interceptor interface and related methods.

    • initialize() method: initializes the interceptor, reads configuration information, and establishes connections.
    • intercept(Event event) method: intercepts and processes a single event, and receives an event object as input and returns a modified event object.
    • intercept(List<Event> list) method: processes events in a batch, and intercepts and processes the event list.
    • close () method: closes the interceptor, releases the resources, and closes the connections.
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
        @Override
        public Event intercept(Event event) {
    
            // Obtain event data.
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // Check whether the event data contains a specified string.
            if (eventData.contains("hello")) {
                // If an event contains the specified string, the event is excluded and null is returned.
                return null;
            }
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            // Create a new list to store the processed events.
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    }

  3. Build an interceptor. The creation and configuration of an interceptor is usually implemented in Builder mode. The complete code is as follows:

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
        }
        @Override
        public Event intercept(Event event) {
            // Obtain event data.
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // Check whether the event data contains a specified string.
            if (eventData.contains("hello")) {
                // If an event contains the specified string, the event is excluded and null is returned.
                return null;
            }
            return event;
        }
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    // Build an interceptor.
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public void configure(Context context) {
    
            }
    
            @Override
            public Interceptor build() {
                return new TestInterceptor();
            }
    
        }
    
    }

  4. Convert the interceptor to a JAR package and upload it to the lib folder in your Flume installation directory.
  5. Compile the configuration file and configure the custom interceptor in it.

    When configuring the full class name of the interceptor, note that the format is Full class name of the interceptor$Builder.

    # Configuration of the interceptor.
    #Definition of the interceptor.
    a1.sources.r1.interceptors = i1
    # Full class name of the interceptor.
    a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

  6. Run Flume.
KV parsing logs: Strings are separated by spaces and converted into strings of the Map type.
public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Obtain event data.
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> splitMap = new HashMap<>();
        String[] splitList = eventData.split(" ");
        for (int i = 0; i < splitList.length; i++) {
            splitMap.put("field" + i, splitList[i].trim());
        }
        eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}

Enriching Logs with External Data Sources and Reporting the Logs to LTS

An event is the basic unit for Flume to transmit data from a source to a destination. An event consists of a header and a body. The header stores certain attributes of the event in the K-V structure, and the body stores the data in the byte array format.

If an external data source is available and you want to enrich the log content, such as modifying or deleting log content, or adding fields, add the content changes to the event's body so that Flume can report them to LTS. The following example shows how to use Java to extend the log content. For details about the following parameters, see Using Kafka to Report Logs to LTS.

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Obtain the event data, convert the original data into JSON strings, and add extra fields.
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject object = new JSONObject();
        object.put("content", eventData);
        object.put("workLoadType", "RelipcaSet");
        eventData = object.toJSONString();
        eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}