Home | 簡體中文 | 繁體中文 | 雜文 | 知乎專欄 | Github | OSChina 博客 | 雲社區 | 雲棲社區 | Facebook | Linkedin | 視頻教程 | 打賞(Donations) | About
知乎專欄多維度架構 | 微信號 netkiller-ebook | QQ群:128659835 請註明“讀者”

2.32. Spring boot with Apache Kafka

Spring boot 1.5.1

2.32.1. 安裝 kafka

一下安裝僅僅適合開發環境,生產環境請使用這個腳本安裝 https://github.com/oscm/shell/tree/master/mq/kafka

		
cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka		
		
		

啟動 Kafka 服務

		
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties		
		

		

-daemon 表示守護進程方式在後台啟動

		
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
		

		

停止 Kafka 服務

		
/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh
		
		

2.32.2. maven

		
		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>			
		
		
			
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>cn.netkiller</groupId>
	<artifactId>deploy</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>

	<name>deploy.netkiller.cn</name>
	<description>Deploy project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> -->
		<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> -->
		<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> -->
		<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.session</groupId>
			<artifactId>spring-session-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
		<dependency>
			<groupId>org.webjars</groupId>
			<artifactId>webjars-locator</artifactId>
		</dependency>
		<dependency>
			<groupId>org.webjars</groupId>
			<artifactId>sockjs-client</artifactId>
			<version>1.0.2</version>
		</dependency>
		<dependency>
			<groupId>org.webjars</groupId>
			<artifactId>stomp-websocket</artifactId>
			<version>2.3.3</version>
		</dependency>
		<dependency>
			<groupId>org.webjars</groupId>
			<artifactId>bootstrap</artifactId>
			<version>3.3.7</version>
		</dependency>
		<dependency>
			<groupId>org.webjars</groupId>
			<artifactId>jquery</artifactId>
			<version>3.1.0</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-mail</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.tomcat.embed</groupId>
			<artifactId>tomcat-embed-jasper</artifactId>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>jstl</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<!-- <version>2.7</version> -->
		</dependency>
		<dependency>
			<groupId>com.caucho</groupId>
			<artifactId>hessian</artifactId>
			<version>4.0.38</version>
		</dependency>
		
		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
		
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<mainClass>cn.netkiller.Application</mainClass>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-surefire-plugin</artifactId>
				<configuration>
					<skip>true</skip>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<repositories>
		<repository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</pluginRepository>
		<pluginRepository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</pluginRepository>
	</pluginRepositories>

</project>
			
			
		

2.32.3. Spring boot Application

			
package cn.netkiller;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);

	}
}
			
		

2.32.4. EnableKafka

			
package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	public KafkaConsumerConfig() {
		// TODO Auto-generated constructor stub
	}

	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumerFactory());
		// factory.setConcurrency(1);
		// factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> propsMap = new HashMap<String, Object>();
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return propsMap;
	}

	@Bean
	public Listener listener() {
		return new Listener();
	}

}
			
		

2.32.5. KafkaListener

			
package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class Listener {

	public Listener() {
		// TODO Auto-generated constructor stub
	}

	protected Logger logger = Logger.getLogger(Listener.class.getName());

	public CountDownLatch getCountDownLatch1() {
		return countDownLatch1;
	}

	private CountDownLatch countDownLatch1 = new CountDownLatch(1);

	@KafkaListener(topics = "test")
	public void listen(ConsumerRecord<?, ?> record) {
		logger.info("Received message: " + record.toString());
		System.out.println("Received message: " + record);
		countDownLatch1.countDown();
	}
}			
			
		

2.32.6. 測試

			
$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.
			
		

每輸入一行回車後發送到你的Spring boot kafka 程序

2.32.7. 完整的發佈訂閲實例

上面的例子僅僅是做了一個熱身,現在我們將實現 一個完整的例子。

2.32.7.1. Consumer

例 2.5. Spring boot with Apache kafka.

SpringApplication

				
package cn.netkiller;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
//import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
//import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
// @EnableMongoRepositories
// @EnableJpaRepositories
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);

	}
}				
				
				

Consumer configuration

				
package cn.netkiller.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import cn.netkiller.kafka.consumer.Consumer;

@Configuration
@EnableKafka
public class ConsumerConfiguration {

	public ConsumerConfiguration() {
		// TODO Auto-generated constructor stub
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		HashMap<String, Object> props = new HashMap<>();
		// list of host:port pairs used for establishing the initial connections
		// to the Kakfa cluster
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		// consumer groups allow a pool of processes to divide the work of
		// consuming and processing records
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");

		return props;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}

	@Bean
	public Consumer receiver() {
		return new Consumer();
	}
}
				
				
				

Consumer

				
package cn.netkiller.kafka.consumer;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

