# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A script which is run when the Streamlit package is executed."""

from __future__ import annotations

import os
import sys
from typing import TYPE_CHECKING, Any, Callable, TypeVar

# We cannot lazy-load click here because its used via decorators.
import click

import streamlit.runtime.caching as caching
import streamlit.web.bootstrap as bootstrap
from streamlit import config as _config
from streamlit.runtime.credentials import Credentials, check_credentials
from streamlit.web.cache_storage_manager_config import (
    create_default_cache_storage_manager,
)

if TYPE_CHECKING:
    from streamlit.config_option import ConfigOption

ACCEPTED_FILE_EXTENSIONS = ("py", "py3")

LOG_LEVELS = ("error", "warning", "info", "debug")


def _convert_config_option_to_click_option(
    config_option: ConfigOption,
) -> dict[str, Any]:
    """Composes given config option options as options for click lib."""
    option = f"--{config_option.key}"
    param = config_option.key.replace(".", "_")
    description = config_option.description
    if config_option.deprecated:
        if description is None:
            description = ""
        description += (
            f"\n {config_option.deprecation_text} - {config_option.expiration_date}"
        )

    return {
        "param": param,
        "description": description,
        "type": config_option.type,
        "option": option,
        "envvar": config_option.env_var,
        "multiple": config_option.multiple,
    }


def _make_sensitive_option_callback(config_option: ConfigOption):
    def callback(_ctx: click.Context, _param: click.Parameter, cli_value) -> None:
        if cli_value is None:
            return None
        raise SystemExit(
            f"Setting {config_option.key!r} option using the CLI flag is not allowed. "
            f"Set this option in the configuration file or environment "
            f"variable: {config_option.env_var!r}"
        )

    return callback


F = TypeVar("F", bound=Callable[..., Any])


def configurator_options(func: F) -> F:
    """Decorator that adds config param keys to click dynamically."""
    for _, value in reversed(_config._config_options_template.items()):
        parsed_parameter = _convert_config_option_to_click_option(value)
        if value.sensitive:
            # Display a warning if the user tries to set sensitive
            # options using the CLI and exit with non-zero code.
            click_option_kwargs = {
                "expose_value": False,
                "hidden": True,
                "is_eager": True,
                "callback": _make_sensitive_option_callback(value),
            }
        else:
            click_option_kwargs = {
                "show_envvar": True,
                "envvar": parsed_parameter["envvar"],
            }
        config_option = click.option(
            parsed_parameter["option"],
            parsed_parameter["param"],
            help=parsed_parameter["description"],
            type=parsed_parameter["type"],
            multiple=parsed_parameter["multiple"],
            **click_option_kwargs,
        )
        func = config_option(func)
    return func


def _download_remote(main_script_path: str, url_path: str) -> None:
    """Fetch remote file at url_path to main_script_path"""
    import requests

    with open(main_script_path, "wb") as fp:
        try:
            resp = requests.get(url_path)
            resp.raise_for_status()
            fp.write(resp.content)
        except requests.exceptions.RequestException as e:
            raise click.BadParameter(f"Unable to fetch {url_path}.\n{e}")


@click.group(context_settings={"auto_envvar_prefix": "STREAMLIT"})
@click.option("--log_level", show_default=True, type=click.Choice(LOG_LEVELS))
@click.version_option(prog_name="Streamlit")
def main(log_level="info"):
    """Try out a demo with:

        $ streamlit hello

    Or use the line below to run your own script:

        $ streamlit run your_script.py
    """

    if log_level:
        from streamlit.logger import get_logger

        LOGGER = get_logger(__name__)
        LOGGER.warning(
            "Setting the log level using the --log_level flag is unsupported."
            "\nUse the --logger.level flag (after your streamlit command) instead."
        )


@main.command("help")
def help():
    """Print this help message."""
    # We use _get_command_line_as_string to run some error checks but don't do
    # anything with its return value.
    _get_command_line_as_string()

    assert len(sys.argv) == 2  # This is always true, but let's assert anyway.

    # Pretend user typed 'streamlit --help' instead of 'streamlit help'.
    sys.argv[1] = "--help"
    main(prog_name="streamlit")


@main.command("version")
def main_version():
    """Print Streamlit's version number."""
    # Pretend user typed 'streamlit --version' instead of 'streamlit version'
    import sys

    # We use _get_command_line_as_string to run some error checks but don't do
    # anything with its return value.
    _get_command_line_as_string()

    assert len(sys.argv) == 2  # This is always true, but let's assert anyway.
    sys.argv[1] = "--version"
    main()


@main.command("docs")
def main_docs():
    """Show help in browser."""
    click.echo("Showing help page in browser...")
    from streamlit import cli_util

    cli_util.open_browser("https://docs.streamlit.io")


