본문 바로가기
infra/Kafka

[kafka] 스프링부트와 kafka 연동

by hjhello423 2019. 12. 11.

스프링 부트와 카프카를 연동해보자.

 

 

일단 기본 스프링 부트 환경을 구성한 뒤 spring-kafka dependency를 추가 하자.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

 

카프카 설정 파일

package com.hongjun423.kafka.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@PropertySource("classpath:kafka.properties")
public class KafkaConfiguration {

    @Autowired
    private Environment env;

    private Map<String, Object> producerConfig() {
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return config;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
    }
}

 

 

컨트롤러

package com.hongjun423.kafka.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class KafkaController {

    private final KafkaTemplate kafkaTemplate;

    public KafkaController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(String message) {
        if (!StringUtils.isEmpty(message)) kafkaTemplate.send("topic-test1", "Message is " + message);
        log.info(message);
        return ResponseEntity.ok("");
    }
}

 

 

카프카 리스너

package com.hongjun423.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReceiveConfiguration {

    @KafkaListener(topics = "topic-test1", groupId = "console-consumer-1970")
    public void receive(String payload) {
        log.info("received payload='{}'", payload);
    }
}

 


 

이제 스프링 부트 애플리케이션을 실행하고 'ip:8080/send'로 post 형식으로 메시지를 요청해 보자.

post맨을 이용하면 쉽게 테스트가 가능하다.

 

아래 명령어를 참고하여 consumer를 실행시키면 스프링 부트로 request 한 메시지가 출력되는 것을 확인할 수 있다.

[itsme@localhost kafka_2.12-2.3.1]$ ./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic topic-test1 --from-beginning
Message is hello
Message is hello

 


참고

 

반응형

댓글