Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ Log Tank Service/ User Guide/ Log Ingestion/ Using Flume to Report Logs to LTS

Using Flume to Report Logs to LTS

Updated on 2025-01-21 GMT+08:00

Flume is a reliable, high-availability, and distributed system for collecting, aggregating, and transporting massive logs. It allows you to customize various data senders in a log system for better data collection. Flume can also process data simply and write data to various data receivers.

You can collect logs using Flume and report logs using the Kafka protocol supported by LTS. The following are some common data collection scenarios:

  1. Using Flume to Collect Text Logs to LTS
  2. Using Flume to Collect Database Table Data to LTS
  3. Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS
  4. Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS
  5. Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS
  6. Using Default Interceptors to Process Logs
  7. Using a Custom Interceptor to Process Logs
  8. Enriching Logs with External Data Sources and Reporting the Logs to LTS

Prerequisites

  • The Java Development Kit (JDK) has been installed on the host.
  • Flume has been installed and the JDK path has been configured in the Flume configuration file.

Using Flume to Collect Text Logs to LTS

You can add the conf file to use Flume to collect text logs and report them to LTS by referring to the following example. For details about the following parameters, see Using Kafka to Report Logs to LTS.

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/test.txt
a1.sources.r1.fileHeader = true
a1.sources.r1.maxBatchCount = 1000

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
a1.sinks.k1.kafka.producer.compression.type = gzip
a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Using Flume to Collect Database Table Data to LTS

You can use Flume to collect database table data and report it to LTS to monitor table data changes. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  1. Download the flume-ng-sql-source plug-in and compress it into a JAR package named flume-ng-sql-source.jar. Before packaging, ensure that the version of flume-ng-core in the POM file is the same as that of Flume to be installed. Then, place the JAR package in the lib directory in the Flume package installation path, for example, FLUME_HOME/lib. Replace FLUME_HOME with the actual installation path.
  2. Add the MySQL driver to the FLUME_HOME/lib directory.

    1. Download the MySQL driver package.
      wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
    2. Decompress the driver package and compress it into a JAR package.
      tar xzf mysql-connector-java-5.1.35.tar.gz
    3. Place the JAR package in the FLUME_HOME/lib/ directory.
      cp mysql-connector-java-5.1.35-bin.jar  FLUME_HOME/lib/

  3. Add the conf file for collecting MySQL data.

    # a1 indicates the agent name.
    # source indicates the input source of a1.
    # channels indicates a buffer.
    # sinks indicates the output destination of a1. In this example, Kafka is used.
    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    #source
    a1.sources.r1.type = org.keedio.flume.source.SQLSource
    # Connect to MySQL: Replace {mysql_host} with the IP address of your VM and {database_name} with the database name. You can run the ifconfig or ip addr command to obtain the IP address.
    # Add ?useUnicode=true&characterEncoding=utf-8&useSSL=false to the URL. Otherwise, the connection may fail.
    a1.sources.r1.hibernate.connection.url = jdbc:mysql://{mysql_host}:3306/{database_name}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    # Hibernate Database connection properties
    # MySQL account, which generally is root.
    a1.sources.r1.hibernate.connection.user = root
    # Enter your MySQL password.
    a1.sources.r1.hibernate.connection.password = xxxxxxxx
    a1.sources.r1.hibernate.connection.autocommit = true
    # MySQL driver.
    a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    # Store the status file.
    a1.sources.r1.status.file.path = FLUME_HOME/bin
    a1.sources.r1.status.file.name = sqlSource.status
    # Custom query
    # Replace {table_name} with the name of the data table to be collected. You can also use the following method:
    a1.sources.r1.custom.query = select * from {table_name}
    
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000

  4. After being started, Flume collects table data from the database to LTS.

Using Flume to Collect Logs Transmitted with the Syslog Protocol to LTS

