
    קg^4                         d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZ d dlZd dlmZ g dZ G d d          Z G d d	e          Z G d
 de	          Z G d d          ZdS )    N)ABCabstractmethod)TracebackType)AnyList
NamedTupleOptionalType)JoinHookJoinableJoinc                   *    e Zd ZdZddZdeddfdZdS )r   a  
    This defines a join hook, which provides two entry points in the join context manager.

    Entry points : a main hook, which is called repeatedly while there exists a non-joined
    process, and a post-hook, which is called once all processes have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    returnNc                     dS )zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

        Training iteration i.e., in one forward pass, backward pass, and optimizer step.
        N selfs    ]/var/www/html/ai-engine/env/lib/python3.11/site-packages/torch/distributed/algorithms/join.py	main_hookzJoinHook.main_hook             is_last_joinerc                     dS )aK  
        Call hook after all processes have joined.

        It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   s     r   	post_hookzJoinHook.post_hook    r   r   r   N)__name__
__module____qualname____doc__r   boolr   r   r   r   r   r      sT        	 	   	 	 	 	 	 	 	 	r   r   c                        e Zd ZdZed fd            Zedefd            Zeede	j
        fd                        Zeedefd                        Z xZS )	r   a_  
    This defines an abstract base class for joinable classes.

    A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    r   Nc                     t                                                       t                                          | _        d S N)super__init___JoinConfigconstruct_disabled_join_config_join_config)r   	__class__s    r   r%   zJoinable.__init__7   s3    'FFHHr   c                     dS )a  
        Return a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargss     r   	join_hookzJoinable.join_hook<   s	     	r   c                     dS )zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   s    r   join_devicezJoinable.join_deviceI   	     	r   c                     dS )zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   s    r   join_process_groupzJoinable.join_process_groupO   r/   r   r   )r   r   r   r   r   r%   r   r,   propertytorchdevicer.   r   r1   __classcell__)r)   s   @r   r   r   ,   s          I I I I I ^I 
X 
 
 
 ^
 U\    ^ X C    ^ X    r   r   c                   H    e Zd ZU dZeed<   eed<   eed<   ed             ZdS )r&   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                  &    t          ddd          S )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

        e.g. if the caller is not in a join context manager.
        Fr7   r8   r9   )r&   r   r   r   r'   z*_JoinConfig.construct_disabled_join_config]   s"     Ue
 
 
 	
r   N)r   r   r   r   r    __annotations__staticmethodr'   r   r   r   r&   r&   V   sV         ooLLL $$$$
 
 \
 
 
r   r&   c                       e Zd ZdZ	 	 ddee         dedefdZdd
ZddZ	d Z
deee                  dee         dee         fdZd Zd Zedefd            Zd	S )r   a
  
    This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

    These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    TF	joinablesr7   r8   c                     t          |          dk    rt          d          || _        fd| j        D             | _        || _        || _        |                                  |                                  d S )Nr   z7The join context manager requires at least one joinablec                 *    g | ]} |j         d i S )r   )r,   ).0joinabler+   s     r   
<listcomp>z!Join.__init__.<locals>.<listcomp>   s:     
 
 
-5H((((
 
 
r   )len
ValueError
_joinables_join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r?   r7   r8   r+   s       `r   r%   zJoin.__init__   s     y>>QVWWW#
 
 
 
9=
 
 
 +E(""$$$!!!!!r   r   Nc                     t          | j                  dk    sJ d}| j        D ]%}t          | j        | j        |          |_        d}&dS )zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   Tr;   FN)rE   rG   r&   rI   rJ   r(   )r   r9   rC   s      r   rK   zJoin._set_joinable_configs   sn    4?##a''''  	& 	&H$/|+/+K"3% % %H!
 !&	& 	&r   c                     d}d}| j         D ]/}||j        }n||j        k    rt          d          ||j        }0|| _        t          j        | j                  | _        || _        dS )a  
        Extract the process group and device information from the joinables.

        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	rG   r1   rF   r.   _process_groupdistget_rank_rank_device)r   process_groupr4   rC   s       r   rL   zJoin._extract_dist_info   s      	. 	.H$ ( ;("=== M   ~!-+]4#677
r   c                     d S r#   r   r   s    r   	__enter__zJoin.__enter__   s    r   typevalue	tracebackc           	         | j         r|rdS d}d}d}d}t          j        d           |s||k    r#t          j        d| d| j         d	| d
           |                                 }|dk    rd}n@| j        r|                                  | j        D ]}	|		                                 d}|dz  }|| j        D ]}	|	
                    |           dS )z
        Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )rI   warningssimplefilterwarnrR   _get_num_nonjoined_procsrJ   _notify_procs_to_terminaterH   r   r   )
r   rW   rX   rY   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr,   s
             r   __exit__zJoin.__exit__   sU    | 	t 	F f%%%" 	>!!3%3 3z3 31?3 3 3   #'"?"?"A"A"a''#'  3 633555 "&!1 * *I''))))!&Q/ # 	4 ) 	0 	0I////	0 	0r   c                     t          j        d| j                  }t          j        || j                   |                                S )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.r\   r4   group)r3   zerosrS   rP   
all_reducerO   item)r   re   s     r   r`   zJoin._get_num_nonjoined_procs  sD    #k!DLAAA+43FGGGG"'')))r   c                     t          j        d| j                  }t          j        || j                   t          d| j         d          )zSchedule an all-reduce to notify non-joined processes to terminate.

        Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
        r\   rh   ri   zRank z exhausted all inputs.)r3   onesrS   rP   rl   rO   RuntimeErrorrR   )r   ro   s     r   ra   zJoin._notify_procs_to_terminate  sN    
 z!DL111D$78888E4:EEEFFFr   rC   c                    t          | d          sJ dt          |            d            | j        }|j        r|j        sdS | j        }| j        }t          j        d|          }t          j
        ||d          }|j        rQt          j        d|          }t          j
        ||	           |                                }|rt          d
          |S )aH  
        Notifies the join context manager that the calling process has not yet joined.

        Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
        (i.e. if one process has already joined) and throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r(   zCheck that the z/ constructor calls the ``Joinable`` constructorNr\   rh   T)rj   async_opri   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrW   r(   r9   r7   r.   r1   r3   ro   rP   rl   r8   rk   rm   rp   )rC   join_configr4   rT   ro   workrk   should_throws           r   notify_join_contextzJoin.notify_join_context(  s   4 x00 	
 	
'd8nn ' ' '	
 	
 	

 +, 	K4F 	4% 3 z!F+++t=4HHH1 		K&111EOE7777 ::<<L "1   r   )TFr   )r   r   r   r   r   r   r    r%   rK   rL   rV   r	   r
   BaseExceptionr   rf   r`   ra   r=   rw   r   r   r   r   r   h   s$       ; ;@ +0	" ">" " %)	" " " "$
& 
& 
& 
&   <  20tM*+20 &20 M*	20 20 20 20h* * *G G G 4h 4 4 4 \4 4 4r   r   )r]   abcr   r   typesr   typingr   r   r   r	   r
   r3   torch.distributeddistributedrP   __all__r   r   r&   r   r   r   r   <module>r      sC    # # # # # # # #       8 8 8 8 8 8 8 8 8 8 8 8 8 8              +
*
*       <' ' ' ' 's ' ' 'T
 
 
 
 
* 
 
 
$u u u u u u u u u ur   