//
//Traits and implementations of arbitrary data streams.
//!
//! Streams are similar to the `Iterator` trait in that they represent some sequential set of items
//! which can be retrieved one by one. Where `Stream`s differ is that they are allowed to return
//! errors instead of just `None` and if they implement the `RangeStreamOnce` trait they are also
//! capable of returning multiple items at the same time, usually in the form of a slice.
//!
//! In addition to he functionality above, a proper `Stream` usable by a `Parser` must also have a
//! position (marked by the `Positioned` trait) and must also be resetable (marked by the
//! `ResetStream` trait). The former is used to ensure that errors at different points in the stream
//! aren't combined and the latter is used in parsers such as `or` to try multiple alternative
//! parses.

use crate::lib::{cmp::Ordering, fmt, marker::PhantomData, str::Chars};

use crate::{
    error::{
        ParseError,
        ParseResult::{self, *},
        StreamError, StringStreamError, Tracked, UnexpectedParse,
    },
    Parser,
};

#[cfg(feature = "std")]
pub use self::decoder::Decoder;

#[doc(hidden)]
#[macro_export]
macro_rules! clone_resetable {
    (( $($params: tt)* ) $ty: ty) => {
        impl<$($params)*> ResetStream for $ty
            where Self: StreamOnce
        {
            type Checkpoint = Self;

            fn checkpoint(&self) -> Self {
                self.clone()
            }
            #[inline]
            fn reset(&mut self, checkpoint: Self) -> Result<(), Self::Error> {
                *self = checkpoint;
                Ok(())
            }
        }
    }
}

#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod buf_reader;
/// Stream wrapper which provides a `ResetStream` impl for `StreamOnce` impls which do not have
/// one.
#[cfg(feature = "alloc")]
#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
pub mod buffered;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod easy;
/// Stream wrapper which provides more detailed position information.
pub mod position;
/// Stream wrapper allowing `std::io::Read` to be used
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod read;
pub mod span;
/// Stream wrapper allowing custom state to be used.
pub mod state;

#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod decoder;

/// A type which has a position.
pub trait Positioned: StreamOnce {
    /// Returns the current position of the stream.
    fn position(&self) -> Self::Position;
}

/// Convenience alias over the `StreamError` for the input stream `Input`
///
/// ```
/// #[macro_use]
/// extern crate combine;
/// use combine::{easy, Parser, Stream, many1};
/// use combine::parser::char::letter;
/// use combine::stream::StreamErrorFor;
/// use combine::error::{ParseError, StreamError};
///
/// parser!{
///    fn parser[Input]()(Input) -> String
///     where [ Input: Stream<Token = char>, ]
///     {
///         many1(letter()).and_then(|word: String| {
///             if word == "combine" {
///                 Ok(word)
///             } else {
///                 // The alias makes it easy to refer to the `StreamError` type of `Input`
///                 Err(StreamErrorFor::<Input>::expected_static_message("combine"))
///             }
///         })
///     }
/// }
///
/// fn main() {
/// }
/// ```
pub type StreamErrorFor<Input> = <<Input as StreamOnce>::Error as ParseError<
    <Input as StreamOnce>::Token,
    <Input as StreamOnce>::Range,
    <Input as StreamOnce>::Position,
>>::StreamError;

/// `StreamOnce` represents a sequence of items that can be extracted one by one.
pub trait StreamOnce {
    /// The type of items which is yielded from this stream.
    type Token: Clone;

    /// The type of a range of items yielded from this stream.
    /// Types which do not a have a way of yielding ranges of items should just use the
    /// `Self::Token` for this type.
    type Range: Clone;

    /// Type which represents the position in a stream.
    /// `Ord` is required to allow parsers to determine which of two positions are further ahead.
    type Position: Clone + Ord;

    type Error: ParseError<Self::Token, Self::Range, Self::Position>;
    /// Takes a stream and removes its first token, yielding the token and the rest of the elements.
    /// Returns `Err` if no element could be retrieved.
    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>;

    /// Returns `true` if this stream only contains partial input.
    ///
    /// See `PartialStream`.
    fn is_partial(&self) -> bool {
        false
    }
}

/// A `StreamOnce` which can create checkpoints which the stream can be reset to
pub trait ResetStream: StreamOnce {
    type Checkpoint: Clone;

    /// Creates a `Checkpoint` at the current position which can be used to reset the stream
    /// later to the current position
    fn checkpoint(&self) -> Self::Checkpoint;
    /// Attempts to reset the stream to an earlier position.
    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>;
}

