首页  编辑  

Kafka集成及sasl认证

Tags: /Java/   Date Created:
添加依赖(pom.xml)
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
配置application.yml,我们对Kafka的key按字符串处理,对value按JSON承载格式进行序列化和反序列化:
spring:
  kafka:
    client_id: xxx_main_xxxx_command_svc_client
    bootstrap-servers: 127.0.0.1:9092
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";
    producer:
      bootstrap-servers: 127.0.0.1:9092
      topic: xxx-xxxx-xxxx-topic
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: 1
      properties:
        enable:
          idempotence: false
    consumer:
      topic: xxx-xxxx-xxxx-topic
      group-id: xxx-main-xxxx-composite-svc-consumer-groupId
      auto-commit-interval: 1S
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 5
      ack-mode: manual_immediate
      missing-topics-fatal: false
Kafka配置:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

  @Value("${spring.kafka.producer.bootstrap-servers}")
  private String server;
  @Value("${spring.kafka.producer.key-serializer}")
  private String keySerializer;
  @Value("${spring.kafka.producer.value-serializer}")
  private String valueSerializer;
  @Value("${spring.kafka.consumer.key-deserializer}")
  private String keyDeserializer;
  @Value("${spring.kafka.consumer.value-deserializer}")
  private String valueDeserializer;
  @Value("${spring.kafka.properties.security.protocol}")
  private String securityProtocol;
  @Value("${spring.kafka.properties.sasl.mechanism}")
  private String saslMechanism;
  @Value("${spring.kafka.consumer.group-id}")
  private String groupId;
  @Value("${spring.kafka.producer.properties.enable.idempotence}")
  private String enableIdempotence;
  @Value("${spring.kafka.client_id}")
  private String clientId;
  @Value("${spring.kafka.properties.sasl.jaas.config}")
  private String saslConfig;
  @Value("${shsop.kafka.sasl:true}")
  private Boolean kafkaSasl;


  @Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    if (Boolean.TRUE.equals(kafkaSasl)) {
      config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
      config.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
      config.put(SaslConfigs.SASL_JAAS_CONFIG, saslConfig);
    }
    return new DefaultKafkaConsumerFactory<>(config);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
  }

  @Bean
  public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    config.put(ProducerConfig.RETRIES_CONFIG, 3);
    if (Boolean.TRUE.equals(kafkaSasl)) {
      config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
      config.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
      config.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
      config.put(SaslConfigs.SASL_JAAS_CONFIG, saslConfig);
    }
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
    config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, -1);
    return new DefaultKafkaProducerFactory<>(config);
  }

  @Bean
  public KafkaProducer<Object, Object> kafkaProducer() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    if (Boolean.TRUE.equals(kafkaSasl)) {
      config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
      config.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
      config.put(SaslConfigs.SASL_JAAS_CONFIG, saslConfig);
    }
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
    config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, -1);
    return new KafkaProducer<>(config);
  }

  @Bean
  public KafkaTemplate<Object, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
}
使用:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${spring.kafka.producer.topic}")
private String topic;

ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(topic, object);
future.addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
        // 消息发送成功的处理逻辑
        System.out.println("Message sent successfully: " + result.getRecordMetadata().toString());
    }

    @Override
    public void onFailure(Throwable ex) {
        // 消息发送失败的处理逻辑
        System.err.println("Message sent failed: " + ex.getMessage());
    }
});
由于Kafka消息发送是异步的,上述是异步处理发送结果的方法,如果要同步等待发送结果,可以使用下面的方法,用future.get替代 addCallback:
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(topic, object);
try {
    SendResult<Object, Object> result = future.get();
} catch(Exception e) {
    e.printStackTrace();
}