import base64
from typing import Any, Dict, Optional
from urllib.parse import quote

import aiohttp
import requests
from langchain_core.utils import get_from_dict_or_env
from pydantic import BaseModel, ConfigDict, Field, model_validator


class DataForSeoAPIWrapper(BaseModel):
    """Wrapper around the DataForSeo API."""

    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        extra="forbid",
    )

    default_params: dict = Field(
        default={
            "location_name": "United States",
            "language_code": "en",
            "depth": 10,
            "se_name": "google",
            "se_type": "organic",
        }
    )
    """Default parameters to use for the DataForSEO SERP API."""
    params: dict = Field(default={})
    """Additional parameters to pass to the DataForSEO SERP API."""
    api_login: Optional[str] = None
    """The API login to use for the DataForSEO SERP API."""
    api_password: Optional[str] = None
    """The API password to use for the DataForSEO SERP API."""
    json_result_types: Optional[list] = None
    """The JSON result types."""
    json_result_fields: Optional[list] = None
    """The JSON result fields."""
    top_count: Optional[int] = None
    """The number of top results to return."""
    aiosession: Optional[aiohttp.ClientSession] = None
    """The aiohttp session to use for the DataForSEO SERP API."""

    @model_validator(mode="before")
    @classmethod
    def validate_environment(cls, values: Dict) -> Any:
        """Validate that login and password exists in environment."""
        login = get_from_dict_or_env(values, "api_login", "DATAFORSEO_LOGIN")
        password = get_from_dict_or_env(values, "api_password", "DATAFORSEO_PASSWORD")
        values["api_login"] = login
        values["api_password"] = password
        return values

    async def arun(self, url: str) -> str:
        """Run request to DataForSEO SERP API and parse result async."""
        return self._process_response(await self._aresponse_json(url))

    def run(self, url: str) -> str:
        """Run request to DataForSEO SERP API and parse result async."""
        return self._process_response(self._response_json(url))

    def results(self, url: str) -> list:
        res = self._response_json(url)
        return self._filter_results(res)

    async def aresults(self, url: str) -> list:
        res = await self._aresponse_json(url)
        return self._filter_results(res)

    def _prepare_request(self, keyword: str) -> dict:
        """Prepare the request details for the DataForSEO SERP API."""
        if self.api_login is None or self.api_password is None:
            raise ValueError("api_login or api_password is not provided")
        cred = base64.b64encode(
            f"{self.api_login}:{self.api_password}".encode("utf-8")
        ).decode("utf-8")
        headers = {"Authorization": f"Basic {cred}", "Content-Type": "application/json"}
        obj = {"keyword": quote(keyword)}
        obj = {**obj, **self.default_params, **self.params}
        data = [obj]
        _url = (
            f"https://api.dataforseo.com/v3/serp/{obj['se_name']}"
            f"/{obj['se_type']}/live/advanced"
        )
        return {
            "url": _url,
            "headers": headers,
            "data": data,
        }

    def _check_response(self, response: dict) -> dict:
        """Check the response from the DataForSEO SERP API for errors."""
        if response.get("status_code") != 20000:
            raise ValueError(
                f"Got error from DataForSEO SERP API: {response.get('status_message')}"
            )
        return response

    def _response_json(self, url: str) -> dict:
        """Use requests to run request to DataForSEO SERP API and return results."""
        request_details = self._prepare_request(url)
        response = requests.post(
            request_details["url"],
            headers=request_details["headers"],
            json=request_details["data"],
        )
        response.raise_for_status()
        return self._check_response(response.json())

    async def _aresponse_json(self, url: str) -> dict:
        """Use aiohttp to request DataForSEO SERP API and return results async."""
        request_details = self._prepare_request(url)
        if not self.aiosession:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    request_details["url"],
                    headers=request_details["headers"],
                    json=request_details["data"],
                ) as response:
                    res = await response.json()
        else:
            async with self.aiosession.post(
                request_details["url"],
                headers=request_details["headers"],
                json=request_details["data"],
            ) as response:
                res = await response.json()
        return self._check_response(res)

    def _filter_results(self, res: dict) -> list:
        output = []
        types = self.json_result_types if self.json_result_types is not None else []
        for task in res.get("tasks", []):
            for result in task.get("result", []):
                for item in result.get("items", []):
                    if len(types) == 0 or item.get("type", "") in types:
                        self._cleanup_unnecessary_items(item)
                        if len(item) != 0:
                            output.append(item)
                    if self.top_count is not None and len(output) >= self.top_count:
                        break
        return output

    def _cleanup_unnecessary_items(self, d: dict) -> dict:
        fields = self.json_result_fields if self.json_result_fields is not None else []
        if len(fields) > 0:
            for k, v in list(d.items()):
                if isinstance(v, dict):
                    self._cleanup_unnecessary_items(v)
                    if len(v) == 0:
                        del d[k]
                elif k not in fields:
                    del d[k]

        if "xpath" in d:
            del d["xpath"]
        if "position" in d:
            del d["position"]
        if "rectangle" in d:
            del d["rectangle"]
        for k, v in list(d.items()):
            if isinstance(v, dict):
                self._cleanup_unnecessary_items(v)
        return d

    def _process_response(self, res: dict) -> str:
        """Process response from DataForSEO SERP API."""
        toret = "No good search result found"
        for task in res.get("tasks", []):
            for result in task.get("result", []):
                item_types = result.get("item_types")
                items = result.get("items", [])
                if "answer_box" in item_types:
                    toret = next(
                        item for item in items if item.get("type") == "answer_box"
                    ).get("text")
                elif "knowledge_graph" in item_types:
                    toret = next(
                        item for item in items if item.get("type") == "knowledge_graph"
                    ).get("description")
                elif "featured_snippet" in item_types:
                    toret = next(
                        item for item in items if item.get("type") == "featured_snippet"
                    ).get("description")
                elif "shopping" in item_types:
                    toret = next(
                        item for item in items if item.get("type") == "shopping"
                    ).get("price")
                elif "organic" in item_types:
                    toret = next(
                        item for item in items if item.get("type") == "organic"
                    ).get("description")
                if toret:
                    break
        return toret
