from __future__ import annotations

import numbers
import subprocess
from io import BufferedReader, BytesIO, TextIOWrapper
from tempfile import SpooledTemporaryFile
from time import sleep
from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast

import emoji
import psutil

from unstructured.documents.coordinates import CoordinateSystem, PixelSpace
from unstructured.documents.elements import (
    TYPE_TO_TEXT_ELEMENT_MAP,
    CheckBox,
    CoordinatesMetadata,
    Element,
    ElementMetadata,
    ElementType,
    ListItem,
    PageBreak,
    Text,
)
from unstructured.logger import logger
from unstructured.nlp.patterns import ENUMERATED_BULLETS_RE, UNICODE_BULLETS_RE

if TYPE_CHECKING:
    from unstructured_inference.inference.layout import PageLayout
    from unstructured_inference.inference.layoutelement import LayoutElement


def normalize_layout_element(
    layout_element: LayoutElement | Element | dict[str, Any],
    coordinate_system: Optional[CoordinateSystem] = None,
    infer_list_items: bool = True,
    source_format: Optional[str] = "html",
) -> Element | list[Element]:
    """Converts an unstructured_inference LayoutElement object to an unstructured Element."""

    if isinstance(layout_element, Element) and source_format == "html":
        return layout_element

    # NOTE(alan): Won't the lines above ensure this never runs (PageBreak is a subclass of Element)?
    if isinstance(layout_element, PageBreak):
        return PageBreak(text="")

    if not isinstance(layout_element, dict):
        layout_dict = layout_element.to_dict()
    else:
        layout_dict = layout_element

    text = layout_dict.get("text", "")
    # Both `coordinates` and `coordinate_system` must be present
    # in order to add coordinates metadata to the element.
    coordinates = layout_dict.get("coordinates")
    element_type = layout_dict.get("type")
    prob = layout_dict.get("prob")
    aux_origin = layout_dict.get("source", None)
    origin = None
    if aux_origin:
        origin = aux_origin.value
    if prob and isinstance(prob, (int, str, float, numbers.Number)):
        class_prob_metadata = ElementMetadata(detection_class_prob=float(prob))  # type: ignore
    else:
        class_prob_metadata = ElementMetadata()
    common_kwargs = {
        "coordinates": coordinates,
        "coordinate_system": coordinate_system,
        "metadata": class_prob_metadata,
        "detection_origin": origin,
    }
    if element_type == ElementType.LIST:
        if infer_list_items:
            return layout_list_to_list_items(
                text,
                **common_kwargs,
            )
        else:
            return ListItem(
                text=text,
                **common_kwargs,
            )

    elif element_type in TYPE_TO_TEXT_ELEMENT_MAP:
        assert isinstance(element_type, str)  # Added to resolve type-error
        _element_class = TYPE_TO_TEXT_ELEMENT_MAP[element_type]
        _element_class = _element_class(
            text=text,
            **common_kwargs,
        )
        if element_type == ElementType.HEADLINE:
            _element_class.metadata.category_depth = 1
        elif element_type == ElementType.SUB_HEADLINE:
            _element_class.metadata.category_depth = 2
        return _element_class
    elif element_type in [
        ElementType.CHECK_BOX_CHECKED,
        ElementType.CHECK_BOX_UNCHECKED,
        ElementType.RADIO_BUTTON_CHECKED,
        ElementType.RADIO_BUTTON_UNCHECKED,
        ElementType.CHECKED,
        ElementType.UNCHECKED,
    ]:
        checked = element_type in [
            ElementType.CHECK_BOX_CHECKED,
            ElementType.RADIO_BUTTON_CHECKED,
            ElementType.CHECKED,
        ]
        return CheckBox(
            checked=checked,
            **common_kwargs,
        )
    else:
        return Text(
            text=text,
            **common_kwargs,
        )


def layout_list_to_list_items(
    text: Optional[str],
    coordinates: Optional[tuple[tuple[float, float], ...]],
    coordinate_system: Optional[CoordinateSystem],
    metadata: Optional[ElementMetadata],
    detection_origin: Optional[str],
) -> list[Element]:
    """Converts a list LayoutElement to a list of ListItem elements."""
    split_items = ENUMERATED_BULLETS_RE.split(text) if text else []
    # NOTE(robinson) - this means there wasn't a match for the enumerated bullets
    if len(split_items) == 1:
        split_items = UNICODE_BULLETS_RE.split(text) if text else []

    list_items: list[Element] = []
    for text_segment in split_items:
        if len(text_segment.strip()) > 0:
            # Both `coordinates` and `coordinate_system` must be present
            # in order to add coordinates metadata to the element.
            item = ListItem(
                text=text_segment.strip(),
                coordinates=coordinates,
                coordinate_system=coordinate_system,
                metadata=metadata,
                detection_origin=detection_origin,
            )
            list_items.append(item)

    return list_items