clone_resetable! {('a) &'a str}
clone_resetable! {('a, T) &'a [T]}
clone_resetable! {('a, T) SliceStream<'a, T> }
clone_resetable! {(T: Clone) IteratorStream<T>}

/// A stream of tokens which can be duplicated
///
/// This is a trait over types which implement the `StreamOnce`, `ResetStream` and `Positioned`
/// traits. If you need a custom `Stream` object then implement those traits and `Stream` is
/// implemented automatically.
pub trait Stream: StreamOnce + ResetStream + Positioned {}

impl<Input> Stream for Input
where
    Input: StreamOnce + Positioned + ResetStream,
{
}

#[inline]
pub fn uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error>
where
    Input: ?Sized + Stream,
{
    match input.uncons() {
        Ok(x) => CommitOk(x),
        Err(err) => wrap_stream_error(input, err),
    }
}

/// A `RangeStream` is an extension of `StreamOnce` which allows for zero copy parsing.
pub trait RangeStreamOnce: StreamOnce + ResetStream {
    /// Takes `size` elements from the stream.
    /// Fails if the length of the stream is less than `size`.
    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>;

    /// Takes items from stream, testing each one with `predicate`.
    /// returns the range of items which passed `predicate`.
    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool;

    #[inline]
    /// Takes items from stream, testing each one with `predicate`
    /// returns a range of at least one items which passed `predicate`.
    ///
    /// # Note
    ///
    /// This may not return `PeekOk` as it should uncons at least one token.
    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        let mut committed = false;
        let mut started_at_eoi = true;
        let result = self.uncons_while(|c| {
            let ok = f(c);
            committed |= ok;
            started_at_eoi = false;
            ok
        });
        if committed {
            match result {
                Ok(x) => CommitOk(x),
                Err(x) => CommitErr(x),
            }
        } else if started_at_eoi {
            PeekErr(Tracked::from(StreamErrorFor::<Self>::end_of_input()))
        } else {
            PeekErr(Tracked::from(
                StreamErrorFor::<Self>::unexpected_static_message(""),
            ))
        }
    }

    /// Returns the distance between `self` and `end`. The returned `usize` must be so that
    ///
    /// ```ignore
    /// let start = stream.checkpoint();
    /// stream.uncons_range(distance);
    /// stream.distance(&start) == distance
    /// ```
    fn distance(&self, end: &Self::Checkpoint) -> usize;

    /// Returns the entire range of `self`
    fn range(&self) -> Self::Range;
}

/// A `RangeStream` is an extension of `Stream` which allows for zero copy parsing.
pub trait RangeStream: Stream + RangeStreamOnce {}

impl<Input> RangeStream for Input where Input: RangeStreamOnce + Stream {}

#[doc(hidden)]
pub fn wrap_stream_error<T, Input>(
    input: &Input,
    err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError,
) -> ParseResult<T, <Input as StreamOnce>::Error>
where
    Input: ?Sized + StreamOnce + Positioned,
{
    let err = Input::Error::from_error(input.position(), err);
    if input.is_partial() {
        CommitErr(err)
    } else {
        PeekErr(err.into())
    }
}

#[inline]
pub fn uncons_range<Input>(
    input: &mut Input,
    size: usize,
) -> ParseResult<Input::Range, <Input as StreamOnce>::Error>
where
    Input: ?Sized + RangeStream,
{
    match input.uncons_range(size) {
        Err(err) => wrap_stream_error(input, err),
        Ok(x) => {
            if size == 0 {
                PeekOk(x)
            } else {
                CommitOk(x)
            }
        }
    }
}

#[doc(hidden)]
pub fn input_at_eof<Input>(input: &mut Input) -> bool
where
    Input: ?Sized + Stream,
{
    let before = input.checkpoint();
    let x = input
        .uncons()
        .err()
        .map_or(false, |err| err.is_unexpected_end_of_input());
    input.reset(before).is_ok() && x
}

/// Removes items from the input while `predicate` returns `true`.
#[inline]
pub fn uncons_while<Input, F>(
    input: &mut Input,
    predicate: F,
) -> ParseResult<Input::Range, Input::Error>
where
    F: FnMut(Input::Token) -> bool,
    Input: ?Sized + RangeStream,
    Input::Range: Range,
{
    match input.uncons_while(predicate) {
        Err(err) => wrap_stream_error(input, err),
        Ok(x) => {
            if input.is_partial() && input_at_eof(input) {
                // Partial inputs which encounter end of file must fail to let more input be
                // retrieved
                CommitErr(Input::Error::from_error(
                    input.position(),
                    StreamError::end_of_input(),
                ))
            } else if x.len() == 0 {
                PeekOk(x)
            } else {
                CommitOk(x)
            }
        }
    }
}

#[inline]
/// Takes items from stream, testing each one with `predicate`
/// returns a range of at least one items which passed `predicate`.
///
/// # Note
///
/// This may not return `PeekOk` as it should uncons at least one token.
pub fn uncons_while1<Input, F>(
    input: &mut Input,
    predicate: F,
) -> ParseResult<Input::Range, Input::Error>
where
    F: FnMut(Input::Token) -> bool,
    Input: ?Sized + RangeStream,
{
    match input.uncons_while1(predicate) {
        CommitOk(x) => {
            if input.is_partial() && input_at_eof(input) {
                // Partial inputs which encounter end of file must fail to let more input be
                // retrieved
                CommitErr(Input::Error::from_error(
                    input.position(),
                    StreamError::end_of_input(),
                ))
            } else {
                CommitOk(x)
            }
        }
        PeekErr(_) => {
            if input.is_partial() && input_at_eof(input) {
                // Partial inputs which encounter end of file must fail to let more input be
                // retrieved
                CommitErr(Input::Error::from_error(
                    input.position(),
                    StreamError::end_of_input(),
                ))
            } else {
                PeekErr(Input::Error::empty(input.position()).into())
            }
        }
        CommitErr(err) => {
            if input.is_partial() && input_at_eof(input) {
                // Partial inputs which encounter end of file must fail to let more input be
                // retrieved
                CommitErr(Input::Error::from_error(
                    input.position(),
                    StreamError::end_of_input(),
                ))
            } else {
                wrap_stream_error(input, err)
            }
        }
        PeekOk(_) => unreachable!(),
    }
}

/// Trait representing a range of elements.
pub trait Range {
    /// Returns the remaining length of `self`.
    /// The returned length need not be the same as the number of items left in the stream.
    fn len(&self) -> usize;

    /// Returns `true` if the range does not contain any elements (`Range::len() == 0`)
    fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

impl<'a, I> StreamOnce for &'a mut I
where
    I: StreamOnce + ?Sized,
{
    type Token = I::Token;

    type Range = I::Range;

    type Position = I::Position;

    type Error = I::Error;
    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
        (**self).uncons()
    }

    fn is_partial(&self) -> bool {
        (**self).is_partial()
    }
}

impl<'a, I> Positioned for &'a mut I
where
    I: Positioned + ?Sized,
{
    #[inline]
    fn position(&self) -> Self::Position {
        (**self).position()
    }
}

impl<'a, I> ResetStream for &'a mut I
where
    I: ResetStream + ?Sized,
{
    type Checkpoint = I::Checkpoint;

    fn checkpoint(&self) -> Self::Checkpoint {
        (**self).checkpoint()
    }

    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
        (**self).reset(checkpoint)
    }
}

impl<'a, I> RangeStreamOnce for &'a mut I
where
    I: RangeStreamOnce + ?Sized,
{
    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        (**self).uncons_while(f)
    }

    #[inline]
    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        (**self).uncons_while1(f)
    }

    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
        (**self).uncons_range(size)
    }

    #[inline]
    fn distance(&self, end: &Self::Checkpoint) -> usize {
        (**self).distance(end)
    }

    fn range(&self) -> Self::Range {
        (**self).range()
    }
}

