이번 글에서는 NiFi에서 Json 파일을 전처리하는 방법 중 하나에 대해 소개하겠습니다. NiFi에서 Json 데이터를 처리하기 위한 다양한 Procssor들이 존재합니다. 하지만 Json 데이터가 단순히 key / value 값들만 존재한다면 처리가 쉽겠지만, 경우에 따라서는 Json 데이터에 Array 타입이 존재할 수 있습니다. 그리고 Array의 크기가 고정이라면 다행이지만 데이터마다 동적인 크기를 가질 때는 상당히 골치 아파집니다. 아래 내용에서 이와 같은 동적인 크기의 Array 타입을 처리하는 방법을 제시하겠습니다.
Json 데이터 예시
이번 글에서 다룰 Json 데이터의 구조와 예시를 보여드리겠습니다.
Json의 가장 최상위에 A, B라는 key 값들이 존재합니다. A는 Array 타입으로써 index 당 AA, BB, CC라는 key값들로 String 타입의 Value들을 가집니다. 그리고 A는 동적인 크기를 가지기 때문에 위의 그림에서는 index가 3까지밖에 없지만, 데이터에 따라 10개, 100개도 존재할 수 있습니다. B는 단순히 String 타입의 Value를 가집니다. 아래는 위에서 제시한 구조의 Json 데이터 예시입니다.
{
"A" :
[
{
"AA" : "Array index 1 and property 1 value",
"BB" : "Array index 1 and property 2 value",
"CC" : "Array index 1 and property 3 value"
},
{
"AA" : "Array index 2 and property 1 value",
"BB" : "Array index 2 and property 2 value",
"CC" : "Array index 2 and property 3 value"
},
{
"AA" : "Array index 3 and property 1 value",
"BB" : "Array index 3 and property 2 value",
"CC" : "Array index 3 and property 3 value"
}
],
"B" : "String value"
}
그리고 NiFi에서는 BB를 key로 가지는 value들을 모두 base64 encoding한 데이터를 사용할 예정입니다. 아래 링크를 이용하면 쉽게 base64 encoding, decoding을 할 수 있습니다.
{
"A" :
[
{
"AA" : "Array index 1 and property 1 value",
"BB" : "QXJyYXkgaW5kZXggMSBhbmQgcHJvcGVydHkgMiB2YWx1ZQ==",
"CC" : "Array index 1 and property 3 value"
},
{
"AA" : "Array index 2 and property 1 value",
"BB" : "QXJyYXkgaW5kZXggMiBhbmQgcHJvcGVydHkgMiB2YWx1ZQ==",
"CC" : "Array index 2 and property 3 value"
},
{
"AA" : "Array index 3 and property 1 value",
"BB" : "QXJyYXkgaW5kZXggMyBhbmQgcHJvcGVydHkgMiB2YWx1ZQ==",
"CC" : "Array index 3 and property 3 value"
}
],
"B" : "String value"
}
FlowFile 생성
NiFi에서는 FlowFile을 생성하는 GenerateFlowFile Procssor가 존재하기 때문에 직접 Json 데이터를 생성 후 NiFi에서 불러 올 필요가 없습니다. 아래와 같이 Processor 생성 후 "Custom Text"에 우리가 원하는 Json 데이터를 넣어줍니다.
ExecuteGroovyScript Processor 사용
동적인 크기의 Array를 가진 Json 데이터를 처리하기위해 NiFi의 다양한 Procssor를 사용하며 시도해봤습니다. 하지만 대부분의 Procssor들이 동적인 크기의 Array를 처리할 수 없었고, 몇몇 존재하는 JoltTransformJson과 같은 Procssor들은 동적으로 처리를 할 수 있지만, 값들을 base64 decoding 하는데 어려움이 있었습니다. 그래서 결국 직접 코딩을 한 Script를 사용하여 Json 데이터를 처리하는 것이 가장 적합하다고 생각했습니다. 그리고 대량의 데이터를 처리하기 위해서는 속도가 중요한데 Python도 가능했지만 이는 Groovy 언어보다 속도에서 느리다는 정보가 있었고, NiFi에서 Groovy script를 많이 사용한다는 점을 고려하여 ExecuteGroovyScript를 사용하기로 결정했습니다.
( Script를 사용하지 않고 NiFi의 Procssor만을 사용하여 처리하는 방법에 대해서는 추후에 글을 게시하겠습니다. )
ExecuteGroovyScript의 데이터 결과를 확인하기 위한 사용하지 않을 UpdateAttribute Procssor 추가 후 연결해 Success, Failure Queue를 만들어줍니다. ExecuteGroovyScript의 설정 값은 딱히 건드릴 게 없습니다. "Script Body"에 Groovy script 내용을 넣어줍니다.
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
def flowFile = session.get()
if( flowFile == null ){
return;
}
def data
def beforeData
try{
flowFile = session.write(flowFile, {inputStream, outputStream ->
def content = inputStream.getText("UTF-8")
beforeData = new JsonSlurper().parseText(content)
data = new JsonSlurper().parseText(content)
data.A.each{ obj ->
if( !obj["BB"].isEmpty() ) {
def decodedString = new String(obj["BB"].decodeBase64())
obj["BB"] = decodedString
}
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
} catch (Exception e){
flowFile = session.putAttribute(flowFile, "errorMessage", e.getMessage().toString())
session.transfer(flowFile, REL_FAILURE)
return;
}
flowFile = session.putAttribute(flowFile, "afterData", data.toString())
flowFile = session.putAttribute(flowFile, "beforeData", beforeData.toString())
session.transfer(flowFile, REL_SUCCESS)
Groovy script 내용
우선 NiFi는 실시간 데이터 처리를 위한 소프트웨어이기 때문에 기본적으로 streaming 데이터 처리로 진행됩니다. 그래서 일반적으로 하는 코딩과는 느낌이 조금 다르지만 주어진 코드 내에서 원하는 내용만 작성해주면 됩니다.
import org.apache.nifi.processor.io.StreamCallback
def flowFile = session.get()
if( flowFile == null ){
return;
}
try{
flowFile = session.write(flowFile, {inputStream, outputStream ->
// 데이터 처리 내용 작성
def output
outputStream.write(output)
} as StreamCallback)
} catch (Exception e){
session.transfer(flowFile, REL_FAILURE)
return;
}
session.transfer(flowFile, REL_SUCCESS)
위와 같이 기본적인 템플릿 작성 후에 주석으로 표시된 부분에 데이터를 처리할 내용을 작성해줍니다. FlowFile마다 데이터 처리 내용이 적용이 되고 FlowFile의 content를 사용하기 위해서는 위의 코드 기준으로 "inputStream" 변수를 사용해줍니다. 그럼 이제 Json 데이터를 처리하는 내용을 설명하겠습니다. 아래 코드는 위 코드 주석에 해당되는 내용입니다.
try{
flowFile = session.write(flowFile, {inputStream, outputStream ->
// 데이터 처리 내용 작성
def content = inputStream.getText("UTF-8")
beforeData = new JsonSlurper().parseText(content)
data = new JsonSlurper().parseText(content)
data.A.each{ obj ->
if( !obj["BB"].isEmpty() ) {
def decodedString = new String(obj["BB"].decodeBase64())
obj["BB"] = decodedString
}
}
outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
}
우선 Json 데이터 내용을 inputStream으로부터 불러와 content 변수에 넣어줍니다. String 형태로 받아온 Json 데이터를 파싱하기 위해 JsonSluper의 parseText 함수를 이용해 "data" 변수에 넣어줍니다. 이때 선언한 "beforeData"는 데이터 처리 전,후를 비교하기 위해 만든 변수입니다.
"data" 변수에 Json 데이터를 파싱한 결과를 넣어줬기 때문에, "."을 이용하여 nested Json 데이터에 접근할 수 있고 Array 타입은 each를 사용하여 loop 형태로 접근하여 각 loop 단계마다 "obj" 변수에 매핑됩니다. "."과 each문을 사용하여 원하는 하위 데이터까지 접근을 했다면 key / value 자료구조처럼 사용하면 우리가 원하는 데이터를 처리할 수 있습니다.
우리가 접근해서 처리하고 싶은 항목은 "BB"이기 때문에 "data.A"에 접근 후 each로 Array 타입 loop를 실행합니다. loop 문 안에서는 "obj"에 접근하여 "obj["BB"]"를 이용하여 "BB" key의 value에 접근합니다. Groovy에서는 "decodeBase64()" 함수를 이용하여 Base64 decoding을 할 수 있습니다. Decoding된 데이터를 String 형태로 변환하여 다시 "BB" key의 value에 넣어줍니다.
catch (Exception e){
flowFile = session.putAttribute(flowFile, "errorMessage", e.getMessage().toString())
session.transfer(flowFile, REL_FAILURE)
return;
}
flowFile = session.putAttribute(flowFile, "afterData", data.toString())
flowFile = session.putAttribute(flowFile, "beforeData", beforeData.toString())
session.transfer(flowFile, REL_SUCCESS)
이어서 나머지 코드도 작성해줍니다. try / catch에 걸려 exception이 발생했다면 에러 메세지를 FlowFile의 attribute에 넣어준다면 원인 파악을 좀 더 쉽게 할 수 있습니다. 그리고 Processor의 결과는 failure로 처리해줍니다. 데이터 처리가 정상적으로 되었다면 처리 전,후를 비교하기 위해 "data"와 "beforedata" 변수를 attribute에 넣어준다면 쉽게 확인할 수 있습니다. 마찬가지로 Processor의 결과는 success로 처리해줍니다.
결과 확인
GenerateFlowFile 실행
ExecuteGroovyScript 실행
FlowFile 결과
'Hadoop Ecosystem > NiFi' 카테고리의 다른 글
[NiFi] Untrusted proxy 및 UninheritableFlowException 에러 해결 방법 (0) | 2022.05.22 |
---|---|
[NiFi] Docker & NiFi 클러스터 환경에서 HDFS 데이터 분산 수집하기 (0) | 2022.02.12 |
[NiFi] Docker로 NiFi 클러스터 구성하기 (0) | 2022.01.31 |
[NiFi] Docker & NiFi로 HDFS 데이터 수집하기 (0) | 2021.11.17 |
[NiFi] NiFi의 핵심 개념 및 아키텍쳐 (0) | 2021.11.17 |