Kafka SSL集群搭建及测试方法
Published on 2019 - 12 - 26
Kafka SSL集群搭建及测试方法
注:本文档kafka版本为 kafka_2.12-2.3.1 下载地址为这里
搭建集群
Broker的server.properties配置文件如下
broker.id=0
# SSL
listeners=SSL://172.18.14.215:9092
ssl.keystore.location=/data/ssl/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/data/ssl/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
security.inter.broker.protocol=SSL
InsecureSkipVerify=true
# SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
其余节点的broker 只需要注意修改
broker.id
即可
生成证书
#!/bin/sh
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
生产者代码
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/Shopify/sarama"
"io/ioutil"
"log"
"time"
)
//var AddressSSL1 = []string{"172.18.236.78:9092", "172.18.236.79:9092", "172.18.236.80:9092"}
var AddressSSL1 = []string{"172.18.14.215:9092", "172.18.14.215:9093"}
func main() {
for i := 0; i < 10; i++ {
KafkaProducer(AddressSSL1, "testSSl", "a")
}
}
func KafkaProducer(address []string, topic string, sendValue string) {
fmt.Println("producer 启动")
//设置配置
config := sarama.NewConfig()
//加载证书
tlsConfig, err := NewTLSConfigProducer("./ssl/client.cer.pem",
"./ssl/client.key.pem", "./ssl/server.cer.pem")
if err != nil {
log.Fatal(err)
}
//客户端不对服务端验证
tlsConfig.InsecureSkipVerify = true
//启动SSL认证
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
//是否等待成功和失败后的响应,只有上面的RequireAcks(等待服务器所有副本都保存成功后的响应)设置不是NoReponse才有用
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
client, err := sarama.NewClient(address, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
//使用配置,新建一个异步生产者
p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer p.Close()
loopProducer(p, topic, sendValue)
}
func NewTLSConfigProducer(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// 加载客户端证书
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// 加载CA证书
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
func loopProducer(producer sarama.AsyncProducer, topic string, text string) {
text = string(time.Now().Format("15:04:05"))
msg := &sarama.ProducerMessage{
Topic: topic,
Key: nil,
Value: sarama.StringEncoder(text),
Headers: nil,
Metadata: nil,
Offset: 0,
Partition: 0,
Timestamp: time.Now(),
}
producer.Input() <- msg
log.Printf("Produced message: [%s] - %s\n", text,msg.Timestamp.Format("15:04:05"))
}
消费者代码
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
"io/ioutil"
"log"
"os"
"os/signal"
"sync"
)
//var AddressSSL2 = []string{"172.18.236.78:9092", "172.18.236.79:9092", "172.18.236.80:9092"}
var AddressSSL2 = []string{"172.18.14.215:9092","172.18.14.215:9093"}
func main() {
KafkaConsumer(AddressSSL2, "testSSl")
}
func KafkaConsumer(address []string, topics string) {
tlsConfig, err := NewTLSConfigConsumer("./ssl/client.cer.pem",
"./ssl/client.key.pem", "./ssl/server.cer.pem")
if err != nil {
log.Fatal(err)
}
//客户端不对服务端验证
tlsConfig.InsecureSkipVerify = true
//创建一个配置对象
consumerConfig := sarama.NewConfig()
//启动TLS通讯
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
consumerConfig.Version = sarama.V2_3_0_0
//创建一个客户消费者
client, err := sarama.NewClient(address, consumerConfig)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumerLoop(consumer, topics)
}
func consumerLoop(consumer sarama.Consumer, topic string) {
partitions, err := consumer.Partitions(topic)
if err != nil {
log.Println("unable to fetch partition IDs for the topic", topic, err)
return
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
for partition := range partitions {
wg.Add(1)
go func() {
consumePartition(consumer, int32(partition), signals, topic)
wg.Done()
}()
}
wg.Wait()
}
func NewTLSConfigConsumer(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// 加载客户端证书
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// 加载CA证书
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal, topic string) {
log.Println("Receving on partition", partition)
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
log.Println(err)
return
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Println(err)
}
}()
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
//log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)
t := msg.Timestamp
formatTime := t.Format("15:04:05")
log.Printf("%s - %s\n", msg.Value,formatTime)
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
双向认证/单项认证 区别
单项认证
可以实现client对broke的认证,如果是任意的一个broke,client是会拒绝连接的。但是,在kafka应用里面,好像并不是很实用,除了数据的加密。更多情况下,broke要防止任意的client进行连接,不能让client随意连接,那么就需要增加broke对client的认证。
双向认证
顾名思义,是双向的认证,不被信任的broker或生产/消费者 是不会互相连接的,除非证书泄露
踩坑笔记
-
证书生成好后,需要修改 client.cer.pem 删除上下部分,只保留中间部分 不然会报错
"private key does not match public key"
见下面例子:
-----BEGIN CERTIFICATE----- MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2 OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符 -----END CERTIFICATE-----
- 程序在消费和生产的时候出现错误:
2019/12/17 20:02:22 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
kafka 日志中出现的错误是:
[2019-12-17 17:27:24,378] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: 断开的管道 at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:175) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61) at org.apache.kafka.common.network.Selector.doClose(Selector.java:739) at org.apache.kafka.common.network.Selector.close(Selector.java:727) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:520) at org.apache.kafka.common.network.Selector.poll(Selector.java:412) at kafka.network.Processor.poll(SocketServer.scala:551) at kafka.network.Processor.run(SocketServer.scala:468) at java.lang.Thread.run(Thread.java:748)
出现这个的原因是我的证书是自己签名的,不是受信任的权威签名,验证的时候通过不了,需要修改配置文件为
不验证服务器的证书
参数讲解:
设置验证证书InsecureSkipVerify: false, 不验证证书InsecureSkipVerify: true
验证内容: 1.证书用的CN的名称是否一致
2.证书是否是权威的签发 不是权威的话就出现certificate signed by unknown authority
- 请熟读 单向认证/双向认证的区别,整清楚server.pem和clien.pem的位置