impl<'a, I> Range for &'a mut I
where
    I: Range + ?Sized,
{
    fn len(&self) -> usize {
        (**self).len()
    }
}

impl<'a> StreamOnce for &'a str {
    type Token = char;
    type Range = &'a str;
    type Position = PointerOffset<str>;
    type Error = StringStreamError;

    #[inline]
    fn uncons(&mut self) -> Result<char, StreamErrorFor<Self>> {
        let mut chars = self.chars();
        match chars.next() {
            Some(c) => {
                *self = chars.as_str();
                Ok(c)
            }
            None => Err(StringStreamError::Eoi),
        }
    }
}

impl<'a> Positioned for &'a str {
    #[inline]
    fn position(&self) -> Self::Position {
        PointerOffset::new(self.as_bytes().position().0)
    }
}

#[allow(clippy::while_let_loop)]
fn str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str
where
    F: FnMut(char) -> bool,
{
    let mut last_char_size = 0;

    macro_rules! test_next {
        () => {
            match chars.next() {
                Some(c) => {
                    if !f(c) {
                        last_char_size = c.len_utf8();
                        break;
                    }
                }
                None => break,
            }
        };
    }
    loop {
        test_next!();
        test_next!();
        test_next!();
        test_next!();
        test_next!();
        test_next!();
        test_next!();
        test_next!();
    }

    let len = slice.len() - chars.as_str().len() - last_char_size;
    let (result, rest) = slice.split_at(len);
    *slice = rest;
    result
}

impl<'a> RangeStreamOnce for &'a str {
    fn uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        Ok(str_uncons_while(self, self.chars(), f))
    }

    #[inline]
    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        let mut chars = self.chars();
        match chars.next() {
            Some(c) => {
                if !f(c) {
                    return PeekErr(Tracked::from(StringStreamError::UnexpectedParse));
                }
            }
            None => return PeekErr(Tracked::from(StringStreamError::Eoi)),
        }

        CommitOk(str_uncons_while(self, chars, f))
    }

    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>> {
        fn is_char_boundary(s: &str, index: usize) -> bool {
            if index == s.len() {
                return true;
            }
            match s.as_bytes().get(index) {
                None => false,
                Some(b) => !(128..=192).contains(b),
            }
        }
        if size <= self.len() {
            if is_char_boundary(self, size) {
                let (result, remaining) = self.split_at(size);
                *self = remaining;
                Ok(result)
            } else {
                Err(StringStreamError::CharacterBoundary)
            }
        } else {
            Err(StringStreamError::Eoi)
        }
    }

    #[inline]
    fn distance(&self, end: &Self) -> usize {
        self.position().0 - end.position().0
    }

    fn range(&self) -> Self::Range {
        self
    }
}

impl<'a> Range for &'a str {
    #[inline]
    fn len(&self) -> usize {
        str::len(self)
    }
}

impl<'a, T> Range for &'a [T] {
    #[inline]
    fn len(&self) -> usize {
        <[T]>::len(self)
    }
}

#[repr(usize)]
enum UnconsStart {
    Zero = 0,
    One = 1,
}

fn slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
where
    F: FnMut(T) -> bool,
    T: Clone,
{
    let mut i = start as usize;
    let len = slice.len();
    // SAFETY: We only call this function with `One` if the slice has length >= 1
    debug_assert!(len >= i, "");
    let mut found = false;

    macro_rules! check {
        () => {
            if !f(unsafe { slice.get_unchecked(i).clone() }) {
                found = true;
                break;
            }
            i += 1;
        };
    }

    // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
    while len - i >= 8 {
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
    }

    if !found {
        while let Some(c) = slice.get(i) {
            if !f(c.clone()) {
                break;
            }
            i += 1;
        }
    }

    let (result, remaining) = slice.split_at(i);
    *slice = remaining;
    result
}

