스프링 부트와 카프카를 연동해보자.
일단 기본 스프링 부트 환경을 구성한 뒤 spring-kafka dependency를 추가 하자.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
카프카 설정 파일
package com.hongjun423.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@PropertySource("classpath:kafka.properties")
public class KafkaConfiguration {
@Autowired
private Environment env;
private Map<String, Object> producerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return config;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
}
컨트롤러
package com.hongjun423.kafka.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaController {
private final KafkaTemplate kafkaTemplate;
public KafkaController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/send")
public ResponseEntity<String> sendMessage(String message) {
if (!StringUtils.isEmpty(message)) kafkaTemplate.send("topic-test1", "Message is " + message);
log.info(message);
return ResponseEntity.ok("");
}
}
카프카 리스너
package com.hongjun423.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ReceiveConfiguration {
@KafkaListener(topics = "topic-test1", groupId = "console-consumer-1970")
public void receive(String payload) {
log.info("received payload='{}'", payload);
}
}
이제 스프링 부트 애플리케이션을 실행하고 'ip:8080/send'로 post 형식으로 메시지를 요청해 보자.
post맨을 이용하면 쉽게 테스트가 가능하다.
아래 명령어를 참고하여 consumer를 실행시키면 스프링 부트로 request 한 메시지가 출력되는 것을 확인할 수 있다.
[itsme@localhost kafka_2.12-2.3.1]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test1 --from-beginning
Message is hello
Message is hello
참고
- git : https://github.com/hongjun423/kafkaStudy.git
- https://wedul.site/574
- https://seungwoo0429.tistory.com/29
반응형
'infra > Kafka' 카테고리의 다른 글
[Kafka] 아주 기초적인 Kafka 톺아보기 (0) | 2022.04.10 |
---|---|
[kafka] 아파치 카프카 설치하기 with zookeeper (0) | 2019.12.09 |
댓글