이번 글에서는 Riot 데이터를 불러와 Kafka 클러스터에 Produce하는 과정 및 이슈 사항들을 정리하겠습니다.
Producer Client 구현
특정 Summoner의 데이터를 불러와 Kafka에 produce 하는 과정을 "/produceSummonerMatchInfo" 주소의 API로 구현했습니다. 지난 글에서 언급했듯이, Summoner 이름을 input으로, Match 정보를 output으로 하는 Riot API는 존재하지 않습니다. 그래서 필요로 하는 API를 여러개 사용하여 최종적으로 Match 정보를 Produce하도록 해야했습니다. 세부 과정으로는 아래와 같습니다.
1. Summoner 이름으로 puuid 가져오기.
2. puuid로 match id list 가져오기.
3. Match id를 통한 Match 정보 가져오기.
4. Match 정보에 (1.)에서 지정한 Summoner 이름 추가 후 Kafka 클러스터에 Produce.
이 때 (4.) 에서 데이터 전처리를 위해서 Json Object를 사용해야 했습니다. 그래서 기존에 Riot API를 String으로 가져오는 것은 변함 없었지만, 이후 Service 내 추가 작업으로 Json Object를 반환시키도록 했습니다. 결국 Riot API 데이터는 Json Object 또는 Json Array로 가져옴으로써 이후 데이터 처리가 편리하도록 했습니다.
이미 처리한 Match 기록하기
위의 과정을 데이터 중복 없이 처리하기 위해서는 이미 처리된 Match를 따로 기록해야 했습니다. 처리된 Match를 기록하기 위해서는 두 가지 방법을 생각했습니다. 첫 번째는 파일을 Read / Write 하면서 기록하는 방법이고, 두 번째는 DB를 이용하여 기록하는 방법입니다. DB를 사용하는 것이 가장 좋은 방법이지만, 현재는 기능 구현을 우선시 했기 때문에 간편하게 파일을 이용한 방법을 먼저 선택했습니다. (DB를 이용하는 방법은 추후 언젠가는 구현 예정)
파일 기반으로 Match를 기록하기 위해서는 아래 두가지 조건이 필요했습니다.
- 파일에 쓰여진 정해진 포맷을 파싱하여 가져올 수 있어야함.
- 신규 내용을 다시 파일에 포맷에 맞게 쓰기가 가능해야함.
여러가지 방법들을 생각하며 구현하려고 했지만, 결과적으로 json 포맷을 사용하기로 결정했습니다. 그 이유는 json 파일을 사용하면 json 포맷을 쉽게 파싱할 수 있는 메소드들이 존재했고, 신규 내용을 json 포맷에 추가해 파일에 저장하는 것이 상당히 간편했기 때문입니다.
json 파일 Read
matchRecordJsonObj = (JSONObject) new JSONParser().parse(new InputStreamReader(resource.getInputStream(), "UTF-8"));
matchList = objectMapper.readValue(matchRecordJsonObj.toString(), new TypeReference<Map<String,LinkedList<String>>>() {});
이 때 내용을 저장하는 자료구조는 LinkedList<String>으로 지정했는데, 그 이유는 Riot API는 Match ID 리스트를 최신순으로 제공하기 때문입니다. 그래서 신규 내용을 추가하기 위해서는 리스트의 가장 앞에 넣어야 했고 그에 맞게 LinkedList를 사용했습니다.
처리된 Match ID 체크하기
이미 처리된 Match ID를 중복으로 처리하지 않기 위해서는 파일에 저장된 기록에서 가장 최근 Match ID를 이용했습니다. Riot API를 통해 가져온 최근 100개 Match ID 리스트를 처음부터 체크해 Match Record의 가장 최근 Match와 일치하면 중단합니다. 아래 그림을 보면 좀 더 이해가 쉽습니다.
public String getLastMatchID(String summonerName) {
if(!matchList.containsKey(summonerName)){
return "";
}
return matchList.get(summonerName).get(0);
}
Docker Kafka에 Match 정보 Produce 하기
Kafka 클러스터를 만들어놨고 Kafka topic 메세지를 보내기 위한 API도 구축해놨습니다. 하지만 Kafka topic 메세지를 보내기 시작할 때 문제가 발생했습니다. Docker Kafka에 메세지를 보냈지만 아래와 같은 에러가 발생했습니다.
Name or service not known : kafka-1 : Could not resolve host name. |
해당 이슈는 저의 Local host에 구현해놓은 Producer client가 Kafka broker들의 hostname을 인식하지 못하는 문제였습니다. 해당 이슈의 문제 원인과 해결 방법은 아래 글에 자세하기 정리해놨으니 참고바랍니다.
[Kafka] Docker로 Kafka 구성 시 Host machine과 통신하기 위한 listener 설정 방법
요약해서 설명하자면 Kafka 클러스터는 Producer 또는 Consumer의 메세지 요청이 왔을 때 실제 해당 데이터가 있는 서버의 주소를 다시 반환해줍니다. 문제가 발생할 때의 Kafka 클러스터는 각 서버의 hostname인 "kafka-1", "kafka-2", "kafka-3" 이름을 반환해줬습니다. 그래서 API가 구현되어있는 제 PC에서는 해당 hostname의 정체를 알 수 없었습니다. 해당 문제는 Kafka 클러스터의 서버 옵션 값 설정 중, "listeners"와 "advertised.listeners"의 값 설정을 통해 해결할 수 있었고 정상적으로 Kafka topic에 데이터가 쌓이기 시작했습니다.
Match 정보 전처리하기
정상적으로 Kafka topic 메세지를 보냈지만, 데이터 내용에 대한 문제가 있었습니다. 제가 구현한 내용은 Summoner의 Match 정보를 하나씩 보냅니다. 하지만 해당 데이터는 어떤 Summoner 검색으로 수집된 데이터인지 알 수 없었습니다. 왜냐하면 해당 데이터에는 10명의 Summoner 이름이 모두 기록되어있기 때문입니다. 즉, Kafka topic의 내용을 봤을 때 해당 Match 정보가 누구의 것인지 알 수 없었습니다.
그래서 Kafka에 Produce 하기 전, 데이터 전처리를 통해 누구의 Match인지 Summoner 이름을 Json Object에 기입했습니다. 그래서 Consumer 입장에서는 Kafka topic의 내용을 볼 때 "whoseMatch" Key 값을 통해 Match들을 Summoner 별로 분류 시킬 수 있습니다.
public String sendRiotDataMessage(String summonerName, JSONObject riotData) {
riotData.put("whoseMatch", summonerName);
...
}
위의 내용들에 대한 자세한 코드 및 docker 파일들을 확인하고 싶으면 아래 Github 주소들을 참고해주시기 바랍니다.https://github.com/Taaewoo/Riot_Data_Pipeline