def add_element_metadata(
    element: Element,
    filename: Optional[str] = None,
    filetype: Optional[str] = None,
    page_number: Optional[int] = None,
    url: Optional[str] = None,
    text_as_html: Optional[str] = None,
    coordinates: Optional[tuple[tuple[float, float], ...]] = None,
    coordinate_system: Optional[CoordinateSystem] = None,
    image_path: Optional[str] = None,
    detection_origin: Optional[str] = None,
    languages: Optional[list[str]] = None,
    **kwargs: Any,
) -> Element:
    """Adds document metadata to the document element.

    Document metadata includes information like the filename, source url, and page number.
    """

    coordinates_metadata = (
        CoordinatesMetadata(
            points=coordinates,
            system=coordinate_system,
        )
        if coordinates is not None and coordinate_system is not None
        else None
    )
    links = element.links if hasattr(element, "links") and len(element.links) > 0 else None
    link_urls = [link.get("url") for link in links] if links else None
    link_texts = [link.get("text") for link in links] if links else None
    link_start_indexes = [link.get("start_index") for link in links] if links else None
    emphasized_texts = (
        element.emphasized_texts
        if hasattr(element, "emphasized_texts") and len(element.emphasized_texts) > 0
        else None
    )
    emphasized_text_contents = (
        [emphasized_text.get("text") for emphasized_text in emphasized_texts]
        if emphasized_texts
        else None
    )
    emphasized_text_tags = (
        [emphasized_text.get("tag") for emphasized_text in emphasized_texts]
        if emphasized_texts
        else None
    )
    depth = element.metadata.category_depth if element.metadata.category_depth else None

    metadata = ElementMetadata(
        coordinates=coordinates_metadata,
        filename=filename,
        filetype=filetype,
        page_number=page_number,
        url=url,
        text_as_html=text_as_html,
        link_urls=link_urls,
        link_texts=link_texts,
        link_start_indexes=link_start_indexes,
        emphasized_text_contents=emphasized_text_contents,
        emphasized_text_tags=emphasized_text_tags,
        category_depth=depth,
        image_path=image_path,
        languages=languages,
    )
    element.metadata.update(metadata)
    if detection_origin is not None:
        element.metadata.detection_origin = detection_origin
    return element


def remove_element_metadata(layout_elements: list[Element]) -> list[Element]:
    """Removes document metadata from the document element.

    Document metadata includes information like the filename, source url, and page number.
    """
    elements: list[Element] = []
    metadata = ElementMetadata()
    for layout_element in layout_elements:
        element = normalize_layout_element(layout_element)
        if isinstance(element, list):
            for _element in element:
                _element.metadata = metadata
            elements.extend(element)
        else:
            element.metadata = metadata
            elements.append(element)
    return elements


def _is_soffice_running():
    for proc in psutil.process_iter():
        try:
            if "soffice" in proc.name().lower():
                return True
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            pass
    return False


def convert_office_doc(
    input_filename: str,
    output_directory: str,
    target_format: str = "docx",
    target_filter: Optional[str] = None,
    wait_for_soffice_ready_time_out: int = 10,
):
    """Converts a .doc/.ppt file to a .docx/.pptx file using the libreoffice CLI.

    Parameters
    ----------
    input_filename: str
        The name of the .doc file to convert to .docx
    output_directory: str
        The output directory for the convert .docx file
    target_format: str
        The desired output format
    target_filter: str
        The output filter name to use when converting. See references below
        for details.
    wait_for_soffice_ready_time_out: int
        The max wait time in seconds for soffice to become available to run

    References
    ----------
    https://stackoverflow.com/questions/52277264/convert-doc-to-docx-using-soffice-not-working
    https://git.libreoffice.org/core/+/refs/heads/master/filter/source/config/fragments/filters

    """
    if target_filter is not None:
        target_format = f"{target_format}:{target_filter}"
    # NOTE(robinson) - In the future can also include win32com client as a fallback for windows
    # users who do not have LibreOffice installed
    # ref: https://stackoverflow.com/questions/38468442/
    #       multiple-doc-to-docx-file-conversion-using-python
    command = [
        "soffice",
        "--headless",
        "--convert-to",
        target_format,
        "--outdir",
        output_directory,
        input_filename,
    ]
    try:
        # only one soffice process can be ran
        wait_time = 0
        sleep_time = 0.1
        output = subprocess.run(command, capture_output=True)
        message = output.stdout.decode().strip()
        # we can't rely on returncode unfortunately because on macOS it would return 0 even when the
        # command failed to run; instead we have to rely on the stdout being empty as a sign of the
        # process failed
        while (wait_time < wait_for_soffice_ready_time_out) and (message == ""):
            wait_time += sleep_time
            if _is_soffice_running():
                sleep(sleep_time)
            else:
                output = subprocess.run(command, capture_output=True)
                message = output.stdout.decode().strip()
    except FileNotFoundError:
        raise FileNotFoundError(
            """soffice command was not found. Please install libreoffice
on your system and try again.

- Install instructions: https://www.libreoffice.org/get-help/install-howto/
- Mac: https://formulae.brew.sh/cask/libreoffice
- Debian: https://wiki.debian.org/LibreOffice""",
        )

    logger.info(message)
    if output.returncode != 0 or message == "":
        logger.error(
            "soffice failed to convert to format %s with code %i", target_format, output.returncode
        )
        logger.error(output.stderr.decode().strip())


