Docker
내가 원하는 환경의 서버를 container라는 개념으로 쉽게 생성 및 삭제할 수 있는 플랫폼.
Kafka
Publish, Subscribe 모델 구조로 이루어진 분산 메세징 시스템
Spark Streaming
Spark API 중 batch와 실시간 streaming이 가능한 Spark API
이전 글에서는 console 창에서 입력하는 값을 topic에 produce 했었습니다. 이번에는 csv 파일을 이용하여 실시간으로 데이터를 전송하는 것처럼 producer를 구현하도록 하겠습니다. 글에서 실습할 전체적인 과정은 아래 이미지와 같습니다. Kafka-1 container가 Producer의 역할로 test1이라는 Kafka topic에 데이터를 보내고 test1 topic에 담겨있는 내용을 Spark_worker-1 container가 Consumer의 역할로 Spark structured streaming을 이용해 데이터를 가져와서 처리를 해줍니다. 참고로 Kafka-1 container는 Kafka Broker에 속한 container입니다.
실시간 Producer 구현하기
가장 먼저 Kafka topic에 지속적으로 보낼 데이터를 간단하게 준비합니다.
그리고 우리는 이 csv 파일의 내용을 한 줄씩 읽어 console에 input으로 사용하는 shell script를 local PC에서 작성합니다. shell script의 내용에서 알 수 있듯이 이전 글에서 사용한 kafka-console-producer를 그대로 사용합니다.
( 참고 : https://it-sunny-333.tistory.com/110 )
## produce_gt.sh
#!/bin/bash
echo "## Kafka Producer ##"
cat gt.csv | while read line; do
echo "$line"
sleep 0.1
done | /bin/kafka-console-producer --topic test1 --bootstrap-server kafka-1:19092,kafka-2:29092,kafka-3:39092
이렇게 작성한 csv 파일과 shell script를 kafka-1 container에 옮겨주면 producer를 위한 준비는 완료됩니다. kafka-1 container에 접속하여 producer를 실행합니다.
$ docker cp gt.csv ks_kafka-1_1:/home/appuser
$ docker cp produce_gt.sh ks_kafka-1_1:/home/appuser
$ cd
$ sh produce_gt.sh
Spark structured streaming 코드 작성하기
Consumer를 위해서는 python 코드를 spark-worker container에 작성하여 Spark structured streaming을 이용할 예정입니다. 우선 python 코드를 작성합니다. 코드의 내용으로는 Kafka 의 topic을 read stream을 통해 받아옵니다. 그리고 해당 데이터를 console에 보여주는 write stream, HDFS에 parquet 형식으로 저장하는 write stream을 각각 만들어줍니다.
$ docker exec -it ks_spark-worker_1 /bin/bash
$ cd
$ vim consume_topic.py
# consume_topic.py
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
sc.sparkContext.setLogLevel('ERROR')
# Read stream
log = sc.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-1:19092") \
.option("subscribe", "test1") \
.option("startingOffsets", "earliest") \
.load()
# Write stream - console
query = log.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("truncate", "false") \
.start()
# Write stream - HDFS
query2 = log.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("parquet") \
.outputMode("append") \
.option("checkpointLocation", "/check") \
.option("path", "/test") \
.start()
query.awaitTermination()
query2.awaitTermination()
python 코드를 작성했다면 Spark 를 실행시키는데 사용해야 합니다. "/spark/bin" 디렉토리에 pyspark라는 파일이 존재해 CLI 환경에서도 위의 코드를 테스트할 수 있습니다. 하지만 보통 작성한 코드를 파일로써 Spark 실행에 사용하려면 spark-submit을 이용해야 합니다. 아래의 명령어를 통해 실행합니다. 참고로 spark-submit을 이용할 때 여러 가지 옵션 값을 argument로 넣지만 저는 기능만을 테스트하기 위해서 다른 옵션 값들은 따로 설정하지 않았습니다.
$ /spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 consume_topic.py
위의 명령어를 실행하면 바로 위에서 작성한 python 코드가 실행될 것입니다. 현재 spark-worker_1 터미널에서 아래 이미지와 같이 지속적으로 streaming 데이터의 값들이 나타난다면 consumer의 역할을 제대로 한 것입니다.
마지막으로 HDFS에 접근하여 stream 데이터가 parquet 포맷으로 저장이 잘되는지 확인합니다. 이 때는 hdfs 명령어를 이용해줍니다. 아래의 이미지처럼 해당 HDFS 경로에 parquet 포맷의 파일들이 존재한다면 정상적으로 저장된 것입니다.
$ hdfs dfs -ls /test
이것으로 docker를 통해 Kafka , Spark , HDFS 서비스를 실행하였고 간단한 Kafka topic Pub & Sub 과정을 진행했습니다. 간단한 테스트만 진행하는데도 모르는 것이 너무 많았던 과정이었습니다. 좀 더 레벨이 높은 실습을 진행하려면 공부를 더 열심히 해야겠네요 ㅎㅎ
'Hadoop Ecosystem > Spark' 카테고리의 다른 글
[Spark] 스파크 완벽 가이드 Chapter 2 - Spark의 동작 방식 (0) | 2021.09.22 |
---|---|
[Spark] Spark structured streaming으로 Kafka topic 받기 #2 - Spark 및 Hadoop 서비스 실행하기 (0) | 2021.08.11 |
[Spark] Spark structured streaming으로 Kafka topic 받기 #1 - Kafka 클러스터 구성하기 (0) | 2021.08.08 |
[Spark] Spark란? - Cluster Computing with Working Sets (0) | 2021.04.02 |