Implementing 2-way SSL and ACL authentication for Kafka with Spring boot client configuration
This is a guide for implementing Apache Kafka’s ACL (Access Control List) using 2-way SSL authentication. In this guide the Kafka cluster is a simple zookeeper and 2 brokers running on the same server “Kafka Server” and my client is a JAVA spring boot application running in a tomcat environment on a different server “Spring boot server”.
Disclaimer: This is not a guide on the basics of any of the above technologies. It does not give instructions on the basics of setting up kafka with spring boot.
Versions: Kafka = 2.13–2.8.1, Cent OS 8, Tomcat 8
Tips: Reload server after config or service files are edited or a new certificate is added to key / trust store (systemctl daemon-reload)
BRIEF ON ACL (Access Control Lists) FOR KAFKA: ACL is a really cool security authorization system that allows different access configurations to different users. With ACL we can specify that user ‘A’ has only read, write access to a specific topic ‘B’ in our kafka cluster.
PART A: SSL CONFIGURATION
As we will be authenticating between our “Kafka Server” and “Spring boot server” using SSL, each system will require a central authority certificate, a keystore, a truststore and a signed certificate. For organizational purposes you can create a directory /kafka-ssl and execute from there.
KAFKA SERVER
1) Generate a central authority key.
2) Generate a central authority certificate specifying the number of days you want it to last.
1) openssl genrsa -aes128 -out ca.key 2048
2) openssl req -new -x509 -days 999 -key ca.key -out ca.crt
3) Generate a keystore. If your sever IP has a domain, use this as your -dname argument. This kafka server does not have a domain so we will use the hostname (kafkatestbed.com). Specify the password for your keystore (I will use 123456)
//To retrieve server hostname: nano /proc/sys/kernel/hostname
3) keytool -genkey -keystore kafka.server.keystore -alias kafkaserver -dname CN=kafkatestbed.com -keyalg RSA -validity 999 -storepass 123456
4) Generate a certificate request (an unsigned certificate). The naming convention is your choice.
5) Sign this certificate using your Central Authority.
4) keytool -certreq -keystore kafka.server.keystore -alias kafkaserver -file kafka.server.unsigned.crt -storepass 123456
5) openssl x509 -req -CA ca.crt -CAkey ca.key -in kafka.server.unsigned.crt -out kafka.server.crt -days 999 -CAcreateserial
6) Import your central authority certificate and signed certificate to keystore
keytool -importcert -file ca.crt -alias CARoot -keystore kafka.server.keystore -storepass 123456 -noprompt //For central authority certificatekeytool -importcert -file kafka.server.crt -alias kafkaserver -keystore kafka.server.keystore -storepass 123456 -noprompt //For signed certificate
7) Import your central authority certificate and signed certificate to a truststore (This will create the truststore).
keytool -import -file kafka.server.crt -alias kafkaserver -keystore kafka.server.truststorekeytool -import -file ca.crt -alias CARoot -keystore kafka.server.truststore
At this point if we list all content of the /kafka-ssl directory we should see the following.
ca.crt ca.key ca.srl kafka.server.crt kafka.server.keystore
kafka.server.truststore kafka.server.unsigned.crt
SPRING BOOT SERVER
We will repeat the same process 1–7 above in a similar /kafka-ssl directory. This server has a domain name (we will use a dummy staging.com)
1) openssl genrsa -aes128 -out ca.key //Central Authority Key
2) openssl req -new -x509 -days 999 -key ca.key -out ca.crt //CA certificate
3) keystore -genkey -keystore staging.com.keystore -alias staging -dname CN=staging.com -keyalg RSA -validity 999 -storepass 123456 //Keystore
4) keystore -certreq -keystore staging.com.keystore -alias staging -file staging.com.unsigned.crt -storepass 123456 //cert request
5) openssl x509 -req -CA ca.crt -CAkey ca.key -in staging.com.unsigned.crt -out staging.com.crt -days 999 -CAcreateserial //sign cert request
6a) keytool -importcert -file ca.crt -alias CARoot -keystore staging.com.keystore -storepass 123456 -noprompt //import CA into keystore
6b) keytool -importcert -file staging.com.crt -alias CARoot -keystore staging.com.keystore -storepass 123456 -noprompt //import signed cert into keystore
7a) keytool -import -file staging.com.crt -alias staging -keystore staging.com.truststore //import signed cert into truststore
7b) keytool -import -file ca.crt -alias CARoot -keystore staging.com.truststore //import CA cert into truststore
SERVER HANDSHAKE
We will trust the central authority and signed certificates of each server in the others truststore. If this is not completely done your SSL handshake will fail with a possible error of ‘Empty certificate chain’ on the kafka server or ‘Unexpected message — Server hello’ on spring boot server. You can copy certificates by simply copy and paste.
ON KAFKA SERVER (in the same /kafka-ssl)
1) nano staging.com.crt //creates cert file, paste staging.com.crt contents from spring boot server here and save.2) nano staging.ca.crt //creates cert file, paste ca.crt contents from spring boot server here and save.3) keytool -import -file staging.ca.crt -alias stagingCARoot -keystore kafka.server.truststore //Trust CA of spring boot server4) keytool -import -file staging.com.crt -alias staging -keystore kafka.server.truststore //Trust cert of spring boot server
ON SPRING BOOT SERVER (reverse of above)
1) nano kafka.server.crt //paste from kafka server and save
2) nano kafka.ca.crt //paste from kafka server and save
3) keytool -import -file kafka.ca.crt -alias kafkaCARoot -keystore staging.com.truststore //Trust CA of kafka server
4) keytool -import -file kafka.server.crt -alias kafkaserver -keystore staging.com.truststore //Trust cert of kafka server
KAFKA CONFIGURATION
Here we will point kafka to our key and truststore, configure SSL settings and set up ACL authorization. Once this is complete, 2-way SSL authentication will work between both severs but ACL will restrict spring boot server from making any commands to the kafka cluster. The error message from ACL authorizer is logged at /logs/kafka-authorizer.log in your kafka directory. A helpful tip is to set logging level to DEBUG inorder to view authorized access logs. It will also be helpful to do the same for /logs/server.log
//FOR AUTHORIZER LOGS ie /logs/kafka-authorizer.log
//from kafka directory
nano /config/log4j.properties
//at the last lines set
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender//FOR BROKER LOGS ie /logs/server.log
//from kafka directory
nano /config/tools-log4j.properties
//set
log4j.rootLogger=DEBUG, stderr
In the server.properties of our broker we add the following configurations. Both brokers on the same sever will use the same configurations except for listening / advertised listening port (the second uses 9093)
listener=SSL://:9092
#I have set the default 9092 port to use SSL authentication. If a #client uses plaintext on 9092 it will throw a SSL handshake error #with possible error - data transfer limit exceeded, using #plaintext?
advertised.listeners=SSL://:9092
inter.broker.listener.name=SSL#Add the following sections
####ACL####
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#This is what tells kafka to use ACL to further authenticate SSL #users
#allow.everyone.if.no.acl.found=true
#Enable the above, if you want all or ANONYMOUS users to have access to topics or consumer groups that have no ACL rules.
super.users=User:CN=localhost
#Super users skip ACL verification. I will allow local users to #be a super user. Full config shown below####SSL####
security.protocol=SSL
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.client.auth=required
ssl.truststore.location=/kafka-ssl/kafka.server.truststore
ssl.truststore.password=123456
ssl.keystore.location=/kafka-ssl/kafka.server.keystore
ssl.keystore.password=123456
ssl.endpoint.identification.algorithm=
#The above disables server host name against certificate #verification. Default is https
Reload server and restart kafka brokers.
We will digress to complete setup for a local user CN=localhost
Create cert signing request for CN=localhost, sign certificate and import it to keystore and truststoreThis will allow SSL handshake with 127.0.0.1
From authorizer logs you will read "User:CN=localhost is a superuser, skipping ACL configuration"Kafka also allows command specific configs pointing to different truststores and keystoreseg I can create a config file /config/root.properties with path to a different keystore, truststore, security protocol, password and pass it as an argument in a kafka command eg to list topics/usr/bin/sh kafka-topics.sh --bootstrap-server=localhost:9092 --list --command-config ../config/root.properties
The above is executed from the bin folder in kafka directory
End of digression
SET UP KAFKA ACL FOR SPRING BOOT SERVER
I will configure ACL to allow my spring boot server carry out all operations for all consumer groups, all topics and all transactions.
//From bin folder in kafka directory
/usr/bin/sh kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:CN=staging.com --operation All --group '*' --topic '*' --transactional-id '*'//For list of acl commands run kafka-acls.sh --help
Operation All gives access to all operations including an important operation 'ClusterAction' which is necessary for the KafkaAdminClient on the springboot application.
CONFIGURING SPRING BOOT APPLICATION
I will add the following to my application.properties
These variables will be provided by my tomcat container.
//application.properties
spring.kafka.bootstrap.server=${KAFKA_SERVER}
spring.kafka.security.protocol=${KAFKA_SECURITY_PROTOCOL}
spring.kafka.ssl.trust-store-location=${CONSUMER_SSL_TRUSTSTORE_LOCATION}
spring.kafka.ssl.trust-store-password=${CONSUMER_SSL_TRUSTSTORE_PASSWORD}
spring.kafka.ssl.key-store-location=${CONSUMER_SSL_KEYSTORE_LOCATION}
spring.kafka.ssl.key-store-password=${CONSUMER_SSL_KEYSTORE_PASSWORD}
spring.kafka.ssl.key-password=${CONSUMER_SSL_KEY_PASSWORD}
spring.kafka.properties.ssl.endpoint.identification.algorithm=
An important thing to note is how tomcat reads path from application.properties. Tomcat (v8) by default will append classpath to provided paths, we kept our keystore on /kafka-ssl so this does not work. A prefix of file:// on tomcat env variables tells tomcat that our path is absolute. We will add the following variables to our tomcat service file
//tomcat service file on spring boot server
#KAFKA SSL CONFIG
Environment=CONSUMER_SSL_TRUSTSTORE_LOCATION=file:///kafka-ssl/staging.com.truststore
Environment=CONSUMER_SSL_TRUSTSTORE_PASSWORD=123456
Environment=CONSUMER_SSL_KEYSTORE_LOCATION=file:///kafka-ssl/staging.com.keystore
Environment=CONSUMER_SSL_KEYSTORE_PASSWORD=123456
Environment=CONSUMER_SSL_KEY_PASSWORD=123456
Environment=KAFKA_SECURITY_PROTOCOL=SSL
As with any kafka client, we have to primarily consider three things — KafkaAdminClient, KafkaConsumer, KafkaProducer. For our springboot app we will need to update the consumer factory and producer factory to use our ssl configuration. We will use the config in our application.properties, remember we added a “file://”prefix for tomcat? this is what the admin client will use to target the keystores but since we are configuring consumer and producer in our JAVA code, JVM will read our path as absolute and throw IO Error ‘nosuchfileexception file:/kafka-ssl/staging.com.truststore’ so we will remove our prefix in JAVA code below
KAFKA CONSUMER CONFIG
//class in spring boot application
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.ssl.trust-store-password}")
private String trustStorePassword;
@Value("${spring.kafka.ssl.trust-store-location:/jdjd}")
private String trustStoreLocation;
@Value("${CONSUMER_SSL_KEYSTORE_LOCATION}")
private String keyStoreLocation;
@Value("${CONSUMER_SSL_KEYSTORE_PASSWORD}")
private String keyStorePassword;
@Autowired
private ConsumerFactory<Integer, String> consumerFactory;
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>
(consumerFactory.getConfigurationProperties());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
checkPath(keyStoreLocation));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
keyStorePassword);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
checkPath(trustStoreLocation));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
trustStorePassword);
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
securityProtocol);
return props;
}
String checkPath(String keystorePath) {
if (keystorePath.startsWith("file://"))
return keystorePath.replaceFirst("file://", "");
else if (keystorePath.startsWith("file:"))
return keystorePath.replaceFirst("file:", "");
return keystorePath;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
return factory;
}
}
KAFKA PRODUCER CONFIG
We will do something similar here
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.ssl.trust-store-password}")
private String trustStorePassword;
@Value("${spring.kafka.ssl.trust-store-location}")
private String trustStoreLocation;
@Value("${CONSUMER_SSL_KEYSTORE_LOCATION}")
private String keyStoreLocation;
@Value("${CONSUMER_SSL_KEYSTORE_PASSWORD}")
private String keyStorePassword;
@Autowired
private ProducerFactory<Integer, String> producerFactory;
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>(producerFactory.getConfigurationProperties());
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, checkPath(keyStoreLocation));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, checkPath(trustStoreLocation));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
return props;
}
String checkPath(String keystorePath) {
if (keystorePath.startsWith("file://")) return keystorePath.replaceFirst("file://", "");
else if (keystorePath.startsWith("file:")) return keystorePath.replaceFirst("file:", "");
return keystorePath;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
}
With this configurations, we will have ACL security on SSL authentication between our kafka cluster and spring boot client. It may be a lengthy process but it only needs to be configured once. Minor updates to ACL rules can be carried out to your adjustment. If you are having errors concerning SSL handshake at this point verify you have exchanged and trusted your certificates. Enabling DEBUG log level is pivotal in trouble shooting. Goodluck!