Syslog is a protocol used to transmit log messages on an IP network. Flume collects logs transmitted using syslog and reports them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • To receive UDP logs, add the conf file for collecting logs transmitted using syslog. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogudp
    # host_port indicates the port number of the syslog server.
    a1.sources.r1.port = {host_port}
    # host_ip indicates the IP address of the syslog server.
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1
  • To receive TCP logs, add the conf file for collecting logs transmitted using syslog. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type=syslogtcp
    # host_port indicates the port number of the syslog server.
    a1.sources.r1.port = {host_port}
    # host_ip indicates the IP address of the syslog server.
    a1.sources.r1.host = {host_ip}
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    
    #Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    a1.sinks.k1.channel = c1

Using Flume to Collect Logs Transmitted with the TCP/UDP Protocol to LTS

You can use Flume to collect logs transmitted with the TCP/UDP protocol and report them to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • To collect TCP port logs, add the conf file of the collection port. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • To collect UDP port logs, add the conf file of the collection port. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = {host_port}
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Using Flume to Collect Device Management Data Reported with the SNMP Protocol to LTS

You can use Flume to collect device management data reported with SNMP and send the data to LTS. For details about the following parameters, see Using Kafka to Report Logs to LTS.

  • Listen to port number 161 of the SNMP communication by adding the conf file for receiving SNMP logs. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 161
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • Listen to port number 162 of SNMP trap communication by adding the conf file for receiving SNMP logs. Example:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    
    a1.sources.r1.type = netcatudp
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 162
    
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ${logGroupId}_${logStreamId}
    a1.sinks.k1.kafka.bootstrap.servers = ${ip}:${port}
    a1.sinks.k1.kafka.producer.acks = 0
    a1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    a1.sinks.k1.kafka.producer.compression.type = gzip
    a1.sinks.k1.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${projectId}" password="${accessKey}#${accessSecret}";
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Using Default Interceptors to Process Logs

In Flume, an interceptor is a simple pluggable component between a source and channels. Before a source writes received events to channels, the interceptor can convert or delete the events. Each interceptor processes only the events received by a given source.

  • Timestamp interceptor

    This interceptor inserts a timestamp into Flume event headers. If no interceptor is used, Flume receives only messages. To configure the connection between a source and a timestamp interceptor, set type to timestamp and the class name full path preserveExisting to false. You can also set preserveExisting to true, indicating the value of the timestamp header will not be replaced if the header already exists in the event. Configuration example:

    a1.sources.r1.interceptors = timestamp 
    a1.sources.r1.interceptors.timestamp.type=timestamp 
    a1.sources.r1.interceptors.timestamp.preserveExisting=false
  • Regex filtering interceptor

    This interceptor filters out unnecessary logs during collection or selectively collects logs that meet a specified regular expression. Set type to REGEX_FILTER. If you set excludeEvents to false, events that match a specified regular expression are collected. If you set excludeEvents to true, matched events are deleted and unmatched events are collected. Example configuration for connecting a source to a regex filtering interceptor:

    a1.sources.r1.interceptors = regex 
    a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
    a1.sources.r1.interceptors.regex.regex=(today)|(Monday) 
    a1.sources.r1.interceptors.regex.excludeEvents=false

    In this way, the configured interceptor collects only the logs containing today or Monday in the log messages.

  • Search and replace interceptor

    This interceptor provides a simple string-based search and replacement function based on Java regular expressions. Configuration example:

    # Interceptor alias
    a1.sources.r1.interceptors = search-replace
    # Interceptor type. The value must be search_replace.
    a1.sources.r1.interceptors.search-replace.type = search_replace
    
    # Delete characters from the event body and match the event content based on a specified regular expression.
    a1.sources.r1.interceptors.search-replace.searchPattern = today
    # Replace the matched event content.
    a1.sources.r1.interceptors.search-replace.replaceString = yesterday
    # Set a character set. The default value is utf8.
    a1.sources.r1.interceptors.search-replace.charset = utf8