impl<'a, T> RangeStreamOnce for &'a [T]
where
    T: Clone + PartialEq,
{
    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
        if size <= self.len() {
            let (result, remaining) = self.split_at(size);
            *self = remaining;
            Ok(result)
        } else {
            Err(UnexpectedParse::Eoi)
        }
    }

    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        Ok(slice_uncons_while(self, UnconsStart::Zero, f))
    }

    #[inline]
    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        match self.first() {
            Some(c) => {
                if !f(c.clone()) {
                    return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
                }
            }
            None => {
                return PeekErr(Tracked::from(UnexpectedParse::Eoi));
            }
        }

        CommitOk(slice_uncons_while(self, UnconsStart::One, f))
    }

    #[inline]
    fn distance(&self, end: &Self) -> usize {
        end.len() - self.len()
    }

    fn range(&self) -> Self::Range {
        self
    }
}

impl<'a, T> Positioned for &'a [T]
where
    T: Clone + PartialEq,
{
    #[inline]
    fn position(&self) -> Self::Position {
        PointerOffset::new(self.as_ptr() as usize)
    }
}

impl<'a, T> StreamOnce for &'a [T]
where
    T: Clone + PartialEq,
{
    type Token = T;
    type Range = &'a [T];
    type Position = PointerOffset<[T]>;
    type Error = UnexpectedParse;

    #[inline]
    fn uncons(&mut self) -> Result<T, StreamErrorFor<Self>> {
        match self.split_first() {
            Some((first, rest)) => {
                *self = rest;
                Ok(first.clone())
            }
            None => Err(UnexpectedParse::Eoi),
        }
    }
}

/// Stream type which indicates that the stream is partial if end of input is reached
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct PartialStream<S>(pub S);

impl<S> From<S> for PartialStream<S> {
    fn from(t: S) -> Self {
        PartialStream(t)
    }
}

impl<S> Positioned for PartialStream<S>
where
    S: Positioned,
{
    #[inline]
    fn position(&self) -> Self::Position {
        self.0.position()
    }
}

impl<S> ResetStream for PartialStream<S>
where
    S: ResetStream,
{
    type Checkpoint = S::Checkpoint;

    #[inline]
    fn checkpoint(&self) -> Self::Checkpoint {
        self.0.checkpoint()
    }

    #[inline]
    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
        self.0.reset(checkpoint)
    }
}

impl<S> StreamOnce for PartialStream<S>
where
    S: StreamOnce,
{
    type Token = S::Token;
    type Range = S::Range;
    type Position = S::Position;
    type Error = S::Error;

    #[inline]
    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
        self.0.uncons()
    }

    fn is_partial(&self) -> bool {
        true
    }
}

impl<S> RangeStreamOnce for PartialStream<S>
where
    S: RangeStreamOnce,
{
    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
        self.0.uncons_range(size)
    }

    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while(f)
    }

    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while1(f)
    }

    #[inline]
    fn distance(&self, end: &Self::Checkpoint) -> usize {
        self.0.distance(end)
    }

    #[inline]
    fn range(&self) -> Self::Range {
        self.0.range()
    }
}

/// Stream type which indicates that the stream is complete if end of input is reached
///
/// For most streams this is already the default but this wrapper can be used to override a nested
/// `PartialStream`
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
#[repr(transparent)]
pub struct CompleteStream<S>(pub S);

impl<S> From<S> for CompleteStream<S> {
    fn from(t: S) -> Self {
        CompleteStream(t)
    }
}

impl<'s, S> From<&'s mut S> for &'s mut CompleteStream<S> {
    fn from(t: &'s mut S) -> Self {
        // SAFETY repr(transparent) is specified on CompleteStream
        unsafe { &mut *(t as *mut S as *mut CompleteStream<S>) }
    }
}

impl<S> Positioned for CompleteStream<S>
where
    S: Positioned,
{
    #[inline]
    fn position(&self) -> Self::Position {
        self.0.position()
    }
}

impl<S> ResetStream for CompleteStream<S>
where
    S: ResetStream,
{
    type Checkpoint = S::Checkpoint;

    #[inline]
    fn checkpoint(&self) -> Self::Checkpoint {
        self.0.checkpoint()
    }

    #[inline]
    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
        self.0.reset(checkpoint)
    }
}

impl<S> StreamOnce for CompleteStream<S>
where
    S: StreamOnce,
{
    type Token = S::Token;
    type Range = S::Range;
    type Position = S::Position;
    type Error = S::Error;

    #[inline]
    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
        self.0.uncons()
    }

    fn is_partial(&self) -> bool {
        false
    }
}

impl<S> RangeStreamOnce for CompleteStream<S>
where
    S: RangeStreamOnce,
{
    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
        self.0.uncons_range(size)
    }

    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while(f)
    }

    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while1(f)
    }

    #[inline]
    fn distance(&self, end: &Self::Checkpoint) -> usize {
        self.0.distance(end)
    }

    #[inline]
    fn range(&self) -> Self::Range {
        self.0.range()
    }
}

#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct MaybePartialStream<S>(pub S, pub bool);

impl<S> Positioned for MaybePartialStream<S>
where
    S: Positioned,
{
    #[inline]
    fn position(&self) -> Self::Position {
        self.0.position()
    }
}

impl<S> ResetStream for MaybePartialStream<S>
where
    S: ResetStream,
{
    type Checkpoint = S::Checkpoint;

    #[inline]
    fn checkpoint(&self) -> Self::Checkpoint {
        self.0.checkpoint()
    }

    #[inline]
    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
        self.0.reset(checkpoint)
    }
}

