====== 대용량 처리를 위한 멀티스레드 기반의 병행처리 예제 ====== ===== 개요 ===== 배치 수행 시, 대용량 처리를 위해 Job을 멀티스레드 기반으로 병행처리하는 과정을 보여준다. 실행 유형별로 멀티쓰레드 방식, Parallel 방식, 파티셔닝 방식 등이 있다. ===== 설명 ===== ==== 멀티쓰레드(Multi-threaded Step) 예제 ==== 병행처리멀티스레드(Multi-threaded Step)은 하나의 step을 멀티쓰레드로 처리하는 방식이다. chunk단위로 각 쓰레드에서 병행으로 처리한다. ===Job설정=== == 병행처리멀티스레드 예제의 Job 설정파일인 parallelJob.xml을 확인한다.== 멀티스레드 처리를 원하는 step의 tasklet에 비동기 설정을 한다. ===JunitTest 구성 및 수행=== == JunitTest 구성 == 병행처리멀티스레드 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다. \\ ✔ JunitTest 클래스의 구조는 [[egovframework:rte2:brte:batch_example:run_junit_Test|배치실행환경 예제 Junit Test 설명]]을 참고한다. \\ ✔ assertEquals(BatchStatus.COMPLETED, execution.getStatus()) : 배치수행결과가 COMPLETED 인지 확인한다. \\ ✔ assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount()) : BATCH_STAGING의 data와 stepexcution의 결과를 비교하여 staging의 step의 결과를 확인한다. @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelJob.xml", "/egovframework/batch/job-runner-context.xml" }) public class EgovParallelJobFunctionalTests { //배치작업을 test하기 위한 JobLauncherTestUtils @Autowired private JobLauncherTestUtils jobLauncherTestUtils; //DB 사용을 위한 SimpleJdbcTemplate private SimpleJdbcTemplate jdbcTemplate; ... @Test public void testLaunchJob() throws Exception { int before = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING"); JobExecution execution = jobLauncherTestUtils.launchJob(); int after = SimpleJdbcTestUtils.countRowsInTable(jdbcTemplate, "BATCH_STAGING"); assertEquals(BatchStatus.COMPLETED, execution.getStatus()); assertEquals(after - before, execution.getStepExecutions().iterator().next().getReadCount()); } } == JunitTest 수행== 수행방법은 [[egovframework:dev2:tst:test_case#Test Case 실행| JunitTest 실행]]을 참고한다. === 결과확인 === == 멀티쓰레드로 병행처리한 setp은 쓰레드마다의 처리 속도차이에 의해 순차적인 처리가 되지 않는다. loading의 step 결과인 TRADE의 data를 보면 확인할 수 있다. == {{:egovframework:rte2:brte:batch_example:multithread_data.png|}} ==== Parallel 예제 ==== parallelStep이란 분리된 flow을 각 thread에서 병행으로 처리하는 방식이다. 두 개의 flow1, flow2 는 각각 thread에서 병행으로 처리된다. ===Job설정=== ==ParallelStep 예제의 Job 설정파일인 parallelStep.xml을 확인한다.== ✔ split 태그에 비동기 설정이 있어야 병행처리가 가능하다. === JunitTest 구성 및 수행 === == JunitTest 구성 == parallelstep 설정과 관련 클래스들로 Junit Test를 수행한다. 이 때 배치가 수행되고, 관련된 내용을 확인할 수 있다. \\ ✔ JunitTest 클래스의 구조는 [[egovframework:rte2:brte:batch_example:run_junit_Test|배치실행환경 예제 Junit Test 설명]]을 참고한다. \\ ✔ assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()): 배치수행결과가 COMPLETED 인지 확인한다. \\ ✔ getUniqueJobParameters에서 JobParameter에 배치에 필요한 입력 리소스, 출력 리소스 위치정보를 넘긴다. @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations ={"/egovframework/batch/simple-job-launcher-context.xml", "/egovframework/batch/jobs/parallelStep.xml","/egovframework/batch/job-runner-context.xml"}) public class EgovParallelStepFunctionalTests{ //배치작업을 test하기 위한 JobLauncherTestUtils @Autowired private JobLauncherTestUtils jobLauncherTestUtils; /** * 배치작업 테스트 */ @Test public void testLaunchJob() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(this.getUniqueJobParameters()); assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); // /target/test-outputs/parallelStep폴더의 output파일 확인 } /** * 잡파라미터를 설정하기 위한 메소드 * @return jobParameters */ protected JobParameters getUniqueJobParameters() { return new JobParametersBuilder().addString("inputFile","/egovframework/data/input/delimited.csv") .addString("outputFile1","file:./target/test-outputs/parallelStep/delimitedOutput1.csv") .addString("outputFile2","file:./target/test-outputs/parallelStep/delimitedOutput2.csv") .addString("outputFile3","file:./target/test-outputs/parallelStep/delimitedOutput3.csv") .addString("outputFile4","file:./target/test-outputs/parallelStep/delimitedOutput4.csv") .addParameter("timestamp", new JobParameter(new Date().getTime())).toJobParameters(); } } ==JunitTest 수행== 수행방법은 [[egovframework:dev2:tst:test_case#Test Case 실행| JunitTest 실행]]을 참고한다. === 결과확인 === split된 두 개의 flow가 멀티쓰레드(SimpleAsyncTaskExecutor-1,SimpleAsyncTaskExecutor-2)로 실행한 결과는 콘솔창의 로그내용으로 확인할 수 있다. step1과 step3은 서로 다른 쓰레드에서 실행한 사실을 확인할 수 있다. {{:egovframework:rte2:brte:batch_example:parallelstep_step1.png|}} ** ...** {{:egovframework:rte2:brte:batch_example:parallelstep_step3.png|}} ==== 파티셔닝(Partitioning) 예제==== === === 파티션 정의 File, DB의 데이터를 처리하는 병행처리 방법의 하나로 File 데이터, DB 데이터를 Partition 하여 멀티쓰레드 방식으로 처리한다. 입력리소스에 따라 DB 파티셔닝예제와 과 File 파티셔닝예제를 보여주고, 여러리소스를 읽어 하나의 타켓파일에 쓰는 SingleFile 파티셔닝 예제를 보여준다. ^입력 리소스 타입^관계^예제^ |DB| |[[egovframework:rte2:brte:batch_example:db_partition|DB Partition 예제]]| |File|N:N|[[egovframework:rte2:brte:batch_example:file_partition|N:N Partition 예제]]| |File|N:1|[[egovframework:rte2:brte:batch_example:single_file_partition|N:1 Partition 예제]]| ===== 참고자료 ===== * [[egovframework:rte2:brte:batch_core:parallel_process|병행처리]]