@main.command("hello")
@configurator_options
def main_hello(**kwargs):
    """Runs the Hello World script."""
    from streamlit.hello import streamlit_app

    bootstrap.load_config_options(flag_options=kwargs)
    filename = streamlit_app.__file__
    _main_run(filename, flag_options=kwargs)


@main.command("run")
@configurator_options
@click.argument("target", required=True, envvar="STREAMLIT_RUN_TARGET")
@click.argument("args", nargs=-1)
def main_run(target: str, args=None, **kwargs):
    """Run a Python script, piping stderr to Streamlit.

    The script can be local or it can be an url. In the latter case, Streamlit
    will download the script to a temporary file and runs this file.

    """
    from streamlit import url_util

    bootstrap.load_config_options(flag_options=kwargs)

    _, extension = os.path.splitext(target)
    if extension[1:] not in ACCEPTED_FILE_EXTENSIONS:
        if extension[1:] == "":
            raise click.BadArgumentUsage(
                "Streamlit requires raw Python (.py) files, but the provided file has no extension.\nFor more information, please see https://docs.streamlit.io"
            )
        else:
            raise click.BadArgumentUsage(
                f"Streamlit requires raw Python (.py) files, not {extension}.\nFor more information, please see https://docs.streamlit.io"
            )

    if url_util.is_url(target):
        from streamlit.temporary_directory import TemporaryDirectory

        with TemporaryDirectory() as temp_dir:
            from urllib.parse import urlparse

            path = urlparse(target).path
            main_script_path = os.path.join(
                temp_dir, path.strip("/").rsplit("/", 1)[-1]
            )
            # if this is a GitHub/Gist blob url, convert to a raw URL first.
            target = url_util.process_gitblob_url(target)
            _download_remote(main_script_path, target)
            _main_run(main_script_path, args, flag_options=kwargs)
    else:
        if not os.path.exists(target):
            raise click.BadParameter(f"File does not exist: {target}")
        _main_run(target, args, flag_options=kwargs)


def _get_command_line_as_string() -> str | None:
    import subprocess

    parent = click.get_current_context().parent
    if parent is None:
        return None

    if "streamlit.cli" in parent.command_path:
        raise RuntimeError(
            "Running streamlit via `python -m streamlit.cli <command>` is"
            " unsupported. Please use `python -m streamlit <command>` instead."
        )

    cmd_line_as_list = [parent.command_path]
    cmd_line_as_list.extend(sys.argv[1:])
    return subprocess.list2cmdline(cmd_line_as_list)


def _main_run(
    file,
    args: list[str] | None = None,
    flag_options: dict[str, Any] | None = None,
) -> None:
    if args is None:
        args = []

    if flag_options is None:
        flag_options = {}

    is_hello = _get_command_line_as_string() == "streamlit hello"

    check_credentials()

    bootstrap.run(file, is_hello, args, flag_options)


# SUBCOMMAND: cache


@main.group("cache")
def cache():
    """Manage the Streamlit cache."""
    pass


@cache.command("clear")
def cache_clear():
    """Clear st.cache_data and st.cache_resource caches."""

    # in this `streamlit cache clear` cli command we cannot use the
    # `cache_storage_manager from runtime (since runtime is not initialized)
    # so we create a new cache_storage_manager instance that used in runtime,
    # and call clear_all() method for it.
    # This will not remove the in-memory cache.
    cache_storage_manager = create_default_cache_storage_manager()
    cache_storage_manager.clear_all()
    caching.cache_resource.clear()


# SUBCOMMAND: config


@main.group("config")
def config():
    """Manage Streamlit's config settings."""
    pass


@config.command("show")
@configurator_options
def config_show(**kwargs):
    """Show all of Streamlit's config settings."""

    bootstrap.load_config_options(flag_options=kwargs)

    _config.show_config()


# SUBCOMMAND: activate


@main.group("activate", invoke_without_command=True)
@click.pass_context
def activate(ctx):
    """Activate Streamlit by entering your email."""
    if not ctx.invoked_subcommand:
        Credentials.get_current().activate()


@activate.command("reset")
def activate_reset():
    """Reset Activation Credentials."""
    Credentials.get_current().reset()


# SUBCOMMAND: test


@main.group("test", hidden=True)
def test():
    """Internal-only commands used for testing.

    These commands are not included in the output of `streamlit help`.
    """
    pass


@test.command("prog_name")
def test_prog_name():
    """Assert that the program name is set to `streamlit test`.

    This is used by our cli-smoke-tests to verify that the program name is set
    to `streamlit ...` whether the streamlit binary is invoked directly or via
    `python -m streamlit ...`.
    """
    # We use _get_command_line_as_string to run some error checks but don't do
    # anything with its return value.
    _get_command_line_as_string()

    parent = click.get_current_context().parent

    assert parent is not None
    assert parent.command_path == "streamlit test"


if __name__ == "__main__":
    main()
