知乎專欄 | 多維度架構 | | | 微信號 netkiller-ebook | | | QQ群:128659835 請註明“讀者” |
Spring boot 1.5.1
一下安裝僅僅適合開發環境,生產環境請使用這個腳本安裝 https://github.com/oscm/shell/tree/master/mq/kafka
cd /usr/local/src wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz tar zxvf kafka_2.12-0.10.2.0.tgz mv kafka_2.12-0.10.2.0 /srv/ cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original} echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka
啟動 Kafka 服務
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
-daemon 表示守護進程方式在後台啟動
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
停止 Kafka 服務
/srv/kafka/bin/kafka-server-stop.sh /srv/kafka/bin/zookeeper-server-stop.sh
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.netkiller</groupId> <artifactId>deploy</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <name>deploy.netkiller.cn</name> <description>Deploy project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> --> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> --> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> --> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>webjars-locator</artifactId> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>sockjs-client</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>stomp-websocket</artifactId> <version>2.3.3</version> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>bootstrap</artifactId> <version>3.3.7</version> </dependency> <dependency> <groupId>org.webjars</groupId> <artifactId>jquery</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <!-- <version>2.7</version> --> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.38</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>cn.netkiller.Application</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </project>
package cn.netkiller; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableAutoConfiguration @ComponentScan @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
package cn.netkiller.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { public KafkaConsumerConfig() { // TODO Auto-generated constructor stub } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumerFactory()); // factory.setConcurrency(1); // factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<String, Object>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
package cn.netkiller.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import java.util.concurrent.CountDownLatch; import java.util.logging.Logger; public class Listener { public Listener() { // TODO Auto-generated constructor stub } protected Logger logger = Logger.getLogger(Listener.class.getName()); public CountDownLatch getCountDownLatch1() { return countDownLatch1; } private CountDownLatch countDownLatch1 = new CountDownLatch(1); @KafkaListener(topics = "test") public void listen(ConsumerRecord<?, ?> record) { logger.info("Received message: " + record.toString()); System.out.println("Received message: " + record); countDownLatch1.countDown(); } }
$ cd /srv/kafka $ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test This is test message.
每輸入一行回車後發送到你的Spring boot kafka 程序
上面的例子僅僅是做了一個熱身,現在我們將實現 一個完整的例子。
例 2.5. Spring boot with Apache kafka.
SpringApplication
package cn.netkiller; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; //import org.springframework.data.jpa.repository.config.EnableJpaRepositories; //import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableAutoConfiguration @ComponentScan // @EnableMongoRepositories // @EnableJpaRepositories @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Consumer configuration
package cn.netkiller.kafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import cn.netkiller.kafka.consumer.Consumer; @Configuration @EnableKafka public class ConsumerConfiguration { public ConsumerConfiguration() { // TODO Auto-generated constructor stub } @Bean public Map<String, Object> consumerConfigs() { HashMap<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // consumer groups allow a pool of processes to divide the work of // consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public Consumer receiver() { return new Consumer(); } }
Consumer
package cn.netkiller.kafka.consumer; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; public class Consumer { public Consumer() { // TODO Auto-generated constructor stub } private static final Logger logger = LoggerFactory .getLogger(Consumer.class); private CountDownLatch latch = new CountDownLatch(1); @KafkaListener(topics = "helloworld.t") public void receiveMessage(String message) { logger.info("received message='{}'", message); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }
例 2.6. Spring boot with Apache kafka.
Producer configuration
package cn.netkiller.kafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import cn.netkiller.kafka.producer.Producer; @Configuration public class ProducerConfiguration { public ProducerConfiguration() { // TODO Auto-generated constructor stub } @Bean public Map<String, Object> producerConfigs() { HashMap<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value to block, after which it will throw a TimeoutException props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<String, String>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } @Bean public Producer sender() { return new Producer(); } }
Producer
package cn.netkiller.kafka.producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); /* * public Sender() { // TODO Auto-generated constructor stub } */ @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { // the KafkaTemplate provides asynchronous send methods returning a // Future ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); // you can register a callback with the listener to receive the result // of the send asynchronously future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { logger.error("unable to send message='{}'", message, ex); } }); // alternatively, to block the sending thread, to await the result, // invoke the future’s get() method } }
Controller
package cn.netkiller.web; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import cn.netkiller.kafka.consumer.Consumer; import cn.netkiller.kafka.producer.Producer; @Controller @RequestMapping("/test") public class KafkaTestController { private static final Logger logger = LoggerFactory.getLogger(IndexController.class); public KafkaTestController() { // TODO Auto-generated constructor stub } @Autowired private Producer sender; @Autowired private Consumer receiver; @RequestMapping("/ping") @ResponseBody public String ping() { String message = "PONG"; return message; } @RequestMapping("/kafka/send") @ResponseBody public String testReceiver() throws Exception { sender.sendMessage("helloworld.t", "Hello Spring Kafka!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); logger.info(receiver.getLatch().getCount() + ""); return "OK"; } }
例 2.7. Test Spring Kafka
SpringBootTest
package cn.netkiller; import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import cn.netkiller.kafka.consumer.Consumer; import cn.netkiller.kafka.producer.Producer; @RunWith(SpringRunner.class) @SpringBootTest public class SpringKafkaApplicationTests { public SpringKafkaApplicationTests() { // TODO Auto-generated constructor stub } @Autowired private Producer sender; @Autowired private Consumer receiver; @Test public void testReceiver() throws Exception { sender.sendMessage("helloworld.t", "Hello Spring Kafka!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(receiver.getLatch().getCount()).isEqualTo(0); } }
package schedule; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling @EnableEurekaClient @EntityScan("common.domain") public class Application { public static void main(String[] args) { System.out.println("Service Schedule Starting..."); SpringApplication.run(Application.class, args); } }
只需要兩行,其餘所有配置均放在配置中心。
# ============================== spring.application.name=schedule eureka.client.serviceUrl.defaultZone=http://eureka:s3cr3t@172.16.0.10:8761/eureka/ # ==============================
配置中心伺服器相關配置
#spring.application.name=schedule spring.cloud.config.profile=development spring.cloud.config.label=master spring.cloud.config.uri=http://172.16.0.10:8888 management.security.enabled=false spring.cloud.config.username=cfg spring.cloud.config.password=s3cr3t
使用 @EnableKafka 啟用 Kafka 不需要其他@Bean等。這個配置檔案可以省略,可以將 @EnableKafka 放到 Application.java 中。我還是喜歡獨立配置。
package schedule.config; @Configuration @EnableKafka public class KafkaConfiguration { }
package schedule.task; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.client.RestTemplate; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import schedule.repository.CmsTrashRepository; import schedule.repository.ArticleRepository; import common.domain.Article; import common.domain.CmsTrash; import common.pojo.ResponseRestful; @Component public class CFPushTasks { private static final Logger logger = LoggerFactory.getLogger(CFPushTasks.class); private static final String TOPIC = "test"; private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final ObjectMapper mapper = new ObjectMapper(); @Autowired private ArticleRepository articleRepository; @Autowired private CmsTrashRepository cmsTrashRepository; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private RedisTemplate<String, String> redisTemplate; @Value("${cf.cms.site_id}") private int siteId; public CFPushTasks() { } private Date getDate() { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, -1); Date date = calendar.getTime(); return date; } private boolean setPostionDate(String key, Date value) { String cacheKey = String.format("schedule:CFPushTasks:%s", key); String date = simpleDateFormat.format(value); logger.info("setPostion({},{})", cacheKey, date); redisTemplate.opsForValue().set(cacheKey, date); if (value == this.getPostionDate(cacheKey)) { return true; } return false; } private Date getPostionDate(String key) { String cacheKey = String.format("schedule:CFPushTasks:%s", key); Date date = null; if (redisTemplate.hasKey(cacheKey)) { try { date = simpleDateFormat.parse(redisTemplate.opsForValue().get(cacheKey)); } catch (ParseException e) { // TODO Auto-generated catch block // e.printStackTrace(); logger.warn(e.getMessage()); } } logger.debug("getPostion({}) => {}", cacheKey, date); return date; } private boolean setPostionId(String key, int id) { String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key); logger.info("setPostionId({},{})", cacheKey, id); redisTemplate.opsForValue().set(cacheKey, String.valueOf(id)); if (id == this.getPostionId(cacheKey)) { return true; } return false; } private int getPostionId(String key) { String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key); int id = 0; if (redisTemplate.hasKey(cacheKey)) { id = Integer.valueOf(redisTemplate.opsForValue().get(cacheKey)); } logger.debug("getPostion({}) => {}", cacheKey, id); return id; } @Scheduled(fixedRate = 1000 * 50) public void insert() { Iterable<Article> articles = null; int id = this.getPostionId("insert"); if (id == 0) { articles = articleRepository.findBySiteId(this.siteId); } else { articles = articleRepository.findBySiteIdAndIdGreaterThan(this.siteId, id); } if (articles != null) { for (Article article : articles) { ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("insert"), "INSERT", article); String jsonString; try { jsonString = mapper.writeValueAsString(responseRestful); this.send(TOPIC, jsonString); if (!this.setPostionId("insert", article.getId())) { return; } } catch (JsonProcessingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } @Scheduled(fixedRate = 1000 * 50) public void update() { String message = "Hello"; this.send(TOPIC, message); } @Scheduled(fixedRate = 1000 * 50) public void delete() { Date date = this.getPostionDate("delete"); Iterable<CmsTrash> cmsTrashs; if (date == null) { cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeOrderByCtime(this.siteId, "delete"); } else { cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeAndCtimeGreaterThanOrderByCtime(this.siteId, "delete", date); } if (cmsTrashs != null) { for (CmsTrash cmsTrash : cmsTrashs) { ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("delete"), "DELETE", cmsTrash); String jsonString; try { jsonString = mapper.writeValueAsString(responseRestful); this.send(TOPIC, jsonString); this.setPostionId("delete", cmsTrash.getId()); if (!this.setPostionDate("delete", cmsTrash.getCtime())) { return; } else { } } catch (JsonProcessingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private void send(String topic, String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { logger.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { logger.error("unable to send message='{}'", message, ex); } }); } private void post(ResponseRestful responseRestful) { RestTemplate restTemplate = new RestTemplate(); String response = restTemplate.postForObject("http://localhost:8440/test/cf/post.json", responseRestful, String.class); // logger.info(article.toString()); if (response != null) { logger.info(response); } } }