| 知乎專欄 | 多維度架構 | | | 微信號 netkiller-ebook | | | QQ群:128659835 請註明“讀者” |
package cn.netkiller.webflux.component;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class HelloWorldHandler {
public HelloWorldHandler() {
}
public Mono<ServerResponse> helloWorld(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(BodyInserters.fromObject("Hello World!!!"));
}
}
package cn.netkiller.webflux.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import cn.netkiller.webflux.component.HelloWorldHandler;
@Configuration
public class WebFluxRouter {
public WebFluxRouter() {
}
@Bean
public RouterFunction<ServerResponse> routeHelloWorld(HelloWorldHandler helloWorldHandler) {
return RouterFunctions.route(RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), helloWorldHandler::helloWorld);
}
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
spring.thymeleaf.cache=true # Enable template caching. spring.thymeleaf.check-template=true # Check that the template exists before rendering it. spring.thymeleaf.check-template-location=true # Check that the templates location exists. spring.thymeleaf.enabled=true # Enable Thymeleaf view resolution for Web frameworks. spring.thymeleaf.encoding=UTF-8 # Template files encoding. spring.thymeleaf.excluded-view-names= # Comma-separated list of view names that should be excluded from resolution. spring.thymeleaf.mode=HTML5 # Template mode to be applied to templates. See also StandardTemplateModeHandlers. spring.thymeleaf.prefix=classpath:/templates/ # Prefix that gets prepended to view names when building a URL. spring.thymeleaf.reactive.max-chunk-size= # Maximum size of data buffers used for writing to the response, in bytes. spring.thymeleaf.reactive.media-types= # Media types supported by the view technology. spring.thymeleaf.servlet.content-type=text/html # Content-Type value written to HTTP responses. spring.thymeleaf.suffix=.html # Suffix that gets appended to view names when building a URL. spring.thymeleaf.template-resolver-order= # Order of the template resolver in the chain. spring.thymeleaf.view-names= # Comma-separated list of view names that can be resolved.
@GetMapping("/welcome")
public Mono<String> hello(final Model model) {
model.addAttribute("name", "Neo");
model.addAttribute("city", "深圳");
String path = "hello";
return Mono.create(monoSink -> monoSink.success(path));
}
@GetMapping("/list")
public String listPage(final Model model) {
final Flux<City> citys = cityService.findAllCity();
model.addAttribute("cityLists", citys);
return "cityList";
}
welcome.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8"/>
<title>歡迎頁面</title>
</head>
<body>
<h1 >你好,歡迎來自<p th:text="${city}"></p>的<p th:text="${name}"></p></h1>
</body>
</html>
cityList.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8"/>
<title>城市列表</title>
</head>
<body>
<div>
<table>
<legend>
<strong>城市列表</strong>
</legend>
<thead>
<tr>
<th>城市編號</th>
<th>省份編號</th>
<th>名稱</th>
<th>描述</th>
</tr>
</thead>
<tbody>
<tr th:each="city : ${cityLists}">
<td th:text="${city.id}"></td>
<td th:text="${city.provinceId}"></td>
<td th:text="${city.name}"></td>
<td th:text="${city.description}"></td>
</tr>
</tbody>
</table>
</div>
</body>
</html>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
server:
port: 8080
spring:
application:
name: webflux
redis:
host: 127.0.0.1
port: 6379
password: pwd2020
timeout: 5000
lettuce:
pool:
max-active: 200
max-idle: 20
min-idle: 5
max-wait: 1000
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());
return reactiveRedisTemplate;
}
@Service
public class RedisServiceImpl implements RedisService {
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Override
public Mono<String> get(String key) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.get(key);
}
@Override
public Mono<String> set(String key,User user) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.getAndSet(key, JSON.toJSONString(user));
}
@Override
public Mono<Boolean> delete(String key) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.delete(key);
}
@Override
public Mono<String> update(String key,User user) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return operations.getAndSet(key, JSON.toJSONString(user));
}
@Override
public Flux<String> all(String key) {
ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
return operations.range(key, 0, -1);
}
@Override
public Mono<Long> push(String key,List<String> list) {
ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
return operations.leftPushAll(key, list);
}
@Override
public Flux<String> find(String key) {
ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));
}
}
@RestController
@RequestMapping("/user")
public class UserController {
public final static String USER_KEY="user";
@Autowired
private RedisService redisService;
@GetMapping("/get/{key}")
public Mono<String> getUserByKey(@PathVariable("id")String key){
return redisService.get(key);
}
@GetMapping("/add")
public Mono<String> add(User user){
user = new User();
user.setAccount("neo");
user.setPassword("123456");
user.setNickname("netkiller");
user.setEmail("netkiller@msn.com");
user.setPhone("");
user.setGender(true);
user.setBirthday("1980-01-30");
user.setProvince("廣東省");
user.setCity("深圳市");
user.setCounty("南山區");
user.setAddress("");
user.setState("Enabled");
System.out.println(JSON.toJSONString(user));
return redisService.set("neo",user);
}
@GetMapping("/addlist")
public Mono<Long> addlist(){
List<String> list=new ArrayList<String>();
User user = new User();
user.setAccount("neo");
user.setPassword("123456");
user.setNickname("netkiller");
user.setEmail("netkiller@msn.com");
user.setPhone("");
user.setGender(true);
user.setBirthday("1980-01-30");
user.setProvince("廣東省");
user.setCity("深圳市");
user.setCounty("南山區");
user.setAddress("");
user.setState("Enabled");
//添加第一條數據
list.add(JSON.toJSONString(user));
//添加第二條數據
list.add(JSON.toJSONString(user));
//添加第三條數據
list.add(JSON.toJSONString(user));
return redisService.addlist("list", list);
}
@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> findAll(){
return redisService.all("list").delayElements(Duration.ofSeconds(2));
}
@GetMapping("/getUsers")
public Flux<String> findUsers() {
return redisService.find("*").delayElements(Duration.ofSeconds(2));
}
}
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import cn.netkiller.entity.User;
public interface UserRepository extends ReactiveMongoRepository<User, Long>{
}
@Service
public class MongoServiceImpl implements MongoService {
@Autowired
private UserRepository userRepository;
@Override
public Mono<User> getById(Long id) {
return userRepository.findById(id);
}
@Override
public Mono<User> addUser(User user) {
return userRepository.save(user);
}
@Override
public Mono<Boolean> deleteById(Long id) {
userRepository.deleteById(id);
return Mono.create(userMonoSink -> userMonoSink.success());
}
@Override
public Mono<User> updateById(User user) {
return userRepository.save(user);
}
@Override
public Flux<User> findAllUser() {
return userRepository.findAll();
}
}
@RestController
@RequestMapping("/usermg")
public class UserMongoController {
@Autowired
private MongoService mongoService;
@GetMapping("/add")
public Mono<User> add(User user) {
user = new User();
User user = new User();
user.setAccount("neo");
user.setPassword("123456");
user.setNickname("netkiller");
user.setEmail("netkiller@msn.com");
user.setPhone("");
user.setGender(true);
user.setBirthday("1980-01-30");
user.setProvince("廣東省");
user.setCity("深圳市");
user.setCounty("南山區");
user.setAddress("");
user.setState("Enabled");
System.out.println(JSON.toJSONString(user));
return mongoService.addUser(user);
}
/**
* 注意這裡 produces = MediaType.APPLICATION_STREAM_JSON_VALUE 必須這樣設置
*/
@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> findAll(){
return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));
}
}
produces 如果不是application/stream+json則調用端無法滾動得到結果,將一直阻塞等待數據流結束或超時。
package cn.netkiller.webflux.controller;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
@RestController
@RequestMapping("/sse")
public class SseController {
private int count_down = 10;
public SseController() {
}
@GetMapping(value = "/launch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> countDown() {
return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
}
private String getCountDownSec() {
if (count_down > 0) {
count_down--;
return "倒計時:" + count_down;
}
return "發射";
}
@GetMapping("/random")
public Flux<ServerSentEvent<Integer>> randomNumbers() {
return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt())).map(data -> ServerSentEvent.<Integer>builder().event("random").id(Long.toString(data.getT1())).data(data.getT2()).build());
}
@GetMapping("/range")
public Flux<Object> range() {
return Flux.range(10, 1).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build());
}
}
運行結果
id:0 event:launch data:倒計時:9 id:1 event:launch data:倒計時:8 id:2 event:launch data:倒計時:7 id:3 event:launch data:倒計時:6 id:4 event:launch data:倒計時:5 id:5 event:launch data:倒計時:4 id:6 event:launch data:倒計時:3 id:7 event:launch data:倒計時:2 id:8 event:launch data:倒計時:1 id:9 event:launch data:倒計時:0 id:10 event:launch data:發射