public class Consumer {

	public Consumer() {
		// TODO Auto-generated constructor stub
	}
	private static final Logger logger = LoggerFactory
            .getLogger(Consumer.class);

    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "helloworld.t")
    public void receiveMessage(String message) {
    	logger.info("received message='{}'", message);
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}
				
				
				

2.32.7.2. Producer

例 2.6. Spring boot with Apache kafka.

Producer configuration

				
package cn.netkiller.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import cn.netkiller.kafka.producer.Producer;

@Configuration
public class ProducerConfiguration {

	public ProducerConfiguration() {
		// TODO Auto-generated constructor stub
	}

	@Bean
	public Map<String, Object> producerConfigs() {
		HashMap<String, Object> props = new HashMap<>();
		// list of host:port pairs used for establishing the initial connections
		// to the Kakfa cluster
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		// value to block, after which it will throw a TimeoutException
		props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

		return props;
	}

	@Bean
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<String, String>(producerFactory());
	}

	@Bean
	public Producer sender() {
		return new Producer();
	}
}
				
				
				

Producer

				
package cn.netkiller.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class Producer {

	private static final Logger logger = LoggerFactory.getLogger(Producer.class);

	/*
	 * public Sender() { // TODO Auto-generated constructor stub }
	 */

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void sendMessage(String topic, String message) {
		// the KafkaTemplate provides asynchronous send methods returning a
		// Future
		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

		// you can register a callback with the listener to receive the result
		// of the send asynchronously
		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

			@Override
			public void onSuccess(SendResult<String, String> result) {
				logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
			}

			@Override
			public void onFailure(Throwable ex) {
				logger.error("unable to send message='{}'", message, ex);
			}
		});

		// alternatively, to block the sending thread, to await the result,
		// invoke the future’s get() method
	}
}				
				
				

Controller

				
package cn.netkiller.web;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;

@Controller
@RequestMapping("/test")
public class KafkaTestController {

	private static final Logger logger = LoggerFactory.getLogger(IndexController.class);

	public KafkaTestController() {
		// TODO Auto-generated constructor stub
	}

	@Autowired
	private Producer sender;

	@Autowired
	private Consumer receiver;

	@RequestMapping("/ping")
	@ResponseBody
	public String ping() {
		String message = "PONG";
		return message;
	}

	@RequestMapping("/kafka/send")
	@ResponseBody
	public String testReceiver() throws Exception {
		sender.sendMessage("helloworld.t", "Hello Spring Kafka!");

		receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
		logger.info(receiver.getLatch().getCount() + "");
		return "OK";
	}

}				
				
				

2.32.7.3. Test

例 2.7. Test Spring Kafka

SpringBootTest

				
package cn.netkiller;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;


@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {

	public SpringKafkaApplicationTests() {
		// TODO Auto-generated constructor stub
	}
	@Autowired
    private Producer sender;

    @Autowired
    private Consumer receiver;

    @Test
    public void testReceiver() throws Exception {
        sender.sendMessage("helloworld.t", "Hello Spring Kafka!");

        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
        assertThat(receiver.getLatch().getCount()).isEqualTo(0);
    }
}
				
				

				
			

2.32.8. Spring cloud with Kafka

2.32.8.1. Application 主檔案

			
package schedule;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@EnableEurekaClient
@EntityScan("common.domain")
public class Application {

	public static void main(String[] args) {
		System.out.println("Service Schedule Starting...");
		SpringApplication.run(Application.class, args);
	}
}			
			
			

2.32.8.2. 資源配置檔案

2.32.8.2.1. application.properties

只需要兩行,其餘所有配置均放在配置中心。

				
# ==============================
spring.application.name=schedule
eureka.client.serviceUrl.defaultZone=http://eureka:s3cr3t@172.16.0.10:8761/eureka/
# ==============================
				
				
2.32.8.2.2. bootstrap.properties

配置中心伺服器相關配置

				
#spring.application.name=schedule
spring.cloud.config.profile=development
spring.cloud.config.label=master
spring.cloud.config.uri=http://172.16.0.10:8888
management.security.enabled=false
spring.cloud.config.username=cfg
spring.cloud.config.password=s3cr3t
				
				
2.32.8.2.3. Git 倉庫

在 git 倉庫中加入 spring.kafka.bootstrap_servers 配置項

				
spring.kafka.bootstrap_servers=172.16.0.10:9092
				
				

2.32.8.3. 啟用 kafka

使用 @EnableKafka 啟用 Kafka 不需要其他@Bean等。這個配置檔案可以省略,可以將 @EnableKafka 放到 Application.java 中。我還是喜歡獨立配置。

			
package schedule.config;
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
			
			
			