impl<S> StreamOnce for MaybePartialStream<S>
where
    S: StreamOnce,
{
    type Token = S::Token;
    type Range = S::Range;
    type Position = S::Position;
    type Error = S::Error;

    #[inline]
    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
        self.0.uncons()
    }

    fn is_partial(&self) -> bool {
        self.1
    }
}

impl<S> RangeStreamOnce for MaybePartialStream<S>
where
    S: RangeStreamOnce,
{
    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
        self.0.uncons_range(size)
    }

    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while(f)
    }

    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        self.0.uncons_while1(f)
    }

    #[inline]
    fn distance(&self, end: &Self::Checkpoint) -> usize {
        self.0.distance(end)
    }

    #[inline]
    fn range(&self) -> Self::Range {
        self.0.range()
    }
}

/// Newtype for constructing a stream from a slice where the items in the slice are not copyable.
#[derive(Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct SliceStream<'a, T>(pub &'a [T]);

impl<'a, T> Clone for SliceStream<'a, T> {
    fn clone(&self) -> SliceStream<'a, T> {
        SliceStream(self.0)
    }
}

impl<'a, T> Positioned for SliceStream<'a, T>
where
    T: PartialEq + 'a,
{
    #[inline]
    fn position(&self) -> Self::Position {
        PointerOffset::new(self.0.as_ptr() as usize)
    }
}

impl<'a, T> StreamOnce for SliceStream<'a, T>
where
    T: PartialEq + 'a,
{
    type Token = &'a T;
    type Range = &'a [T];
    type Position = PointerOffset<[T]>;
    type Error = UnexpectedParse;

    #[inline]
    fn uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>> {
        match self.0.split_first() {
            Some((first, rest)) => {
                self.0 = rest;
                Ok(first)
            }
            None => Err(UnexpectedParse::Eoi),
        }
    }
}

fn slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
where
    F: FnMut(&'a T) -> bool,
{
    let mut i = start as usize;
    let len = slice.len();
    // SAFETY: We only call this function with `One` if the slice has length >= 1
    debug_assert!(len >= i, "");
    let mut found = false;

    macro_rules! check {
        () => {
            if !f(unsafe { slice.get_unchecked(i) }) {
                found = true;
                break;
            }
            i += 1;
        };
    }

    // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
    while len - i >= 8 {
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
        check!();
    }

    if !found {
        while let Some(c) = slice.get(i) {
            if !f(c) {
                break;
            }
            i += 1;
        }
    }

    let (result, remaining) = slice.split_at(i);
    *slice = remaining;
    result
}

impl<'a, T> RangeStreamOnce for SliceStream<'a, T>
where
    T: PartialEq + 'a,
{
    #[inline]
    fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
        if size <= self.0.len() {
            let (range, rest) = self.0.split_at(size);
            self.0 = rest;
            Ok(range)
        } else {
            Err(UnexpectedParse::Eoi)
        }
    }

    #[inline]
    fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        Ok(slice_uncons_while_ref(&mut self.0, UnconsStart::Zero, f))
    }

    #[inline]
    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
    where
        F: FnMut(Self::Token) -> bool,
    {
        match self.0.first() {
            Some(c) => {
                if !f(c) {
                    return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
                }
            }
            None => return PeekErr(Tracked::from(UnexpectedParse::Eoi)),
        }

        CommitOk(slice_uncons_while_ref(&mut self.0, UnconsStart::One, f))
    }

    #[inline]
    fn distance(&self, end: &Self) -> usize {
        end.0.len() - self.0.len()
    }

    fn range(&self) -> Self::Range {
        self.0
    }
}

/// Wrapper around iterators which allows them to be treated as a stream.
/// Returned by [`IteratorStream::new`].
#[derive(Copy, Clone, Debug)]
pub struct IteratorStream<Input>(Input);

impl<Input> IteratorStream<Input>
where
    Input: Iterator,
{
    /// Converts an `Iterator` into a stream.
    ///
    /// NOTE: This type do not implement `Positioned` and `Clone` and must be wrapped with types
    ///     such as `BufferedStreamRef` and `State` to become a `Stream` which can be parsed
    pub fn new<T>(iter: T) -> IteratorStream<Input>
    where
        T: IntoIterator<IntoIter = Input, Item = Input::Item>,
    {
        IteratorStream(iter.into_iter())
    }
}

impl<Input> Iterator for IteratorStream<Input>
where
    Input: Iterator,
{
    type Item = Input::Item;
    fn next(&mut self) -> Option<Input::Item> {
        self.0.next()
    }
}

impl<Input: Iterator> StreamOnce for IteratorStream<Input>
where
    Input::Item: Clone + PartialEq,
{
    type Token = Input::Item;
    type Range = Input::Item;
    type Position = ();
    type Error = UnexpectedParse;

    #[inline]
    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
        match self.next() {
            Some(x) => Ok(x),
            None => Err(UnexpectedParse::Eoi),
        }
    }
}

/// Newtype around a pointer offset into a slice stream (`&[T]`/`&str`).
pub struct PointerOffset<T: ?Sized>(pub usize, PhantomData<T>);

impl<T: ?Sized> Clone for PointerOffset<T> {
    fn clone(&self) -> Self {
        PointerOffset::new(self.0)
    }
}

