Kafka SSL集群搭建及测试方法

Published on 2019 - 12 - 26

Kafka SSL集群搭建及测试方法

注:本文档kafka版本为 kafka_2.12-2.3.1 下载地址为这里

搭建集群

Broker的server.properties配置文件如下

broker.id=0

# SSL
listeners=SSL://172.18.14.215:9092
ssl.keystore.location=/data/ssl/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/data/ssl/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
security.inter.broker.protocol=SSL
InsecureSkipVerify=true
# SSL

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

其余节点的broker 只需要注意修改 broker.id 即可

生成证书

#!/bin/sh
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
 
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
 
keytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem

生产者代码

package main

import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "github.com/Shopify/sarama"
    "io/ioutil"
    "log"
    "time"
)

//var AddressSSL1 = []string{"172.18.236.78:9092", "172.18.236.79:9092", "172.18.236.80:9092"}
var AddressSSL1 = []string{"172.18.14.215:9092", "172.18.14.215:9093"}

func main() {

    for i := 0; i < 10; i++ {
        KafkaProducer(AddressSSL1, "testSSl", "a")

    }
}

func KafkaProducer(address []string, topic string, sendValue string) {
    fmt.Println("producer 启动")
    //设置配置
    config := sarama.NewConfig()
    //加载证书
    tlsConfig, err := NewTLSConfigProducer("./ssl/client.cer.pem",
        "./ssl/client.key.pem", "./ssl/server.cer.pem")
    if err != nil {
        log.Fatal(err)
    }
    //客户端不对服务端验证
    tlsConfig.InsecureSkipVerify = true
    //启动SSL认证
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = tlsConfig
    //是否等待成功和失败后的响应,只有上面的RequireAcks(等待服务器所有副本都保存成功后的响应)设置不是NoReponse才有用
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    client, err := sarama.NewClient(address, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }
    //使用配置,新建一个异步生产者
    p, err := sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        log.Fatal(err)
    }
    defer p.Close()
    loopProducer(p, topic, sendValue)
}
func NewTLSConfigProducer(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
    tlsConfig := tls.Config{}

    // 加载客户端证书
    cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
    if err != nil {
        return &tlsConfig, err
    }
    tlsConfig.Certificates = []tls.Certificate{cert}

    // 加载CA证书
    caCert, err := ioutil.ReadFile(caCertFile)
    if err != nil {
        return &tlsConfig, err
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)
    tlsConfig.RootCAs = caCertPool

    tlsConfig.BuildNameToCertificate()
    return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, text string) {

    text = string(time.Now().Format("15:04:05"))

    msg := &sarama.ProducerMessage{
        Topic:     topic,
        Key:       nil,
        Value:     sarama.StringEncoder(text),
        Headers:   nil,
        Metadata:  nil,
        Offset:    0,
        Partition: 0,
        Timestamp: time.Now(),
    }

    producer.Input() <- msg
    log.Printf("Produced message: [%s] - %s\n", text,msg.Timestamp.Format("15:04:05"))
}

消费者代码

package main

import (
    "crypto/tls"
    "crypto/x509"
    "github.com/Shopify/sarama"
    "io/ioutil"
    "log"
    "os"
    "os/signal"
    "sync"
)

//var AddressSSL2 = []string{"172.18.236.78:9092", "172.18.236.79:9092", "172.18.236.80:9092"}
var AddressSSL2 = []string{"172.18.14.215:9092","172.18.14.215:9093"}

func main() {
    KafkaConsumer(AddressSSL2, "testSSl")
}

func KafkaConsumer(address []string, topics string) {
    tlsConfig, err := NewTLSConfigConsumer("./ssl/client.cer.pem",
        "./ssl/client.key.pem", "./ssl/server.cer.pem")
    if err != nil {
        log.Fatal(err)
    }

    //客户端不对服务端验证
    tlsConfig.InsecureSkipVerify = true

    //创建一个配置对象
    consumerConfig := sarama.NewConfig()
    //启动TLS通讯
    consumerConfig.Net.TLS.Enable = true
    consumerConfig.Net.TLS.Config = tlsConfig
    consumerConfig.Version = sarama.V2_3_0_0

    //创建一个客户消费者
    client, err := sarama.NewClient(address, consumerConfig)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    consumer, err := sarama.NewConsumerFromClient(client)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumerLoop(consumer, topics)
}

func consumerLoop(consumer sarama.Consumer, topic string) {
    partitions, err := consumer.Partitions(topic)
    if err != nil {
        log.Println("unable to fetch partition IDs for the topic", topic, err)
        return
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var wg sync.WaitGroup
    for partition := range partitions {
        wg.Add(1)
        go func() {
            consumePartition(consumer, int32(partition), signals, topic)
            wg.Done()
        }()
    }
    wg.Wait()
}

func NewTLSConfigConsumer(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
    tlsConfig := tls.Config{}

    // 加载客户端证书
    cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
    if err != nil {
        return &tlsConfig, err
    }
    tlsConfig.Certificates = []tls.Certificate{cert}

    // 加载CA证书
    caCert, err := ioutil.ReadFile(caCertFile)
    if err != nil {
        return &tlsConfig, err
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)
    tlsConfig.RootCAs = caCertPool

    tlsConfig.BuildNameToCertificate()
    return &tlsConfig, err
}

func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal, topic string) {
    log.Println("Receving on partition", partition)
    partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
    if err != nil {
        log.Println(err)
        return
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Println(err)
        }
    }()

    consumed := 0
ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            //log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)
            t := msg.Timestamp
            formatTime := t.Format("15:04:05")
            log.Printf("%s - %s\n", msg.Value,formatTime)
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }
    log.Printf("Consumed: %d\n", consumed)
}

双向认证/单项认证 区别

单项认证

可以实现client对broke的认证,如果是任意的一个broke,client是会拒绝连接的。但是,在kafka应用里面,好像并不是很实用,除了数据的加密。更多情况下,broke要防止任意的client进行连接,不能让client随意连接,那么就需要增加broke对client的认证。

双向认证

顾名思义,是双向的认证,不被信任的broker或生产/消费者 是不会互相连接的,除非证书泄露

踩坑笔记

  1. 证书生成好后,需要修改 client.cer.pem 删除上下部分,只保留中间部分 不然会报错"private key does not match public key"

    见下面例子:

    -----BEGIN CERTIFICATE-----
    MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJj
    bjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UE
    CwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0y
    ODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UE
    BxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2
    OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符
    -----END CERTIFICATE-----
    
  2. 程序在消费和生产的时候出现错误:
    2019/12/17 20:02:22 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

    kafka 日志中出现的错误是:

    [2019-12-17 17:27:24,378] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
    java.io.IOException: 断开的管道
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212)
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:175)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:739)
    at org.apache.kafka.common.network.Selector.close(Selector.java:727)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:520)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
    at kafka.network.Processor.poll(SocketServer.scala:551)
    at kafka.network.Processor.run(SocketServer.scala:468)
    at java.lang.Thread.run(Thread.java:748)
    

    出现这个的原因是我的证书是自己签名的,不是受信任的权威签名,验证的时候通过不了,需要修改配置文件为 不验证服务器的证书

    InsecureSkipVerify: true

    参数讲解:
    设置验证证书InsecureSkipVerify: false, 不验证证书InsecureSkipVerify: true
    验证内容:

    ​ 1.证书用的CN的名称是否一致

    ​ 2.证书是否是权威的签发 不是权威的话就出现certificate signed by unknown authority

  3. 请熟读 单向认证/双向认证的区别,整清楚server.pem和clien.pem的位置