Taaewoo
Data Engineering Blog
Taaewoo
전체 방문자
오늘
어제
  • 분류 전체보기 (67)
    • Computer Science (16)
      • Algorithm (6)
      • OS (1)
      • Java (2)
      • C++ (6)
      • Python (1)
    • Hadoop Ecosystem (27)
      • Hadoop (6)
      • Spark (5)
      • NiFi (6)
      • Hive (9)
      • Kafka (1)
    • BigData Engineering (14)
      • Jupyter (1)
      • Docker (3)
      • CDH (3)
      • Riot Data Pipeline (7)
    • Back-end 개발 (0)
      • Spring (0)
    • Algorithm 문제 풀이 (9)
      • 백준 (5)
      • LeetCode (4)
    • Conference (1)
      • LINE DEVELOPER DAY 2021 (1)
      • if(kakao) 2021 (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • hadoop
  • 프로그래밍
  • Hive
  • java
  • NiFi
  • 빅데이터
  • Coding
  • 알고리즘
  • hdfs
  • 코딩
  • algorithm
  • spark
  • docker
  • 정렬
  • kafka
  • sort
  • CS
  • metastore
  • C++
  • BigData

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Taaewoo

Data Engineering Blog

[Spark] Spark structured streaming으로 Kafka topic 받기 #1 - Kafka 클러스터 구성하기
Hadoop Ecosystem/Spark

[Spark] Spark structured streaming으로 Kafka topic 받기 #1 - Kafka 클러스터 구성하기

2021. 8. 8. 17:02

Docker

 내가 원하는 환경의 서버를 container라는 개념으로 쉽게 생성 및 삭제할 수 있는 플랫폼.

 

Kafka

 Publish, Subscribe 모델 구조로 이루어진 분산 메세징 시스템

 

Spark Streaming

 Spark API 중 batch와 실시간 streaming이 가능한 Spark API


 

 이번 글부터는  Kafka 와 Spark를 docker로 구성 및 이용해보겠습니다. Docker를 이용한 hadoop 구성하기와 CDH 배포판 설치하기는 docker container를 hadoop 클러스터 중 하나의 서버로 사용해왔습니다. 하지만 docker에 대해서 공부하고 알다 보니 이것은 올바른 사용법이 아니라고 생각했습니다. 최근 docker와 k8s는 monolithic 모델보다는 microservice 모델을 구현하기 위해 사용되고 있습니다. 즉, 저는 지금까지 docker를 monolithic 모델을 위해 사용해왔고 하나의 container에 여러 서비스들을 동시에 실행시켜왔습니다. 이제부터는 각 서비스별로 container를 생성하는 microservice 모델로 docker를 사용할 예정이고  Kafka 와 Spark를 이와 같이 구성하겠습니다.

 

 

Kafka 클러스터 생성

 

 이번 글의 핵심 주제 중 하나인  Kafka  클러스터를 docker로 구성하는 방법은 어렵지 않습니다. docker-compose를 이용하면 서비스별로 분리되어 있는 모델을 구성가능하고 이미 기존에 오픈 소스로써 많이 존재합니다. 가장 먼저 docker-compose.yml 파일을 생성합니다.  Kafka 는 기본적으로 Zookeeper와 연동되어 작동되기 때문에 필수적으로 필요합니다. 그래서 docker-compose.yml에 zookeeper 서비스 container도 포함시킨 것입니다. 그리고 지금 현재 글에서는  Kafka  클러스터를 구성하기 위한 서비스들만 포함시켰기 때문에 spark와 hadoop 서비스들이 포함되어있지 않지만 나중에 spark streaming을 이용해 topic을 받아오는 글에서는 모두 포함시킬 예정인 점 알아주시기 바랍니다.

docker-compose.yml
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    ports:
      - "22181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    ports:
      - "32181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092

  kafka-2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-2
    ports:
      - "29092:29092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092

  kafka-3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-3
    ports:
      - "39092:39092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092

 

 이 yml 파일은 원하는 폴더를 생성해 넣어주시면 됩니다. 참고로 이 때 만든 폴더 이름이 docker compose의 이름으로 지정됩니다. docker compose 이름이 되는 폴더를 생성 후 yml 파일까지 위치해 있다면 우리는 아래의 명령어를 통해  Kafka  클러스터에 대한 microservice를 실행시킬 수 있습니다.

$ docker compose up -d

 

 docker-compose up 명령어를 사용했다면 아마 6개의 container가 실행되고 있을 겁니다. 실행되고 있는 docker compose container 확인은 아래의 명령어로 가능합니다. container의 이름은 "폴더 이름_서비스 이름"입니다. 

$ docker compose ps

 

 

Kafka Topic pub & sub 

 

 docker compose를 통해 우리는 쉽게  Kafka  클러스터를 구성했습니다. 그렇다면 이제  Kafka  topic의 pub & sub이 정상적으로 되는지 테스트해보겠습니다. 터미널 창 2개로 각각 kafka-1, kafka-3 container에 접속합니다. 

/* Terminal 1 */
$ docker exec -it kafka-spark_kafka-1_1 /bin/bash

/* Terminal 2 */
$ docker exec -it kafka-spark_kafka-3_1 /bin/bash

 

 Kafka  container에 접속했다면 /bin 디렉토리에  Kafka  관련 실행파일들이 존재하는 것을 확인할 수 있습니다.

 

 이중에서 우리는 "kafka-topics", "kafka-console-producer", "kafka-console-consumer"를 이용해서 topic produce, subscribe 기능을 테스트할 수 있습니다. 지금은 CLI 환경에서 실행 파일로 테스트를 진행하지만 이후에 Spark streaming과 테스트를 진행할 때는 Spark 프로그래밍으로 할 계획입니다.

 

 가장 먼저 topic을 생성해줍니다. 이때 사용되는 옵션 중 --bootstrap-server은 기존에 --zookeeper라는 옵션에서 대체되었습니다. 기기존에는  Kafka  명령어를 사용하는 Client도 metadata를 저장하고 있는 zookeeper를 명시했습니다. 하지만 최근  Kafka 의 경우 이러한 metadata 정보들은  Kafka  broker가 저장하고 있기 때문에 Client는 직접  Kafka  broker와 통신하면 됩니다.

/* kafka-1 Terminal */
/bin/kafka-topics --create --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092 --replication-factor 1 --partitions 1 --topic test1

 

 생성이 완료되었다면 topic list와 topic 정보를 확인해봅니다.

$ /bin/kafka-topics --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092 --list

$ /bin/kafka-topics --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092 --topic test1 --describe

 

 topic의 확인이 끝났다면 이제 producer와 consumer를 이용해 topic을 주고 받겠습니다.  Kafka  broker에게 topic의 내용을 pub & sub 하는 방법은 여러가지가 있지만 위에서 언급한 "kafka-console-producer"를 이용하면 console 창에서 입력하는 값들이 produce됩니다.

/* kafka-1 Terminal */
$ /bin/kafka-console-producer --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092 --topic test1
> It
> is
> test1
> topic

 

 마지막으로 kafka-3 container에서 consume 해보겠습니다. "kafka-console-consumer"를 이용하고 --from-beginning 옵션을 준다면 해당 topic의 처음 data부터 받아올 수 있습니다.

/* kafka-3 Terminal */
$ /bin/kafka-console-consumer --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092 --topic test1 --from-beginning

 

 위의 그림과 같이 Producer가 보낸 data들이 Consumer console 창에 나타난다면 정상적으로 테스트가 완료된겁니다.

 

 다음 글부터는 Hadoop과 Spark 서비스를  Kafka 와 같이 microservice화 시켜 container를 올리고 Spark streaming으로  Kafka  topic을 pub & sub하는 과정을 소개하겠습니다.

저작자표시 비영리 (새창열림)

'Hadoop Ecosystem > Spark' 카테고리의 다른 글

[Spark] 스파크 완벽 가이드 Chapter 2 - Spark의 동작 방식  (0) 2021.09.22
[Spark] Spark structured streaming으로 Kafka topic 받기 #3 - pyspark로 HDFS에 topic data 저장하기  (0) 2021.08.21
[Spark] Spark structured streaming으로 Kafka topic 받기 #2 - Spark 및 Hadoop 서비스 실행하기  (0) 2021.08.11
[Spark] Spark란? - Cluster Computing with Working Sets  (0) 2021.04.02
    'Hadoop Ecosystem/Spark' 카테고리의 다른 글
    • [Spark] 스파크 완벽 가이드 Chapter 2 - Spark의 동작 방식
    • [Spark] Spark structured streaming으로 Kafka topic 받기 #3 - pyspark로 HDFS에 topic data 저장하기
    • [Spark] Spark structured streaming으로 Kafka topic 받기 #2 - Spark 및 Hadoop 서비스 실행하기
    • [Spark] Spark란? - Cluster Computing with Working Sets
    Taaewoo
    Taaewoo

    티스토리툴바