Taaewoo
Data Engineering Blog
Taaewoo
전체 방문자
오늘
어제
  • 분류 전체보기 (67)
    • Computer Science (16)
      • Algorithm (6)
      • OS (1)
      • Java (2)
      • C++ (6)
      • Python (1)
    • Hadoop Ecosystem (27)
      • Hadoop (6)
      • Spark (5)
      • NiFi (6)
      • Hive (9)
      • Kafka (1)
    • BigData Engineering (14)
      • Jupyter (1)
      • Docker (3)
      • CDH (3)
      • Riot Data Pipeline (7)
    • Back-end 개발 (0)
      • Spring (0)
    • Algorithm 문제 풀이 (9)
      • 백준 (5)
      • LeetCode (4)
    • Conference (1)
      • LINE DEVELOPER DAY 2021 (1)
      • if(kakao) 2021 (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • java
  • spark
  • sort
  • hadoop
  • C++
  • kafka
  • algorithm
  • BigData
  • docker
  • NiFi
  • 정렬
  • 알고리즘
  • 코딩
  • CS
  • Hive
  • 빅데이터
  • Coding
  • metastore
  • 프로그래밍
  • hdfs

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Taaewoo

Data Engineering Blog

[Spark] Spark structured streaming으로 Kafka topic 받기 #2 - Spark 및 Hadoop 서비스 실행하기
Hadoop Ecosystem/Spark

[Spark] Spark structured streaming으로 Kafka topic 받기 #2 - Spark 및 Hadoop 서비스 실행하기

2021. 8. 11. 02:00

Docker

 내가 원하는 환경의 서버를 container라는 개념으로 쉽게 생성 및 삭제할 수 있는 플랫폼.

 

Kafka

 Publish, Subscribe 모델 구조로 이루어진 분산 메세징 시스템

 

Spark Streaming

 Spark API 중 batch와 실시간 streaming이 가능한 Spark API

 


Hadoop & Spark 서비스 실행

 

 이번 글에서는 Docker Compose를 이용하여 Hadoop과  Spark  서비스들을 microservice화 시키도록 하겠습니다. 이전 글에서 구성했던 Kafka와 Zookeeper를 모두 포함하여 yml 파일을 작성하고 "docker compose up -d" 명령어를 실행하도록 하겠습니다. 이때 새로운 폴더에서 진행합니다.

docker-compose.yml
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    ports:
      - "22181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    ports:
      - "32181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:12888:13888;zookeeper-2:22888:23888;zookeeper-3:32888:33888

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092
 
  kafka-2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-2
    ports:
      - "29092:29092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092
 
  kafka-3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-3
    ports:
      - "39092:39092"
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092


  namenode:
    image: bde2020/hadoop-namenode:1.1.0-hadoop2.8-java8
    container_name: namenode
    volumes:
      - ./data/namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop.env
    ports:
      - 50070:50070

  datanode:
    image: bde2020/hadoop-datanode:1.1.0-hadoop2.8-java8
    depends_on:
      - namenode
    volumes:
      - ./data/datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop.env
    ports:
      - 50075:50075

  spark-master:
    image: bde2020/spark-master:2.1.0-hadoop2.8-hive-java8
    container_name: spark-master
    ports:
      - 8080:8080
      - 7077:7077
    env_file:
      - ./hadoop.env

  spark-worker:
    image: bde2020/spark-worker:2.1.0-hadoop2.8-hive-java8
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
    ports:
      - 8081:8081
    env_file:
      - ./hadoop.env

  spark-notebook:
    image: bde2020/spark-notebook:2.1.0-hadoop2.8-hive
    container_name: spark-notebook
    env_file:
      - ./hadoop.env
    ports:
      - 9001:9001

  hue:
    image: bde2020/hdfs-filebrowser:3.11
    ports:
      - 8088:8088
    environment:
      - NAMENODE_HOST=namenode

 

 yml 파일을 모두 작성했다면 Hadoop 서비스에 필요한 volume mount를 위해 현재 폴더에 "data/namenode", "data/datanode" 폴더를 만듭니다. yml 파일에서 namenode와 datanode 서비스를 보면 이 폴더들을 대상으로 mount를 진행하기 때문입니다.

/* New Docker Compose folder */
$ mkdir -p data/namenode
$ mkdir -p data/datanode

 

 그리고 마지막으로 hadoop.env 파일을 생성해줍니다. 파일의 내용은 아래와 같습니다.

CORE_CONF_fs_defaultFS=hdfs://namenode:8020
CORE_CONF_hadoop_http_staticuser_user=root
CORE_CONF_hadoop_proxyuser_hue_hosts=*
CORE_CONF_hadoop_proxyuser_hue_groups=*

HDFS_CONF_dfs_webhdfs_enabled=true
HDFS_CONF_dfs_permissions_enabled=false

YARN_CONF_yarn_log___aggregation___enable=true
YARN_CONF_yarn_resourcemanager_recovery_enabled=true
YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
YARN_CONF_yarn_timeline___service_enabled=true
YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
YARN_CONF_yarn_timeline___service_hostname=historyserver
YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031

 현재 폴더의 구조가 위와 같으면 준비가 완료된 상태입니다. 완료가 되었다면 Docker Compose 명령어를 실행해줍니다.

$ docker compose up -d

 정상적으로 실행되었다면 "docker compose ps" 명령어로 확인해줍니다.

 

 

Spark 실행하기

 

 서비스들이 정상적으로 동작 중이라면 이제 우리들은  Spark 가 정상적으로 실행이 되고있는지 확인해야 합니다. 이때  Spark  container에 접속해야 하는데 spark-master 또는 spark-worker 중 하나를 정하면 됩니다. 

$ docker exec -it spark-master /bin/bash

 접속 후 "/spark/bin" 경로에 들어가면 여러가지 실행 파일들이 존재합니다. 저는  Spark 를 python으로 작성할 예정이기 때문에 pyspark를 실행해보겠습니다.

/* spark-master Terminal */
$ /spark/bin/pyspark

 정상적으로 실행되었다면 아래처럼 pyspark shell에 접속할 수 있습니다.

 

 간단한 pyspark 예제 코드를 실행해보고 이번 글은 마무리하겠습니다. WordCount 예제를 실행하기 위해 test 파일을 생성합니다. 파일 내용은 원하는 것을 저장하시면 됩니다. ( vim을 쓴다면 설치해야 합니다 -> $ apt install -y vim )

$ pwd
/root

$ vim test

/* put "test" file to HDFS path */
$ hdfs dfs -put /root/test /

 아래의 코드를 pyspark shell에서 실행하면 결과를 확인할 수 있습니다.

# run this command in pyspark shell
text_file = sc.textFile("hdfs:///test")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///result")
$ hdfs dfs -cat /result/*

pyspark code
결과 값

 

 이제 Kafka,  Spark , Hadoop 서비스가 모두 Docker Compose로 준비되었으니 다음 글에서는 Spark streaming을 이용하여 Kafka topic을 받아오는 방법에 대해 설명하도록 하겠습니다.

 

 

References

 

https://github.com/big-data-europe/docker-hadoop-spark-workbench

https://spark.apache.org/examples.html

저작자표시 비영리 (새창열림)

'Hadoop Ecosystem > Spark' 카테고리의 다른 글

[Spark] 스파크 완벽 가이드 Chapter 2 - Spark의 동작 방식  (0) 2021.09.22
[Spark] Spark structured streaming으로 Kafka topic 받기 #3 - pyspark로 HDFS에 topic data 저장하기  (0) 2021.08.21
[Spark] Spark structured streaming으로 Kafka topic 받기 #1 - Kafka 클러스터 구성하기  (0) 2021.08.08
[Spark] Spark란? - Cluster Computing with Working Sets  (0) 2021.04.02
    'Hadoop Ecosystem/Spark' 카테고리의 다른 글
    • [Spark] 스파크 완벽 가이드 Chapter 2 - Spark의 동작 방식
    • [Spark] Spark structured streaming으로 Kafka topic 받기 #3 - pyspark로 HDFS에 topic data 저장하기
    • [Spark] Spark structured streaming으로 Kafka topic 받기 #1 - Kafka 클러스터 구성하기
    • [Spark] Spark란? - Cluster Computing with Working Sets
    Taaewoo
    Taaewoo

    티스토리툴바