Spring Batch + Redis Pipeline 으로 구현한 성능 개선
코드 참고는 https://github.com/bkjeon1614/java-example-code/tree/develop/spring-batch-mybatis-codebase 에서 참고 부탁드립니다.
Redis Pipeline 이란
Redis의 pipeline은 여러 개의 명령어를 한 번에 보내고, 그 결과를 한 번에 받아올 수 있는 메커니즘입니다. 이를 통해 네트워크 오버헤드를 줄이고 Redis 서버의 처리 성능을 최적화할 수 있다. 또한 주의해야할 점은, Redis 서버의 처리량(capacity)을 고려하여 pipeline의 chunk size를 결정해야 한다.
주의사항
- Request Chunk Size: 먼저 요청하는 chunk size 에 대해 설명하자면 너무 작은 chunk size는 네트워크 오버헤드를 줄이지만 Redis 서버에 부하를 증가시킬 수 있고, 너무 큰 chunk size는 한 번에 처리해야 할 명령어 개수가 많아져서 오히려 성능을 저하시킬 수 있다. 공식문서 상에서는 명시적으로 정해진 값은 없지만, 일반적으로 100에서 1000개 사이의 명령어를 포함하는 것이 효율적인 경우가 많다고 한다.
- Command Timeout: 네트워크 레이턴시를 고려하자면 Pipeline을 사용할 때는 네트워크 지연에 의한 영향을 줄일 수 있지만, 여전히 Redis 서버의 응답 시간은 중요하다. 일반적으로 5초에서 10초 사이의 commandTimeout 값을 고려할 수 있습니다. (너무 짧게 설정하면 Pipeline 이 타임아웃이 될 수 있다.)
- Error Example: org.springframework.data.redis.connection.RedisPipelineException: Pipeline contained one or more invalid commands…
실습
RedisTemplate 의 executePipelined 메소드와 RedisCallback을 사용하여 파이프라이닝을 사용할 수 있다.
실습사양
- Java 17
- Gradle 8.8
- Spring Batch 5.1.2
- table schema 정의
create table sample ( id bigint not null auto_increment, amount bigint, tx_name varchar(255), tx_date_time datetime, primary key (id) ) engine = InnoDB; create table sample_out ( id bigint not null auto_increment, amount bigint, tx_name varchar(255), tx_date_time datetime, primary key (id) ) engine = InnoDB;
- RedisConfig 에 redisTemplate bean 추가
... @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory()); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); return redisTemplate; }
- Job 생성
[MybatisSampleRedisJobConfig.java]package com.bkjeon.job; import com.bkjeon.feature.entity.sample.Sample; import com.bkjeon.feature.entity.sample.SampleOut; import com.bkjeon.feature.mapper.sample.SampleMapper; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisBatchItemWriter; import org.mybatis.spring.batch.MyBatisPagingItemReader; import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder; import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.transaction.PlatformTransactionManager; /** * --job.name=MYBATIS_SAMPLE_REDIS_JOB requestDate=20240701 */ @Slf4j @Configuration @RequiredArgsConstructor public class MybatisSampleRedisJobConfig { private static final String JOB_NAME_PREFIX = "MYBATIS_SAMPLE_REDIS"; private static final String REDIS_KEY_PREFIX = "SAMPLE_"; private static final int CHUNK_SIZE = 1000; private final StringRedisTemplate redisTemplate; private final SqlSessionFactory sqlSessionFactory; @Bean public Job mybatisSampleRedisJob(JobRepository jobRepository, Step mybatisSampleRedisJobStep1, Step mybatisSampleRedisJobStep2) { return new JobBuilder(JOB_NAME_PREFIX + "_JOB", jobRepository) .start(mybatisSampleRedisJobStep1) .next(mybatisSampleRedisJobStep2) .build(); } @Bean @JobScope public Step mybatisSampleRedisJobStep1(JobRepository jobRepository, Tasklet mybatisSampleDataToRedisTasklet, PlatformTransactionManager platformTransactionManager) { log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep1"); return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository) .tasklet(mybatisSampleDataToRedisTasklet, platformTransactionManager).build(); } /** * Redis Pipeline Test (pipeline 1000 단위로 처리) * [사양] * - redis version: 7.0.5 * 1. 10만건 * - asis: 117초 * - tobe: 7초(1000) * 2. 100만건 * - asis: 1225초 * - tobe: 64초(1000) - Redis 사용량에 따라 성능이 달라질 수 있다. * * 약 19배 증가 * 또한 레디스를 사용하여 DB 에서의 집계가 아닌 코드에서 Aggregation 결과를 Redis 에 넣는 방식으로 성능 개선한 사례들이 있다. */ @Bean public Tasklet mybatisSampleDataToRedisTasklet() { return ((contribution, chunkContext) -> { log.info(">>>>> This is mybatisSampleDataToRedisTasklet"); // Mock Data List<Sample> sampleList = new ArrayList<>(); for (long c=1; c <= 1000000; c++) { sampleList.add( Sample.builder().id(c).txName("TEST" + c).amount(c * 1000).txDateTime(LocalDateTime.now()).build()); } long beforeTime = System.currentTimeMillis(); int size = sampleList.size(); for (int i = 0; i < size; i += CHUNK_SIZE) { List<Sample> chunk = sampleList.subList(i, Math.min(size, i + CHUNK_SIZE)); redisTemplate.executePipelined( (RedisCallback<Object>) connection -> { StringRedisConnection stringRedisConn = (StringRedisConnection) connection; for (Sample sample: chunk) { stringRedisConn.set(REDIS_KEY_PREFIX + sample.getId(), "value" + sample.getTxName()); } return null; }); } long afterTime = System.currentTimeMillis(); long secDiffTime = (afterTime - beforeTime) / 1000; log.info(">>>>>>>>>>>>>>>>>>>>>>>>>> Redis 실행시간(s): {}", secDiffTime); return RepeatStatus.FINISHED; }); } @Bean @JobScope public Step mybatisSampleRedisJobStep2(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, @Value("#{jobParameters[requestDate]}") String requestDate) { log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep2 requestDate:{} ", requestDate); return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository) .<Sample, SampleOut>chunk(CHUNK_SIZE, platformTransactionManager) .reader(mybatisSampleRedisToRdbPagingItemReader()) .processor(mybatisSampleToSampleOutItemProcessor()) .writer(mybatisSampleRedisToRdbItemWriter()) .build(); } @Bean public MyBatisPagingItemReader<Sample> mybatisSampleRedisToRdbPagingItemReader() { return new MyBatisPagingItemReaderBuilder<Sample>() .pageSize(CHUNK_SIZE) .sqlSessionFactory(sqlSessionFactory) .queryId("com.bkjeon.feature.mapper.sample.SampleMapper.selectZeroOffsetSampleList") .build(); } @Bean public ItemProcessor<Sample, SampleOut> mybatisSampleToSampleOutItemProcessor() { return item -> SampleOut.builder() .id(item.getId()) .amount(item.getAmount()) .txName(redisTemplate.opsForValue().get(REDIS_KEY_PREFIX + item.getId())) .txDateTime(LocalDateTime.now()) .build(); } @Bean public MyBatisBatchItemWriter<SampleOut> mybatisSampleRedisToRdbItemWriter() { return new MyBatisBatchItemWriterBuilder<SampleOut>() .sqlSessionFactory(sqlSessionFactory) .statementId("com.bkjeon.feature.mapper.sample.SampleMapper.insertSample") .build(); } }
- Mapper
[SampleMapper.java]import com.bkjeon.feature.entity.sample.Sample; import com.bkjeon.feature.entity.sample.SampleOut; import java.util.List; import org.apache.ibatis.annotations.Mapper; @Mapper public interface SampleMapper { ... List<Sample> selectZeroOffsetSampleList(); void insertSample(SampleOut sample); }
- Mapper xml
... <select id="selectZeroOffsetSampleList" resultMap="selectSampleListMap"> SELECT id, amount, tx_name, tx_date_time FROM sample WHERE 1=1 AND id > #{_skiprows} LIMIT 0, #{_pagesize} </select> <insert id="insertSample" parameterType="com.bkjeon.feature.entity.sample.SampleOut"> INSERT INTO sample_out ( amount, tx_name, tx_date_time ) VALUES ( #{amount}, #{txName}, #{txDateTime} ) </insert>
- Entity 생성
[Sample.java]
package com.bkjeon.feature.entity.sample; import java.time.LocalDateTime; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; @ToString @Getter @NoArgsConstructor public class Sample { private Long id; private Long amount; private String txName; private LocalDateTime txDateTime; @Builder public Sample(Long id, Long amount, String txName, LocalDateTime txDateTime) { this.id = id; this.amount = amount; this.txName = txName; this.txDateTime = txDateTime; } }
[SampleOut.java]
package com.bkjeon.feature.entity.sample; import java.time.LocalDateTime; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; @ToString @Getter @NoArgsConstructor public class SampleOut { private Long id; private Long amount; private String txName; private LocalDateTime txDateTime; @Builder public SampleOut(Long id, Long amount, String txName, LocalDateTime txDateTime) { this.id = id; this.amount = amount; this.txName = txName; this.txDateTime = txDateTime; } }
완료 후 batch 를 동작시켜서 pipeline 을 사용한것과 안한것의 차이를 비교해보면 chunk size 1000 개 기준 약 19배의 성능이 증가한걸 확인할 수 있다.