Using Flume to Report Logs to LTS
Flume is a reliable, high-availability, and distributed system for collecting, aggregating, and transporting massive logs. It allows you to customize various data senders in a log system for better data collection. Flume can also process data simply and write data to various data receivers.
You can collect logs using Flume and report logs using the Kafka protocol supported by LTS. The following are some common data collection scenarios:
- Using Flume to Collect Text Logs to LTS
- Using Flume to Collect Database Table Data to LTS
- Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS
- Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS
- Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS
- Using Default Interceptors to Process Logs
- Using a Custom Interceptor to Process Logs
- Enriching Logs with External Data Sources and Reporting the Logs to LTS
Prerequisites
- The Java Development Kit (JDK) has been installed on the host.
- Flume has been installed and the JDK path has been configured in the Flume configuration file.
Using Flume to Collect Text Logs to LTS
You can add the conf file to use Flume to collect text logs and report them to LTS by referring to the following example. For details about the following parameters, see Using Kafka to Report Logs to LTS.
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/test.txt a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Using Flume to Collect Database Table Data to LTS
You can use Flume to collect database table data and report it to LTS to monitor table data changes. For details about the following parameters, see Using Kafka to Report Logs to LTS.
- Download the flume-ng-sql-source plug-in and compress it into a JAR package named flume-ng-sql-source.jar. Before packaging, ensure that the version of flume-ng-core in the POM file is the same as that of Flume to be installed. Then, place the JAR package in the lib directory in the Flume package installation path, for example, FLUME_HOME/lib. Replace FLUME_HOME with the actual installation path.
- Add the MySQL driver to the FLUME_HOME/lib directory.
- Download the MySQL driver package.
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
- Decompress the driver package and compress it into a JAR package.
tar xzf mysql-connector-java-5.1.35.tar.gz
- Place the JAR package in the FLUME_HOME/lib/ directory.
cp mysql-connector-java-5.1.35-bin.jar FLUME_HOME/lib/
- Download the MySQL driver package.
- Add the conf file for collecting MySQL data.
# a1 indicates the agent name. # source indicates the input source of a1. # channels indicates a buffer. # sinks indicates the output destination of a1. In this example, Kafka is used. a1.channels = c1 a1.sources = r1 a1.sinks = k1 #source a1.sources.r1.type = org.keedio.flume.source.SQLSource # Connect to MySQL: Replace {mysql_host} with the IP address of your VM and {database_name} with the database name. You can run the ifconfig or ip addr command to obtain the IP address. # Add ?useUnicode=true&characterEncoding=utf-8&useSSL=false to the URL. Otherwise, the connection may fail. a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false # Hibernate Database connection properties # MySQL account, which generally is root. a1.sources.r1.hibernate.connection.user = root # Enter your MySQL password. a1.sources.r1.hibernate.connection.password = xxxxxxxx a1.sources.r1.hibernate.connection.autocommit = true # MySQL driver. a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver # Store the status file. a1.sources.r1.status.file.path = FLUME_HOME/bin a1.sources.r1.status.file.name = sqlSource.status # Custom query # Replace {table_name} with the name of the data table to be collected. You can also use the following method: a1.sources.r1.custom.query = select * from {table_name} #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
- After being started, Flume collects table data from the database to LTS.
Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS
Syslog is a protocol used to transmit log messages on an IP network. Flume collects logs transmitted using syslog and reports them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.
- To receive UDP logs, add the conf file for collecting logs transmitted using syslog. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogudp # host_port indicates the port number of the syslog server. a1.sources.r1.port = {host_port} # host_ip indicates the IP address of the syslog server. a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
- To receive TCP logs, add the conf file for collecting logs transmitted using syslog. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type=syslogtcp # host_port indicates the port number of the syslog server. a1.sources.r1.port = {host_port} # host_ip indicates the IP address of the syslog server. a1.sources.r1.host = {host_ip} a1.sources.r1.channels = c1 a1.channels.c1.type = memory #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.sinks.k1.channel = c1
Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS
You can use Flume to collect logs transmitted with the TCP/UDP protocol and report them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.
- To collect TCP port logs, add the conf file of the collection port. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- To collect UDP port logs, add the conf file of the collection port. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = {host_port} a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS
You can use Flume to collect device management data reported with SNMP and send the data to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.
- Listen to port number 161 of the SNMP communication by adding the conf file for receiving SNMP logs. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 161 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- Listen to port number 162 of SNMP trap communication by adding the conf file for receiving SNMP logs. Example:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 162 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId} a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port} a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.compression.type = gzip a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}"; a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Using Default Interceptors to Process Logs
In Flume, an interceptor is a simple pluggable component between a source and channels. Before a source writes received events to channels, the interceptor can convert or delete the events. Each interceptor processes only the events received by a given source.
- Timestamp interceptor
This interceptor inserts a timestamp into Flume event headers. If no interceptor is used, Flume receives only messages. To configure the connection between a source and a timestamp interceptor, set type to timestamp and the class name full path preserveExisting to false. You can also set preserveExisting to true, indicating the value of the timestamp header will not be replaced if the header already exists in the event. Configuration example:
a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false
- Regex filtering interceptor
This interceptor filters out unnecessary logs during collection or selectively collects logs that meet a specified regular expression. Set type to REGEX_FILTER. If you set excludeEvents to false, events that match a specified regular expression are collected. If you set excludeEvents to true, matched events are deleted and unmatched events are collected. Example configuration for connecting a source to a regex filtering interceptor:
a1.sources.r1.interceptors = regex a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=(today)|(Monday) a1.sources.r1.interceptors.regex.excludeEvents=false
In this way, the configured interceptor collects only the logs containing today or Monday in the log messages.
- Search and replace interceptor
This interceptor provides a simple string-based search and replacement function based on Java regular expressions. Configuration example:
# Interceptor alias a1.sources.r1.interceptors = search-replace # Interceptor type. The value must be search_replace. a1.sources.r1.interceptors.search-replace.type = search_replace # Delete characters from the event body and match the event content based on a specified regular expression. a1.sources.r1.interceptors.search-replace.searchPattern = today # Replace the matched event content. a1.sources.r1.interceptors.search-replace.replaceString = yesterday # Set a character set. The default value is utf8. a1.sources.r1.interceptors.search-replace.charset = utf8
Using a Custom Interceptor to Process Logs
The following uses Java as an example to describe how to customize an interceptor in Flume. FLUME_HOME specifies where Flume is installed. It is /tools/flume in this example and needs to be replaced with the actual Flume installation directory.
- Create a Maven project and introduce the Flume dependency.
Ensure that the dependency matches the Flume version in the target cluster.
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> </dependencies>
The dependency does not need to be packaged into the final JAR package. Therefore, set the scope to provided.
- Create a class to implement the Interceptor interface and related methods.
- initialize() method: initializes the interceptor, reads configuration information, and establishes connections.
- intercept(Event event) method: intercepts and processes a single event, and receives an event object as input and returns a modified event object.
- intercept(List<Event> list) method: processes events in a batch, and intercepts and processes the event list.
- close () method: closes the interceptor, releases the resources, and closes the connections.
import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // Obtain event data. String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // Check whether the event data contains a specified string. if (eventData.contains("hello")) { // If an event contains the specified string, the event is excluded and null is returned. return null; } return event; } @Override public List<Event> intercept(List<Event> events) { // Create a new list to store the processed events. List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
- Build an interceptor. The creation and configuration of an interceptor is usually implemented in Builder mode. The complete code is as follows:
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // Obtain event data. String eventData = new String(event.getBody(), StandardCharsets.UTF_8); // Check whether the event data contains a specified string. if (eventData.contains("hello")) { // If an event contains the specified string, the event is excluded and null is returned. return null; } return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } // Build an interceptor. public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { } @Override public Interceptor build() { return new TestInterceptor(); } } }
- Convert the interceptor to a JAR package and upload it to the lib folder in your Flume installation directory.
- Compile the configuration file and configure the custom interceptor in it.
When configuring the full class name of the interceptor, note that the format is Full class name of the interceptor$Builder.
# Configuration of the interceptor. #Definition of the interceptor. a1.sources.r1.interceptors = i1 # Full class name of the interceptor. a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder
- Run Flume.
public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // Obtain event data. String eventData = new String(event.getBody(), StandardCharsets.UTF_8); Map<String, Object> splitMap = new HashMap<>(); String[] splitList = eventData.split(" "); for (int i = 0; i < splitList.length; i++) { splitMap.put("field" + i, splitList[i].trim()); } eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
Enriching Logs with External Data Sources and Reporting the Logs to LTS
An event is the basic unit for Flume to transmit data from a source to a destination. An event consists of a header and a body. The header stores certain attributes of the event in the K-V structure, and the body stores the data in the byte array format.
If an external data source is available and you want to enrich the log content, such as modifying or deleting log content, or adding fields, add the content changes to the event's body so that Flume can report them to LTS. The following example shows how to use Java to extend the log content. For details about the following parameters, see Using Kafka to Report Logs to LTS.
import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; public class TestInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // Obtain the event data, convert the original data into JSON strings, and add extra fields. String eventData = new String(event.getBody(), StandardCharsets.UTF_8); JSONObject object = new JSONObject(); object.put("content", eventData); object.put("workLoadType", "RelipcaSet"); eventData = object.toJSONString(); eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8)); return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> interceptedEvents = new ArrayList<>(); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { interceptedEvents.add(interceptedEvent); } } return interceptedEvents; } @Override public void close() { } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot