Updated on 2024-04-02 GMT+08:00

SASL Kerberos Authentication

In a cluster with the security mode enabled, the components must be mutually authenticated before communicating with each other to ensure communication security. The Kafka, ZooKeeper, and Kerberos security authentications are required for Kafka application development. However, you only need to generate one JAAS file and configure related environment variables accordingly. LoginUtil related APIs can be used to complete these configurations.

Sample Code

The code snippets are contained in the LoginUtil class of the com.huawei.bigdata.kafka.example.security package.

    /**
     * keytab file name of the machine-machine account that the user applies for 
     */
    private static final String USER_KEYTAB_FILE = "Keytab file name of the machine-machine account that the user applies for, for example, user.keytab";
    
   /**
    * Machine-machine account that the user applies for
    */
    private static final String USER_PRINCIPAL = "Machine-machine account that the user applies for";

public static void securityPrepare() throws IOException
    {
        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;       
        String krbFile = filePath + "krb5.conf";
        String userKeyTableFile = filePath + USER_KEYTAB_FILE;
        
        //Replace separators in the Windows path.
        userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
        krbFile = krbFile.replace("\\", "\\\\");
        
        LoginUtil.setKrb5Config(krbFile);
        LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.<System domain name>");
        LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
    }

Log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain, which is the current system domain name.

Function Description

The following code snippets are used in the com.huawei.bigdata.kafka.example.WordCountProcessorDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Code Sample

private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
    @Override
    public Processor<String, String> get() {
        return new Processor<String, String>() {
            // ProcessorContext instance, which provides the access of the metadata of the records being processed
            private ProcessorContext context;
            private KeyValueStore<String, Integer> kvStore;

            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                // Save processor context in the local host, because it will be used for punctuate() and commit().
                this.context = context;
                // Execute punctuate() once every second.
                this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                    try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                        System.out.println("----------- " + timestamp + " ----------- ");
                        while (iter.hasNext()) {
                            final KeyValue<String, Integer> entry = iter.next();
                            System.out.println("[" + entry.key + ", " + entry.value + "]");
                            // Send the new records to the downstream processor as key-value pairs.
                            context.forward(entry.key, entry.value.toString());
                        }
                    }
                });
                // Search for the key-value states storage area named KEY_VALUE_STATE_STORE_NAME to memorize the recently received input records.
                this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME);
            }

            // Process the receiving records of input topic. Split the records into words, and count the words.
            @Override
            public void process(String dummy, String line) {
                String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);

                for (String word : words) {
                    Integer oldValue = this.kvStore.get(word);

                    if (oldValue == null) {
                        this.kvStore.put(word, 1);
                    } else {
                        this.kvStore.put(word, oldValue + 1);
                    }
                }
            }

            @Override
            public void close() {
            }
        };
    }
}