| 知乎專欄 | 多維度架構 | | | 微信號 netkiller-ebook | | | QQ群:128659835 請註明“讀者” |
Spring boot 1.5.1
一下安裝僅僅適合開發環境,生產環境請使用這個腳本安裝 https://github.com/oscm/shell/tree/master/mq/kafka
cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka
啟動 Kafka 服務
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
-daemon 表示守護進程方式在後台啟動
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
停止 Kafka 服務
/srv/kafka/bin/kafka-server-stop.sh /srv/kafka/bin/zookeeper-server-stop.sh
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.netkiller</groupId>
<artifactId>deploy</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>deploy.netkiller.cn</name>
<description>Deploy project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> -->
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> -->
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> -->
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<!-- <version>2.7</version> -->
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.38</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>cn.netkiller.Application</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public KafkaConsumerConfig() {
// TODO Auto-generated constructor stub
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
// factory.setConcurrency(1);
// factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;
public class Listener {
public Listener() {
// TODO Auto-generated constructor stub
}
protected Logger logger = Logger.getLogger(Listener.class.getName());
public CountDownLatch getCountDownLatch1() {
return countDownLatch1;
}
private CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
logger.info("Received message: " + record.toString());
System.out.println("Received message: " + record);
countDownLatch1.countDown();
}
}
$ cd /srv/kafka $ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test This is test message.
每輸入一行回車後發送到你的Spring boot kafka 程序
上面的例子僅僅是做了一個熱身,現在我們將實現 一個完整的例子。
例 5.5. Spring boot with Apache kafka.
SpringApplication
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
//import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
//import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
// @EnableMongoRepositories
// @EnableJpaRepositories
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Consumer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import cn.netkiller.kafka.consumer.Consumer;
@Configuration
@EnableKafka
public class ConsumerConfiguration {
public ConsumerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> consumerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Consumer receiver() {
return new Consumer();
}
}
Consumer
package cn.netkiller.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class Consumer {
public Consumer() {
// TODO Auto-generated constructor stub
}
private static final Logger logger = LoggerFactory
.getLogger(Consumer.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "helloworld.t")
public void receiveMessage(String message) {
logger.info("received message='{}'", message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
例 5.6. Spring boot with Apache kafka.
Producer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import cn.netkiller.kafka.producer.Producer;
@Configuration
public class ProducerConfiguration {
public ProducerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> producerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Producer sender() {
return new Producer();
}
}
Producer
package cn.netkiller.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
/*
* public Sender() { // TODO Auto-generated constructor stub }
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// you can register a callback with the listener to receive the result
// of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("unable to send message='{}'", message, ex);
}
});
// alternatively, to block the sending thread, to await the result,
// invoke the future’s get() method
}
}
Controller
package cn.netkiller.web;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@Controller
@RequestMapping("/test")
public class KafkaTestController {
private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
public KafkaTestController() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@RequestMapping("/ping")
@ResponseBody
public String ping() {
String message = "PONG";
return message;
}
@RequestMapping("/kafka/send")
@ResponseBody
public String testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
logger.info(receiver.getLatch().getCount() + "");
return "OK";
}
}
例 5.7. Test Spring Kafka
SpringBootTest
package cn.netkiller;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {
public SpringKafkaApplicationTests() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@Test
public void testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
package schedule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@EnableEurekaClient
@EntityScan("common.domain")
public class Application {
public static void main(String[] args) {
System.out.println("Service Schedule Starting...");
SpringApplication.run(Application.class, args);
}
}
只需要兩行,其餘所有配置均放在配置中心。
# ============================== spring.application.name=schedule eureka.client.serviceUrl.defaultZone=http://eureka:s3cr3t@172.16.0.10:8761/eureka/ # ==============================
配置中心伺服器相關配置
#spring.application.name=schedule spring.cloud.config.profile=development spring.cloud.config.label=master spring.cloud.config.uri=http://172.16.0.10:8888 management.security.enabled=false spring.cloud.config.username=cfg spring.cloud.config.password=s3cr3t
使用 @EnableKafka 啟用 Kafka 不需要其他@Bean等。這個配置檔案可以省略,可以將 @EnableKafka 放到 Application.java 中。我還是喜歡獨立配置。
package schedule.config;
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
package schedule.task;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.RestTemplate;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import schedule.repository.CmsTrashRepository;
import schedule.repository.ArticleRepository;
import common.domain.Article;
import common.domain.CmsTrash;
import common.pojo.ResponseRestful;
@Component
public class CFPushTasks {
private static final Logger logger = LoggerFactory.getLogger(CFPushTasks.class);
private static final String TOPIC = "test";
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final ObjectMapper mapper = new ObjectMapper();
@Autowired
private ArticleRepository articleRepository;
@Autowired
private CmsTrashRepository cmsTrashRepository;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Value("${cf.cms.site_id}")
private int siteId;
public CFPushTasks() {
}
private Date getDate() {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -1);
Date date = calendar.getTime();
return date;
}
private boolean setPostionDate(String key, Date value) {
String cacheKey = String.format("schedule:CFPushTasks:%s", key);
String date = simpleDateFormat.format(value);
logger.info("setPostion({},{})", cacheKey, date);
redisTemplate.opsForValue().set(cacheKey, date);
if (value == this.getPostionDate(cacheKey)) {
return true;
}
return false;
}
private Date getPostionDate(String key) {
String cacheKey = String.format("schedule:CFPushTasks:%s", key);
Date date = null;
if (redisTemplate.hasKey(cacheKey)) {
try {
date = simpleDateFormat.parse(redisTemplate.opsForValue().get(cacheKey));
} catch (ParseException e) {
// TODO Auto-generated catch block
// e.printStackTrace();
logger.warn(e.getMessage());
}
}
logger.debug("getPostion({}) => {}", cacheKey, date);
return date;
}
private boolean setPostionId(String key, int id) {
String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);
logger.info("setPostionId({},{})", cacheKey, id);
redisTemplate.opsForValue().set(cacheKey, String.valueOf(id));
if (id == this.getPostionId(cacheKey)) {
return true;
}
return false;
}
private int getPostionId(String key) {
String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);
int id = 0;
if (redisTemplate.hasKey(cacheKey)) {
id = Integer.valueOf(redisTemplate.opsForValue().get(cacheKey));
}
logger.debug("getPostion({}) => {}", cacheKey, id);
return id;
}
@Scheduled(fixedRate = 1000 * 50)
public void insert() {
Iterable<Article> articles = null;
int id = this.getPostionId("insert");
if (id == 0) {
articles = articleRepository.findBySiteId(this.siteId);
} else {
articles = articleRepository.findBySiteIdAndIdGreaterThan(this.siteId, id);
}
if (articles != null) {
for (Article article : articles) {
ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("insert"), "INSERT", article);
String jsonString;
try {
jsonString = mapper.writeValueAsString(responseRestful);
this.send(TOPIC, jsonString);
if (!this.setPostionId("insert", article.getId())) {
return;
}
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
@Scheduled(fixedRate = 1000 * 50)
public void update() {
String message = "Hello";
this.send(TOPIC, message);
}
@Scheduled(fixedRate = 1000 * 50)
public void delete() {
Date date = this.getPostionDate("delete");
Iterable<CmsTrash> cmsTrashs;
if (date == null) {
cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeOrderByCtime(this.siteId, "delete");
} else {
cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeAndCtimeGreaterThanOrderByCtime(this.siteId, "delete", date);
}
if (cmsTrashs != null) {
for (CmsTrash cmsTrash : cmsTrashs) {
ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("delete"), "DELETE", cmsTrash);
String jsonString;
try {
jsonString = mapper.writeValueAsString(responseRestful);
this.send(TOPIC, jsonString);
this.setPostionId("delete", cmsTrash.getId());
if (!this.setPostionDate("delete", cmsTrash.getCtime())) {
return;
} else {
}
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
private void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("unable to send message='{}'", message, ex);
}
});
}
private void post(ResponseRestful responseRestful) {
RestTemplate restTemplate = new RestTemplate();
String response = restTemplate.postForObject("http://localhost:8440/test/cf/post.json", responseRestful, String.class);
// logger.info(article.toString());
if (response != null) {
logger.info(response);
}
}
}