Skip to main content

tower/util/
mod.rs

1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod boxed_clone_sync;
7mod call_all;
8mod either;
9
10mod future_service;
11mod map_err;
12mod map_request;
13mod map_response;
14mod map_result;
15
16mod map_future;
17mod oneshot;
18mod optional;
19mod ready;
20mod service_fn;
21mod then;
22
23pub mod rng;
24
25pub use self::{
26    and_then::{AndThen, AndThenLayer},
27    boxed::{
28        BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
29    },
30    boxed_clone::BoxCloneService,
31    boxed_clone_sync::BoxCloneSyncService,
32    either::Either,
33    future_service::{future_service, FutureService},
34    map_err::{MapErr, MapErrLayer},
35    map_future::{MapFuture, MapFutureLayer},
36    map_request::{MapRequest, MapRequestLayer},
37    map_response::{MapResponse, MapResponseLayer},
38    map_result::{MapResult, MapResultLayer},
39    oneshot::Oneshot,
40    optional::Optional,
41    ready::{Ready, ReadyOneshot},
42    service_fn::{service_fn, ServiceFn},
43    then::{Then, ThenLayer},
44};
45
46pub use self::call_all::{CallAll, CallAllUnordered};
47use std::future::Future;
48
49use crate::layer::util::Identity;
50
51#[cfg(feature = "buffer")]
52use crate::buffer::Buffer;
53
54#[cfg(feature = "retry")]
55use crate::retry::Retry;
56
57pub mod error {
58    //! Error types
59
60    pub use super::optional::error as optional;
61}
62
63pub mod future {
64    //! Future types
65
66    pub use super::and_then::AndThenFuture;
67    pub use super::either::EitherResponseFuture;
68    pub use super::map_err::MapErrFuture;
69    pub use super::map_response::MapResponseFuture;
70    pub use super::map_result::MapResultFuture;
71    pub use super::optional::future as optional;
72    pub use super::then::ThenFuture;
73}
74
75/// An extension trait for `Service`s that provides a variety of convenient
76/// adapters
77pub trait ServiceExt<Request>: tower_service::Service<Request> {
78    /// Yields a mutable reference to the service when it is ready to accept a request.
79    fn ready(&mut self) -> Ready<'_, Self, Request>
80    where
81        Self: Sized,
82    {
83        Ready::new(self)
84    }
85
86    /// Yields the service when it is ready to accept a request.
87    fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
88    where
89        Self: Sized,
90    {
91        ReadyOneshot::new(self)
92    }
93
94    /// Consume this `Service`, calling it with the provided request once it is ready.
95    fn oneshot(self, req: Request) -> Oneshot<Self, Request>
96    where
97        Self: Sized,
98    {
99        Oneshot::new(self, req)
100    }
101
102    /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
103    ///
104    /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
105    /// Response>`][stream]. See the documentation for [`CallAll`] for
106    /// details.
107    ///
108    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
109    /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110    fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
111    where
112        Self: Sized,
113        S: futures_core::Stream<Item = Request>,
114    {
115        CallAll::new(self, reqs)
116    }
117
118    /// Executes a new future after this service's future resolves. This does
119    /// not alter the behaviour of the [`poll_ready`] method.
120    ///
121    /// This method can be used to change the [`Response`] type of the service
122    /// into a different type. You can use this method to chain along a computation once the
123    /// service's response has been resolved.
124    ///
125    /// [`Response`]: crate::Service::Response
126    /// [`poll_ready`]: crate::Service::poll_ready
127    ///
128    /// # Example
129    /// ```
130    /// # use std::task::{Poll, Context};
131    /// # use tower::{Service, ServiceExt};
132    /// #
133    /// # struct DatabaseService;
134    /// # impl DatabaseService {
135    /// #   fn new(address: &str) -> Self {
136    /// #       DatabaseService
137    /// #   }
138    /// # }
139    /// #
140    /// # struct Record {
141    /// #   pub name: String,
142    /// #   pub age: u16
143    /// # }
144    /// #
145    /// # impl Service<u32> for DatabaseService {
146    /// #   type Response = Record;
147    /// #   type Error = u8;
148    /// #   type Future = std::future::Ready<Result<Record, u8>>;
149    /// #
150    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151    /// #       Poll::Ready(Ok(()))
152    /// #   }
153    /// #
154    /// #   fn call(&mut self, request: u32) -> Self::Future {
155    /// #       std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
156    /// #   }
157    /// # }
158    /// #
159    /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
160    /// #
161    /// # fn main() {
162    /// #    async {
163    /// // A service returning Result<Record, _>
164    /// let service = DatabaseService::new("127.0.0.1:8080");
165    ///
166    /// // Map the response into a new response
167    /// let mut new_service = service.and_then(|record: Record| async move {
168    ///     let name = record.name;
169    ///     avatar_lookup(name).await
170    /// });
171    ///
172    /// // Call the new service
173    /// let id = 13;
174    /// let avatar = new_service.call(id).await.unwrap();
175    /// #    };
176    /// # }
177    /// ```
178    fn and_then<F>(self, f: F) -> AndThen<Self, F>
179    where
180        Self: Sized,
181        F: Clone,
182    {
183        AndThen::new(self, f)
184    }
185
186    /// Maps this service's response value to a different value. This does not
187    /// alter the behaviour of the [`poll_ready`] method.
188    ///
189    /// This method can be used to change the [`Response`] type of the service
190    /// into a different type. It is similar to the [`Result::map`]
191    /// method. You can use this method to chain along a computation once the
192    /// service's response has been resolved.
193    ///
194    /// [`Response`]: crate::Service::Response
195    /// [`poll_ready`]: crate::Service::poll_ready
196    ///
197    /// # Example
198    /// ```
199    /// # use std::task::{Poll, Context};
200    /// # use tower::{Service, ServiceExt};
201    /// #
202    /// # struct DatabaseService;
203    /// # impl DatabaseService {
204    /// #   fn new(address: &str) -> Self {
205    /// #       DatabaseService
206    /// #   }
207    /// # }
208    /// #
209    /// # struct Record {
210    /// #   pub name: String,
211    /// #   pub age: u16
212    /// # }
213    /// #
214    /// # impl Service<u32> for DatabaseService {
215    /// #   type Response = Record;
216    /// #   type Error = u8;
217    /// #   type Future = std::future::Ready<Result<Record, u8>>;
218    /// #
219    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220    /// #       Poll::Ready(Ok(()))
221    /// #   }
222    /// #
223    /// #   fn call(&mut self, request: u32) -> Self::Future {
224    /// #       std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
225    /// #   }
226    /// # }
227    /// #
228    /// # fn main() {
229    /// #    async {
230    /// // A service returning Result<Record, _>
231    /// let service = DatabaseService::new("127.0.0.1:8080");
232    ///
233    /// // Map the response into a new response
234    /// let mut new_service = service.map_response(|record| record.name);
235    ///
236    /// // Call the new service
237    /// let id = 13;
238    /// let name = new_service
239    ///     .ready()
240    ///     .await?
241    ///     .call(id)
242    ///     .await?;
243    /// # Ok::<(), u8>(())
244    /// #    };
245    /// # }
246    /// ```
247    fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
248    where
249        Self: Sized,
250        F: FnOnce(Self::Response) -> Response + Clone,
251    {
252        MapResponse::new(self, f)
253    }
254
255    /// Maps this service's error value to a different value. This does not
256    /// alter the behaviour of the [`poll_ready`] method.
257    ///
258    /// This method can be used to change the [`Error`] type of the service
259    /// into a different type. It is similar to the [`Result::map_err`] method.
260    ///
261    /// [`Error`]: crate::Service::Error
262    /// [`poll_ready`]: crate::Service::poll_ready
263    ///
264    /// # Example
265    /// ```
266    /// # use std::task::{Poll, Context};
267    /// # use tower::{Service, ServiceExt};
268    /// #
269    /// # struct DatabaseService;
270    /// # impl DatabaseService {
271    /// #   fn new(address: &str) -> Self {
272    /// #       DatabaseService
273    /// #   }
274    /// # }
275    /// #
276    /// # struct Error {
277    /// #   pub code: u32,
278    /// #   pub message: String
279    /// # }
280    /// #
281    /// # impl Service<u32> for DatabaseService {
282    /// #   type Response = String;
283    /// #   type Error = Error;
284    /// #   type Future = std::future::Ready<Result<String, Error>>;
285    /// #
286    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287    /// #       Poll::Ready(Ok(()))
288    /// #   }
289    /// #
290    /// #   fn call(&mut self, request: u32) -> Self::Future {
291    /// #       std::future::ready(Ok(String::new()))
292    /// #   }
293    /// # }
294    /// #
295    /// # fn main() {
296    /// #   async {
297    /// // A service returning Result<_, Error>
298    /// let service = DatabaseService::new("127.0.0.1:8080");
299    ///
300    /// // Map the error to a new error
301    /// let mut new_service = service.map_err(|err| err.code);
302    ///
303    /// // Call the new service
304    /// let id = 13;
305    /// let code = new_service
306    ///     .ready()
307    ///     .await?
308    ///     .call(id)
309    ///     .await
310    ///     .unwrap_err();
311    /// # Ok::<(), u32>(())
312    /// #   };
313    /// # }
314    /// ```
315    fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
316    where
317        Self: Sized,
318        F: FnOnce(Self::Error) -> Error + Clone,
319    {
320        MapErr::new(self, f)
321    }
322
323    /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
324    /// to a different value, regardless of whether the future succeeds or
325    /// fails.
326    ///
327    /// This is similar to the [`map_response`] and [`map_err`] combinators,
328    /// except that the *same* function is invoked when the service's future
329    /// completes, whether it completes successfully or fails. This function
330    /// takes the [`Result`] returned by the service's future, and returns a
331    /// [`Result`].
332    ///
333    /// Like the standard library's [`Result::and_then`], this method can be
334    /// used to implement control flow based on `Result` values. For example, it
335    /// may be used to implement error recovery, by turning some [`Err`]
336    /// responses from the service into [`Ok`] responses. Similarly, some
337    /// successful responses from the service could be rejected, by returning an
338    /// [`Err`] conditionally, depending on the value inside the [`Ok`]. Finally,
339    /// this method can also be used to implement behaviors that must run when a
340    /// service's future completes, regardless of whether it succeeded or failed.
341    ///
342    /// This method can be used to change the [`Response`] type of the service
343    /// into a different type. It can also be used to change the [`Error`] type
344    /// of the service. However, because the [`map_result`] function is not applied
345    /// to the errors returned by the service's [`poll_ready`] method, it must
346    /// be possible to convert the service's [`Error`] type into the error type
347    /// returned by the [`map_result`] function. This is trivial when the function
348    /// returns the same error type as the service, but in other cases, it can
349    /// be useful to use [`BoxError`] to erase differing error types.
350    ///
351    /// # Examples
352    ///
353    /// Recovering from certain errors:
354    ///
355    /// ```
356    /// # use std::task::{Poll, Context};
357    /// # use tower::{Service, ServiceExt};
358    /// #
359    /// # struct DatabaseService;
360    /// # impl DatabaseService {
361    /// #   fn new(address: &str) -> Self {
362    /// #       DatabaseService
363    /// #   }
364    /// # }
365    /// #
366    /// # struct Record {
367    /// #   pub name: String,
368    /// #   pub age: u16
369    /// # }
370    /// # #[derive(Debug)]
371    /// # enum DbError {
372    /// #   Parse(std::num::ParseIntError),
373    /// #   NoRecordsFound,
374    /// # }
375    /// #
376    /// # impl Service<u32> for DatabaseService {
377    /// #   type Response = Vec<Record>;
378    /// #   type Error = DbError;
379    /// #   type Future = std::future::Ready<Result<Vec<Record>, DbError>>;
380    /// #
381    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
382    /// #       Poll::Ready(Ok(()))
383    /// #   }
384    /// #
385    /// #   fn call(&mut self, request: u32) -> Self::Future {
386    /// #       std::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
387    /// #   }
388    /// # }
389    /// #
390    /// # fn main() {
391    /// #    async {
392    /// // A service returning Result<Vec<Record>, DbError>
393    /// let service = DatabaseService::new("127.0.0.1:8080");
394    ///
395    /// // If the database returns no records for the query, we just want an empty `Vec`.
396    /// let mut new_service = service.map_result(|result| match result {
397    ///     // If the error indicates that no records matched the query, return an empty
398    ///     // `Vec` instead.
399    ///     Err(DbError::NoRecordsFound) => Ok(Vec::new()),
400    ///     // Propagate all other responses (`Ok` and `Err`) unchanged
401    ///     x => x,
402    /// });
403    ///
404    /// // Call the new service
405    /// let id = 13;
406    /// let name = new_service
407    ///     .ready()
408    ///     .await?
409    ///     .call(id)
410    ///     .await?;
411    /// # Ok::<(), DbError>(())
412    /// #    };
413    /// # }
414    /// ```
415    ///
416    /// Rejecting some `Ok` responses:
417    ///
418    /// ```
419    /// # use std::task::{Poll, Context};
420    /// # use tower::{Service, ServiceExt};
421    /// #
422    /// # struct DatabaseService;
423    /// # impl DatabaseService {
424    /// #   fn new(address: &str) -> Self {
425    /// #       DatabaseService
426    /// #   }
427    /// # }
428    /// #
429    /// # struct Record {
430    /// #   pub name: String,
431    /// #   pub age: u16
432    /// # }
433    /// # type DbError = String;
434    /// # type AppError = String;
435    /// #
436    /// # impl Service<u32> for DatabaseService {
437    /// #   type Response = Record;
438    /// #   type Error = DbError;
439    /// #   type Future = std::future::Ready<Result<Record, DbError>>;
440    /// #
441    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
442    /// #       Poll::Ready(Ok(()))
443    /// #   }
444    /// #
445    /// #   fn call(&mut self, request: u32) -> Self::Future {
446    /// #       std::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
447    /// #   }
448    /// # }
449    /// #
450    /// # fn main() {
451    /// #    async {
452    /// use tower::BoxError;
453    ///
454    /// // A service returning Result<Record, DbError>
455    /// let service = DatabaseService::new("127.0.0.1:8080");
456    ///
457    /// // If the user is zero years old, return an error.
458    /// let mut new_service = service.map_result(|result| {
459    ///    let record = result?;
460    ///
461    ///    if record.age == 0 {
462    ///         // Users must have been born to use our app!
463    ///         let app_error = AppError::from("users cannot be 0 years old!");
464    ///
465    ///         // Box the error to erase its type (as it can be an `AppError`
466    ///         // *or* the inner service's `DbError`).
467    ///         return Err(BoxError::from(app_error));
468    ///     }
469    ///
470    ///     // Otherwise, return the record.
471    ///     Ok(record)
472    /// });
473    ///
474    /// // Call the new service
475    /// let id = 13;
476    /// let record = new_service
477    ///     .ready()
478    ///     .await?
479    ///     .call(id)
480    ///     .await?;
481    /// # Ok::<(), BoxError>(())
482    /// #    };
483    /// # }
484    /// ```
485    ///
486    /// Performing an action that must be run for both successes and failures:
487    ///
488    /// ```
489    /// # use std::convert::TryFrom;
490    /// # use std::task::{Poll, Context};
491    /// # use tower::{Service, ServiceExt};
492    /// #
493    /// # struct DatabaseService;
494    /// # impl DatabaseService {
495    /// #   fn new(address: &str) -> Self {
496    /// #       DatabaseService
497    /// #   }
498    /// # }
499    /// #
500    /// # impl Service<u32> for DatabaseService {
501    /// #   type Response = String;
502    /// #   type Error = u8;
503    /// #   type Future = std::future::Ready<Result<String, u8>>;
504    /// #
505    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506    /// #       Poll::Ready(Ok(()))
507    /// #   }
508    /// #
509    /// #   fn call(&mut self, request: u32) -> Self::Future {
510    /// #       std::future::ready(Ok(String::new()))
511    /// #   }
512    /// # }
513    /// #
514    /// # fn main() {
515    /// #   async {
516    /// // A service returning Result<Record, DbError>
517    /// let service = DatabaseService::new("127.0.0.1:8080");
518    ///
519    /// // Print a message whenever a query completes.
520    /// let mut new_service = service.map_result(|result| {
521    ///     println!("query completed; success={}", result.is_ok());
522    ///     result
523    /// });
524    ///
525    /// // Call the new service
526    /// let id = 13;
527    /// let response = new_service
528    ///     .ready()
529    ///     .await?
530    ///     .call(id)
531    ///     .await;
532    /// # response
533    /// #    };
534    /// # }
535    /// ```
536    ///
537    /// [`map_response`]: ServiceExt::map_response
538    /// [`map_err`]: ServiceExt::map_err
539    /// [`map_result`]: ServiceExt::map_result
540    /// [`Error`]: crate::Service::Error
541    /// [`Response`]: crate::Service::Response
542    /// [`poll_ready`]: crate::Service::poll_ready
543    /// [`BoxError`]: crate::BoxError
544    fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
545    where
546        Self: Sized,
547        Error: From<Self::Error>,
548        F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
549    {
550        MapResult::new(self, f)
551    }
552
553    /// Composes a function *in front of* the service.
554    ///
555    /// This adapter produces a new service that passes each value through the
556    /// given function `f` before sending it to `self`.
557    ///
558    /// # Example
559    /// ```
560    /// # use std::convert::TryFrom;
561    /// # use std::task::{Poll, Context};
562    /// # use tower::{Service, ServiceExt};
563    /// #
564    /// # struct DatabaseService;
565    /// # impl DatabaseService {
566    /// #   fn new(address: &str) -> Self {
567    /// #       DatabaseService
568    /// #   }
569    /// # }
570    /// #
571    /// # impl Service<String> for DatabaseService {
572    /// #   type Response = String;
573    /// #   type Error = u8;
574    /// #   type Future = std::future::Ready<Result<String, u8>>;
575    /// #
576    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
577    /// #       Poll::Ready(Ok(()))
578    /// #   }
579    /// #
580    /// #   fn call(&mut self, request: String) -> Self::Future {
581    /// #       std::future::ready(Ok(String::new()))
582    /// #   }
583    /// # }
584    /// #
585    /// # fn main() {
586    /// #   async {
587    /// // A service taking a String as a request
588    /// let service = DatabaseService::new("127.0.0.1:8080");
589    ///
590    /// // Map the request to a new request
591    /// let mut new_service = service.map_request(|id: u32| id.to_string());
592    ///
593    /// // Call the new service
594    /// let id = 13;
595    /// let response = new_service
596    ///     .ready()
597    ///     .await?
598    ///     .call(id)
599    ///     .await;
600    /// # response
601    /// #    };
602    /// # }
603    /// ```
604    fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
605    where
606        Self: Sized,
607        F: FnMut(NewRequest) -> Request,
608    {
609        MapRequest::new(self, f)
610    }
611
612    /// Composes this service with a [`Filter`] that conditionally accepts or
613    /// rejects requests based on a [predicate].
614    ///
615    /// This adapter produces a new service that passes each value through the
616    /// given function `predicate` before sending it to `self`.
617    ///
618    /// # Example
619    /// ```
620    /// # use std::convert::TryFrom;
621    /// # use std::task::{Poll, Context};
622    /// # use tower::{Service, ServiceExt};
623    /// #
624    /// # struct DatabaseService;
625    /// # impl DatabaseService {
626    /// #   fn new(address: &str) -> Self {
627    /// #       DatabaseService
628    /// #   }
629    /// # }
630    /// #
631    /// # #[derive(Debug)] enum DbError {
632    /// #   Parse(std::num::ParseIntError)
633    /// # }
634    /// #
635    /// # impl std::fmt::Display for DbError {
636    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
637    /// # }
638    /// # impl std::error::Error for DbError {}
639    /// # impl Service<u32> for DatabaseService {
640    /// #   type Response = String;
641    /// #   type Error = DbError;
642    /// #   type Future = std::future::Ready<Result<String, DbError>>;
643    /// #
644    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
645    /// #       Poll::Ready(Ok(()))
646    /// #   }
647    /// #
648    /// #   fn call(&mut self, request: u32) -> Self::Future {
649    /// #       std::future::ready(Ok(String::new()))
650    /// #   }
651    /// # }
652    /// #
653    /// # fn main() {
654    /// #    async {
655    /// // A service taking a u32 as a request and returning Result<_, DbError>
656    /// let service = DatabaseService::new("127.0.0.1:8080");
657    ///
658    /// // Fallibly map the request to a new request
659    /// let mut new_service = service
660    ///     .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
661    ///
662    /// // Call the new service
663    /// let id = "13";
664    /// let response = new_service
665    ///     .ready()
666    ///     .await?
667    ///     .call(id)
668    ///     .await;
669    /// # response
670    /// #    };
671    /// # }
672    /// ```
673    ///
674    /// [`Filter`]: crate::filter::Filter
675    /// [predicate]: crate::filter::Predicate
676    #[cfg(feature = "filter")]
677    fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
678    where
679        Self: Sized,
680        F: crate::filter::Predicate<NewRequest>,
681    {
682        crate::filter::Filter::new(self, filter)
683    }
684
685    /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
686    /// rejects requests based on an [async predicate].
687    ///
688    /// This adapter produces a new service that passes each value through the
689    /// given function `predicate` before sending it to `self`.
690    ///
691    /// # Example
692    /// ```
693    /// # use std::convert::TryFrom;
694    /// # use std::task::{Poll, Context};
695    /// # use tower::{Service, ServiceExt};
696    /// #
697    /// # #[derive(Clone)] struct DatabaseService;
698    /// # impl DatabaseService {
699    /// #   fn new(address: &str) -> Self {
700    /// #       DatabaseService
701    /// #   }
702    /// # }
703    /// # #[derive(Debug)]
704    /// # enum DbError {
705    /// #   Rejected
706    /// # }
707    /// # impl std::fmt::Display for DbError {
708    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
709    /// # }
710    /// # impl std::error::Error for DbError {}
711    /// #
712    /// # impl Service<u32> for DatabaseService {
713    /// #   type Response = String;
714    /// #   type Error = DbError;
715    /// #   type Future = std::future::Ready<Result<String, DbError>>;
716    /// #
717    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
718    /// #       Poll::Ready(Ok(()))
719    /// #   }
720    /// #
721    /// #   fn call(&mut self, request: u32) -> Self::Future {
722    /// #       std::future::ready(Ok(String::new()))
723    /// #   }
724    /// # }
725    /// #
726    /// # fn main() {
727    /// #    async {
728    /// // A service taking a u32 as a request and returning Result<_, DbError>
729    /// let service = DatabaseService::new("127.0.0.1:8080");
730    ///
731    /// /// Returns `true` if we should query the database for an ID.
732    /// async fn should_query(id: u32) -> bool {
733    ///     // ...
734    ///     # true
735    /// }
736    ///
737    /// // Filter requests based on `should_query`.
738    /// let mut new_service = service
739    ///     .filter_async(|id: u32| async move {
740    ///         if should_query(id).await {
741    ///             return Ok(id);
742    ///         }
743    ///
744    ///         Err(DbError::Rejected)
745    ///     });
746    ///
747    /// // Call the new service
748    /// let id = 13;
749    /// # let id: u32 = id;
750    /// let response = new_service
751    ///     .ready()
752    ///     .await?
753    ///     .call(id)
754    ///     .await;
755    /// # response
756    /// #    };
757    /// # }
758    /// ```
759    ///
760    /// [`AsyncFilter`]: crate::filter::AsyncFilter
761    /// [asynchronous predicate]: crate::filter::AsyncPredicate
762    #[cfg(feature = "filter")]
763    fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
764    where
765        Self: Sized,
766        F: crate::filter::AsyncPredicate<NewRequest>,
767    {
768        crate::filter::AsyncFilter::new(self, filter)
769    }
770
771    /// Composes an asynchronous function *after* this service.
772    ///
773    /// This takes a function or closure returning a future, and returns a new
774    /// `Service` that chains that function after this service's [`Future`]. The
775    /// new `Service`'s future will consist of this service's future, followed
776    /// by the future returned by calling the chained function with the future's
777    /// [`Output`] type. The chained function is called regardless of whether
778    /// this service's future completes with a successful response or with an
779    /// error.
780    ///
781    /// This method can be thought of as an equivalent to the [`futures`
782    /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
783    /// _return_ futures, rather than on an individual future. Similarly to that
784    /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
785    /// error recovery, by calling some asynchronous function with errors
786    /// returned by this service. Alternatively, it may also be used to call a
787    /// fallible async function with the successful response of this service.
788    ///
789    /// This method can be used to change the [`Response`] type of the service
790    /// into a different type. It can also be used to change the [`Error`] type
791    /// of the service. However, because the `then` function is not applied
792    /// to the errors returned by the service's [`poll_ready`] method, it must
793    /// be possible to convert the service's [`Error`] type into the error type
794    /// returned by the `then` future. This is trivial when the function
795    /// returns the same error type as the service, but in other cases, it can
796    /// be useful to use [`BoxError`] to erase differing error types.
797    ///
798    /// # Examples
799    ///
800    /// ```
801    /// # use std::task::{Poll, Context};
802    /// # use tower::{Service, ServiceExt};
803    /// #
804    /// # struct DatabaseService;
805    /// # impl DatabaseService {
806    /// #   fn new(address: &str) -> Self {
807    /// #       DatabaseService
808    /// #   }
809    /// # }
810    /// #
811    /// # type Record = ();
812    /// # type DbError = ();
813    /// #
814    /// # impl Service<u32> for DatabaseService {
815    /// #   type Response = Record;
816    /// #   type Error = DbError;
817    /// #   type Future = std::future::Ready<Result<Record, DbError>>;
818    /// #
819    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
820    /// #       Poll::Ready(Ok(()))
821    /// #   }
822    /// #
823    /// #   fn call(&mut self, request: u32) -> Self::Future {
824    /// #       std::future::ready(Ok(()))
825    /// #   }
826    /// # }
827    /// #
828    /// # fn main() {
829    /// // A service returning Result<Record, DbError>
830    /// let service = DatabaseService::new("127.0.0.1:8080");
831    ///
832    /// // An async function that attempts to recover from errors returned by the
833    /// // database.
834    /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
835    ///     // ...
836    ///     # Ok(())
837    /// }
838    /// #    async {
839    ///
840    /// // If the database service returns an error, attempt to recover by
841    /// // calling `recover_from_error`. Otherwise, return the successful response.
842    /// let mut new_service = service.then(|result| async move {
843    ///     match result {
844    ///         Ok(record) => Ok(record),
845    ///         Err(e) => recover_from_error(e).await,
846    ///     }
847    /// });
848    ///
849    /// // Call the new service
850    /// let id = 13;
851    /// let record = new_service
852    ///     .ready()
853    ///     .await?
854    ///     .call(id)
855    ///     .await?;
856    /// # Ok::<(), DbError>(())
857    /// #    };
858    /// # }
859    /// ```
860    ///
861    /// [`Future`]: crate::Service::Future
862    /// [`Output`]: std::future::Future::Output
863    /// [`futures` crate]: https://docs.rs/futures
864    /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
865    /// [`Error`]: crate::Service::Error
866    /// [`Response`]: crate::Service::Response
867    /// [`poll_ready`]: crate::Service::poll_ready
868    /// [`BoxError`]: crate::BoxError
869    fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
870    where
871        Self: Sized,
872        Error: From<Self::Error>,
873        F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
874        Fut: Future<Output = Result<Response, Error>>,
875    {
876        Then::new(self, f)
877    }
878
879    /// Composes a function that transforms futures produced by the service.
880    ///
881    /// This takes a function or closure returning a future computed from the future returned by
882    /// the service's [`call`] method, as opposed to the responses produced by the future.
883    ///
884    /// # Examples
885    ///
886    /// ```
887    /// # use std::task::{Poll, Context};
888    /// # use tower::{Service, ServiceExt, BoxError};
889    /// #
890    /// # struct DatabaseService;
891    /// # impl DatabaseService {
892    /// #   fn new(address: &str) -> Self {
893    /// #       DatabaseService
894    /// #   }
895    /// # }
896    /// #
897    /// # type Record = ();
898    /// # type DbError = crate::BoxError;
899    /// #
900    /// # impl Service<u32> for DatabaseService {
901    /// #   type Response = Record;
902    /// #   type Error = DbError;
903    /// #   type Future = std::future::Ready<Result<Record, DbError>>;
904    /// #
905    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
906    /// #       Poll::Ready(Ok(()))
907    /// #   }
908    /// #
909    /// #   fn call(&mut self, request: u32) -> Self::Future {
910    /// #       std::future::ready(Ok(()))
911    /// #   }
912    /// # }
913    /// #
914    /// # fn main() {
915    /// use std::time::Duration;
916    /// use tokio::time::timeout;
917    ///
918    /// // A service returning Result<Record, DbError>
919    /// let service = DatabaseService::new("127.0.0.1:8080");
920    /// #    async {
921    ///
922    /// let mut new_service = service.map_future(|future| async move {
923    ///     let res = timeout(Duration::from_secs(1), future).await?;
924    ///     Ok::<_, BoxError>(res)
925    /// });
926    ///
927    /// // Call the new service
928    /// let id = 13;
929    /// let record = new_service
930    ///     .ready()
931    ///     .await?
932    ///     .call(id)
933    ///     .await?;
934    /// # Ok::<(), BoxError>(())
935    /// #    };
936    /// # }
937    /// ```
938    ///
939    /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
940    ///
941    /// [`call`]: crate::Service::call
942    /// [`Timeout`]: crate::timeout::Timeout
943    fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
944    where
945        Self: Sized,
946        F: FnMut(Self::Future) -> Fut,
947        Error: From<Self::Error>,
948        Fut: Future<Output = Result<Response, Error>>,
949    {
950        MapFuture::new(self, f)
951    }
952
953    /// Returns a buffered version of this service.
954    ///
955    /// See [`Buffer::new()`] for the details.
956    #[cfg(feature = "buffer")]
957    fn buffered(self, bound: usize) -> Buffer<Request, Self::Future>
958    where
959        Self: Send + Sized + 'static,
960        Self::Future: Send,
961        Self::Error: Into<crate::BoxError> + Send + Sync,
962        Request: Send + Sized + 'static,
963    {
964        Buffer::new(self, bound)
965    }
966
967    /// Returns a retry version of this service.
968    ///
969    /// See [`Retry::new()`] for the details.
970    #[cfg(feature = "retry")]
971    fn retry<P>(self, policy: P) -> Retry<P, Self>
972    where
973        Self: Sized,
974    {
975        Retry::new(policy, self)
976    }
977
978    /// Convert the service into a [`Service`] + [`Send`] trait object.
979    ///
980    /// See [`BoxService`] for more details.
981    ///
982    /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
983    /// can be used instead, to produce a boxed service which will also
984    /// implement [`Clone`].
985    ///
986    /// # Example
987    ///
988    /// ```
989    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
990    /// #
991    /// # struct Request;
992    /// # struct Response;
993    /// # impl Response {
994    /// #     fn new() -> Self { Self }
995    /// # }
996    ///
997    /// let service = service_fn(|req: Request| async {
998    ///     Ok::<_, BoxError>(Response::new())
999    /// });
1000    ///
1001    /// let service: BoxService<Request, Response, BoxError> = service
1002    ///     .map_request(|req| {
1003    ///         println!("received request");
1004    ///         req
1005    ///     })
1006    ///     .map_response(|res| {
1007    ///         println!("response produced");
1008    ///         res
1009    ///     })
1010    ///     .boxed();
1011    /// # let service = assert_service(service);
1012    /// # fn assert_service<S, R>(svc: S) -> S
1013    /// # where S: Service<R> { svc }
1014    /// ```
1015    ///
1016    /// [`Service`]: crate::Service
1017    /// [`boxed_clone`]: Self::boxed_clone
1018    fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
1019    where
1020        Self: Sized + Send + 'static,
1021        Self::Future: Send + 'static,
1022    {
1023        BoxService::new(self)
1024    }
1025
1026    /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
1027    ///
1028    /// This is similar to the [`boxed`] method, but it requires that `Self` implement
1029    /// [`Clone`], and the returned boxed service implements [`Clone`].
1030    /// See [`BoxCloneService`] for more details.
1031    ///
1032    /// # Example
1033    ///
1034    /// ```
1035    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1036    /// #
1037    /// # struct Request;
1038    /// # struct Response;
1039    /// # impl Response {
1040    /// #     fn new() -> Self { Self }
1041    /// # }
1042    ///
1043    /// let service = service_fn(|req: Request| async {
1044    ///     Ok::<_, BoxError>(Response::new())
1045    /// });
1046    ///
1047    /// let service: BoxCloneService<Request, Response, BoxError> = service
1048    ///     .map_request(|req| {
1049    ///         println!("received request");
1050    ///         req
1051    ///     })
1052    ///     .map_response(|res| {
1053    ///         println!("response produced");
1054    ///         res
1055    ///     })
1056    ///     .boxed_clone();
1057    ///
1058    /// // The boxed service can still be cloned.
1059    /// service.clone();
1060    /// # let service = assert_service(service);
1061    /// # fn assert_service<S, R>(svc: S) -> S
1062    /// # where S: Service<R> { svc }
1063    /// ```
1064    ///
1065    /// [`Service`]: crate::Service
1066    /// [`boxed`]: Self::boxed
1067    fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1068    where
1069        Self: Clone + Sized + Send + 'static,
1070        Self::Future: Send + 'static,
1071    {
1072        BoxCloneService::new(self)
1073    }
1074}
1075
1076impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1077
1078/// Convert an `Option<Layer>` into a [`Layer`].
1079///
1080/// ```
1081/// # use std::time::Duration;
1082/// # use tower::Service;
1083/// # use tower::builder::ServiceBuilder;
1084/// use tower::util::option_layer;
1085/// # use tower::timeout::TimeoutLayer;
1086/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1087/// # let timeout = Some(Duration::new(10, 0));
1088/// // Layer to apply a timeout if configured
1089/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1090///
1091/// ServiceBuilder::new()
1092///     .layer(maybe_timeout)
1093///     .service(svc);
1094/// # }
1095/// ```
1096///
1097/// [`Layer`]: crate::layer::Layer
1098pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1099    if let Some(layer) = layer {
1100        Either::Left(layer)
1101    } else {
1102        Either::Right(Identity::new())
1103    }
1104}