import concurrent
import logging
import random
from pathlib import Path
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Type, Union

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_community.document_loaders.html_bs import BSHTMLLoader
from langchain_community.document_loaders.text import TextLoader
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader

FILE_LOADER_TYPE = Union[
    Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader], Type[CSVLoader]
]
logger = logging.getLogger(__name__)


def _is_visible(p: Path) -> bool:
    parts = p.parts
    for _p in parts:
        if _p.startswith("."):
            return False
    return True


class DirectoryLoader(BaseLoader):
    """Load from a directory."""

    def __init__(
        self,
        path: str,
        glob: Union[List[str], Tuple[str], str] = "**/[!.]*",
        silent_errors: bool = False,
        load_hidden: bool = False,
        loader_cls: FILE_LOADER_TYPE = UnstructuredFileLoader,
        loader_kwargs: Union[dict, None] = None,
        recursive: bool = False,
        show_progress: bool = False,
        use_multithreading: bool = False,
        max_concurrency: int = 4,
        *,
        exclude: Union[Sequence[str], str] = (),
        sample_size: int = 0,
        randomize_sample: bool = False,
        sample_seed: Union[int, None] = None,
    ):
        """Initialize with a path to directory and how to glob over it.

        Args:
            path: Path to directory.
            glob: A glob pattern or list of glob patterns to use to find files.
                Defaults to "**/[!.]*" (all files except hidden).
            exclude: A pattern or list of patterns to exclude from results.
                Use glob syntax.
            silent_errors: Whether to silently ignore errors. Defaults to False.
            load_hidden: Whether to load hidden files. Defaults to False.
            loader_cls: Loader class to use for loading files.
              Defaults to UnstructuredFileLoader.
            loader_kwargs: Keyword arguments to pass to loader_cls. Defaults to None.
            recursive: Whether to recursively search for files. Defaults to False.
            show_progress: Whether to show a progress bar. Defaults to False.
            use_multithreading: Whether to use multithreading. Defaults to False.
            max_concurrency: The maximum number of threads to use. Defaults to 4.
            sample_size: The maximum number of files you would like to load from the
                directory.
            randomize_sample: Shuffle the files to get a random sample.
            sample_seed: set the seed of the random shuffle for reproducibility.

        Examples:

            .. code-block:: python
                from langchain_community.document_loaders import DirectoryLoader

                # Load all non-hidden files in a directory.
                loader = DirectoryLoader("/path/to/directory")

                # Load all text files in a directory without recursion.
                loader = DirectoryLoader("/path/to/directory", glob="*.txt")

                # Recursively load all text files in a directory.
                loader = DirectoryLoader(
                    "/path/to/directory", glob="*.txt", recursive=True
                )

                # Load all files in a directory, except for py files.
                loader = DirectoryLoader("/path/to/directory", exclude="*.py")

                # Load all files in a directory, except for py or pyc files.
                loader = DirectoryLoader(
                    "/path/to/directory", exclude=["*.py", "*.pyc"]
                )
        """
        if loader_kwargs is None:
            loader_kwargs = {}
        if isinstance(exclude, str):
            exclude = (exclude,)
        self.path = path
        self.glob = glob
        self.exclude = exclude
        self.load_hidden = load_hidden
        self.loader_cls = loader_cls
        self.loader_kwargs = loader_kwargs
        self.silent_errors = silent_errors
        self.recursive = recursive
        self.show_progress = show_progress
        self.use_multithreading = use_multithreading
        self.max_concurrency = max_concurrency
        self.sample_size = sample_size
        self.randomize_sample = randomize_sample
        self.sample_seed = sample_seed

    def load(self) -> List[Document]:
        """Load documents."""
        return list(self.lazy_load())

    def lazy_load(self) -> Iterator[Document]:
        """Load documents lazily."""
        p = Path(self.path)
        if not p.exists():
            raise FileNotFoundError(f"Directory not found: '{self.path}'")
        if not p.is_dir():
            raise ValueError(f"Expected directory, got file: '{self.path}'")

        # glob multiple patterns if a list is provided, e.g., multiple file extensions
        if isinstance(self.glob, (list, tuple)):
            paths = []
            for pattern in self.glob:
                paths.extend(
                    list(p.rglob(pattern) if self.recursive else p.glob(pattern))
                )
        elif isinstance(self.glob, str):
            paths = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob))
        else:
            raise TypeError(
                f"Expected glob to be str or sequence of str, but got {type(self.glob)}"
            )

        items = [
            path
            for path in paths
            if not (self.exclude and any(path.match(glob) for glob in self.exclude))
            and path.is_file()
        ]

        if self.sample_size > 0:
            if self.randomize_sample:
                randomizer = random.Random(
                    self.sample_seed if self.sample_seed else None
                )
                randomizer.shuffle(items)
            items = items[: min(len(items), self.sample_size)]

        pbar = None
        if self.show_progress:
            try:
                from tqdm import tqdm

                pbar = tqdm(total=len(items))
            except ImportError as e:
                logger.warning(
                    "To log the progress of DirectoryLoader you need to install tqdm, "
                    "`pip install tqdm`"
                )
                if self.silent_errors:
                    logger.warning(e)
                else:
                    raise ImportError(
                        "To log the progress of DirectoryLoader "
                        "you need to install tqdm, "
                        "`pip install tqdm`"
                    )

        if self.use_multithreading:
            futures = []
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=self.max_concurrency
            ) as executor:
                for i in items:
                    futures.append(
                        executor.submit(
                            self._lazy_load_file_to_non_generator(self._lazy_load_file),
                            i,
                            p,
                            pbar,
                        )
                    )
                for future in concurrent.futures.as_completed(futures):
                    for item in future.result():
                        yield item
        else:
            for i in items:
                yield from self._lazy_load_file(i, p, pbar)

        if pbar:
            pbar.close()

    def _lazy_load_file_to_non_generator(self, func: Callable) -> Callable:
        def non_generator(item: Path, path: Path, pbar: Optional[Any]) -> List:
            return [x for x in func(item, path, pbar)]

        return non_generator

    def _lazy_load_file(
        self, item: Path, path: Path, pbar: Optional[Any]
    ) -> Iterator[Document]:
        """Load a file.

        Args:
            item: File path.
            path: Directory path.
            pbar: Progress bar. Defaults to None.

        """
        if item.is_file():
            if _is_visible(item.relative_to(path)) or self.load_hidden:
                try:
                    logger.debug(f"Processing file: {str(item)}")
                    loader = self.loader_cls(str(item), **self.loader_kwargs)
                    try:
                        for subdoc in loader.lazy_load():
                            yield subdoc
                    except NotImplementedError:
                        for subdoc in loader.load():
                            yield subdoc
                except Exception as e:
                    if self.silent_errors:
                        logger.warning(f"Error loading file {str(item)}: {e}")
                    else:
                        logger.error(f"Error loading file {str(item)}")
                        raise e
                finally:
                    if pbar:
                        pbar.update(1)