2.32.8.4. 消息發佈主程序

			
package schedule.task;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import schedule.repository.CmsTrashRepository;
import schedule.repository.ArticleRepository;
import common.domain.Article;
import common.domain.CmsTrash;
import common.pojo.ResponseRestful;

@Component
public class CFPushTasks {
	private static final Logger logger = LoggerFactory.getLogger(CFPushTasks.class);

	private static final String TOPIC = "test";
	private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private static final ObjectMapper mapper = new ObjectMapper();

	@Autowired
	private ArticleRepository articleRepository;

	@Autowired
	private CmsTrashRepository cmsTrashRepository;

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Autowired
	private RedisTemplate<String, String> redisTemplate;

	@Value("${cf.cms.site_id}")
	private int siteId;

	public CFPushTasks() {
	}

	private Date getDate() {

		Calendar calendar = Calendar.getInstance();
		calendar.add(Calendar.MINUTE, -1);
		Date date = calendar.getTime();
		return date;
	}

	private boolean setPostionDate(String key, Date value) {
		String cacheKey = String.format("schedule:CFPushTasks:%s", key);
		String date = simpleDateFormat.format(value);
		logger.info("setPostion({},{})", cacheKey, date);
		redisTemplate.opsForValue().set(cacheKey, date);

		if (value == this.getPostionDate(cacheKey)) {
			return true;
		}
		return false;
	}

	private Date getPostionDate(String key) {
		String cacheKey = String.format("schedule:CFPushTasks:%s", key);
		Date date = null;
		if (redisTemplate.hasKey(cacheKey)) {
			try {
				date = simpleDateFormat.parse(redisTemplate.opsForValue().get(cacheKey));
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				// e.printStackTrace();
				logger.warn(e.getMessage());
			}
		}
		logger.debug("getPostion({}) => {}", cacheKey, date);
		return date;
	}

	private boolean setPostionId(String key, int id) {
		String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);
		logger.info("setPostionId({},{})", cacheKey, id);
		redisTemplate.opsForValue().set(cacheKey, String.valueOf(id));
		if (id == this.getPostionId(cacheKey)) {
			return true;
		}
		return false;
	}

	private int getPostionId(String key) {
		String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);
		int id = 0;
		if (redisTemplate.hasKey(cacheKey)) {
			id = Integer.valueOf(redisTemplate.opsForValue().get(cacheKey));
		}
		logger.debug("getPostion({}) => {}", cacheKey, id);
		return id;
	}

	@Scheduled(fixedRate = 1000 * 50)
	public void insert() {
		Iterable<Article> articles = null;
		int id = this.getPostionId("insert");

		if (id == 0) {
			articles = articleRepository.findBySiteId(this.siteId);

		} else {
			articles = articleRepository.findBySiteIdAndIdGreaterThan(this.siteId, id);
		}
		if (articles != null) {
			for (Article article : articles) {
				ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("insert"), "INSERT", article);
				String jsonString;
				try {
					jsonString = mapper.writeValueAsString(responseRestful);
					this.send(TOPIC, jsonString);
					if (!this.setPostionId("insert", article.getId())) {
						return;
					}

				} catch (JsonProcessingException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

	@Scheduled(fixedRate = 1000 * 50)
	public void update() {
		String message = "Hello";
		this.send(TOPIC, message);

	}

	@Scheduled(fixedRate = 1000 * 50)
	public void delete() {
		Date date = this.getPostionDate("delete");
		Iterable<CmsTrash> cmsTrashs;
		if (date == null) {
			cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeOrderByCtime(this.siteId, "delete");
		} else {
			cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeAndCtimeGreaterThanOrderByCtime(this.siteId, "delete", date);
		}
		if (cmsTrashs != null) {

			for (CmsTrash cmsTrash : cmsTrashs) {
				ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("delete"), "DELETE", cmsTrash);
				String jsonString;
				try {
					jsonString = mapper.writeValueAsString(responseRestful);
					this.send(TOPIC, jsonString);
					this.setPostionId("delete", cmsTrash.getId());
					if (!this.setPostionDate("delete", cmsTrash.getCtime())) {
						return;
					} else {

					}
				} catch (JsonProcessingException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

			}
		}

	}

	private void send(String topic, String message) {

		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

			@Override
			public void onSuccess(SendResult<String, String> result) {
				logger.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
			}

			@Override
			public void onFailure(Throwable ex) {
				logger.error("unable to send message='{}'", message, ex);
			}
		});
	}

	private void post(ResponseRestful responseRestful) {
		RestTemplate restTemplate = new RestTemplate();
		String response = restTemplate.postForObject("http://localhost:8440/test/cf/post.json", responseRestful, String.class);

		// logger.info(article.toString());
		if (response != null) {
			logger.info(response);
		}
	}
}