impl<T: ?Sized> Copy for PointerOffset<T> {}

impl<T: ?Sized> Default for PointerOffset<T> {
    fn default() -> Self {
        PointerOffset::new(0)
    }
}

impl<T: ?Sized> PartialEq for PointerOffset<T> {
    fn eq(&self, other: &Self) -> bool {
        self.0 == other.0
    }
}

impl<T: ?Sized> Eq for PointerOffset<T> {}

impl<T: ?Sized> PartialOrd for PointerOffset<T> {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        self.0.partial_cmp(&other.0)
    }
}

impl<T: ?Sized> Ord for PointerOffset<T> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.0.cmp(&other.0)
    }
}

impl<T> fmt::Debug for PointerOffset<T>
where
    T: ?Sized,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self)
    }
}

impl<T> fmt::Display for PointerOffset<T>
where
    T: ?Sized,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "PointerOffset({:?})", self.0 as *const ())
    }
}

impl<T> PointerOffset<T>
where
    T: ?Sized,
{
    pub fn new(offset: usize) -> Self {
        PointerOffset(offset, PhantomData)
    }

    /// Converts the pointer-based position into an indexed position.
    ///
    /// ```rust
    /// # extern crate combine;
    /// # use combine::*;
    /// # fn main() {
    /// let text = "b";
    /// let err = token('a').easy_parse(text).unwrap_err();
    /// assert_eq!(err.position.0, text.as_ptr() as usize);
    /// assert_eq!(err.map_position(|p| p.translate_position(text)).position, 0);
    /// # }
    /// ```
    pub fn translate_position(mut self, initial_slice: &T) -> usize {
        self.0 -= initial_slice as *const T as *const () as usize;
        self.0
    }
}

/// Decodes `input` using `parser`.
///
/// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
/// `parser`.
/// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
/// using `parser`.
///
/// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
pub fn decode<Input, P>(
    mut parser: P,
    input: &mut Input,
    partial_state: &mut P::PartialState,
) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
where
    P: Parser<Input>,
    Input: RangeStream,
{
    let start = input.checkpoint();
    match parser.parse_with_state(input, partial_state) {
        Ok(message) => Ok((Some(message), input.distance(&start))),
        Err(err) => {
            if err.is_unexpected_end_of_input() {
                if input.is_partial() {
                    // The parser expected more input to parse and input is partial, return `None`
                    // as we did not finish and also return how much may be removed from the stream
                    Ok((None, input.distance(&start)))
                } else {
                    Err(err)
                }
            } else {
                Err(err)
            }
        }
    }
}

/// Decodes `input` using `parser`. Like `decode` but works directly in both
/// `tokio_util::Decoder::decode` and `tokio_util::Decoder::decode_eof`
///
/// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
/// `parser`.
/// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
/// using `parser`.
/// Returns `Ok(None, 0)` if `input` did not contain enough data to finish parsing
/// using `parser`.
///
/// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
pub fn decode_tokio<Input, P>(
    mut parser: P,
    input: &mut Input,
    partial_state: &mut P::PartialState,
) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
where
    P: Parser<Input>,
    Input: RangeStream,
{
    let start = input.checkpoint();
    match parser.parse_with_state(input, partial_state) {
        Ok(message) => Ok((Some(message), input.distance(&start))),
        Err(err) => {
            if err.is_unexpected_end_of_input() {
                if input.is_partial() {
                    // The parser expected more input to parse and input is partial, return `None`
                    // as we did not finish and also return how much may be removed from the stream
                    Ok((None, input.distance(&start)))
                } else if input_at_eof(input) && input.distance(&start) == 0 {
                    // We are at eof and the input is empty, return None to indicate that we are
                    // done
                    Ok((None, 0))
                } else {
                    Err(err)
                }
            } else {
                Err(err)
            }
        }
    }
}

/// Parses an instance of `std::io::Read` as a `&[u8]` without reading the entire file into
/// memory.
///
/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
/// example for how to pass a parser to the macro (constructing parts of the parser outside of
/// the `decode!` call is unlikely to work.
///
/// ```
/// use std::{
///     fs::File,
/// };
/// use combine::{decode, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
///
/// let mut read = File::open("README.md").unwrap();
/// let mut decoder = Decoder::new();
/// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
/// assert_eq!(
///     decode!(
///         decoder,
///         read,
///         {
///             let word = many1(satisfy(|b| !is_whitespace(b)));
///             sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
///         },
///         |input, _position| combine::easy::Stream::from(input),
///     ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
///     Ok(773),
/// );
/// ```
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
#[macro_export]
macro_rules! decode {
    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
        $crate::decode!($decoder, $read, $parser, |input, _position| input, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
        $crate::decode!($decoder, $read, $parser, $input_stream, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
        match $decoder {
            ref mut decoder => match $read {
                ref mut read => 'outer: loop {
                    let (opt, removed) = {
                        let (state, position, buffer, end_of_input) = decoder.__inner();
                        let buffer =
                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, read);

                        let mut stream = $crate::stream::call_with2(
                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
                            *position,
                            $input_stream,
                        );
                        let result = $crate::stream::decode($parser, &mut stream, state);
                        *position = $crate::stream::Positioned::position(&stream);
                        $crate::stream::call_with(stream, $post_decode);
                        match result {
                            Ok(x) => x,
                            Err(err) => {
                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
                            }
                        }
                    };

                    decoder.advance(&mut *read, removed);

                    if let Some(v) = opt {
                        break 'outer Ok(v);
                    }

                    match decoder.__before_parse(&mut *read) {
                        Ok(x) => x,
                        Err(error) => {
                            break 'outer Err($crate::stream::decoder::Error::Io {
                                error,
                                position: Clone::clone(decoder.position()),
                            })
                        }
                    };
                },
            },
        }
    };
}

/// Parses an instance of `futures::io::AsyncRead` as a `&[u8]` without reading the entire file into
/// memory.
///
/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
/// example for how to pass a parser to the macro (constructing parts of the parser outside of
/// the `decode!` call is unlikely to work.
///
/// ```
/// # use futures_03_dep as futures;
/// use futures::pin_mut;
/// use async_std::{
///     fs::File,
///     task,
/// };
///
/// use combine::{decode_futures_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
///
/// fn main() {
///     task::block_on(main_());
/// }
///
/// async fn main_() {
///     let mut read = File::open("README.md").await.unwrap();
///     let mut decoder = Decoder::new();
///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
///     assert_eq!(
///         decode_futures_03!(
///             decoder,
///             read,
///             {
///                 let word = many1(satisfy(|b| !is_whitespace(b)));
///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
///             },
///             |input, _position| combine::easy::Stream::from(input),
///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
///         Ok(773),
///     );
/// }
/// ```
#[cfg(feature = "futures-io-03")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures-io-03")))]
#[macro_export]
macro_rules! decode_futures_03 {
    ($decoder: expr, $read: expr, $parser: expr) => {
        $crate::decode_futures_03!($decoder, $read, $parser, |x| x $(,)?)
    };


    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
        $crate::decode_futures_03!($decoder, $read, $parser, $input_stream, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
        match $decoder {
            ref mut decoder => match $read {
                ref mut read => 'outer: loop {
                    let (opt, removed) = {
                        let (state, position, buffer, end_of_input) = decoder.__inner();
                        let buffer =
                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);

                        let mut stream = $crate::stream::call_with2(
                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
                            *position,
                            $input_stream,
                        );
                        let result = $crate::stream::decode($parser, &mut stream, state);
                        *position = $crate::stream::Positioned::position(&stream);
                        $crate::stream::call_with(stream, $post_decode);
                        match result {
                            Ok(x) => x,
                            Err(err) => break 'outer Err($crate::stream::decoder::Error::Parse(err)),
                        }
                    };

                    decoder.advance_pin(std::pin::Pin::new(&mut *read), removed);

                    if let Some(v) = opt {
                        break 'outer Ok(v);
                    }


                    match decoder.__before_parse_async(std::pin::Pin::new(&mut *read)).await {
                        Ok(_) => (),
                        Err(error) => {
                            break 'outer Err($crate::stream::decoder::Error::Io {
                                error,
                                position: Clone::clone(decoder.position()),
                            })
                        }
                    };
                }
            }
        }
    };
}

/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
/// memory.
///
/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
/// example for how to pass a parser to the macro (constructing parts of the parser outside of
/// the `decode!` call is unlikely to work.
///
/// ```
/// # use tokio_02_dep as tokio;
/// # use futures_03_dep as futures;
/// use futures::pin_mut;
/// use tokio::{
///     fs::File,
/// };
///
/// use combine::{decode_tokio_02, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
///
/// #[tokio::main]
/// async fn main() {
///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
///     let mut decoder = Decoder::new_bufferless();
///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
///     assert_eq!(
///         decode_tokio_02!(
///             decoder,
///             read,
///             {
///                 let word = many1(satisfy(|b| !is_whitespace(b)));
///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
///             },
///             |input, _position| combine::easy::Stream::from(input),
///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
///         Ok(773),
///     );
/// }
/// ```
#[cfg(feature = "tokio-02")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))]
#[macro_export]
macro_rules! decode_tokio_02 {
    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
        $crate::decode_tokio_02!($decoder, $read, $parser, |input, _position| input)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
        $crate::decode_tokio_02!($decoder, $read, $parser, $input_stream, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
        match $decoder {
            ref mut decoder => match $read {
                ref mut read => 'outer: loop {
                    let (opt, removed) = {
                        let (state, position, buffer, end_of_input) = decoder.__inner();
                        let buffer =
                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
                        let mut stream = $crate::stream::call_with2(
                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
                            *position,
                            $input_stream,
                        );
                        let result = $crate::stream::decode($parser, &mut stream, state);
                        *position = $crate::stream::Positioned::position(&stream);
                        $crate::stream::call_with(stream, $post_decode);
                        match result {
                            Ok(x) => x,
                            Err(err) => {
                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
                            }
                        }
                    };

                    decoder.advance_pin(std::pin::Pin::new(read), removed);

                    if let Some(v) = opt {
                        break 'outer Ok(v);
                    }

                    match decoder
                        .__before_parse_tokio_02(std::pin::Pin::new(&mut *read))
                        .await
                    {
                        Ok(x) => x,
                        Err(error) => {
                            break 'outer Err($crate::stream::decoder::Error::Io {
                                error,
                                position: Clone::clone(decoder.position()),
                            })
                        }
                    };
                },
            },
        }
    };
}

/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
/// memory.
///
/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
/// example for how to pass a parser to the macro (constructing parts of the parser outside of
/// the `decode!` call is unlikely to work.
///
/// ```
/// # use tokio_03_dep as tokio;
/// # use futures_03_dep as futures;
/// use futures::pin_mut;
/// use tokio::{
///     fs::File,
/// };
///
/// use combine::{decode_tokio_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
///
/// #[tokio::main]
/// async fn main() {
///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
///     let mut decoder = Decoder::new_bufferless();
///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
///     assert_eq!(
///         decode_tokio_03!(
///             decoder,
///             read,
///             {
///                 let word = many1(satisfy(|b| !is_whitespace(b)));
///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
///             },
///             |input, _position| combine::easy::Stream::from(input),
///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
///         Ok(773),
///     );
/// }
/// ```
#[cfg(feature = "tokio-03")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))]
#[macro_export]
macro_rules! decode_tokio_03 {
    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
        $crate::decode_tokio_03!($decoder, $read, $parser, |input, _position| input)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
        $crate::decode_tokio_03!($decoder, $read, $parser, $input_stream, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
        match $decoder {
            ref mut decoder => match $read {
                ref mut read => 'outer: loop {
                    let (opt, removed) = {
                        let (state, position, buffer, end_of_input) = decoder.__inner();
                        let buffer =
                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
                        let mut stream = $crate::stream::call_with2(
                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
                            *position,
                            $input_stream,
                        );
                        let result = $crate::stream::decode($parser, &mut stream, state);
                        *position = $crate::stream::Positioned::position(&stream);
                        $crate::stream::call_with(stream, $post_decode);
                        match result {
                            Ok(x) => x,
                            Err(err) => {
                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
                            }
                        }
                    };

                    decoder.advance_pin(std::pin::Pin::new(read), removed);

                    if let Some(v) = opt {
                        break 'outer Ok(v);
                    }

                    match decoder
                        .__before_parse_tokio_03(std::pin::Pin::new(&mut *read))
                        .await
                    {
                        Ok(x) => x,
                        Err(error) => {
                            break 'outer Err($crate::stream::decoder::Error::Io {
                                error,
                                position: Clone::clone(decoder.position()),
                            })
                        }
                    };
                },
            },
        }
    };
}

/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
/// memory.
///
/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
/// example for how to pass a parser to the macro (constructing parts of the parser outside of
/// the `decode!` call is unlikely to work.
///
/// ```
/// # use tokio_dep as tokio;
/// # use futures_03_dep as futures;
/// use futures::pin_mut;
/// use tokio::{
///     fs::File,
/// };
///
/// use combine::{decode_tokio, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
///
/// #[tokio::main]
/// async fn main() {
///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
///     let mut decoder = Decoder::new_bufferless();
///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
///     assert_eq!(
///         decode_tokio!(
///             decoder,
///             read,
///             {
///                 let word = many1(satisfy(|b| !is_whitespace(b)));
///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
///             },
///             |input, _position| combine::easy::Stream::from(input),
///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
///         Ok(773),
///     );
/// }
/// ```
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
#[macro_export]
macro_rules! decode_tokio {
    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
        $crate::decode_tokio!($decoder, $read, $parser, |input, _position| input)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
        $crate::decode_tokio!($decoder, $read, $parser, $input_stream, |x| x)
    };

    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
        match $decoder {
            ref mut decoder => match $read {
                ref mut read => 'outer: loop {
                    let (opt, removed) = {
                        let (state, position, buffer, end_of_input) = decoder.__inner();
                        let buffer =
                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
                        let mut stream = $crate::stream::call_with2(
                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
                            *position,
                            $input_stream,
                        );
                        let result = $crate::stream::decode($parser, &mut stream, state);
                        *position = $crate::stream::Positioned::position(&stream);
                        $crate::stream::call_with(stream, $post_decode);
                        match result {
                            Ok(x) => x,
                            Err(err) => {
                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
                            }
                        }
                    };

                    decoder.advance_pin(std::pin::Pin::new(read), removed);

                    if let Some(v) = opt {
                        break 'outer Ok(v);
                    }

                    match decoder
                        .__before_parse_tokio(std::pin::Pin::new(&mut *read))
                        .await
                    {
                        Ok(x) => x,
                        Err(error) => {
                            break 'outer Err($crate::stream::decoder::Error::Io {
                                error,
                                position: Clone::clone(decoder.position()),
                            })
                        }
                    };
                },
            },
        }
    };
}

#[doc(hidden)]
pub fn call_with2<F, A, B, R>(a: A, b: B, f: F) -> R
where
    F: FnOnce(A, B) -> R,
{
    f(a, b)
}

#[doc(hidden)]
pub fn call_with<F, A, R>(a: A, f: F) -> R
where
    F: FnOnce(A) -> R,
{
    f(a)
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    #[inline]
    fn uncons_range_at_end() {
        assert_eq!("".uncons_range(0), Ok(""));
        assert_eq!("123".uncons_range(3), Ok("123"));
        assert_eq!((&[1][..]).uncons_range(1), Ok(&[1][..]));
        let s: &[u8] = &[];
        assert_eq!(SliceStream(s).uncons_range(0), Ok(&[][..]));
    }

    #[test]
    fn larger_than_1_byte_items_return_correct_distance() {
        let mut input = &[123i32, 0i32][..];

        let before = input.checkpoint();
        assert_eq!(input.distance(&before), 0);

        input.uncons().unwrap();
        assert_eq!(input.distance(&before), 1);

        input.uncons().unwrap();
        assert_eq!(input.distance(&before), 2);

        input.reset(before).unwrap();
        assert_eq!(input.distance(&before), 0);
    }
}