Using a Custom Interceptor to Process Logs

The following uses Java as an example to describe how to customize an interceptor in Flume. FLUME_HOME specifies where Flume is installed. It is /tools/flume in this example and needs to be replaced with the actual Flume installation directory.

  1. Create a Maven project and introduce the Flume dependency.

    Ensure that the dependency matches the Flume version in the target cluster.

    <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.10.1</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

    The dependency does not need to be packaged into the final JAR package. Therefore, set the scope to provided.

  2. Create a class to implement the Interceptor interface and related methods.

    • initialize() method: initializes the interceptor, reads configuration information, and establishes connections.
    • intercept(Event event) method: intercepts and processes a single event, and receives an event object as input and returns a modified event object.
    • intercept(List<Event> list) method: processes events in a batch, and intercepts and processes the event list.
    • close () method: closes the interceptor, releases the resources, and closes the connections.
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
        @Override
        public Event intercept(Event event) {
    
            // Obtain event data.
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // Check whether the event data contains a specified string.
            if (eventData.contains("hello")) {
                // If an event contains the specified string, the event is excluded and null is returned.
                return null;
            }
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            // Create a new list to store the processed events.
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    }

  3. Build an interceptor. The creation and configuration of an interceptor is usually implemented in Builder mode. The complete code is as follows:

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestInterceptor implements Interceptor {
        @Override
        public void initialize() {
        }
        @Override
        public Event intercept(Event event) {
            // Obtain event data.
            String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
            // Check whether the event data contains a specified string.
            if (eventData.contains("hello")) {
                // If an event contains the specified string, the event is excluded and null is returned.
                return null;
            }
            return event;
        }
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> interceptedEvents = new ArrayList<>();
            for (Event event : events) {
                Event interceptedEvent = intercept(event);
                if (interceptedEvent != null) {
                    interceptedEvents.add(interceptedEvent);
                }
            }
            return interceptedEvents;
        }
    
        @Override
        public void close() {
    
        }
    
    // Build an interceptor.
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public void configure(Context context) {
    
            }
    
            @Override
            public Interceptor build() {
                return new TestInterceptor();
            }
    
        }
    
    }

  4. Convert the interceptor to a JAR package and upload it to the lib folder in your Flume installation directory.
  5. Compile the configuration file and configure the custom interceptor in it.

    When configuring the full class name of the interceptor, note that the format is Full class name of the interceptor$Builder.

    # Configuration of the interceptor.
    #Definition of the interceptor.
    a1.sources.r1.interceptors = i1
    # Full class name of the interceptor.
    a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

  6. Run Flume.
KV parsing logs: Strings are separated by spaces and converted into strings of the Map type.
public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Obtain event data.
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> splitMap = new HashMap<>();
        String[] splitList = eventData.split(" ");
        for (int i = 0; i < splitList.length; i++) {
            splitMap.put("field" + i, splitList[i].trim());
        }
        eventData.setBody(splitMap.toString().getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}

Enriching Logs with External Data Sources and Reporting the Logs to LTS

An event is the basic unit for Flume to transmit data from a source to a destination. An event consists of a header and a body. The header stores certain attributes of the event in the K-V structure, and the body stores the data in the byte array format.

If an external data source is available and you want to enrich the log content, such as modifying or deleting log content, or adding fields, add the content changes to the event's body so that Flume can report them to LTS. The following example shows how to use Java to extend the log content. For details about the following parameters, see Using Kafka to Report Logs to LTS.

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Obtain the event data, convert the original data into JSON strings, and add extra fields.
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject object = new JSONObject();
        object.put("content", eventData);
        object.put("workLoadType", "RelipcaSet");
        eventData = object.toJSONString();
        eventData.setBody(eventData.getBytes(StandardCharsets.UTF_8));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }
        return interceptedEvents;
    }

    @Override
    public void close() {
    }
}

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback