[아이북조아] 10만 회원의 성향 변경 배치 처리: 문제와 해결
아이북조아에서는 '좋아요&싫어요'와 같은 피드백을 하면 회원의 성향(MBTI)이 바뀐다.
해당 변화는 피드백을 한 도서의 성향(MBTI)에 영향을 받는다.
이를 위한 배치 Job은 다음과 같이 진행된다.
Job: syncFeedbackAndUpdateTraitsJob
Step:
Step1 : syncFeedbackStep - redis에 임시 저장되어 있던 좋아요/싫어요를 mysql로 이관한다
Step2 : updateTraitsChange - mysql에서 step1의 결과를 읽어와 점수 변화를 계산하고 TraitsChange 테이블을 업데이트한다
Step3 : updateChildTraits - 갱신된 TraitsChange 테이블을 읽어 점수가 5 이상이면 ChildTraits에 새로운 레코드를 생성한다
Step4 : updateMbtiHistory - 갱신된 ChildTraits 테이블을 읽어 Mbti가 달라졌다면 MbtiHistory에 새로운 레코드를 생성한다
@Bean
public Job syncFeedbackAndUpdateTraitsJob(JobRepository jobRepository,
Step syncFeedbackStep,
Step updateTraitsChange,
Step updateChildTraits,
Step updateMbtiHistory
) {
return new JobBuilder("syncFeedbackAndUpdateTraitsJob", jobRepository)
.start(syncFeedbackStep)
.next(updateTraitsChange)
.next(updateChildTraits)
.next(updateMbtiHistory)
.build();
}
해당 배치 Job을 구성하면서 만난 문제들은 다음과 같이 해결하였다.
[문제 1]
Redis에서 MySql로 데이터를 이관하는 과정에서 Redis 데이터를 읽기 위해 HashOperations.entries()를 사용했지만, 이 방식은 모든 데이터를 한 번에 가져오기 때문에 메모리 사용량이 크게 증가했다. 또한, Redis 데이터를 '좋아요'와 '싫어요'로 구분하여 처리하는 로직이 너무 복잡해졌다. 메모리로 모두 들고온 후에 데이터를 하나씩 돌며 '좋아요'와 '싫어요'를 구분해야 했기 때문이다. 또한, 배치 작업이 끝난 후 Redis 데이터가 삭제되지 않아 중복 처리된다는 문제점이 있었다.
[해결 1]
1. Iterator를 사용한 데이터 처리:
Redis 데이터를 한 번에 모두 가져오는 대신, 두 개의 Iterator를 사용하여 순차적으로 데이터를 읽도록 하였다.
currentIterator: 전체 redis를 순회하기 위한 iterator.
currentBookIterator: 자녀별 좋아요/싫어요 bookId를 순회하기 위한 iterator.
2. 좋아요/싫어요 데이터 구분:
isProcessingLikes: 현재 처리하는 데이터가 좋아요인지 싫어요인지 구분하기 위해 사용하는 flag.
3. 처리 완료 후 Redis 데이터 삭제:
데이터 중복 처리를 방지하기 위해 syncFeedbackStep 완료 시 clearRedisData()를 수행하였다.
private void clearRedisData() {
hashOperations.getOperations().delete(LIKE_HASH_KEY);
hashOperations.getOperations().delete(HATE_HASH_KEY);
}
[문제 2]
JPA를 사용해 MySql에 데이터를 insert하려 했으나, JPA는 데이터를 영속성 컨텍스트에서 관리하기 때문에, EntityManager.flush 및 clear 호출로 이를 관리해야 했다. 이후 로직이 복잡해지면 lazy loading 등으로 문제가 발생할 확률이 높았다. 또한, bulk insert가 안 된다는 치명적인 단점이 있었다.
[해결 2]
전체 batch job에 jdbc를 적용하여 읽기와 쓰기를 bulk로 진행하도록 하였다.
- NamedParameterJdbcTemplate를 사용하여 sql에 '?'를 사용하는 대신 ':childId'와 같이 명명하여 집어넣었다.
- 쿼리 실수를 줄이고 가독성을 높일 수 있었다.
- batchUpdate()를 사용하여 bulk로 데이터를 삽입하여 성능을 향상시켰다.
아래는 실제 로직 예시다.
@Component
@RequiredArgsConstructor
@Slf4j
public class MySqlFeedbackWriter implements ItemWriter<FeedbackDto> {
/*
FeedbackDto 객체들을 MySQL의 feedback 테이블에 삽입한다
NamedParameterJdbcTemplate는 SQL 쿼리를 실행하기 위한 템플릿이다
*/
private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
@Override
public void write(Chunk<? extends FeedbackDto> chunk) throws Exception {
String insertQuery = """
INSERT INTO feedback (child_id, book_id, like_status, hate_status, created_at)
VALUES (:childId, :bookId, :likeStatus, :hateStatus, current_timestamp)
ON DUPLICATE KEY UPDATE
like_status = VALUES(like_status),
hate_status = VALUES(hate_status)
""";
List<? extends FeedbackDto> items = chunk.getItems();
List<FeedbackDto> updates = new ArrayList<>(items);
// 각 FeedbackDto 객체는 MapSqlParameterSource를 통해 sql문에 매핑된다
// 이후 FeedbackDto 객체들이 배열 형태의 SqlParameterSource[]로 전달되어 chunk 크기만큼 한꺼번에 쿼리가 날아간다
SqlParameterSource[] batchParams = updates.stream()
.map(dto -> new MapSqlParameterSource()
.addValue("childId", dto.getChildId())
.addValue("bookId", dto.getBookId())
.addValue("likeStatus", dto.isLikeStatus())
.addValue("hateStatus", dto.isHateStatus()))
.toArray(SqlParameterSource[]::new);
namedParameterJdbcTemplate.batchUpdate(insertQuery, batchParams);
}
}
[문제 3]
MySql에서 읽은 데이터를 바탕으로 '성향 변화량' '자녀 성향' '자녀 MBTI', 총 3개의 테이블을 업데이트하려 하였다. 이를 모두 하나의 step에서 처리하려 하자, 다음과 같은 문제가 발생하였다.
1. 지나치게 복잡한 로직
childTraitsArray, traitChangeArray, updateChildTraitDtos 등 여러 데이터를 동시에 처리하면서 로직이 지나치게 복잡해짐.
변화량 +- 5 확인, MBTI 변경 여부 확인 등 if/else 가 지나치게 많아 가독성이 매우 떨어짐.
2. 성능 저하
한꺼번에 모든 데이터를 불러와 처리하면서 CPU와 메모리 사용량이 증가.
변경 여부를 확인할 때 중복 작업이 많아 효율성이 떨어짐.
3. 유지보수성 떨어짐
새로운 요구사항 추가 반영이 불가할 정도로 조건과 분기 처리가 많음.
디버깅이 어려워 오류 원인을 찾을 수 없음.
아래는 원래의 로직.
List<UpdateChildTraitDto> updateChildTraitDtos = new ArrayList<>();
List<UpdateTraitChangeDto> updateTraitChangeDtos = new ArrayList<>();
boolean mbtiChangedFlag = false;
for (int i = 0; i < 4; i++) {
// 먼저 totalChangeAmount를 childTrait에 반영
int childTraitCurrentValue = childTraitsArray[i]; // 현재 childTrait 값
int totalChangeAmount = changeAmountArray[i]; // 현재 trait의 변화량
childTraitsArray[i] += totalChangeAmount; // 총합 변화량을 먼저 반영
// traitChangeArray[i]가 5를 넘는지 확인
if (traitChangeArray[i] >= 5) {
// ChildTrait에 traitChange 반영
childTraitsArray[i] += traitChangeArray[i]; // traitChange를 childTrait에 반영
UpdateChildTraitDto dto = new UpdateChildTraitDto(historyId, i + 1, traitChangeArray[i]);
updateChildTraitDtos.add(dto);
// childTrait가 50을 넘는지 확인하고 MBTI 변경 플래그 설정
mbtiChangedFlag = mbtiChangedFlag || (childTraitsArray[i] >= 50) || (childTraitsArray[i] < 50);
} else {
// TraitChange만 업데이트
UpdateTraitChangeDto dto = new UpdateTraitChangeDto(childId, i + 1, traitChangeArray[i]);
updateTraitChangeDtos.add(dto);
}
}
// MBTI 변경 여부 확인 및 처리
if (mbtiChangedFlag) {
for (UpdateChildTraitDto dto : updateChildTraitDtos) {
int traitIndex = dto.getTraitId() - 1;
// 업데이트된 childTrait 값
int updatedTraitValue = childTraitsArray[traitIndex];
// 50을 기준으로 MBTI가 변경되는지 확인
boolean wasOver50Before = (updatedTraitValue - totalChangeAmount) >= 50;
boolean isOver50Now = updatedTraitValue >= 50;
if (wasOver50Before != isOver50Now) {
// MBTI 레코드 생성
String newMbti = calculateNewMbti(childTraitsArray);
createNewMbtiRecord(childId, newMbti, historyId, dto.getTraitId());
} else {
// MBTI가 변경되지 않으면 ChildTrait만 업데이트
updateChildTrait(dto);
}
}
}
[해결 3]
하나의 Step에서 처리하던 작업을 네 개의 독립적인 Step으로 분리. 각 Step이 단일 책임 원칙을 따르도록 설계하여 복잡도를 낮추고 성능과 유지보수성을 개선하고자 하였다.
이렇게 수정 후 테스트를 진행해 보았다.
테스트 데이터
- Child 10만 명
- ChildTraits 10만 명 * 4개의 성향 : 40만 건
- Book 20만 권
- BookTraits 20만 권 * 4개의 성향 : 80만 건
- Redis 좋아요 10만 & 싫어요 10만 : 20만 건
스텝별 테스트 결과
- step1 : 1분 1초
- step2 : 6분 6초
- step3 : 22초
- step4 : 36초
전체 Job 테스트 결과 : 7분 22초
[문제 4]
배치 예외 발생 시 문제를 건너뛰고 나머지 작업을 지속하는 것이 중요하다. 하지만 어떤 예외가 발생하였는지는 알아야 하기 때문에, 이를 기록할 필요성이 있었다.
[해결 4]
faultTolerant()를 활용해 예외가 발생하더라도 작업을 중단하지 않고 건너뛰도록 설정했다. 또한 특정 예외(RuntimeException)에 대해 최대 3번까지 건너뛰도록 skipLimit을 설정했고, 예외가 발생한 경우, CustomSkipListener를 통해 Slack으로 알림을 전송하도록 구현했다.
[문제 5]
배치 작업이 '특정 상황'들에 대해 배치가 잘 동작하는지 확인이 더 필요했다. 예를 들면 아래의 것들을 확인하고 싶었다.
성향이 100이 넘어가지 않도록 작동하는지
예외 상황 발생 시, skip이 올바르게 작동하는지
SkipListeneter가 제대로 호출되는지
또한 배치 테스트를 위한 데이터 작업의 공수가 커서, 좀 더 간단한 방법으로 로직을 테스트하는 방법이 필요했다.
[해결 5]
따라서 단위 테스트 코드를 작성하였다. 아래는 step4의 단위 테스트를 작성한 코드다. SpringBatchTest 어노테이션과 JobLauncherTestUtils를 활용해 배치 작업을 단위 테스트할 수 있는 환경을 구성했다.
@ExtendWith(SpringExtension.class)
@SpringBootTest
@SpringBatchTest
@Sql({"/clean-up.sql", "/batch.sql"})
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
public class UpdateTraitsChangeStepTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private Job syncFeedbackAndUpdateTraitsJob;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void setUp() {
jobLauncherTestUtils.setJob(syncFeedbackAndUpdateTraitsJob);
}
@DisplayName(value = "성향 변화량 누적치 업데이트 성공")
@Test
void success_update_traits_change() throws Exception {
// given
insertSqlForTestUpdateTraits();
String stepName = "updateTraitsChange";
// when
JobExecution stepExecution = jobLauncherTestUtils.launchStep(stepName);
// then
assertThat(stepExecution.getExitStatus().getExitCode()).isEqualTo("COMPLETED");
// TraitsChange 테이블에 데이터 반영값을 검증한다
verifyTraitsChangeInDatabase(1L, 1L, calculateExpectedChangeAmount(50, 70));
verifyTraitsChangeInDatabase(1L, 2L, calculateExpectedChangeAmount(50, 35));
verifyTraitsChangeInDatabase(1L, 3L, calculateExpectedChangeAmount(50, 20));
verifyTraitsChangeInDatabase(1L, 4L, calculateExpectedChangeAmount(50, 60));
}
@DisplayName(value = "성향이 100이거나 0일 경우 데이터 처리 확인")
@Test
void success_when_number_on_boundary() throws Exception {
// given
String insertFeedbackSql = """
INSERT INTO feedback (child_id, book_id, like_status, hate_status, created_at)
VALUES (1, 1, true, false, CURRENT_TIMESTAMP)
""";
jdbcTemplate.execute(insertFeedbackSql);
insertChildTraitsForEdgeCase(0, 100);
insertBookTraitsForEdgeCase(0, 100);
// when
JobExecution stepExecution = jobLauncherTestUtils.launchStep("updateTraitsChange");
// then
assertThat(stepExecution.getExitStatus().getExitCode()).isEqualTo("COMPLETED");
// 0이나 100같은 경계값에서도 배치가 잘 동작하는지 검증한다
verifyTraitsChangeInDatabase(1L, 1L, calculateExpectedChangeAmount(0, 0));
verifyTraitsChangeInDatabase(1L, 2L, calculateExpectedChangeAmount(100, 0));
verifyTraitsChangeInDatabase(1L, 3L, calculateExpectedChangeAmount(0, 100));
verifyTraitsChangeInDatabase(1L, 4L, calculateExpectedChangeAmount(100, 100));
}
@DisplayName(value = "성향 개수가 4개가 아닌 경우 skip 발생")
@Test
void when_traits_size_is_wrong() {
// given
String insertFeedbackSql = """
INSERT INTO feedback (child_id, book_id, like_status, hate_status, created_at)
VALUES (1, 1, true, false, CURRENT_TIMESTAMP)
""";
jdbcTemplate.execute(insertFeedbackSql);
insertChildTraits(50, 50, 50, 50);
insertNotEnoughBookTraits(60, 70, 80);
// when
JobExecution stepExecution = jobLauncherTestUtils.launchStep("updateTraitsChange");
// then
assertThat(stepExecution.getExitStatus().getExitCode()).isEqualTo("COMPLETED");
// skiplistener가 실행되어 job은 성공적으로 마치지만, skip count가 1 증가하는지 확인한다
Long skipCount = stepExecution.getStepExecutions().stream()
.filter(step -> step.getStepName().equals("updateTraitsChange"))
.findFirst()
.map(StepExecution::getSkipCount)
.orElse(0L);
assertThat(skipCount).isEqualTo(1L);
}
@DisplayName(value = "좋아요/싫어요 데이터가 존재하지 않는 경우")
@Test
void when_no_data_returns_from_reader() {
// given
Long childId = 1L;
Long traitId1 = 1L;
// when
JobExecution stepExecution = jobLauncherTestUtils.launchStep("updateTraitsChange");
// then
assertThat(stepExecution.getExitStatus().getExitCode()).isEqualTo("COMPLETED");
// Reader로부터 넘어온 FeedbackWithTraitsDto가 없을 경우, TraitsChange 테이블에 변화가 없는지 확인한다
String sql = "SELECT change_amount FROM traits_change WHERE child_id = ? AND trait_id = ?";
Integer changeAmount = jdbcTemplate.queryForObject(sql, Integer.class, childId, traitId1);
assertThat(changeAmount).isEqualTo(0);
}
private void insertSqlForTestUpdateTraits() {
insertFeedbackSql();
String insertChildTraitsSql = """
INSERT INTO child_traits (history_id, trait_id, trait_score, created_at)
VALUES (1, 1, 50, CURRENT_TIMESTAMP),
(1, 2, 50, CURRENT_TIMESTAMP),
(1, 3, 50, CURRENT_TIMESTAMP),
(1, 4, 50, CURRENT_TIMESTAMP)
""";
jdbcTemplate.execute(insertChildTraitsSql);
String insertBookTraitsSql = """
INSERT INTO book_traits (book_id, trait_id, trait_score)
VALUES (1, 1, 70),
(1, 2, 35),
(1, 3, 20),
(1, 4, 60)
""";
jdbcTemplate.execute(insertBookTraitsSql);
}
private void insertFeedbackSql() {
String insertFeedbackSql = """
INSERT INTO feedback (child_id, book_id, like_status, hate_status, created_at)
VALUES (1, 1, true, false, CURRENT_TIMESTAMP),
(1, 2, false, true, CURRENT_TIMESTAMP)
""";
jdbcTemplate.execute(insertFeedbackSql);
}
private void insertChildTraitsForEdgeCase(int zero, int hundred) {
String insertChildTraitsSql = """
INSERT INTO child_traits (history_id, trait_id, trait_score, created_at)
VALUES (1, 1, ?, CURRENT_TIMESTAMP),
(1, 2, ?, CURRENT_TIMESTAMP),
(1, 3, ?, CURRENT_TIMESTAMP),
(1, 4, ?, CURRENT_TIMESTAMP)
""";
jdbcTemplate.update(insertChildTraitsSql, zero, hundred, zero, hundred);
}
private void insertBookTraitsForEdgeCase(int zero, int hundred) {
String insertBookTraitsSql = """
INSERT INTO book_traits (book_id, trait_id, trait_score)
VALUES (1, 1, ?),
(1, 2, ?),
(1, 3, ?),
(1, 4, ?)
""";
jdbcTemplate.update(insertBookTraitsSql, zero, zero, hundred, hundred);
}
private void insertChildTraits(int trait1, int trait2, int trait3, int trait4) {
String insertChildTraitsSql = """
INSERT INTO child_traits (history_id, trait_id, trait_score, created_at)
VALUES (1, 1, ?, CURRENT_TIMESTAMP),
(1, 2, ?, CURRENT_TIMESTAMP),
(1, 3, ?, CURRENT_TIMESTAMP),
(1, 4, ?, CURRENT_TIMESTAMP)
""";
jdbcTemplate.update(insertChildTraitsSql, trait1, trait2, trait3, trait4);
}
private void insertWrongBookTraits() {
String insertBookTraitsSql = """
INSERT INTO book_traits (book_id, trait_id, trait_score)
VALUES (1, 1, null),
(1, 2, null),
(1, 3, null),
(1, 4, null)
""";
jdbcTemplate.update(insertBookTraitsSql);
}
private void insertNotEnoughBookTraits(int trait1, int trait2, int trait3) {
String insertBookTraitsSql = """
INSERT INTO book_traits (book_id, trait_id, trait_score)
VALUES (1, 1, ?),
(1, 2, ?),
(1, 3, ?)
""";
jdbcTemplate.update(insertBookTraitsSql, trait1, trait2, trait3);
}
private void verifyTraitsChangeInDatabase(Long childId, Long traitId, int expectedChangeAmount) {
String sql = "SELECT change_amount FROM traits_change WHERE child_id = ? AND trait_id = ?";
Integer changeAmount = jdbcTemplate.queryForObject(sql, Integer.class, childId, traitId);
assertThat(changeAmount).isEqualTo(expectedChangeAmount);
}
private int calculateExpectedChangeAmount(int childTrait, int bookTrait) {
return MbtiCalculator.calculateTraitChange(childTrait, bookTrait);
}
}