def exactly_one(**kwargs: Any) -> None:
    """
    Verify arguments; exactly one of all keyword arguments must not be None.

    Example:
        >>> exactly_one(filename=filename, file=file, text=text, url=url)
    """
    if sum([(arg is not None and arg != "") for arg in kwargs.values()]) != 1:
        names = list(kwargs.keys())
        if len(names) > 1:
            message = f"Exactly one of {', '.join(names[:-1])} and {names[-1]} must be specified."
        else:
            message = f"{names[0]} must be specified."
        raise ValueError(message)


_T = TypeVar("_T")


def spooled_to_bytes_io_if_needed(file: _T | SpooledTemporaryFile[bytes]) -> _T | BytesIO:
    """Convert `file` to `BytesIO` when it is a `SpooledTemporaryFile`.

    Note that `file` does not need to be IO[bytes]. It can be `None` or `bytes` and this function
    will not complain.

    In Python <3.11, `SpooledTemporaryFile` does not implement `.readable()` or `.seekable()` which
    triggers an exception when the file is loaded by certain packages. In particular, the stdlib
    `zipfile.Zipfile` raises on opening a `SpooledTemporaryFile` as does `Pandas.read_csv()`.
    """
    if isinstance(file, SpooledTemporaryFile):
        file.seek(0)
        return BytesIO(cast(bytes, file.read()))

    # -- return `file` unchanged otherwise --
    return file


def convert_to_bytes(file: bytes | IO[bytes]) -> bytes:
    """Extract the bytes from `file` without preventing it from being read again later.

    As a convenience to simplify client code, also returns `file` unchanged if it is already bytes.
    """
    if isinstance(file, bytes):
        return file

    if isinstance(file, SpooledTemporaryFile):
        file.seek(0)
        f_bytes = file.read()
        file.seek(0)
        return f_bytes

    if isinstance(file, BytesIO):
        return file.getvalue()

    if isinstance(file, (TextIOWrapper, BufferedReader)):
        with open(file.name, "rb") as f:
            return f.read()

    raise ValueError("Invalid file-like object type")


def contains_emoji(s: str) -> bool:
    """
    Check if the input string contains any emoji characters.

    Parameters:
    - s (str): The input string to check.

    Returns:
    - bool: True if the string contains any emoji, False otherwise.
    """

    return bool(emoji.emoji_count(s))


def get_page_image_metadata(page: PageLayout) -> dict[str, Any]:
    """Retrieve image metadata and coordinate system from a page."""

    image = getattr(page, "image", None)
    image_metadata = getattr(page, "image_metadata", None)

    if image:
        image_format = image.format
        image_width = image.width
        image_height = image.height
    elif image_metadata:
        image_format = image_metadata.get("format")
        image_width = image_metadata.get("width")
        image_height = image_metadata.get("height")
    else:
        image_format = None
        image_width = None
        image_height = None

    return {
        "format": image_format,
        "width": image_width,
        "height": image_height,
    }


def ocr_data_to_elements(
    ocr_data: list["LayoutElement"],
    image_size: tuple[int | float, int | float],
    common_metadata: Optional[ElementMetadata] = None,
    infer_list_items: bool = True,
    source_format: Optional[str] = None,
) -> list[Element]:
    """Convert OCR layout data into `unstructured` elements with associated metadata."""

    image_width, image_height = image_size
    coordinate_system = PixelSpace(width=image_width, height=image_height)
    elements: list[Element] = []
    for layout_element in ocr_data:
        element = normalize_layout_element(
            layout_element,
            coordinate_system=coordinate_system,
            infer_list_items=infer_list_items,
            source_format=source_format if source_format else "html",
        )

        if common_metadata:
            element.metadata.update(common_metadata)

        elements.append(element)

    return elements
