Spring Cloud Stream 기반으로 처리되는 구독(Subscribe) 예제 5종을 제공하며 메시지 브로커의 메시지를 전달 받아 다양한 방법으로 처리한다.
| 순번 | 유형 | Messsage Broker | 기반 | 구독 데이터 | UI URL | 비고 |
|---|---|---|---|---|---|---|
| 1 | IOT | RabbitMQ | Spring Cloud Stream | Websocket(Google Chart) 출력 | /sensor | |
| 2 | Log | Apache Kafka | OpenSearch | OpenDashboard로 출력 | OpenLogstash로 자료 처리 및 연동 | |
| 3 | File | Apache Kafka | Spring Cloud Stream | Websocket이용하여 출력 | /file_line | |
| 4 | DB | Apache Kafka | Spring Cloud Stream | Websocket(Google Chart) 출력 | /db | |
| 5 | Open API | Apache Kafka | Spring Cloud Stream | MongoDB 저장 |
메시지 브로커에서 온도 및 습도 정보를 구독하여 관련 데이타를 가져온 다음 Websocket을 이용하여 화면에 구글차트로 결과를 보여준다.
| 유형 | 대상소스명 | 설명 |
|---|---|---|
| Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 온/습도 데이타 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 온/습도 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovSensor.html | 온/습도 수신 데이타 화면 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/SensorWebSocketHandler.java | 온/습도 데이타 웹소켓 설정 |
spring:
cloud:
stream:
bindings:
sensorThermo-in-0:
destination: sensor
binder: rabbit
function:
definition: sensorThermo
메시지 브로커에서 라인기반 파일 정보를 구독하여 관련 데이타를 가져온 Websocket을 이용하여 화면에 결과를 보여준다.
| 유형 | 대상소스명 | 설명 |
|---|---|---|
| Consumer Bean | /src/main/java/egovframework/webflux/stream/sensor/EgovConsumerSensorThermohygrometer.java | 라인기반 파일 데이타 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | 라인기반 파일 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerFileLine.html | 라인기반 파일 구독 데이타 결과 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/FileWebSocketHandler.java | 파일 데이타 웹소켓 설정 |
spring:
cloud:
stream:
bindings:
fileLine-in-0:
destination: line-topic
binder: kafka
function:
definition: fileLine
Elastic Search가 상용화 되면서 해당 OSS(Open Source Software)에서 분기한 OpenSearch를 이용하면 편리하게 데이터를 수신하여 시각화 할 수 있다. 이때 메시지 서버와 연계는 Logstash를 이용하여 처리한다.
Logstash는 로그 형태의 데이타를 OpenSearch로 쉽게 전달할 수 있다.
* input > group_id : consumer 그룹 지정
* input > topics : egov-logs (Kafka 토픽 명)
* input > decorate_events ⇒ 이걸 설정해야 @metadata 같이 옵션 사용가능
* filter > grok 플러그인 : 메시지 패턴 적용
* output > index : YYYY.MM.dd로 일별 구분
input {
kafka {
bootstrap_servers => "192.168.100.50:9092"
group_id => "logstash"
topics => ["egov-logs"]
consumer_threads => 1
decorate_events => true
}
}
filter {
grok {
match => {
"message" => "\[%{TIMESTAMP_ISO8601:timestamp}] \[%{DATA:thread}] %{LOGLEVEL:logLevel} %{JAVACLASS:packageClass} - %{GREEDYDATA:msg}"
}
}
}
output {
stdout {}
opensearch {
hosts => ["https://192.168.100.50:9200"]
index => "egov-logs-%{+YYYY.MM.dd}"
user => "admin"
password => "admin"
ssl => true
ssl_certificate_verification => false
}
}
메시지 브로커에서 DB CRUD 정보를 구독하여 Websocket을 이용하여 화면에 결과를 보여준다.
| 유형 | 대상소스명 | 설명 |
|---|---|---|
| Consumer Bean | /src/main/java/egovframework/webflux/stream/db/EgovConsumerDb.java | DB 데이터 구독 |
| Controller | /src/main/java/egovframework/webflux/stream/controller/EgovSocketController.java | DB 데이터 컨트롤러 클래스 |
| Thymeleaf | /src/main/resources/templates/thymeleaf/egovConsumerDb.html | DB 데이터 구독 결과 표시 |
| Websocket | /src/main/java/egovframework/webflux/websocket/DbWebSocketHandler.java | DB 데이터 웹소켓 설정 |
spring:
cloud:
stream:
bindings:
historyDb-in-0:
destination: db-topic
binder: kafka
function:
definition: historyDb
메시지 브로커에서 Open API 정보를 구독하여 MongoDB에 저장한다.
| 유형 | 대상소스명 | 설명 |
|---|---|---|
| Consumer Bean | /src/main/java/egovframework/webflux/stream/api/EgovConsumerTrainApi.java | Open API 데이터 구독 |
| DTO | /src/main/java/egovframework/webflux/stream/api/dto/RealtimePositionDTO.java | Open API 데이터 DTO |
| Entity | /src/main/java/egovframework/webflux/stream/api/entity/RealtimePosition.java | Open API 데이터 Entity |
spring:
cloud:
stream:
bindings:
trainApi-in-0:
destination: api-topic
binder: kafka
function:
definition: trainApi