Different combination of Spring batch steps
By : Chrislyn Joy Equinan
Date : March 29 2020, 07:55 AM
I hope this helps you . Question 1: Using Spring-Integration and Spring-Batch together is a good idea. Question 2: With Spring-Batch, you have a lot of options to scale and therfore to improve performance. You can launch whole jobs in parallel, you can execute Steps in parallel, you can execute Chunks in parallel... The basic questions are: code :
Flow 1:
Step A (System A)
Step B (System B)
Step C (System C)
Flow 2:
Step A (System A)
Step C (System C)
Step B (System B)
JobFlow1
Step1 {JobStep call jobSystemA}
Step2 {JobStep call jobSystemB}
Step3 {JobStep call jobSystemC}
JobFlow2
Step1 {JobStep call jobSystemA}
Step2 {JobStep call jobSystemC}
Step3 {JobStep call jobSystemB}
|
Spring Batch, variable through the steps of a job
By : Alexey Klyotzin
Date : March 29 2020, 07:55 AM
wish helps you You can put the value in ExecutionContext of tasklet and then later retrieve it in other tasklet. See the code below: Inside 1st tasklet- code :
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
//Putting value in Execution Context
chunkContext.getStepContext().getStepExecution().getJobExecution()
.getExecutionContext()
.put(Constants.DATA_LIST, idList);
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
//Retrieving value from Execution context
(ArrayList) chunkContext.getStepContext()
.getStepExecution().getJobExecution().getExecutionContext()
.get(Constants.DATA_LIST);
}
<bean id="mySecondTasklet" class="mySecondTasklet" scope="step" >
|
spring batch rollback all steps in case of exception in one the the steps
By : manxisuo
Date : March 29 2020, 07:55 AM
like below fixes the issue , That's not possible, there is no inter-step transactions.
|
how to partition steps in spring-batch?
By : rd888
Date : December 17 2020, 07:28 AM
it helps some times I am learning spring batch and wrote simple application to play with it. Acccording my requirements I read from single csv file, do some transformation and insert into database. , You can use below code to implement batch Partition. code :
@Configuration
public class DemoJobBatchConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoJobBatchConfiguration.class);
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("applicaionDS")
public DataSource dataSource;
@Autowired
UserWritter userWriter;
@Bean("demoJob")
public Job partitionJob(JobNotificationListener listener, JobBuilderFactory jobBuilderFactory,
@Qualifier("demoPartitionStep") Step demoPartitionStep) {
return jobBuilderFactory.get("demoJob").incrementer(new RunIdIncrementer()).listener(listener)
.start(demoPartitionStep).build();
}
@Bean(name = "demoPartitionStep")
public Step demoPartitionStep(Step demoSlaveStep, StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("demoPartitionStep").partitioner("demoPartitionStep", demoPartitioner())
.gridSize(21).step(demoSlaveStep).taskExecutor(jobTaskExecutor()).build();
}
@Bean(name = "demoPartitioner", destroyMethod = "")
public Partitioner demoPartitioner() {
DemoPartitioner partitioner = new DemoPartitioner();
// partitioner.partition(20);
return partitioner;
}
@Bean
public Step demoSlaveStep(ItemReader<User> demoReader, ItemProcessor<User, User> demoJobProcessor) {
return stepBuilderFactory.get("demoSlaveStep").<User, User>chunk(3).reader(demoReader)
.processor(demoJobProcessor).writer(userWriter).build();
}
@Bean(name = "demoReader")
@StepScope
public JdbcCursorItemReader<User> demoReader(@Value("#{stepExecutionContext[SQL]}") String SQL,
@Value("#{jobParameters[JOB_PARM]}") String jobParm,
@Value("#{jobExecutionContext[jobExecutionParameter]}") String jobExecutionParameter) {
LOGGER.info("---------------------- demoReader ------------------------------- " + SQL);
LOGGER.info(" jobParm : " + jobParm);
LOGGER.info(" jobExecutionParameter : " + jobExecutionParameter);
JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(200);
reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
reader.setSql(SQL);
return reader;
}
@Bean(name = "demoJobProcessor")
@StepScope
public ItemProcessor<User, User> demoJobProcessor() throws Exception {
LOGGER.info(" DemoJobBatchConfiguration: demoJobProcessor ");
return new UserProcessor();
}
/*
* @Bean public ItemWriter<User> demoWriter() { return users -> { for (User user
* : users) { if (LOGGER.isInfoEnabled()) { LOGGER.info("user read is :: " +
* user.toString()); } } if (LOGGER.isInfoEnabled()) {
* LOGGER.info("%%%%%%%%%%%%%%%%%%%%% demoWriter %%%%%%%%%%%%%%%%%%%%% "); } };
* }
*/
@Bean
public TaskExecutor jobTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
public class DemoPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int range = 3;
int fromId = 1;
int toId = range;
for (int i = fromId; i <= gridSize;) {
ExecutionContext executionContext = new ExecutionContext();
String SQL = "SELECT * FROM CUSTOMER WHERE ID BETWEEN " + fromId + " AND " + toId;
System.out.println("SQL : " + SQL);
executionContext.putInt("fromId", fromId);
executionContext.putInt("toId", toId);
executionContext.putString("SQL", SQL);
executionContext.putString("name", "Thread" + i);
result.put("partition" + i, executionContext);
fromId = toId + 1;
i = fromId;
toId += range;
}
return result;
}
}
|
Passing Data to Future Steps - Spring Batch
By : viewPoint
Date : March 29 2020, 07:55 AM
I think the issue was by ths following , You are confusing the keys to be promoted from the step execution context to the job execution context with data itself. This confusion comes from two places: : someValues should be someKeys calling executionContext.put("keys", someValues); in the @BeforeStep is incorrect code :
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
for (Integer item : items) {
System.out.println("item = " + item);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
stepContext.put("count", count + items.size());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
// retrieve the key from the job execution context
Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
System.out.println("In step 2: step 1 wrote " + count + " items");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"count"});
return listener;
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items
|