tower/util/mod.rs
1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod boxed_clone_sync;
7mod call_all;
8mod either;
9
10mod future_service;
11mod map_err;
12mod map_request;
13mod map_response;
14mod map_result;
15
16mod map_future;
17mod oneshot;
18mod optional;
19mod ready;
20mod service_fn;
21mod then;
22
23pub mod rng;
24
25pub use self::{
26 and_then::{AndThen, AndThenLayer},
27 boxed::{
28 BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
29 },
30 boxed_clone::BoxCloneService,
31 boxed_clone_sync::BoxCloneSyncService,
32 either::Either,
33 future_service::{future_service, FutureService},
34 map_err::{MapErr, MapErrLayer},
35 map_future::{MapFuture, MapFutureLayer},
36 map_request::{MapRequest, MapRequestLayer},
37 map_response::{MapResponse, MapResponseLayer},
38 map_result::{MapResult, MapResultLayer},
39 oneshot::Oneshot,
40 optional::Optional,
41 ready::{Ready, ReadyOneshot},
42 service_fn::{service_fn, ServiceFn},
43 then::{Then, ThenLayer},
44};
45
46pub use self::call_all::{CallAll, CallAllUnordered};
47use std::future::Future;
48
49use crate::layer::util::Identity;
50
51#[cfg(feature = "buffer")]
52use crate::buffer::Buffer;
53
54#[cfg(feature = "retry")]
55use crate::retry::Retry;
56
57pub mod error {
58 //! Error types
59
60 pub use super::optional::error as optional;
61}
62
63pub mod future {
64 //! Future types
65
66 pub use super::and_then::AndThenFuture;
67 pub use super::either::EitherResponseFuture;
68 pub use super::map_err::MapErrFuture;
69 pub use super::map_response::MapResponseFuture;
70 pub use super::map_result::MapResultFuture;
71 pub use super::optional::future as optional;
72 pub use super::then::ThenFuture;
73}
74
75/// An extension trait for `Service`s that provides a variety of convenient
76/// adapters
77pub trait ServiceExt<Request>: tower_service::Service<Request> {
78 /// Yields a mutable reference to the service when it is ready to accept a request.
79 fn ready(&mut self) -> Ready<'_, Self, Request>
80 where
81 Self: Sized,
82 {
83 Ready::new(self)
84 }
85
86 /// Yields the service when it is ready to accept a request.
87 fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
88 where
89 Self: Sized,
90 {
91 ReadyOneshot::new(self)
92 }
93
94 /// Consume this `Service`, calling it with the provided request once it is ready.
95 fn oneshot(self, req: Request) -> Oneshot<Self, Request>
96 where
97 Self: Sized,
98 {
99 Oneshot::new(self, req)
100 }
101
102 /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
103 ///
104 /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
105 /// Response>`][stream]. See the documentation for [`CallAll`] for
106 /// details.
107 ///
108 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
109 /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110 fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
111 where
112 Self: Sized,
113 S: futures_core::Stream<Item = Request>,
114 {
115 CallAll::new(self, reqs)
116 }
117
118 /// Executes a new future after this service's future resolves. This does
119 /// not alter the behaviour of the [`poll_ready`] method.
120 ///
121 /// This method can be used to change the [`Response`] type of the service
122 /// into a different type. You can use this method to chain along a computation once the
123 /// service's response has been resolved.
124 ///
125 /// [`Response`]: crate::Service::Response
126 /// [`poll_ready`]: crate::Service::poll_ready
127 ///
128 /// # Example
129 /// ```
130 /// # use std::task::{Poll, Context};
131 /// # use tower::{Service, ServiceExt};
132 /// #
133 /// # struct DatabaseService;
134 /// # impl DatabaseService {
135 /// # fn new(address: &str) -> Self {
136 /// # DatabaseService
137 /// # }
138 /// # }
139 /// #
140 /// # struct Record {
141 /// # pub name: String,
142 /// # pub age: u16
143 /// # }
144 /// #
145 /// # impl Service<u32> for DatabaseService {
146 /// # type Response = Record;
147 /// # type Error = u8;
148 /// # type Future = std::future::Ready<Result<Record, u8>>;
149 /// #
150 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151 /// # Poll::Ready(Ok(()))
152 /// # }
153 /// #
154 /// # fn call(&mut self, request: u32) -> Self::Future {
155 /// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
156 /// # }
157 /// # }
158 /// #
159 /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
160 /// #
161 /// # fn main() {
162 /// # async {
163 /// // A service returning Result<Record, _>
164 /// let service = DatabaseService::new("127.0.0.1:8080");
165 ///
166 /// // Map the response into a new response
167 /// let mut new_service = service.and_then(|record: Record| async move {
168 /// let name = record.name;
169 /// avatar_lookup(name).await
170 /// });
171 ///
172 /// // Call the new service
173 /// let id = 13;
174 /// let avatar = new_service.call(id).await.unwrap();
175 /// # };
176 /// # }
177 /// ```
178 fn and_then<F>(self, f: F) -> AndThen<Self, F>
179 where
180 Self: Sized,
181 F: Clone,
182 {
183 AndThen::new(self, f)
184 }
185
186 /// Maps this service's response value to a different value. This does not
187 /// alter the behaviour of the [`poll_ready`] method.
188 ///
189 /// This method can be used to change the [`Response`] type of the service
190 /// into a different type. It is similar to the [`Result::map`]
191 /// method. You can use this method to chain along a computation once the
192 /// service's response has been resolved.
193 ///
194 /// [`Response`]: crate::Service::Response
195 /// [`poll_ready`]: crate::Service::poll_ready
196 ///
197 /// # Example
198 /// ```
199 /// # use std::task::{Poll, Context};
200 /// # use tower::{Service, ServiceExt};
201 /// #
202 /// # struct DatabaseService;
203 /// # impl DatabaseService {
204 /// # fn new(address: &str) -> Self {
205 /// # DatabaseService
206 /// # }
207 /// # }
208 /// #
209 /// # struct Record {
210 /// # pub name: String,
211 /// # pub age: u16
212 /// # }
213 /// #
214 /// # impl Service<u32> for DatabaseService {
215 /// # type Response = Record;
216 /// # type Error = u8;
217 /// # type Future = std::future::Ready<Result<Record, u8>>;
218 /// #
219 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220 /// # Poll::Ready(Ok(()))
221 /// # }
222 /// #
223 /// # fn call(&mut self, request: u32) -> Self::Future {
224 /// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
225 /// # }
226 /// # }
227 /// #
228 /// # fn main() {
229 /// # async {
230 /// // A service returning Result<Record, _>
231 /// let service = DatabaseService::new("127.0.0.1:8080");
232 ///
233 /// // Map the response into a new response
234 /// let mut new_service = service.map_response(|record| record.name);
235 ///
236 /// // Call the new service
237 /// let id = 13;
238 /// let name = new_service
239 /// .ready()
240 /// .await?
241 /// .call(id)
242 /// .await?;
243 /// # Ok::<(), u8>(())
244 /// # };
245 /// # }
246 /// ```
247 fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
248 where
249 Self: Sized,
250 F: FnOnce(Self::Response) -> Response + Clone,
251 {
252 MapResponse::new(self, f)
253 }
254
255 /// Maps this service's error value to a different value. This does not
256 /// alter the behaviour of the [`poll_ready`] method.
257 ///
258 /// This method can be used to change the [`Error`] type of the service
259 /// into a different type. It is similar to the [`Result::map_err`] method.
260 ///
261 /// [`Error`]: crate::Service::Error
262 /// [`poll_ready`]: crate::Service::poll_ready
263 ///
264 /// # Example
265 /// ```
266 /// # use std::task::{Poll, Context};
267 /// # use tower::{Service, ServiceExt};
268 /// #
269 /// # struct DatabaseService;
270 /// # impl DatabaseService {
271 /// # fn new(address: &str) -> Self {
272 /// # DatabaseService
273 /// # }
274 /// # }
275 /// #
276 /// # struct Error {
277 /// # pub code: u32,
278 /// # pub message: String
279 /// # }
280 /// #
281 /// # impl Service<u32> for DatabaseService {
282 /// # type Response = String;
283 /// # type Error = Error;
284 /// # type Future = std::future::Ready<Result<String, Error>>;
285 /// #
286 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 /// # Poll::Ready(Ok(()))
288 /// # }
289 /// #
290 /// # fn call(&mut self, request: u32) -> Self::Future {
291 /// # std::future::ready(Ok(String::new()))
292 /// # }
293 /// # }
294 /// #
295 /// # fn main() {
296 /// # async {
297 /// // A service returning Result<_, Error>
298 /// let service = DatabaseService::new("127.0.0.1:8080");
299 ///
300 /// // Map the error to a new error
301 /// let mut new_service = service.map_err(|err| err.code);
302 ///
303 /// // Call the new service
304 /// let id = 13;
305 /// let code = new_service
306 /// .ready()
307 /// .await?
308 /// .call(id)
309 /// .await
310 /// .unwrap_err();
311 /// # Ok::<(), u32>(())
312 /// # };
313 /// # }
314 /// ```
315 fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
316 where
317 Self: Sized,
318 F: FnOnce(Self::Error) -> Error + Clone,
319 {
320 MapErr::new(self, f)
321 }
322
323 /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
324 /// to a different value, regardless of whether the future succeeds or
325 /// fails.
326 ///
327 /// This is similar to the [`map_response`] and [`map_err`] combinators,
328 /// except that the *same* function is invoked when the service's future
329 /// completes, whether it completes successfully or fails. This function
330 /// takes the [`Result`] returned by the service's future, and returns a
331 /// [`Result`].
332 ///
333 /// Like the standard library's [`Result::and_then`], this method can be
334 /// used to implement control flow based on `Result` values. For example, it
335 /// may be used to implement error recovery, by turning some [`Err`]
336 /// responses from the service into [`Ok`] responses. Similarly, some
337 /// successful responses from the service could be rejected, by returning an
338 /// [`Err`] conditionally, depending on the value inside the [`Ok`]. Finally,
339 /// this method can also be used to implement behaviors that must run when a
340 /// service's future completes, regardless of whether it succeeded or failed.
341 ///
342 /// This method can be used to change the [`Response`] type of the service
343 /// into a different type. It can also be used to change the [`Error`] type
344 /// of the service. However, because the [`map_result`] function is not applied
345 /// to the errors returned by the service's [`poll_ready`] method, it must
346 /// be possible to convert the service's [`Error`] type into the error type
347 /// returned by the [`map_result`] function. This is trivial when the function
348 /// returns the same error type as the service, but in other cases, it can
349 /// be useful to use [`BoxError`] to erase differing error types.
350 ///
351 /// # Examples
352 ///
353 /// Recovering from certain errors:
354 ///
355 /// ```
356 /// # use std::task::{Poll, Context};
357 /// # use tower::{Service, ServiceExt};
358 /// #
359 /// # struct DatabaseService;
360 /// # impl DatabaseService {
361 /// # fn new(address: &str) -> Self {
362 /// # DatabaseService
363 /// # }
364 /// # }
365 /// #
366 /// # struct Record {
367 /// # pub name: String,
368 /// # pub age: u16
369 /// # }
370 /// # #[derive(Debug)]
371 /// # enum DbError {
372 /// # Parse(std::num::ParseIntError),
373 /// # NoRecordsFound,
374 /// # }
375 /// #
376 /// # impl Service<u32> for DatabaseService {
377 /// # type Response = Vec<Record>;
378 /// # type Error = DbError;
379 /// # type Future = std::future::Ready<Result<Vec<Record>, DbError>>;
380 /// #
381 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
382 /// # Poll::Ready(Ok(()))
383 /// # }
384 /// #
385 /// # fn call(&mut self, request: u32) -> Self::Future {
386 /// # std::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
387 /// # }
388 /// # }
389 /// #
390 /// # fn main() {
391 /// # async {
392 /// // A service returning Result<Vec<Record>, DbError>
393 /// let service = DatabaseService::new("127.0.0.1:8080");
394 ///
395 /// // If the database returns no records for the query, we just want an empty `Vec`.
396 /// let mut new_service = service.map_result(|result| match result {
397 /// // If the error indicates that no records matched the query, return an empty
398 /// // `Vec` instead.
399 /// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
400 /// // Propagate all other responses (`Ok` and `Err`) unchanged
401 /// x => x,
402 /// });
403 ///
404 /// // Call the new service
405 /// let id = 13;
406 /// let name = new_service
407 /// .ready()
408 /// .await?
409 /// .call(id)
410 /// .await?;
411 /// # Ok::<(), DbError>(())
412 /// # };
413 /// # }
414 /// ```
415 ///
416 /// Rejecting some `Ok` responses:
417 ///
418 /// ```
419 /// # use std::task::{Poll, Context};
420 /// # use tower::{Service, ServiceExt};
421 /// #
422 /// # struct DatabaseService;
423 /// # impl DatabaseService {
424 /// # fn new(address: &str) -> Self {
425 /// # DatabaseService
426 /// # }
427 /// # }
428 /// #
429 /// # struct Record {
430 /// # pub name: String,
431 /// # pub age: u16
432 /// # }
433 /// # type DbError = String;
434 /// # type AppError = String;
435 /// #
436 /// # impl Service<u32> for DatabaseService {
437 /// # type Response = Record;
438 /// # type Error = DbError;
439 /// # type Future = std::future::Ready<Result<Record, DbError>>;
440 /// #
441 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
442 /// # Poll::Ready(Ok(()))
443 /// # }
444 /// #
445 /// # fn call(&mut self, request: u32) -> Self::Future {
446 /// # std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
447 /// # }
448 /// # }
449 /// #
450 /// # fn main() {
451 /// # async {
452 /// use tower::BoxError;
453 ///
454 /// // A service returning Result<Record, DbError>
455 /// let service = DatabaseService::new("127.0.0.1:8080");
456 ///
457 /// // If the user is zero years old, return an error.
458 /// let mut new_service = service.map_result(|result| {
459 /// let record = result?;
460 ///
461 /// if record.age == 0 {
462 /// // Users must have been born to use our app!
463 /// let app_error = AppError::from("users cannot be 0 years old!");
464 ///
465 /// // Box the error to erase its type (as it can be an `AppError`
466 /// // *or* the inner service's `DbError`).
467 /// return Err(BoxError::from(app_error));
468 /// }
469 ///
470 /// // Otherwise, return the record.
471 /// Ok(record)
472 /// });
473 ///
474 /// // Call the new service
475 /// let id = 13;
476 /// let record = new_service
477 /// .ready()
478 /// .await?
479 /// .call(id)
480 /// .await?;
481 /// # Ok::<(), BoxError>(())
482 /// # };
483 /// # }
484 /// ```
485 ///
486 /// Performing an action that must be run for both successes and failures:
487 ///
488 /// ```
489 /// # use std::convert::TryFrom;
490 /// # use std::task::{Poll, Context};
491 /// # use tower::{Service, ServiceExt};
492 /// #
493 /// # struct DatabaseService;
494 /// # impl DatabaseService {
495 /// # fn new(address: &str) -> Self {
496 /// # DatabaseService
497 /// # }
498 /// # }
499 /// #
500 /// # impl Service<u32> for DatabaseService {
501 /// # type Response = String;
502 /// # type Error = u8;
503 /// # type Future = std::future::Ready<Result<String, u8>>;
504 /// #
505 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506 /// # Poll::Ready(Ok(()))
507 /// # }
508 /// #
509 /// # fn call(&mut self, request: u32) -> Self::Future {
510 /// # std::future::ready(Ok(String::new()))
511 /// # }
512 /// # }
513 /// #
514 /// # fn main() {
515 /// # async {
516 /// // A service returning Result<Record, DbError>
517 /// let service = DatabaseService::new("127.0.0.1:8080");
518 ///
519 /// // Print a message whenever a query completes.
520 /// let mut new_service = service.map_result(|result| {
521 /// println!("query completed; success={}", result.is_ok());
522 /// result
523 /// });
524 ///
525 /// // Call the new service
526 /// let id = 13;
527 /// let response = new_service
528 /// .ready()
529 /// .await?
530 /// .call(id)
531 /// .await;
532 /// # response
533 /// # };
534 /// # }
535 /// ```
536 ///
537 /// [`map_response`]: ServiceExt::map_response
538 /// [`map_err`]: ServiceExt::map_err
539 /// [`map_result`]: ServiceExt::map_result
540 /// [`Error`]: crate::Service::Error
541 /// [`Response`]: crate::Service::Response
542 /// [`poll_ready`]: crate::Service::poll_ready
543 /// [`BoxError`]: crate::BoxError
544 fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
545 where
546 Self: Sized,
547 Error: From<Self::Error>,
548 F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
549 {
550 MapResult::new(self, f)
551 }
552
553 /// Composes a function *in front of* the service.
554 ///
555 /// This adapter produces a new service that passes each value through the
556 /// given function `f` before sending it to `self`.
557 ///
558 /// # Example
559 /// ```
560 /// # use std::convert::TryFrom;
561 /// # use std::task::{Poll, Context};
562 /// # use tower::{Service, ServiceExt};
563 /// #
564 /// # struct DatabaseService;
565 /// # impl DatabaseService {
566 /// # fn new(address: &str) -> Self {
567 /// # DatabaseService
568 /// # }
569 /// # }
570 /// #
571 /// # impl Service<String> for DatabaseService {
572 /// # type Response = String;
573 /// # type Error = u8;
574 /// # type Future = std::future::Ready<Result<String, u8>>;
575 /// #
576 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
577 /// # Poll::Ready(Ok(()))
578 /// # }
579 /// #
580 /// # fn call(&mut self, request: String) -> Self::Future {
581 /// # std::future::ready(Ok(String::new()))
582 /// # }
583 /// # }
584 /// #
585 /// # fn main() {
586 /// # async {
587 /// // A service taking a String as a request
588 /// let service = DatabaseService::new("127.0.0.1:8080");
589 ///
590 /// // Map the request to a new request
591 /// let mut new_service = service.map_request(|id: u32| id.to_string());
592 ///
593 /// // Call the new service
594 /// let id = 13;
595 /// let response = new_service
596 /// .ready()
597 /// .await?
598 /// .call(id)
599 /// .await;
600 /// # response
601 /// # };
602 /// # }
603 /// ```
604 fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
605 where
606 Self: Sized,
607 F: FnMut(NewRequest) -> Request,
608 {
609 MapRequest::new(self, f)
610 }
611
612 /// Composes this service with a [`Filter`] that conditionally accepts or
613 /// rejects requests based on a [predicate].
614 ///
615 /// This adapter produces a new service that passes each value through the
616 /// given function `predicate` before sending it to `self`.
617 ///
618 /// # Example
619 /// ```
620 /// # use std::convert::TryFrom;
621 /// # use std::task::{Poll, Context};
622 /// # use tower::{Service, ServiceExt};
623 /// #
624 /// # struct DatabaseService;
625 /// # impl DatabaseService {
626 /// # fn new(address: &str) -> Self {
627 /// # DatabaseService
628 /// # }
629 /// # }
630 /// #
631 /// # #[derive(Debug)] enum DbError {
632 /// # Parse(std::num::ParseIntError)
633 /// # }
634 /// #
635 /// # impl std::fmt::Display for DbError {
636 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
637 /// # }
638 /// # impl std::error::Error for DbError {}
639 /// # impl Service<u32> for DatabaseService {
640 /// # type Response = String;
641 /// # type Error = DbError;
642 /// # type Future = std::future::Ready<Result<String, DbError>>;
643 /// #
644 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
645 /// # Poll::Ready(Ok(()))
646 /// # }
647 /// #
648 /// # fn call(&mut self, request: u32) -> Self::Future {
649 /// # std::future::ready(Ok(String::new()))
650 /// # }
651 /// # }
652 /// #
653 /// # fn main() {
654 /// # async {
655 /// // A service taking a u32 as a request and returning Result<_, DbError>
656 /// let service = DatabaseService::new("127.0.0.1:8080");
657 ///
658 /// // Fallibly map the request to a new request
659 /// let mut new_service = service
660 /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
661 ///
662 /// // Call the new service
663 /// let id = "13";
664 /// let response = new_service
665 /// .ready()
666 /// .await?
667 /// .call(id)
668 /// .await;
669 /// # response
670 /// # };
671 /// # }
672 /// ```
673 ///
674 /// [`Filter`]: crate::filter::Filter
675 /// [predicate]: crate::filter::Predicate
676 #[cfg(feature = "filter")]
677 fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
678 where
679 Self: Sized,
680 F: crate::filter::Predicate<NewRequest>,
681 {
682 crate::filter::Filter::new(self, filter)
683 }
684
685 /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
686 /// rejects requests based on an [async predicate].
687 ///
688 /// This adapter produces a new service that passes each value through the
689 /// given function `predicate` before sending it to `self`.
690 ///
691 /// # Example
692 /// ```
693 /// # use std::convert::TryFrom;
694 /// # use std::task::{Poll, Context};
695 /// # use tower::{Service, ServiceExt};
696 /// #
697 /// # #[derive(Clone)] struct DatabaseService;
698 /// # impl DatabaseService {
699 /// # fn new(address: &str) -> Self {
700 /// # DatabaseService
701 /// # }
702 /// # }
703 /// # #[derive(Debug)]
704 /// # enum DbError {
705 /// # Rejected
706 /// # }
707 /// # impl std::fmt::Display for DbError {
708 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
709 /// # }
710 /// # impl std::error::Error for DbError {}
711 /// #
712 /// # impl Service<u32> for DatabaseService {
713 /// # type Response = String;
714 /// # type Error = DbError;
715 /// # type Future = std::future::Ready<Result<String, DbError>>;
716 /// #
717 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
718 /// # Poll::Ready(Ok(()))
719 /// # }
720 /// #
721 /// # fn call(&mut self, request: u32) -> Self::Future {
722 /// # std::future::ready(Ok(String::new()))
723 /// # }
724 /// # }
725 /// #
726 /// # fn main() {
727 /// # async {
728 /// // A service taking a u32 as a request and returning Result<_, DbError>
729 /// let service = DatabaseService::new("127.0.0.1:8080");
730 ///
731 /// /// Returns `true` if we should query the database for an ID.
732 /// async fn should_query(id: u32) -> bool {
733 /// // ...
734 /// # true
735 /// }
736 ///
737 /// // Filter requests based on `should_query`.
738 /// let mut new_service = service
739 /// .filter_async(|id: u32| async move {
740 /// if should_query(id).await {
741 /// return Ok(id);
742 /// }
743 ///
744 /// Err(DbError::Rejected)
745 /// });
746 ///
747 /// // Call the new service
748 /// let id = 13;
749 /// # let id: u32 = id;
750 /// let response = new_service
751 /// .ready()
752 /// .await?
753 /// .call(id)
754 /// .await;
755 /// # response
756 /// # };
757 /// # }
758 /// ```
759 ///
760 /// [`AsyncFilter`]: crate::filter::AsyncFilter
761 /// [asynchronous predicate]: crate::filter::AsyncPredicate
762 #[cfg(feature = "filter")]
763 fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
764 where
765 Self: Sized,
766 F: crate::filter::AsyncPredicate<NewRequest>,
767 {
768 crate::filter::AsyncFilter::new(self, filter)
769 }
770
771 /// Composes an asynchronous function *after* this service.
772 ///
773 /// This takes a function or closure returning a future, and returns a new
774 /// `Service` that chains that function after this service's [`Future`]. The
775 /// new `Service`'s future will consist of this service's future, followed
776 /// by the future returned by calling the chained function with the future's
777 /// [`Output`] type. The chained function is called regardless of whether
778 /// this service's future completes with a successful response or with an
779 /// error.
780 ///
781 /// This method can be thought of as an equivalent to the [`futures`
782 /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
783 /// _return_ futures, rather than on an individual future. Similarly to that
784 /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
785 /// error recovery, by calling some asynchronous function with errors
786 /// returned by this service. Alternatively, it may also be used to call a
787 /// fallible async function with the successful response of this service.
788 ///
789 /// This method can be used to change the [`Response`] type of the service
790 /// into a different type. It can also be used to change the [`Error`] type
791 /// of the service. However, because the `then` function is not applied
792 /// to the errors returned by the service's [`poll_ready`] method, it must
793 /// be possible to convert the service's [`Error`] type into the error type
794 /// returned by the `then` future. This is trivial when the function
795 /// returns the same error type as the service, but in other cases, it can
796 /// be useful to use [`BoxError`] to erase differing error types.
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// # use std::task::{Poll, Context};
802 /// # use tower::{Service, ServiceExt};
803 /// #
804 /// # struct DatabaseService;
805 /// # impl DatabaseService {
806 /// # fn new(address: &str) -> Self {
807 /// # DatabaseService
808 /// # }
809 /// # }
810 /// #
811 /// # type Record = ();
812 /// # type DbError = ();
813 /// #
814 /// # impl Service<u32> for DatabaseService {
815 /// # type Response = Record;
816 /// # type Error = DbError;
817 /// # type Future = std::future::Ready<Result<Record, DbError>>;
818 /// #
819 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
820 /// # Poll::Ready(Ok(()))
821 /// # }
822 /// #
823 /// # fn call(&mut self, request: u32) -> Self::Future {
824 /// # std::future::ready(Ok(()))
825 /// # }
826 /// # }
827 /// #
828 /// # fn main() {
829 /// // A service returning Result<Record, DbError>
830 /// let service = DatabaseService::new("127.0.0.1:8080");
831 ///
832 /// // An async function that attempts to recover from errors returned by the
833 /// // database.
834 /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
835 /// // ...
836 /// # Ok(())
837 /// }
838 /// # async {
839 ///
840 /// // If the database service returns an error, attempt to recover by
841 /// // calling `recover_from_error`. Otherwise, return the successful response.
842 /// let mut new_service = service.then(|result| async move {
843 /// match result {
844 /// Ok(record) => Ok(record),
845 /// Err(e) => recover_from_error(e).await,
846 /// }
847 /// });
848 ///
849 /// // Call the new service
850 /// let id = 13;
851 /// let record = new_service
852 /// .ready()
853 /// .await?
854 /// .call(id)
855 /// .await?;
856 /// # Ok::<(), DbError>(())
857 /// # };
858 /// # }
859 /// ```
860 ///
861 /// [`Future`]: crate::Service::Future
862 /// [`Output`]: std::future::Future::Output
863 /// [`futures` crate]: https://docs.rs/futures
864 /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
865 /// [`Error`]: crate::Service::Error
866 /// [`Response`]: crate::Service::Response
867 /// [`poll_ready`]: crate::Service::poll_ready
868 /// [`BoxError`]: crate::BoxError
869 fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
870 where
871 Self: Sized,
872 Error: From<Self::Error>,
873 F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
874 Fut: Future<Output = Result<Response, Error>>,
875 {
876 Then::new(self, f)
877 }
878
879 /// Composes a function that transforms futures produced by the service.
880 ///
881 /// This takes a function or closure returning a future computed from the future returned by
882 /// the service's [`call`] method, as opposed to the responses produced by the future.
883 ///
884 /// # Examples
885 ///
886 /// ```
887 /// # use std::task::{Poll, Context};
888 /// # use tower::{Service, ServiceExt, BoxError};
889 /// #
890 /// # struct DatabaseService;
891 /// # impl DatabaseService {
892 /// # fn new(address: &str) -> Self {
893 /// # DatabaseService
894 /// # }
895 /// # }
896 /// #
897 /// # type Record = ();
898 /// # type DbError = crate::BoxError;
899 /// #
900 /// # impl Service<u32> for DatabaseService {
901 /// # type Response = Record;
902 /// # type Error = DbError;
903 /// # type Future = std::future::Ready<Result<Record, DbError>>;
904 /// #
905 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
906 /// # Poll::Ready(Ok(()))
907 /// # }
908 /// #
909 /// # fn call(&mut self, request: u32) -> Self::Future {
910 /// # std::future::ready(Ok(()))
911 /// # }
912 /// # }
913 /// #
914 /// # fn main() {
915 /// use std::time::Duration;
916 /// use tokio::time::timeout;
917 ///
918 /// // A service returning Result<Record, DbError>
919 /// let service = DatabaseService::new("127.0.0.1:8080");
920 /// # async {
921 ///
922 /// let mut new_service = service.map_future(|future| async move {
923 /// let res = timeout(Duration::from_secs(1), future).await?;
924 /// Ok::<_, BoxError>(res)
925 /// });
926 ///
927 /// // Call the new service
928 /// let id = 13;
929 /// let record = new_service
930 /// .ready()
931 /// .await?
932 /// .call(id)
933 /// .await?;
934 /// # Ok::<(), BoxError>(())
935 /// # };
936 /// # }
937 /// ```
938 ///
939 /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
940 ///
941 /// [`call`]: crate::Service::call
942 /// [`Timeout`]: crate::timeout::Timeout
943 fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
944 where
945 Self: Sized,
946 F: FnMut(Self::Future) -> Fut,
947 Error: From<Self::Error>,
948 Fut: Future<Output = Result<Response, Error>>,
949 {
950 MapFuture::new(self, f)
951 }
952
953 /// Returns a buffered version of this service.
954 ///
955 /// See [`Buffer::new()`] for the details.
956 #[cfg(feature = "buffer")]
957 fn buffered(self, bound: usize) -> Buffer<Request, Self::Future>
958 where
959 Self: Send + Sized + 'static,
960 Self::Future: Send,
961 Self::Error: Into<crate::BoxError> + Send + Sync,
962 Request: Send + Sized + 'static,
963 {
964 Buffer::new(self, bound)
965 }
966
967 /// Returns a retry version of this service.
968 ///
969 /// See [`Retry::new()`] for the details.
970 #[cfg(feature = "retry")]
971 fn retry<P>(self, policy: P) -> Retry<P, Self>
972 where
973 Self: Sized,
974 {
975 Retry::new(policy, self)
976 }
977
978 /// Convert the service into a [`Service`] + [`Send`] trait object.
979 ///
980 /// See [`BoxService`] for more details.
981 ///
982 /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
983 /// can be used instead, to produce a boxed service which will also
984 /// implement [`Clone`].
985 ///
986 /// # Example
987 ///
988 /// ```
989 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
990 /// #
991 /// # struct Request;
992 /// # struct Response;
993 /// # impl Response {
994 /// # fn new() -> Self { Self }
995 /// # }
996 ///
997 /// let service = service_fn(|req: Request| async {
998 /// Ok::<_, BoxError>(Response::new())
999 /// });
1000 ///
1001 /// let service: BoxService<Request, Response, BoxError> = service
1002 /// .map_request(|req| {
1003 /// println!("received request");
1004 /// req
1005 /// })
1006 /// .map_response(|res| {
1007 /// println!("response produced");
1008 /// res
1009 /// })
1010 /// .boxed();
1011 /// # let service = assert_service(service);
1012 /// # fn assert_service<S, R>(svc: S) -> S
1013 /// # where S: Service<R> { svc }
1014 /// ```
1015 ///
1016 /// [`Service`]: crate::Service
1017 /// [`boxed_clone`]: Self::boxed_clone
1018 fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
1019 where
1020 Self: Sized + Send + 'static,
1021 Self::Future: Send + 'static,
1022 {
1023 BoxService::new(self)
1024 }
1025
1026 /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
1027 ///
1028 /// This is similar to the [`boxed`] method, but it requires that `Self` implement
1029 /// [`Clone`], and the returned boxed service implements [`Clone`].
1030 /// See [`BoxCloneService`] for more details.
1031 ///
1032 /// # Example
1033 ///
1034 /// ```
1035 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1036 /// #
1037 /// # struct Request;
1038 /// # struct Response;
1039 /// # impl Response {
1040 /// # fn new() -> Self { Self }
1041 /// # }
1042 ///
1043 /// let service = service_fn(|req: Request| async {
1044 /// Ok::<_, BoxError>(Response::new())
1045 /// });
1046 ///
1047 /// let service: BoxCloneService<Request, Response, BoxError> = service
1048 /// .map_request(|req| {
1049 /// println!("received request");
1050 /// req
1051 /// })
1052 /// .map_response(|res| {
1053 /// println!("response produced");
1054 /// res
1055 /// })
1056 /// .boxed_clone();
1057 ///
1058 /// // The boxed service can still be cloned.
1059 /// service.clone();
1060 /// # let service = assert_service(service);
1061 /// # fn assert_service<S, R>(svc: S) -> S
1062 /// # where S: Service<R> { svc }
1063 /// ```
1064 ///
1065 /// [`Service`]: crate::Service
1066 /// [`boxed`]: Self::boxed
1067 fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1068 where
1069 Self: Clone + Sized + Send + 'static,
1070 Self::Future: Send + 'static,
1071 {
1072 BoxCloneService::new(self)
1073 }
1074}
1075
1076impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1077
1078/// Convert an `Option<Layer>` into a [`Layer`].
1079///
1080/// ```
1081/// # use std::time::Duration;
1082/// # use tower::Service;
1083/// # use tower::builder::ServiceBuilder;
1084/// use tower::util::option_layer;
1085/// # use tower::timeout::TimeoutLayer;
1086/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1087/// # let timeout = Some(Duration::new(10, 0));
1088/// // Layer to apply a timeout if configured
1089/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1090///
1091/// ServiceBuilder::new()
1092/// .layer(maybe_timeout)
1093/// .service(svc);
1094/// # }
1095/// ```
1096///
1097/// [`Layer`]: crate::layer::Layer
1098pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1099 if let Some(layer) = layer {
1100 Either::Left(layer)
1101 } else {
1102 Either::Right(Identity::new())
1103 }
1104}