知乎專欄 | 多維度架構 | | | 微信號 netkiller-ebook | | | QQ群:128659835 請註明“讀者” |
package cn.netkiller.webflux.component; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; @Component public class HelloWorldHandler { public HelloWorldHandler() { } public Mono<ServerResponse> helloWorld(ServerRequest request) { return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(BodyInserters.fromObject("Hello World!!!")); } }
package cn.netkiller.webflux.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import cn.netkiller.webflux.component.HelloWorldHandler; @Configuration public class WebFluxRouter { public WebFluxRouter() { } @Bean public RouterFunction<ServerResponse> routeHelloWorld(HelloWorldHandler helloWorldHandler) { return RouterFunctions.route(RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), helloWorldHandler::helloWorld); } }
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency>
spring.thymeleaf.cache=true # Enable template caching. spring.thymeleaf.check-template=true # Check that the template exists before rendering it. spring.thymeleaf.check-template-location=true # Check that the templates location exists. spring.thymeleaf.enabled=true # Enable Thymeleaf view resolution for Web frameworks. spring.thymeleaf.encoding=UTF-8 # Template files encoding. spring.thymeleaf.excluded-view-names= # Comma-separated list of view names that should be excluded from resolution. spring.thymeleaf.mode=HTML5 # Template mode to be applied to templates. See also StandardTemplateModeHandlers. spring.thymeleaf.prefix=classpath:/templates/ # Prefix that gets prepended to view names when building a URL. spring.thymeleaf.reactive.max-chunk-size= # Maximum size of data buffers used for writing to the response, in bytes. spring.thymeleaf.reactive.media-types= # Media types supported by the view technology. spring.thymeleaf.servlet.content-type=text/html # Content-Type value written to HTTP responses. spring.thymeleaf.suffix=.html # Suffix that gets appended to view names when building a URL. spring.thymeleaf.template-resolver-order= # Order of the template resolver in the chain. spring.thymeleaf.view-names= # Comma-separated list of view names that can be resolved.
@GetMapping("/welcome") public Mono<String> hello(final Model model) { model.addAttribute("name", "Neo"); model.addAttribute("city", "深圳"); String path = "hello"; return Mono.create(monoSink -> monoSink.success(path)); } @GetMapping("/list") public String listPage(final Model model) { final Flux<City> citys = cityService.findAllCity(); model.addAttribute("cityLists", citys); return "cityList"; }
welcome.html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"/> <title>歡迎頁面</title> </head> <body> <h1 >你好,歡迎來自<p th:text="${city}"></p>的<p th:text="${name}"></p></h1> </body> </html>
cityList.html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"/> <title>城市列表</title> </head> <body> <div> <table> <legend> <strong>城市列表</strong> </legend> <thead> <tr> <th>城市編號</th> <th>省份編號</th> <th>名稱</th> <th>描述</th> </tr> </thead> <tbody> <tr th:each="city : ${cityLists}"> <td th:text="${city.id}"></td> <td th:text="${city.provinceId}"></td> <td th:text="${city.name}"></td> <td th:text="${city.description}"></td> </tr> </tbody> </table> </div> </body> </html>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
server: port: 8080 spring: application: name: webflux redis: host: 127.0.0.1 port: 6379 password: pwd2020 timeout: 5000 lettuce: pool: max-active: 200 max-idle: 20 min-idle: 5 max-wait: 1000
@Bean public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) { ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string()); return reactiveRedisTemplate; }
@Service public class RedisServiceImpl implements RedisService { @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Override public Mono<String> get(String key) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.get(key); } @Override public Mono<String> set(String key,User user) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.getAndSet(key, JSON.toJSONString(user)); } @Override public Mono<Boolean> delete(String key) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.delete(key); } @Override public Mono<String> update(String key,User user) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return operations.getAndSet(key, JSON.toJSONString(user)); } @Override public Flux<String> all(String key) { ReactiveListOperations<String, String> operations = redisTemplate.opsForList(); return operations.range(key, 0, -1); } @Override public Mono<Long> push(String key,List<String> list) { ReactiveListOperations<String, String> operations = redisTemplate.opsForList(); return operations.leftPushAll(key, list); } @Override public Flux<String> find(String key) { ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue(); return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId)); } }
@RestController @RequestMapping("/user") public class UserController { public final static String USER_KEY="user"; @Autowired private RedisService redisService; @GetMapping("/get/{key}") public Mono<String> getUserByKey(@PathVariable("id")String key){ return redisService.get(key); } @GetMapping("/add") public Mono<String> add(User user){ user = new User(); user.setAccount("neo"); user.setPassword("123456"); user.setNickname("netkiller"); user.setEmail("netkiller@msn.com"); user.setPhone(""); user.setGender(true); user.setBirthday("1980-01-30"); user.setProvince("廣東省"); user.setCity("深圳市"); user.setCounty("南山區"); user.setAddress(""); user.setState("Enabled"); System.out.println(JSON.toJSONString(user)); return redisService.set("neo",user); } @GetMapping("/addlist") public Mono<Long> addlist(){ List<String> list=new ArrayList<String>(); User user = new User(); user.setAccount("neo"); user.setPassword("123456"); user.setNickname("netkiller"); user.setEmail("netkiller@msn.com"); user.setPhone(""); user.setGender(true); user.setBirthday("1980-01-30"); user.setProvince("廣東省"); user.setCity("深圳市"); user.setCounty("南山區"); user.setAddress(""); user.setState("Enabled"); //添加第一條數據 list.add(JSON.toJSONString(user)); //添加第二條數據 list.add(JSON.toJSONString(user)); //添加第三條數據 list.add(JSON.toJSONString(user)); return redisService.addlist("list", list); } @GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<String> findAll(){ return redisService.all("list").delayElements(Duration.ofSeconds(2)); } @GetMapping("/getUsers") public Flux<String> findUsers() { return redisService.find("*").delayElements(Duration.ofSeconds(2)); } }
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import cn.netkiller.entity.User; public interface UserRepository extends ReactiveMongoRepository<User, Long>{ }
@Service public class MongoServiceImpl implements MongoService { @Autowired private UserRepository userRepository; @Override public Mono<User> getById(Long id) { return userRepository.findById(id); } @Override public Mono<User> addUser(User user) { return userRepository.save(user); } @Override public Mono<Boolean> deleteById(Long id) { userRepository.deleteById(id); return Mono.create(userMonoSink -> userMonoSink.success()); } @Override public Mono<User> updateById(User user) { return userRepository.save(user); } @Override public Flux<User> findAllUser() { return userRepository.findAll(); } }
@RestController @RequestMapping("/usermg") public class UserMongoController { @Autowired private MongoService mongoService; @GetMapping("/add") public Mono<User> add(User user) { user = new User(); User user = new User(); user.setAccount("neo"); user.setPassword("123456"); user.setNickname("netkiller"); user.setEmail("netkiller@msn.com"); user.setPhone(""); user.setGender(true); user.setBirthday("1980-01-30"); user.setProvince("廣東省"); user.setCity("深圳市"); user.setCounty("南山區"); user.setAddress(""); user.setState("Enabled"); System.out.println(JSON.toJSONString(user)); return mongoService.addUser(user); } /** * 注意這裡 produces = MediaType.APPLICATION_STREAM_JSON_VALUE 必須這樣設置 */ @GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<User> findAll(){ return mongoService.findAllUser().delayElements(Duration.ofSeconds(1)); } }
produces 如果不是application/stream+json則調用端無法滾動得到結果,將一直阻塞等待數據流結束或超時。
package cn.netkiller.webflux.controller; import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.util.function.Tuples; @RestController @RequestMapping("/sse") public class SseController { private int count_down = 10; public SseController() { } @GetMapping(value = "/launch", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<Object>> countDown() { return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build()); } private String getCountDownSec() { if (count_down > 0) { count_down--; return "倒計時:" + count_down; } return "發射"; } @GetMapping("/random") public Flux<ServerSentEvent<Integer>> randomNumbers() { return Flux.interval(Duration.ofSeconds(1)).map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt())).map(data -> ServerSentEvent.<Integer>builder().event("random").id(Long.toString(data.getT1())).data(data.getT2()).build()); } @GetMapping("/range") public Flux<Object> range() { return Flux.range(10, 1).map(seq -> Tuples.of(seq, getCountDownSec())).map(data -> ServerSentEvent.<Object>builder().event("launch").id(Long.toString(data.getT1())).data(data.getT2().toString()).build()); } }
運行結果
id:0 event:launch data:倒計時:9 id:1 event:launch data:倒計時:8 id:2 event:launch data:倒計時:7 id:3 event:launch data:倒計時:6 id:4 event:launch data:倒計時:5 id:5 event:launch data:倒計時:4 id:6 event:launch data:倒計時:3 id:7 event:launch data:倒計時:2 id:8 event:launch data:倒計時:1 id:9 event:launch data:倒計時:0 id:10 event:launch data:發射