# mypy: allow-untyped-defs
# Copyright (c) Meta Platforms, Inc. and affiliates
import logging
import math
import threading
from functools import reduce
from itertools import chain
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING, Union

import torch
from torch.distributed import is_available
from torch.utils._typing_utils import not_none


__all__ = ["init_device_mesh", "DeviceMesh"]


if not is_available():
    import sys

    # We need to create the stubs when distributed is not available.
    # Otherwise, we would fail the doc tests (```./.ci/pytorch/docs-test.sh```),
    # since it would try to import ``torch.distributed.device_mesh`` or
    # ``torch.distributed.init_device_mesh`` but cannot find them.

    class _DeviceMeshStub:
        pass

    def _init_device_mesh_stub():
        pass

    sys.modules["torch.distributed.device_mesh"].DeviceMesh = _DeviceMeshStub  # type: ignore[attr-defined]
    sys.modules[
        "torch.distributed.device_mesh"
    ].init_device_mesh = _init_device_mesh_stub  # type: ignore[attr-defined]


else:
    from torch.distributed.distributed_c10d import (
        _find_pg_by_ranks_and_tag,
        _get_default_group,
        _get_group_tag,
        get_backend,
        get_process_group_ranks,
        get_rank,
        get_world_size,
        init_process_group,
        is_initialized,
        new_group,
        ProcessGroup,
    )

    logger = logging.getLogger(__name__)

    # only import numpy typing when type checking
    if TYPE_CHECKING:
        try:
            from numpy.typing import ArrayLike
        except ImportError:
            logger.warning(
                "DeviceMesh requires numpy >= 1.21 to be installed for type checking"
            )

    class _MeshEnv(threading.local):
        def __init__(self) -> None:
            self.mesh_stack: List[DeviceMesh] = []
            self.child_to_root_mapping: Dict[DeviceMesh, DeviceMesh] = {}
            self.mesh_dim_group_options: Dict[
                int, Tuple[str, Optional[ProcessGroup.Options]]
            ] = {}
            self.root_to_flatten_mapping: Dict[DeviceMesh, Dict[str, DeviceMesh]] = {}
            # Record flatten mesh name to its mesh dim index in root mesh.
            self.flatten_name_to_root_dims: Dict[
                DeviceMesh, Dict[str, Tuple[int, ...]]
            ] = {}

        def get_current_mesh(self) -> "DeviceMesh":
            if len(self.mesh_stack) == 0:
                raise RuntimeError("No device mesh is currently active!")
            return self.mesh_stack[-1]

        def create_sub_mesh(
            self,
            device_mesh: "DeviceMesh",
            submesh_dim_names: Tuple[str, ...],
            submesh_dims: List[Tuple[int, ...]],
        ) -> "DeviceMesh":
            # Get the submesh dim size from the submesh_dims.
            # For example, if we have a 3D mesh with mesh_shape (2, 2, 2) mesh_dim_names ("dp", "cp", "tp") and we want
            # to slice out mesh["dp_cp"], then submesh_dims = [(0, 1), (2,)] and submesh_dim_size = [2 * 2, 2] = [4, 2].
            # If we want to slice out mesh["dp", "cp"], then submesh_dims = [(0,), (1,)] and submesh_dim_size = [2, 2].
            slice_dim_size = [
                reduce(
                    lambda x, y: device_mesh.mesh.size(x) * device_mesh.mesh.size(y),
                    mesh_dim,
                )
                if len(mesh_dim) > 1
                else device_mesh.mesh.size(mesh_dim[0])
                for mesh_dim in submesh_dims
            ]

            mesh_tensor = device_mesh.mesh
            # slice_dim_idx could be differnt from submesh_dims, as we may need to flatten out some dims.
            slice_dim_idx = []
            slice_dim_group_info = []
            # keep track of the number of dims that have been flattened so we can get the correct slice_dim_idx in the
            # flattened mesh tensor.
            num_dims_flatten = 0
            for mesh_dim_indices, mesh_dim_name in zip(submesh_dims, submesh_dim_names):
                # Currently, this only allows slicing out a contiguous flattened dim.
                # TODO: we need to handle reconstructing a non-contiguous flattened dim.
                if len(mesh_dim_indices) > 1:
                    # We need to move the start_dim and end_dim to the left if some dims are already flattened.
                    mesh_tensor = mesh_tensor.flatten(
                        start_dim=mesh_dim_indices[0] - num_dims_flatten,
                        end_dim=mesh_dim_indices[-1] - num_dims_flatten,
                    )
                    # If some dims are already flattened, we need to adjust the slice_dim_idx accordingly.
                    # For example, if the submesh_dims = [(0, 1), (2,), (3, 4)] with 0-1 flattened and 3-4 flattened,
                    # then the final slice_dim_idx should be [0, 1, 2].
                    slice_dim_idx.append(mesh_dim_indices[0] - num_dims_flatten)
                    num_dims_flatten += len(mesh_dim_indices) - 1
                    slice_dim_group_info.append(
                        self.root_to_flatten_mapping[device_mesh][
                            mesh_dim_name
                        ]._dim_group_infos[0]
                    )
                else:
                    slice_dim_idx.append(mesh_dim_indices[0] - num_dims_flatten)
                    slice_dim_group_info.append(
                        device_mesh._dim_group_infos[mesh_dim_indices[0]]
                    )

            # mesh_tensor has already been flattened if needed. So mesh_tensor.ndim <= device_mesh.mesh.ndim now.
            mesh_dims_remained_idx = list(range(mesh_tensor.ndim))
            for idx in slice_dim_idx:
                mesh_dims_remained_idx.remove(idx)

            # pg_ranks_by_dim is the size of [number of local ranks of the outermost submesh dimension, *slice_dim_idx]
            # This means on each local rank of the outermost slice mesh dim, we have a tensor of submesh size with
            # the pg ranks of the submesh. From this, we can extract the submesh mesh tensor contains the current rank.
            pg_ranks_by_dim = mesh_tensor.permute(
                *mesh_dims_remained_idx, *slice_dim_idx
            ).reshape(-1, *slice_dim_size)

            cur_rank = device_mesh.get_rank()
            for mesh_nd in pg_ranks_by_dim:
                submesh = DeviceMesh(
                    device_mesh.device_type,
                    mesh_nd,
                    mesh_dim_names=submesh_dim_names,
                    _init_backend=False,
                )
                if cur_rank in mesh_nd:
                    res_submesh = submesh

            res_submesh._dim_group_infos = slice_dim_group_info  # type: ignore[possibly-undefined]
            self.child_to_root_mapping[res_submesh] = device_mesh

            return res_submesh

        def create_flatten_mesh(
            self, device_mesh: "DeviceMesh", mesh_dim_name: Optional[str] = None
        ) -> "DeviceMesh":
            root_mesh = _mesh_resources.get_root_mesh(device_mesh)

            flatten_dims_in_root = [
                not_none(root_mesh.mesh_dim_names).index(flattened_mesh_dim_name)
                for flattened_mesh_dim_name in not_none(device_mesh.mesh_dim_names)
            ]

            if not mesh_dim_name:
                mesh_dim_name = "_".join(
                    [
                        not_none(root_mesh.mesh_dim_names)[dim]
                        for dim in flatten_dims_in_root
                    ]
                )

            # Check whether the mesh_dim_name for flattened mesh is valid.
            self.flatten_name_to_root_dims.setdefault(root_mesh, {})
            invalid_dim_names = chain(
                *list(not_none(root_mesh.mesh_dim_names)),
                *self.flatten_name_to_root_dims[root_mesh].keys(),
            )
            if mesh_dim_name in invalid_dim_names:
                raise RuntimeError(
                    f"{mesh_dim_name} already exists for submesh of the {root_mesh}. ",
                    f"The mesh_dim_names of submesh and flattened mesh are {invalid_dim_names}. "
                    f"Please specify another valid mesh_dim_name.",
                )

            # Quick return if the flatten mesh has been created before.
            # TODO: If we decide to restrict flatten initialization once, we should remove
            # this check and throw an error if the flatten mesh is already created before.
            if (
                root_mesh in self.root_to_flatten_mapping
                and mesh_dim_name in self.root_to_flatten_mapping[root_mesh]
            ):
                return self.root_to_flatten_mapping[root_mesh][mesh_dim_name]

            flattened_mesh_dim_size = math.prod(device_mesh.mesh.size())

            remained_dims_in_root = list(range(root_mesh.mesh.ndim))
            for flatten_dim_in_root in flatten_dims_in_root:
                remained_dims_in_root.remove(flatten_dim_in_root)

            pg_ranks_by_dim = root_mesh.mesh.permute(
                *remained_dims_in_root, *flatten_dims_in_root
            ).reshape(-1, flattened_mesh_dim_size)

            cur_rank = root_mesh.get_rank()
            for mesh_nd in pg_ranks_by_dim:
                # need to init backend here since the flattened pg doesn't exist in root mesh.
                flattened_mesh = DeviceMesh(
                    root_mesh.device_type,
                    mesh_nd,
                    mesh_dim_names=(mesh_dim_name,),
                )
                if cur_rank in mesh_nd:
                    res_flattened_mesh = flattened_mesh
            self.child_to_root_mapping[res_flattened_mesh] = root_mesh  # type: ignore[possibly-undefined]
            self.root_to_flatten_mapping.setdefault(root_mesh, {})[mesh_dim_name] = res_flattened_mesh  # type: ignore[possibly-undefined]
            self.flatten_name_to_root_dims[root_mesh][mesh_dim_name] = tuple(flatten_dims_in_root)  # type: ignore[possibly-undefined]

            return res_flattened_mesh

        def get_root_mesh(self, device_mesh: "DeviceMesh") -> "DeviceMesh":
            # If a mesh could not be found in the child_to_root_mapping, it is a root mesh itself.
            # A root mesh is not created through slicing.
            # We considers the root mesh of a root mesh is itself.
            root_mesh = self.child_to_root_mapping.get(device_mesh, None)
            return device_mesh if not root_mesh else root_mesh

        def get_root_mesh_dim(self, device_mesh: "DeviceMesh") -> Optional[int]:
            """
            Returns the index of the mesh dim in the root mesh.
            The device_mesh passed in needs to be sliced out from the root mesh
            or submesh of the root mesh.
            """
            root_mesh = self.get_root_mesh(device_mesh)
            child_mesh_dim_names = device_mesh.mesh_dim_names
            if root_mesh and child_mesh_dim_names:
                assert (
                    len(child_mesh_dim_names) == 1
                ), "The submesh can only be a 1D mesh."
                child_mesh_dim_name = child_mesh_dim_names[0]
                return self.get_mesh_dim_by_name(root_mesh, child_mesh_dim_name)
            return None

        @staticmethod
        def num_devices_per_host(device_type: str) -> int:
            return _get_device_handle(device_type).device_count()

        @staticmethod
        def num_hosts(device_type: str) -> int:
            # ProcessGroup can't tell us this info so we have to infer it, assume
            # homogeneous hardware for now
            return get_world_size() // _MeshEnv.num_devices_per_host(device_type)

        def get_mesh_dim_by_name(
            self, device_mesh: "DeviceMesh", mesh_dim_name: str
        ) -> int:
            if (
                device_mesh.mesh_dim_names is None
                or len(device_mesh.mesh_dim_names) == 0
            ):
                raise KeyError(
                    "No `mesh_dim_names` found.",
                )
            if mesh_dim_name not in device_mesh.mesh_dim_names:
                raise KeyError(
                    f"Mesh dimension '{mesh_dim_name}' does not exist.",
                    f"Available mesh dimensions are: mesh_dim_names={device_mesh.mesh_dim_names}",
                )
            return not_none(device_mesh.mesh_dim_names.index(mesh_dim_name))

        def _set_mesh_dim_group_options(
            self,
            dim: int,
            backend: str,
            pg_options: Optional[ProcessGroup.Options] = None,
        ) -> None:
            self.mesh_dim_group_options[dim] = (backend, pg_options)

        def _get_slice_mesh_dims(
            self, device_mesh, mesh_dim_names
        ) -> List[Tuple[int, ...]]:
            """
            Validate whether the mesh_dim_names is valid for slicing the given device_mesh.
            If valid, return dim indexes of the slice mesh in the device mesh.
            """
            if device_mesh != self.get_root_mesh(device_mesh):
                raise RuntimeError("Cannot create a submesh from a submesh.")

            # The slice mesh_dim_names should consist either the device_mesh's mesh_dim_names
            # or its flattened mesh's mesh_dim_names.
            self.flatten_name_to_root_dims.setdefault(device_mesh, {})
            flatten_name_to_root_dims = self.flatten_name_to_root_dims[device_mesh]
            valid_mesh_dim_names = [
                *device_mesh.mesh_dim_names,
                *flatten_name_to_root_dims,
            ]

            if not all(
                mesh_dim_name in valid_mesh_dim_names
                for mesh_dim_name in mesh_dim_names
            ):
                raise KeyError(
                    f"Invalid mesh_dim_names {mesh_dim_names} specified. "
                    f"Valid mesh_dim_names are {valid_mesh_dim_names}."
                )

            # Validate the order of the slice mesh dim indices.
            # This needs to be in ascending order.
            curr_idx = -1
            slice_mesh_dims = []
            for mesh_dim_name in mesh_dim_names:
                if mesh_dim_name in flatten_name_to_root_dims:
                    mesh_indices = flatten_name_to_root_dims[mesh_dim_name]
                    # TODO: this doesn't allow non-contiguous slicing with flatten dim yet. next_idx
                    # should be mesh_indices[0] once we support non-contiguous slicing with flatten dim.
                    next_idx = mesh_indices[-1]
                    slice_mesh_dims.append(mesh_indices)
                else:
                    next_idx = device_mesh.mesh_dim_names.index(mesh_dim_name)
                    slice_mesh_dims.append((next_idx,))
                if next_idx <= curr_idx:
                    raise KeyError(
                        f"Invalid mesh_dim_names {mesh_dim_names} specified. ",
                        f"Found mesh dim indices to slice: {slice_mesh_dims}. ",
                        "Mesh dim indices should be in ascending order.",
                    )
                curr_idx = next_idx

            return slice_mesh_dims

        def _get_all_submeshes(
            self, device_mesh: "DeviceMesh", mesh_dim_name: str
        ) -> List["DeviceMesh"]:
            """
            Return all the submeshes of a given mesh dimension of the device mesh.
            """
            mesh_dim = self.get_mesh_dim_by_name(device_mesh, mesh_dim_name)
            pg_ranks_by_dim = device_mesh.mesh.swapdims(-1, mesh_dim).reshape(
                -1, device_mesh.mesh.size(mesh_dim)
            )

            cur_rank = device_mesh.get_rank()
            res_submeshes = []
            for mesh_1d in pg_ranks_by_dim:
                submesh = DeviceMesh(
                    device_mesh.device_type,
                    mesh_1d,
                    mesh_dim_names=(mesh_dim_name,),
                    _init_backend=False,
                )
                submesh._dim_group_infos = (
                    [device_mesh._dim_group_infos[mesh_dim]]
                    if cur_rank in mesh_1d
                    else []
                )
                res_submeshes.append(submesh)

            return res_submeshes

    _mesh_resources: _MeshEnv = _MeshEnv()

    def _get_device_handle(device_type: str = "cuda"):
        """
        Get the module corresponding to the device_type which is cuda or cuda-like device.
        For example, when the device_type is cuda, the module `torch.cuda` is returned.
        Return None when there is no corresponding module for device_type, otherwise
        return the corresponding module.
        """
        return getattr(torch, device_type, None)

    class DeviceMesh:
        """
        DeviceMesh represents a mesh of devices, where layout of devices could be
        represented as a n-d dimension array, and each value of the n-d dimensional
        array is the global id of the default process group ranks.

        DeviceMesh could be used to describe the layout of devices across the cluster,
        and serves as a proxy for communication among the device lists within the cluster.

        DeviceMesh can be used as a context manager.

        .. note::
            DeviceMesh follows SPMD programming model, which means the same PyTorch Python program
            is running on all processes/ranks in the cluster. Therefore, users need to make sure the
            `mesh` array (which describes the layout of devices) should be identical across all ranks.
            Inconsistent `mesh` will lead to silent hang.

        Args:
            device_type (str): The device type of the mesh. Currently supports: "cpu", "cuda/cuda-like".
            mesh (ndarray): A multi-dimensional array or an integer tensor describing the layout
                of devices, where the IDs are global IDs of the default process group.

        Returns:
            DeviceMesh: A :class:`DeviceMesh` object representing the device layout.

        The following program runs on each process/rank in an SPMD manner. In this example, we have 2
        hosts with 4 GPUs each.
        A reduction over the first dimension of mesh will reduce across
        columns (0, 4), .. and (3, 7), a reduction over the second dimension
        of mesh reduces across rows (0, 1, 2, 3) and (4, 5, 6, 7).

        Example::
            >>> # xdoctest: +SKIP("no rank")
            >>> from torch.distributed.device_mesh import DeviceMesh
            >>>
            >>> # Initialize device mesh as (2, 4) to represent the topology
            >>> # of cross-host(dim 0), and within-host (dim 1).
            >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
        """

        device_type: str
        mesh: torch.Tensor
        mesh_dim_names: Optional[Tuple[str, ...]]

        def __init__(
            self,
            device_type: str,
            mesh: Union[torch.Tensor, "ArrayLike"],
            *,
            mesh_dim_names: Optional[Tuple[str, ...]] = None,
            _init_backend: bool = True,
        ) -> None:
            self.device_type = device_type
            if isinstance(mesh, torch.Tensor) and mesh.device.type != "cpu":
                raise ValueError(f"`mesh` must be a CPU tensor, got {mesh}")
            self.mesh = (
                mesh.detach().to(dtype=torch.int)
                if isinstance(mesh, torch.Tensor)
                else torch.tensor(mesh, device="cpu", dtype=torch.int)
            )
            self.mesh_dim_names = tuple(mesh_dim_names) if mesh_dim_names else None

            # private field to pre-generate DeviceMesh's hash
            self._flatten_mesh_list = tuple(self.mesh.flatten().tolist())
            self._thread_id = None

            # Skip process group initialization if xla device or init backend is False
            # TODO(yeounoh) implement DeviceMesh backend and register XLA backend.
            if device_type != "xla":
                # always try to create default (world) pg, even if it is not initialized
                # already. The world pg is used for device mesh identity (rank) on each
                # process (we need to know if the current global rank is in the mesh or not).
                if _init_backend:
                    self._get_or_create_default_group()
                    self._init_process_groups()

                if is_initialized() and get_backend() == "threaded":
                    self._thread_id = threading.get_ident()

                # calculate the coordinates of the current global rank on the mesh
                rank_coords = (self.mesh == get_rank()).nonzero()
                assert rank_coords.size(0) in (0, 1)
                self._coordinate_on_dim: Optional[List[int]] = (
                    rank_coords[0].tolist() if rank_coords.size(0) > 0 else None
                )

        def _get_or_create_default_group(self):
            default_initialized = is_initialized()
            if not default_initialized:
                init_process_group()

            world_size = get_world_size()
            if self.mesh.numel() > world_size:
                raise RuntimeError(
                    f"Mesh should not be bigger than default world size {world_size}, but found {self.mesh.numel()} ranks!"
                )

            device_handle = _get_device_handle(self.device_type)
            # TODO: if user want to pass pg_options, offer a way to do it
            if not default_initialized and device_handle:
                # automatically set the current cuda/cuda-like device base on num of gpu devices available in each host
                # NOTE: This device selection would only work for homogeneous hardware.
                num_devices_per_host = device_handle.device_count()
                if (
                    world_size > num_devices_per_host
                    and world_size % num_devices_per_host != 0
                ):
                    raise RuntimeError(
                        f"DeviceMesh only support homogeneous hardware, but found "
                        f"{world_size} ranks and {num_devices_per_host} {self.device_type} devices!"
                    )
                device_handle.set_device(get_rank() % num_devices_per_host)

            return _get_default_group()

        def _init_process_groups(self):
            # tag/ranks/group_name associated with each mesh dimension, each
            # mesh dimension should have one sub-group per rank
            #
            # TODO(yifu): remove tag and ranks once we fully migrate to native
            # functional collectives. See details in:
            # https://github.com/pytorch/pytorch/issues/93173#issuecomment-1907095208
            dim_group_infos: List[Tuple[str, List[int], str]] = []

            if self.mesh.ndim == 1 and self.mesh.numel() == get_world_size():
                # Append the default pg to the first dim groups only if the default pg is compatible with `self.device_type`.
                # Otherwise, create new pg.
                default_group = _get_default_group()
                ranks = list(range(get_world_size()))
                dim_group = (
                    new_group(backend="cpu:gloo,cuda:nccl", ranks=ranks)
                    if torch.cuda.is_available()
                    and get_backend(default_group) == "gloo"
                    else default_group
                )
                dim_group_infos.append(
                    (
                        _get_group_tag(dim_group),
                        ranks,
                        dim_group.group_name,
                    )
                )
            else:
                # create sub pgs base on the mesh argument specified
                for dim in range(self.mesh.ndim):
                    # swap the current dim to the last dim
                    # then reshape to flatten out other dims
                    pg_ranks_by_dim = self.mesh.swapdims(-1, dim).reshape(
                        -1, self.mesh.size(dim)
                    )
                    # multi-dim mesh, create subgroups by looping over the pg_ranks
                    # for each dim and append the groups
                    for dim_mesh in pg_ranks_by_dim:
                        subgroup_ranks = dim_mesh.tolist()

                        # Respect dim group options specified via _MeshEnv.set_dim_group_options().
                        # Inherit from the parent group if no options are specified for the group.
                        if dim in _mesh_resources.mesh_dim_group_options:
                            (
                                backend,
                                pg_options,
                            ) = _mesh_resources.mesh_dim_group_options[dim]
                        else:
                            backend, pg_options = None, None

                        # We temporarily revert the re-use subgroup, since it breaks two internal tests.
                        # Temporarily reverting to resolve test timeout while root-causing.
                        # TODO: Add two tests to cover internal tests scenarios and re-enable reuse subgroup if exists.
                        dim_group = new_group(
                            ranks=subgroup_ranks,
                            backend=backend,
                            pg_options=pg_options,
                        )

                        # only add to dim_groups if the current rank in the subgroup
                        if self.get_rank() in subgroup_ranks:
                            if len(dim_group_infos) > dim:
                                raise RuntimeError(
                                    f"Each device mesh dimension should get only one process group, but got {self.get_rank()} "
                                    f"in {subgroup_ranks}!"
                                )
                            dim_group_infos.append(
                                (
                                    _get_group_tag(not_none(dim_group)),
                                    subgroup_ranks,
                                    dim_group.group_name,
                                )
                            )
            self._dim_group_infos = dim_group_infos

        def __enter__(self) -> "DeviceMesh":
            # set this mesh as the current mesh in mesh env
            _mesh_resources.mesh_stack.append(self)
            return self

        # pyre-fixme[2]: Parameter must be annotated.
        def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
            # pop this mesh from mesh env
            _mesh_resources.mesh_stack.pop()

        def __repr__(self) -> str:
            device_mesh_repr = (
                f"DeviceMesh('{self.device_type}', {self.mesh.tolist()})"
                if not self.mesh_dim_names
                else f"DeviceMesh('{self.device_type}', {self.mesh.tolist()}, mesh_dim_names={self.mesh_dim_names})"
            )
            return device_mesh_repr

        def __hash__(self):
            # lazily compute hash
            self._hash = getattr(self, "_hash", None)
            if not self._hash:
                self._hash = hash(
                    (
                        self._flatten_mesh_list,
                        self.mesh.shape,
                        self.device_type,
                        self.mesh_dim_names,
                        self._thread_id,
                    )
                )
            return self._hash

        def __eq__(self, other: object) -> bool:
            if not isinstance(other, DeviceMesh):
                return False
            if id(self) == id(other):
                return True
            else:
                return (
                    self._flatten_mesh_list == other._flatten_mesh_list
                    and self.mesh.shape == other.mesh.shape
                    and self.device_type == other.device_type
                    and self.mesh_dim_names == other.mesh_dim_names
                    and self._thread_id == other._thread_id
                )

        def __getitem__(
            self, mesh_dim_names: Union[str, Tuple[str, ...]]
        ) -> "DeviceMesh":
            """
            Slice the current DeviceMesh based on the mesh_dim_names given to create a submesh.
            The submesh created consists of the dimensions and the communicators indicated by
            ``mesh_dim_names``

            Args:
                mesh_dim_names (Union[str, Tuple[str]]): the name or the tuple of names of the
                mesh dimension of the DeviceMesh to create the submesh for.
            Returns:
                A :class:`DeviceMesh` object

            The following program runs on each process/rank in an SPMD manner in a world size of 8.
            In the first example:
                Calling mesh_2d["tp"] on rank 0, 1, 2, 3 returns a 1D submesh of DeviceMesh:([0, 1, 2, 3]).
                Calling mesh_2d["tp"] on rank 4, 5, 6, 7 returns a 1D submesh of  DeviceMesh:([4, 5, 6, 7]).
                Calling mesh_2d["dp"] on rank 0, 4 returns a 1D submesh of  DeviceMesh:([0, 4]).
                Calling mesh_2d["dp"] on rank 1, 5 returns a 1D submesh of  DeviceMesh:([1, 5]).
                Calling mesh_2d["dp"] on rank 2, 6 returns a 1D submesh of  DeviceMesh:([2, 6]).
                Calling mesh_2d["dp"] on rank 3, 7 returns a 1D submesh of  DeviceMesh:([3, 7]).

            In the second example:
                Calling mesh_3d["dp", "cp"] on rank 0, 1, 4, 5 returns a 2D submesh of DeviceMesh:([[0, 1], [4, 5]]).
                Calling mesh_3d["dp", "cp"] on rank 2, 3, 6, 7 returns a 2D submesh of DeviceMesh:([[2, 3], [6, 7]]).
                Calling mesh_3d["cp", "dp"] on rank 0, 1, 4, 5 returns a 2D submesh of DeviceMesh:([[0, 4], [1, 5]]).
                Calling mesh_3d["cp", "dp"] on rank 2, 3, 6, 7 returns a 2D submesh of DeviceMesh:([[2, 6], [3, 7]]).

            Example::
                >>> # xdoctest: +SKIP("no rank")
                >>> from torch.distributed.device_mesh import DeviceMesh
                >>>
                >>> # Initialize a 2D device mesh as (2, 4) to represent the topology
                >>> # of cross-host(dim 0), and within-host (dim 1).
                >>> mesh_2d = init_device_mesh(device_type="cuda", (2,4), mesh_dim_names=("dp", "tp"))
                >>> tp_mesh = mesh_2d["tp"]
                >>> dp_mesh = mesh_2d["dp"]
                >>>
                >>> # Initialize a 3D mesh.
                >>> mesh_3d = init_device_mesh(device_type="cuda", (2,2,2), mesh_dim_names=("dp", "pp", "cp"))
                >>> # The order of the mesh_dim_names provided deteremines the order of dimensions in the submesh.
                >>> dp_cp_mesh = mesh_3d["dp", "cp"]
                >>> cp_dp_mesh = mesh_3d["cp", "dp"]
            """
            if not self.mesh_dim_names:
                raise RuntimeError("Cannot slice a DeviceMesh without mesh_dim_names!")

            mesh_dim_names = (
                (mesh_dim_names,) if isinstance(mesh_dim_names, str) else mesh_dim_names
            )

            if mesh_dim_names == self.mesh_dim_names:
                return self
            else:
                slice_mesh_dims = _mesh_resources._get_slice_mesh_dims(
                    self, mesh_dim_names
                )
                submesh = _mesh_resources.create_sub_mesh(
                    self, mesh_dim_names, slice_mesh_dims
                )
                return submesh

        def get_group(self, mesh_dim: Optional[Union[int, str]] = None) -> ProcessGroup:
            """
            Returns the single ProcessGroup specified by mesh_dim, or, if mesh_dim is not specified and the
            DeviceMesh is 1-dimensional, returns the only ProcessGroup in the mesh.

            Args:
                mesh_dim (str/int, optional): it can be the name of the mesh dimension or the index
                of the mesh dimension. Default is None.

            Returns:
                A :class:`ProcessGroup` object.
            """
            if not hasattr(self, "_dim_group_infos"):
                raise RuntimeError("DeviceMesh process groups not initialized!")

            if self.mesh.ndim > 1 and mesh_dim is None:
                raise RuntimeError(
                    f"Found the DeviceMesh have {self.mesh.ndim} dimensions",
                    "Optional kwarg `mesh_dim` needs to be specified when device_mesh.ndim > 1.",
                    "If you want to get the list of all the ProcessGroups in the DeviceMesh,"
                    "please use `get_all_groups()` instead.",
                )

            # Quick return if the current device_mesh is a 1D mesh.
            if self.mesh.ndim == 1 and mesh_dim is None:
                return not_none(
                    _find_pg_by_ranks_and_tag(*self._dim_group_infos[0][:2])  # type: ignore[index]
                )

            root_mesh = _mesh_resources.get_root_mesh(self)
            root_to_flatten_mapping = _mesh_resources.root_to_flatten_mapping.get(
                root_mesh, None
            )
            if root_to_flatten_mapping and mesh_dim in root_to_flatten_mapping.keys():
                dim_group_infos = root_to_flatten_mapping[mesh_dim]._dim_group_infos[0][:2]  # type: ignore[index]
                return not_none(_find_pg_by_ranks_and_tag(*dim_group_infos))
            else:
                mesh_dim = (
                    _mesh_resources.get_mesh_dim_by_name(self, mesh_dim)
                    if isinstance(mesh_dim, str)
                    else mesh_dim
                )
                return not_none(
                    _find_pg_by_ranks_and_tag(*self._dim_group_infos[mesh_dim][:2])  # type: ignore[index]
                )

        def get_all_groups(self) -> List[ProcessGroup]:
            """
            Returns a list of ProcessGroups for all mesh dimensions.

            Returns:
                A list of :class:`ProcessGroup` object.
            """
            return [self.get_group(i) for i in range(self.mesh.ndim)]

        @staticmethod
        def from_group(
            group: Union[ProcessGroup, List[ProcessGroup]],
            device_type: str,
            mesh: Optional[Union[torch.Tensor, "ArrayLike"]] = None,
            *,
            mesh_dim_names: Optional[Tuple[str, ...]] = None,
        ) -> "DeviceMesh":
            """
            Constructs a :class:`DeviceMesh` with ``device_type`` from an
            existing :class:`ProcessGroup`.

            The constructed device mesh has number of dimensions equal to the
            number of groups passed. If more than one group is passed, then the
            ``mesh`` argument is required.
            """
            if isinstance(group, ProcessGroup):
                group_ranks = get_process_group_ranks(group)
                if (
                    isinstance(mesh, torch.Tensor) and mesh.tolist() != group_ranks
                ) or (mesh is not None and mesh != group_ranks):
                    raise ValueError(
                        f"Invalid mesh {str(mesh)} for ProcessGroup with ranks {group_ranks}"
                    )
                mesh = torch.tensor(group_ranks, device="cpu", dtype=torch.int)
                device_mesh = DeviceMesh(
                    device_type,
                    mesh,
                    mesh_dim_names=mesh_dim_names,
                    _init_backend=False,
                )
                device_mesh._dim_group_infos = [
                    (_get_group_tag(group), group_ranks, group.group_name)
                ]
                return device_mesh
            groups = list(group)
            if len(groups) == 0:
                raise ValueError("Expects at least one ProcessGroup to be passed")
            if mesh is None:
                raise ValueError("Must pass mesh if passing multiple ProcessGroups")
            mesh = (
                mesh.detach().to(dtype=torch.int, device="cpu")
                if isinstance(mesh, torch.Tensor)
                else torch.tensor(mesh, device="cpu", dtype=torch.int)
            )
            if mesh.ndim != len(groups):
                raise ValueError(
                    "Expects mesh with ndim equal to number of ProcessGroups but got "
                    f"mesh {mesh.tolist()} and {len(groups)} ProcessGroups"
                )
            device_mesh = DeviceMesh(
                device_type, mesh, mesh_dim_names=mesh_dim_names, _init_backend=False
            )
            device_mesh._dim_group_infos = [
                (
                    _get_group_tag(group),
                    get_process_group_ranks(group),
                    group.group_name,
                )
                for group in groups
            ]
            return device_mesh

        def size(self, mesh_dim: Optional[int] = None) -> int:
            return self.mesh.numel() if mesh_dim is None else self.mesh.size(mesh_dim)

        @property
        def ndim(self) -> int:
            return self.mesh.ndim

        @property
        def shape(self) -> Tuple[int, ...]:
            return tuple(self.mesh.shape)

        def get_rank(self) -> int:
            """
            Returns the current global rank.
            """
            return get_rank()

        def get_local_rank(self, mesh_dim: Optional[Union[int, str]] = None) -> int:
            """
            Returns the local rank of the given mesh_dim of the DeviceMesh.

            Args:
                mesh_dim (str/int, optional): it can be the name of the mesh dimension or the index
                of the mesh dimension. Default is None.

            Returns:
                An integer denotes the local rank.

            The following program runs on each process/rank in an SPMD manner. In this example, we have 2
            hosts with 4 GPUs each.
            Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 0, 1, 2, 3 would return 0.
            Calling mesh_2d.get_local_rank(mesh_dim=0) on rank 4, 5, 6, 7 would return 1.
            Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 0, 4 would return 0.
            Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 1, 5 would return 1.
            Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 2, 6 would return 2.
            Calling mesh_2d.get_local_rank(mesh_dim=1) on rank 3, 7 would return 3.

            Example::
                >>> # xdoctest: +SKIP("no rank")
                >>> from torch.distributed.device_mesh import DeviceMesh
                >>>
                >>> # Initialize device mesh as (2, 4) to represent the topology
                >>> # of cross-host(dim 0), and within-host (dim 1).
                >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
            """
            if self.ndim > 1 and mesh_dim is None:
                raise RuntimeError(
                    f"Found the DeviceMesh have {self.mesh.ndim} dimensions",
                    "Optional kwarg `mesh_dim` needs to be specified when device_mesh.ndim > 1.",
                )
            elif mesh_dim is None:
                mesh_dim = 0

            mesh_dim_group = not_none(self.get_group(mesh_dim))
            assert isinstance(
                mesh_dim_group, ProcessGroup
            ), "We expect ProcessGroup before calling `get_rank`!"
            return not_none(get_rank(mesh_dim_group))

        def get_coordinate(self) -> Optional[List[int]]:
            """
            Return the relative indices of this rank relative to all
            dimensions of the mesh. If this rank is not part of the mesh, return None.
            """
            return self._coordinate_on_dim if self._coordinate_on_dim else None

        def _flatten(self, mesh_dim_name: Optional[str] = None) -> "DeviceMesh":
            """
            Returns a 1D DeviceMesh by flattening the current DeviceMesh.

            If no mesh_dim_name is provided, the default is a string concatentaing the mesh_dim_names of the
            given submesh with each mesh_dim_name separated by "_". For example, if we have a 3D mesh
            DeviceMesh([[[0, 1], [2, 3]], [[4, 5], [6, 7]]], mesh_dim_names=("dp", "cp", "tp")), calling
            mesh_3d["dp", "cp"]._flatten() will create a 1D submesh DeviceMesh([0, 1, 2, 3], mesh_dim_names=("dp_cp",))
            on rank 0, 1, 2, 3 and a 1D submesh DeviceMesh([4, 5, 6, 7], mesh_dim_names=("dp_cp",)) on rank 4, 5, 6, 7.

            After the flattened dimension is created, to access the flattened dimesnion in mesh_3d, one can use the
            existing slicing method to obtain the flattened mesh through calling mesh_3d["dp_cp"].
            """
            if not self.mesh_dim_names:
                raise RuntimeError(
                    "Cannot flatten a DeviceMesh without mesh_dim_names!"
                )

            return _mesh_resources.create_flatten_mesh(self, mesh_dim_name)

    def init_device_mesh(
        device_type: str,
        mesh_shape: Tuple[int, ...],
        *,
        mesh_dim_names: Optional[Tuple[str, ...]] = None,
    ) -> DeviceMesh:
        """
        Initializes a `DeviceMesh` based on `device_type`, `mesh_shape`, and `mesh_dim_names` parameters.

        This creates a DeviceMesh with an n-dimensional array layout, where `n` is the length of `mesh_shape`.
        If `mesh_dim_names` is provided, each dimension is labeled as `mesh_dim_names[i]`.

        .. note::
            `init_device_mesh` follows SPMD programming model, meaning the same PyTorch Python program
            runs on all processes/ranks in the cluster. Ensure `mesh_shape` (the dimensions of the nD array
            describing device layout) is identical across all ranks. Inconsistent `mesh_shape` may lead to hanging.

        .. note::
            If no process group is found, init_device_mesh will initialize distributed process group/groups
            required for distributed communications behind the scene.

        Args:
            device_type (str): The device type of the mesh. Currently supports: "cpu", "cuda/cuda-like".
                Passing in a device type with a GPU index, such as "cuda:0", is not allowed.
            mesh_shape (Tuple[int]): A tuple defining the dimensions of the multi-dimensional array
                describing the layout of devices.
            mesh_dim_names (Tuple[str], optional): A tuple of mesh dimension names to assign to each dimension
                of the multi-dimensional array describing the layout of devices. Its length must match the length
                of `mesh_shape`. Each string in `mesh_dim_names` must be unique.

        Returns:
            DeviceMesh: A :class:`DeviceMesh` object representing the device layout.

        Example::
            >>> # xdoctest: +SKIP("no rank")
            >>> from torch.distributed.device_mesh import init_device_mesh
            >>>
            >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
            >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))

        """
        if mesh_dim_names is not None:
            if len(set(mesh_dim_names)) != len(mesh_dim_names):
                raise RuntimeError(
                    "Each mesh_dim_name must be unique.",
                    f"Found repeated mesh_dim_name in mesh_dim_names {mesh_dim_names}",
                )

            if len(mesh_shape) != len(mesh_dim_names):
                raise RuntimeError(
                    "mesh_shape and mesh_dim_names should have same length!",
                    f"Found len(mesh_dim_names): {len(mesh_dim_names)} and len(mesh_shape):{len(mesh_shape)}.",
                )

        # assume valid device types are all letters
        if device_type and not device_type.isalpha():
            raise RuntimeError(
                f"Device type with GPU index is not supported but got {device_type}. ",
                "If you maintained a 'torch.device' object, it's recommended to pass in 'device.type'.",
            )

        # Always initialize the mesh's tensor on CPU, regardless of what the
        # external device type has been set to be (e.g. meta)
        with torch.device("cpu"):
            mesh = torch.arange(math.prod(mesh_shape), dtype=torch.int).view(mesh_shape)
        device_mesh = DeviceMesh(
            device_type=device_type,
            mesh=mesh,
            mesh_dim_names=mesh_dim_names,
        )

        return device_mesh
