
    קg[                       d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZmZ d dlmZ dd	lm Z m!Z!m"Z" dd
l#m$Z$ erd dlm%Z% g dZ& ej'        e(          Z) G d de
          Z*e*j+        Z+e*j,        Z,e*j-        Z-e*j.        Z.e*j/        Z/e*j0        Z0e*j1        Z1e*j2        Z2e*j3        Z3e+Z4e,Z5e-Z6 ej7        d          Z8 G d de          Z9dee:eee9                  f         de;fdZ<	 d=dee:eee9                  f         de:de:de=fdZ> G d de          Z?d>deej@                 dee;         fdZA	 d>deej@                 dee;         dee:ej%        f         fdZB G d  d!e?          ZC G d" d#eC          ZD G d$ d%eC          ZE G d& d'eC          ZF	 d?d)eee9                  d*e:dee9         fd+ZGd)ee:ee9         f         d,ee:ge:f         de:dee:ee9         f         fd-ZH G d. d/e?          ZI G d0 d1eI          ZJ G d2 d3eI          ZK	 	 d@d4ZL G d5 d6eI          ZM G d7 d8eI          ZN G d9 d:eN          ZOd;e;fd<ZPdS )A    N)ABCabstractmethod)defaultdict)Enum)
AnyCallableDictList
NamedTupleOptionalSetTupleTYPE_CHECKINGUnion)
FSDPModuleUnshardHandle)record_function   )merge_chunkssplit_args_kwargs_into_chunksTensorChunkSpec)_PipelineStageBase)Work)	get_schedule_classPipelineScheduleSinglePipelineScheduleMultiSchedule1F1BScheduleFlexibleInterleaved1F1BScheduleGPipeScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubblec                   N    e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
 Zed             ZdS )_ComputationTyper                        	   c                     t           j        dt           j        dt           j        dt           j        dt           j        dt           j        dt           j        dt           j        dt           j	        d	i	}||          S )
NFBWUNSHARDRESHARDSEND_FRECV_FSEND_BRECV_B)
r$   FORWARDBACKWARDWEIGHTr1   r2   r3   r4   r5   r6   )selfstr_maps     b/var/www/html/ai-engine/env/lib/python3.11/site-packages/torch/distributed/pipelining/schedules.py__str__z_ComputationType.__str__?   s]    $c%s#S$i$i#X#X#X#X

 t}    c                 j   | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | dk    rt           j        S | d	k    rt           j	        S t          d
|            )Nr.   r/   r0   r1   r2   r3   r4   r5   r6   zInvalid computation type )r$   r7   r8   r9   r1   r2   r3   r4   r5   r6   RuntimeError)actions    r<   from_strz_ComputationType.from_strM   s    S==#++s]]#,,s]]#**y  #++y  #++x#**x#**x#**x#**C6CCDDDr>   N)__name__
__module____qualname__r7   r8   r9   r1   r2   r3   r4   r5   r6   r=   staticmethodrB    r>   r<   r$   r$   3   sp        GHFGGFFFF   E E \E E Er>   r$   zD(\d+)([F,B,W]|UNSHARD|RESHARD|SEND_F|RECV_F|SEND_B|RECV_B{0,1})(\d*)c                   Z    e Zd ZU eed<   eed<   dZee         ed<   d Ze	d             Z
dS )_Actionstage_indexcomputation_typeNmicrobatch_indexc                     t          | j                  }|t          | j                  z  }| j        |t          | j                  z  }|S N)strrJ   rK   rL   )r:   reprs     r<   __repr__z_Action.__repr__   sJ    4#$$D)*** ,C-...Dr>   c                 p   t                               |           x}rl|                                \  }}}t          t	          |          t
                              |          t          |          rt	          |          nd          S | dk    s|                                 rdS t          d|  d          )z
        Reverse of __repr__

        String should be formatted as [stage][action type][(microbatch)]
            e.g. `2F0`, `1UNSHARD`, `3SEND_F1`
        N zInvalid action string: zD, should be formatted as [stage][action type][(microbatch)] e.g. 2F0)
_action_regexmatchgroupsrI   intr$   rB   lenisspacer@   )rO   rU   rJ   rK   rL   s        r<   rB   z_Action.from_str   s     "'',,,5 	>Cllnn;K)+;K   ))*:;;),-=)>)>H$%%%D  
 BYY#++--Y4ocooo
 
 	
r>   )rC   rD   rE   rW   __annotations__r$   rL   r   rQ   rF   rB   rG   r>   r<   rI   rI   z   sm         &&&&&*hsm***   
 
 \
 
 
r>   rI   pipeline_orderreturnc                    	
 t          d                                  D                       

fdt          
          D             }
 fdt                     D             }t	          t          j        |ddi          }t                     }d t          |          D             }d t          |g|R  D             	dt          |d	                   d
z   z  d	                    	fdt          |          D                       z   }	fdt          ||          D             }|dz   d	                    |          z   dz   }|S )z}
    Formats the pipeline order in a timestep (row) x rank (column) grid of actions
    and returns the formatted string
    c              3   4   K   | ]}t          |          V  d S rN   rX   ).0actionss     r<   	<genexpr>z)_format_pipeline_order.<locals>.<genexpr>   s(      HHWCLLHHHHHHr>   c                     g | ]D}d t          |                              t          t          dz
                                z   ES )zStep r   )rO   zfillrX   )r`   i	num_stepss     r<   
<listcomp>z*_format_pipeline_order.<locals>.<listcomp>   sQ       <=#a&&,,s3y1}#5#566777  r>   c                 D    g | ]}                     |d gz            S )rS   )get)r`   keyrf   r[   s     r<   rg   z*_format_pipeline_order.<locals>.<listcomp>   s=       693y 011  r>   	fillvaluerS   c                 2    g | ]}d t          |          z   S )zRank rO   )r`   re   s     r<   rg   z*_format_pipeline_order.<locals>.<listcomp>   s#    >>>7SVV#>>>r>   c                 @    g | ]}t          d  |D                       S )c              3   V   K   | ]$}|t          t          |                    ndV  %d S )Nr   )rX   rO   )r`   items     r<   rb   z4_format_pipeline_order.<locals>.<listcomp>.<genexpr>   s7      FF$d.CD		NNNAFFFFFFr>   )max)r`   cols     r<   rg   z*_format_pipeline_order.<locals>.<listcomp>   s?        	FF#FFFFF  r>    r   r%   c              3   8   K   | ]\  }}|d |          V  dS <NrG   )r`   re   labelmax_lengthss      r<   rb   z)_format_pipeline_order.<locals>.<genexpr>   sJ       < <)1E5$;q>$$$< < < < < <r>   c           	      ~    g | ]9\  }}| d d                     fdt          |          D                       z   :S ): rs   c              3   R   K   | ]!\  }}t          |          d |          V  "dS ru   rm   )r`   re   rp   rx   s      r<   rb   z4_format_pipeline_order.<locals>.<listcomp>.<genexpr>   s?      RR4c$ii3+a.333RRRRRRr>   )join	enumerate)r`   rw   rowrx   s      r<   rg   z*_format_pipeline_order.<locals>.<listcomp>   se        E3 
((RRRR9S>>RRR
R
R	S  r>   
)rq   valuesrangesortedlist	itertoolszip_longestrX   zipr|   r}   )r[   step_labelsrank_actionstransposed_actions	num_ranksrank_labels
header_rowformatted_rowsformatted_tablerx   rf   s   `        @@r<   _format_pipeline_orderr      s    HH0E0E0G0GHHHHHI   AFyAQAQ  K    =CN=S=S  L i3\PRPPQQN##I>>U9-=-=>>>K {8%7888  K
 KN++a/0388 < < < <5>{5K5K< < < 4 4 J    k+=>>  N !4'$))N*C*CCdJOr>   Fnum_microbatches
num_stagesenable_zero_bubblec           
      0   i }t          d |                                 D                       }t          |          D ]}g }g }t          t          |                     D ]Y}	|t          | |	                   k     r| |	         |         nd}
|
,|
j        }|t
          j        k    r|                    |
           Zd |D             }t          |          t          |          k    r|                    d           |D ]l}
|
j        }|
j        }|
j	        }|
J d            ||k    r|                    d| d           ||vr6|t
          j
        k    s|dk    r|                    d	|            ||f||<   }||         \  }}|t
          j
        k    r-||d
z
  k    r|d
z
  }t
          j        }nh|d
z   }t
          j
        }nV|t
          j        k    r3|dk    r|                    d|d            nx|d
z
  }t
          j        }nt          d| d          |$||k    r|                    d|d|d|           ||k    r|                    d|d|d|           ||f||<   n|s?t          |          dk    r)t          d| dd                    |          z              dS t          t          |                     D ]}	t                      }t                      }| |	         D ]}
|
|
j        }|
j        }|
j	        }|t
          j        k    r||                    ||f           D|t
          j        k    r]||f|vr|                    d|d|d           ||f|v r|                    d|d|d           ||                    ||f           t          |          t          |          k    r|                    d           t          |          dk    r)t          d| dd                    |          z             dS )a6  
    pipeline_order[rank] = [(computation_type, microbatch_index, stage_index), ...]
    Validating that the pipeline order follows the rules:
    1. Forward action for a microbatch must be before the Backward action for that microbatch
    2. Recv for a microbatch must be before the send for that microbatch
    3. Microbatch index is handled in sequential order for each stage
    4. A later stage cannot operate on a microbatch before any of the previous stages have operated on it
    5. Same microbatch cannot be handled in the same time step across ranks
    c              3   4   K   | ]}t          |          V  d S rN   r_   )r`   	rank_lists     r<   rb   z+_validate_pipeline_order.<locals>.<genexpr>   s(      OO)s9~~OOOOOOr>   Nc                     h | ]	}|j         
S rG   )rL   )r`   rA   s     r<   	<setcomp>z+_validate_pipeline_order.<locals>.<setcomp>   s(     %
 %
 %
(.F#%
 %
 %
r>   z<Duplicate microbatch index found in current_timestep_actionsCAll currently supported action types require valid microbatch_indexzMicrobatch index z out of ranger   zIncorrect start for microbatch r   z
[mb_index=z'] already finished backward computationzComputation type z not supportedz] expected_computation=z VS. actual computation_type=z] expected_stage=z VS. actual stage_index=zError at timestep rz   ,z	mb_index=z, stage_index=z Weight happened before bwdz Duplicated weight stepz'Length weight steps != Length bwd steps)rq   r   r   rX   rK   r$   r9   appendrJ   rL   r7   r8   
ValueErrorr@   r|   setadd)r[   r   r   r   microbatch_process_infomax_timesteptimestep	error_msgcurrent_timestep_actionsrankrA   rK   unique_microbatch_indicesrJ   mb_indexprev_computation
prev_stageexpected_stageexpected_computationbackward_stepsweight_stepss                        r<   _validate_pipeline_orderr      s     HJOO~7L7L7N7NOOOOOL,'' {X {X!	#% #n--.. 
	< 
	<D c."67777 t$X..  !#)#: #'7'>>>,33F;;;%
 %
2J%
 %
 %
! ())S1I-J-JJJN   / 6	 6	F ,K%6.H$$T %$$+++  !LX!L!L!LMMM 666#'7'???;RSCSCS$$%Qx%Q%QRRR5E{4S'11 0Gx/P, *#'7'???!Z!^33)3a/?/H,,)3a/?/G,,%)9)BBB!Q!((RRRR   )3a/?/H,,$L,<LLL   (3+/???!((ccc-AccP`cc   "[00$$THTTTTkTT  
 )"5'11
 " 	9~~"""55558K8KK   FF#n--.. 	L 	LD3655N14L(. B B>$0#)#: !2#'7'@@@+&**Hk+BCCC%)9)@@@ +.nDD!((UxUUKUUU   !+.,>>!((QxQQKQQQ    +$(((K)@AAA>""c,&7&777  !JKKKy>>Q@H@@@388ICVCVVWWW u{X {Xr>   c                   "   e Zd Z	 	 	 	 ddedeedej        f                  deee	df                  dee
ee	f                  deee
eef         ee         f                  f
dZd	 Zd
 Zd Ze	 	 	 	 ddee         dee         dee         dee         fd            Zeddddee         fd            Z	 	 	 	 ddee         dee         dee         dee         fdZd Z	 ddeedf         dee
eef                  fdZdee         defdZdS )_PipelineScheduleNn_microbatchesloss_fn.args_chunk_speckwargs_chunk_specoutput_merge_specc                     || _         || _        || _        || _        || _        	 | j        d u| _        g | _        t                              d| j	        j
                   d S )NzUsing %s)_n_microbatches_loss_fn_args_chunk_spec_kwargs_chunk_spec_output_merge_spec_has_backward_internal_lossesloggerinfo	__class__rC   )r:   r   r   r   r   r   s         r<   __init__z_PipelineSchedule.__init__T  sj      . /"3"3	 "]$6 57J 788888r>   c                     |j         r?| j        r:|                     |||                   }| j                            |           d S d S d S rN   )is_lastr   _compute_lossr   r   )r:   stageoutput
target_mbsr   losss         r<   _maybe_compute_lossz%_PipelineSchedule._maybe_compute_lossq  sa    = 	/T/ 	/%%fj.BCCD!((.....	/ 	/ 	/ 	/r>   c                     d|cxk    ot          | j                  k     nc }|j        r| j        r|r| j        |         S t          | j                  dk    r|st	          d| d| j                   d S )Nr   zLoss for microbatch z6 is not available. Available losses for microbatches: )rX   r   r   r   r@   )r:   r   r   valid_indexs       r<   _maybe_get_lossz!_PipelineSchedule._maybe_get_lossv  s    8@@@@c$*?&@&@@@@@= 	T/ 	K 	(22&''1,,[,Nx N N6:6KN N  
 4r>   c                    t          |t                    s|g}t          d |D                       }|ry|wt          | j                  | j        k    r,t          d| j         dt          | j                             |                                 |                    | j                   | j                                         dS )zB
        Update the losses to those in the internal state
        c              3   $   K   | ]}|j         V  d S rN   )r   r`   r   s     r<   rb   z3_PipelineSchedule._update_losses.<locals>.<genexpr>  s$      !D!DE%-!D!D!D!D!D!Dr>   N
Expecting z losses but got )	
isinstancer   anyrX   r   r   r@   clearextend)r:   stageslossescontains_last_stages       r<   _update_lossesz _PipelineSchedule._update_losses  s    
 &$'' 	XF!!D!DV!D!D!DDD  		16#54())T-AAA"c!5ccs4K`GaGacc  
 LLNNNMM$/000##%%%%%r>   arg_mbs	kwarg_mbsr   r   c                     t           )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the schedule
        implementation.

        Args:
            microbatches: list of microbatch args.
        NotImplementedError)r:   r   r   r   r   s        r<   _step_microbatchesz$_PipelineSchedule._step_microbatches  s
      "!r>   targetr   c                    t           )  
        Run one iteration of the pipeline schedule with *whole-batch* input.
        Will chunk the input into microbatches automatically, and go through the
        microbatches according to the schedule implementation.

        args: positional arguments to the model (as in non-pipeline case).
        kwargs: keyword arguments to the model (as in non-pipeline case).
        target: target for the loss function.
        losses: a list to store the losses for each microbatch.
        r   )r:   r   r   argskwargss        r<   stepz_PipelineSchedule.step  s
     "!r>   c                     dt           f fd}| ||d           ndg j        z  }| ||d           ni g j        z  }| ||d           |4t          |t                    st	          dt          |                     ||fS )	z*
        Pre-process/check inputs
        namec           
          t          | t                    s!t          | dt          |                      t	          |           j        k    r*t          dj         d| dt	          |                      d S )Nz must be a list but got a r   rs   z	 but got )r   r   	TypeErrortyperX   r   r   )mbsr   r:   s     r<   check_type_and_lenz;_PipelineSchedule._check_inputs.<locals>.check_type_and_len  s    c4(( P4 N N499 N NOOO3xx4/// Q!5QQQQs3xxQQ   0/r>   Nr   rG   r   r   z losses must be a list but got a )rO   r   r   r   r   r   )r:   r   r   r   r   r   s   `     r<   _check_inputsz_PipelineSchedule._check_inputs  s    	# 	 	 	 	 	 	 w	2222dT11G y+6666t33I!z<888fd++ S Q4<< Q QRRR	!!r>   c                 .    |                      ||          S rN   )r   )r:   r   r   s      r<   r   z_PipelineSchedule._compute_loss  s    }}VV,,,r>   r   r   c                     |s|r)t          ||| j        | j        | j                  \  }}||fS dg| j        z  i g| j        z  fS )zj
        Splits a full-batch input into chunks (i.e. microbatches) and returns
        the chunks
        rG   )r   r   r   r   )r:   r   r   
args_splitkwargs_splits        r<   _split_inputsz_PipelineSchedule._split_inputs  so      	L6 	L'D$%'( ($J |++ 4$..t7K0KKKr>   output_chunksr\   c                 ,    t          || j                  S )z
        Merge output chunks back to a batch state.
        If output_merge_spec is None, the utility will merge output chunks by dimension 0 (batch dim).
        )r   r   )r:   r   s     r<   _merge_outputsz _PipelineSchedule._merge_outputs  s    
 #
 
 	
r>   NNNNrN   )rC   rD   rE   rW   r   r   torchTensorr   r   r	   rO   r   r   r   r   r   r   r   r
   r   r   r   r   r   r   rG   r>   r<   r   r   S  sd        :>AEBFIM9 99 (3#4569 "%(<"=>	9
 $Do)=$>?9 $E$sCx.%**D$EF9 9 9 9:/ / /

 
 
& & &.  #'$(%)!%" "$" D>" TN	"
 " " " ^"" !% " " "x~ " " " ^" #'$(%)!%$" $"$$" D>$" TN	$"
 $" $" $" $"L- - - ,0L LCHoL c3h(L L L L.
DI 
# 
 
 
 
 
 
r>   r   p2p_opsdescc                     t          |           dk    rdS |r| dnd}t                              d||            t          j        |                                           S )zt
    Simple wrapper over batch_isend_irecv from torch.distributed, which just adds a descriptive logger on top.
    r   Nz, rS   zbatch_p2p %s%s)rX   r   debugdistbatch_isend_irecvpop)r   r   desc_strs      r<   
_batch_p2pr     sb     7||qt"*${{{{H
LL!8W555!'**..000r>   c                    t          t                    }i }t          |           dk    r|S | D ]"}||j                                     |           #t          |                                          D ]\  }}t          ||          ||<   |S )z
    Sorts the list of P2P ops by the peer rank, and then calls
    batch_isend_irecv. Return a dictionary of works by peer rank. This function
    helps us avoid hangs in case of skip connections.
    r   r   )r   r   rX   peerr   r   itemsr   )r   r   ops_by_peerwork_by_peeropr  opss          r<   _sorted_batch_p2pr    s     0;4/@/@K)+L
7||q  ( (BG##B'''' K--//00 8 8	c'$777Tr>   c                        e Zd ZdZ	 	 	 	 ddededee         deee	df                  dee
ee	f                  d	eee
eef         ee         f                  f fd
Zddddee         fdZ xZS )r   z
    Base class for single-stage schedules.
    Implements the `step` method.
    Derived classes should implement `_step_microbatches`.
    Nr   r   r   r   .r   r   c                    t                                          |||||           || _        |j        | _        | j        | j        _        |                    |           | j        r|                    |           d S d S )Nr   r   r   r   r   )	superr   _stager   _num_stagesr   has_backward_prepare_forward_infra_prepare_backward_infra)r:   r   r   r   r   r   r   r   s          r<   r   zPipelineScheduleSingle.__init__1  s     	)+// 	 	
 	
 	
  +#'#5  	$$^444 	:)).99999	: 	:r>   r   r   c                H   | j                                          |                     ||          \  }}|(t          t	          j        || j                            }nd}|                     ||||           | j         j        r| 	                    | j         j
                  S dS r   N)r  clear_runtime_statesr   r   r   tensor_splitr   r   r   r   r   )r:   r   r   r   r   r   r   targets_splits           r<   r   zPipelineScheduleSingle.stepN  s     	((*** $(#5#5dF#C#C 
L  !3FD<P!Q!QRRMM M 	
L-PPP ; 	&&t{'@AAA4r>   r   )rC   rD   rE   __doc__r   rW   r   r   r   r   r	   rO   r   r   r   r
   r   __classcell__r   s   @r<   r   r   *  s         '+AEBFIM: :!: : (#	:
 "%(<"=>: $Do)=$>?: $E$sCx.%**D$EF: : : : : :: "&   x~        r>   r   c            
       d    e Zd ZdZ	 	 	 	 ddee         dee         dee         dee         fdZdS )	_ScheduleForwardOnlyzo
    The forward-only schedule.
    Will go through all the microbatches and perform only the forward pass
    Nr   r   r   r   c                 
   ||t          d          |                     ||||          \  }}g }t          | j                  D ]#}t	          d|           5  | j                            |          }t          |d          }|                                D ]}	|		                                 | j        
                    |||         ||                    | j                            |          }t          |d          }|                    |                                           ddd           n# 1 swxY w Y   t                              d| j        j        |           %|D ]}	|		                                 dS )z<
        Run one iteration of the pipeline schedule
        Nz7Forward-only schedule does not support loss computationForward fwd_recvr  fwd_send[%s] Forwarded microbatch %s)r@   r   r   r   r   r  get_fwd_recv_opsr  r   waitforward_one_chunkget_fwd_send_opsr   r   r   rJ   )
r:   r   r   r   r   fwd_sends_to_waitre   r  worksworks
             r<   r   z'_ScheduleForwardOnly._step_microbatchesv  s    !V%7I   "//JPVWW .0 t+,, 	U 	UA A00 
9 
9k22155)#J???!LLNN    DIIKKKK--aYq\JJJk22155)#J???!((888
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 LL79PRSTTTT
 & 	 	DIIKKKK	 	s   CD77D;	>D;	r   rC   rD   rE   r  r   r
   r   rG   r>   r<   r  r  p  s{          #'$(%)!%( ($( D>( TN	(
 ( ( ( ( ( (r>   r  c            
       d    e Zd ZdZ	 	 	 	 ddee         dee         dee         dee         fdZdS )	r   z^
    The GPipe schedule.
    Will go through all the microbatches in a fill-drain manner.
    Nr   r   r   r   c                 ,   |                      ||||          \  }}g }t          | j                  D ]@}t          d|           5  | j                            |          }t          |d          }|                                D ]}	|	                                 | j        	                    |||         ||                   }
| j        
                    |          }t          |d          }|                    |                                           ddd           n# 1 swxY w Y   t                              d| j        j        |           |                     | j        |
||           B|D ]}	|	                                 | j        sdS g }t          | j                  D ]2}t          d|           5  | j                            |          }t          |d          }|                                D ]}	|	                                 |                     | j        |          }| j                            ||	           | j                            |          }t          |d
          }|                    |                                           ddd           n# 1 swxY w Y   t                              d| j        j        |           4|                     | j        |           |D ]}	|	                                 dS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the GPipe schedule.

        Args:
            microbatches: list of microbatch args.
        r  r  r  r  Nr   z	Backward bwd_recvr   bwd_sendz[%s] Backwarded microbatch %s)r   r   r   r   r  r!  r  r   r"  r#  r$  r   r   r   rJ   r   r   get_bwd_recv_opsr   backward_one_chunkget_bwd_send_opsr   )r:   r   r   r   r   r%  re   r  r&  r'  r   bwd_sends_to_waitr   s                r<   r   z ScheduleGPipe._step_microbatches  s    "//JPVWW .0 t+,, 	I 	IA A00 
9 
9k22155)#J???!LLNN    DIIKKKK66q'!*iPQlSSk22155)#J???!((888
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 
9 LL79PRSTTT$$T[&*aHHHH
 & 	 	DIIKKKK ! 	F .0t+,, 	V 	VA Q11 9 9k22155)#J???!LLNN    DIIKKKK++DK;;..qt.<<<k22155)#J???!((8889 9 9 9 9 9 9 9 9 9 9 9 9 9 9 LL8$+:QSTUUUU 	DK000 & 	 	DIIKKKK	 	s&   CD$$D(	+D(	C J--J1	4J1	r   r(  rG   r>   r<   r   r     s          #'$(%)!%G G$G D>G TN	G
 G G G G G Gr>   r   c            
       d    e Zd ZdZ	 	 	 	 ddee         dee         dee         dee         fdZdS )	r   zo
    The 1F1B schedule.
    Will perform one forward and one backward on the microbatches in steady state.
    Nr   r   r   r   c                 :   |                      ||||          \  }}t          | j        | j        | j        j        z
            }d}d}d}d}	g }
t          |          D ]}| j                            |          }t          |d          x}r|	                                 | j        
                    |||         ||                   }|	r|		                                 | j                            |          }
||dz
  k    rt          |
d          }	|                     | j        |||           |dz  }	 | j                            |          }t          |
|z   d          x}r|	                                 |                     | j        |          }| j                            ||	           | j                            |          }|dz  }|| j        k    rn| j                            |          }t          ||z   d
          x}r|	                                 | j        
                    |||         ||                   }|                     | j        |||           | j                            |          }
|dz  }Pt          |d          }	|| j        k     r| j                            |          }t          |d          x}r|	                                 |                     | j        |          }| j                            ||	           |	r|		                                 | j                            |          }t          |d          }	|dz  }|| j        k     |	r|		                                 |                     | j        |           dS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the 1F1B schedule.

        Args:
            microbatches: list of microbatch args.
        r   Nr  r  r   r  Tfwd_send_bwd_recvr,  bwd_send_fwd_recvr-  r+  )r   minr   r  r  rJ   r   r!  r   r"  r#  r$  r   r.  r   r/  r0  r   )r:   r   r   r   r   warmup_chunksfwd_mb_indexbwd_mb_indexweight_stage_mb_index	send_work	fwd_sends_	fwd_recvs	recv_workr   	bwd_recvs	fuse_workr   	bwd_sendss                      r<   r   zSchedule1F1B._step_microbatches  s    "//JPVWW  t{66
 
  ! 		}%% 	 	A44\BBI&yzBBBBy !    [22<AVXabnXoppF  !    44\BBI}q000&yzBBB	
 $$T[&*lSSSALL
#	44\BBI 'y9'<CVWWWWy !    ''\BBDK**<d*CCC 44\BBIALt333 44\BBI 'y9'<CVWWWWy !    [22<AVXabnXoppF $$T[&*lSSS 44\BBIALG#	L yz:::	 T11144\BBI&yzBBBBy !    ''\BBDK**<d*CCC  !    44\BBI"9:>>>IAL# T111(  	NN 	DK00000r>   r   r(  rG   r>   r<   r   r     s          #'$(%)!%1 1$1 D>1 TN	1
 1 1 1 1 1 1r>   r   r&   compute_actionsmax_active_stagesc                   
 dt           dt          t          t                            dt          t                    fd}t	                      
g dt           f
fd}dt           f
fd}t          |           D ]\  }}| ||| |d                   t          t          
fd	                    }t          t          fd

                    }|D ]}	 ||	           |D ]}	 ||	                               |           S )aR  Given a basic schedule involving only compute actions (F,B,W), add UNSHARD/RESHARD actions for FSDP.

    UNSHARD refers to fetching the full contents of an FSDP-sharded layer, requiring an all-gather operation.
    RESHARD does the opposite, releasing memory (but doing no commmunication)

    We abandon the "timestep lock"  during lowering

    max_active_stages controls how many prefetches we allow. It should be measured in mb and tuneable but in practice
    3 stages is probably the thing we want?
    (to account for having one f and one b active, and something else prefetching?)
    countnext_actionsr\   c                     t                      }g }|D ]V}|R|j        |vrI|                    |j                   |                    |j                   t	          |          | k    r nW|S )zdRemove duplicates (same stage, different microbatch), find next 'count' stages that will do compute.)r   rJ   r   r   rX   )rF  rG  seenretas        r<   next_stage_indicesz0_add_unshard_reshard.<locals>.next_stage_indices  sv      	 	A}d!:!:'''

1=)))s88u$$E
r>   rJ   c                                          |                                t          | t          d                      d S rN   )r   r   rI   r1   rJ   active_stagesfsdp_aware_actionss    r<   _unshardz&_add_unshard_reshard.<locals>._unshard  s=    +&&&!!'+w"E"EFFFFFr>   c                                          |                                t          | t          d                      d S rN   )remover   rI   r2   rN  s    r<   _reshardz&_add_unshard_reshard.<locals>._reshard  s=    [)))!!'+w"E"EFFFFFr>   Nc                     | vS rN   rG   )srO  s    r<   <lambda>z&_add_unshard_reshard.<locals>.<lambda>  s    a}&< r>   c                     | vS rN   rG   )rV  next_ns    r<   rW  z&_add_unshard_reshard.<locals>.<lambda>  s    avo r>   )	rW   r
   r   rI   r   r}   r   filterr   )rC  rD  rL  rQ  rT  re   rA   fetchevictr   rO  rP  rY  s             @@@r<   _add_unshard_reshardr]  y  s    "&x'8"9	c    "eeM(*Gc G G G G G G GGc G G G G G G G // * *	6> $#$5qrr7JKKV<<<<fEEFFV5555}EEFF  	 	EHUOOOO 	 	EHUOOOO!!&))))r>   stage_to_rankc                 <   d | D             }dt           dt          ffddt           dt          t           t           f         ffd}dt          t                    dt          t                    dt          ffd}| rd}t          t          |                     D ]}t          | |                   d	k    sJ | |         d	         } ||||                   s>|x||                             |            |          rR ||          \  }	}
||                             |	           | ||
j                                               |
           | |         	                    d	           t          | |                   d	k    r| |= d
}|s
J d            | |S )Nc                     i | ]}|g S rG   rG   )r`   r   s     r<   
<dictcomp>z"_add_send_recv.<locals>.<dictcomp>  s    -S-S-S4dB-S-S-Sr>   rA   r\   c                 z    | j         t          k    r| j        dz
  k    S | j         t          k    r| j        dk    S dS )Nr   r   F)rK   r.   rJ   r/   )rA   r   s    r<   
_has_commsz"_add_send_recv.<locals>._has_comms  sD    "a''%a77$))%**ur>   c                 0    |           sJ |  d            | j         }| j        }| j        }t          ||t          k    rt
          nt          |          }|t          k    r|dz   n|dz
  }t          ||t          k    rt          nt          |          }||fS )Nz is not a valid comm actionr   )	rJ   rK   rL   rI   r.   r3   r5   r4   r6   )rA   	stage_idxctypemb_idxsendrecv_stage_idxrecvrc  s          r<   
_get_commsz"_add_send_recv.<locals>._get_comms  s    z&!!IIf#I#I#IIII&	'(yEQJJ&&FFKK*/1**Q)a-~!vvPPTzr>   prev_actionsc                 n   | dS | j         t          k    rF| j        dk    s;t          | j        | j         t          k    rt          nt
          | j                  }||v S | j         t          k    rI| j        dz
  k    s;t          | j        | j         t          k    rt          nt
          | j                  }||v S dS )a  We don't put our own recv ops in the schedule, we let a sender on another rank put our recv ops in place.
        This helps ensure a sane (non-hanging) ordering of sends and recvs.
        But it also means we might not be able to schedule our next compute action yet.
        NTr   r   )rK   r.   rJ   rI   r4   r6   rL   r/   )rA   rl  expected_recvr   s      r<   _ready_to_schedulez*_add_send_recv.<locals>._ready_to_schedule  s     >4$))&2D2I2I#" 1Q66F' M
 !L00$))&2D
UV2V2V#" 1Q66F' M
 !L004r>   Fr   Tz6Malformed compute schedule, can't schedule sends/recvs)
rI   boolr   r   r
   r   rX   r   rJ   r   )rC  r^  r   comm_actionsrk  ro  progressr   rA   rh  rj  rc  s     `        @r<   _add_send_recvrs    s"   
 .T-S?-S-S-SL7 t      7 uWg-='>      !15g	     4  R#o..// 	 	Dt,--1111$T*1-F%%fl4.@AA !T"))&111:f%% O!+F!3!3JD$ !&--d333 t/?!@!@AHHNNND!%%a(((?4())Q..#D)HHQQQQQQ/  R0 r>   c                   |    e Zd ZdZ	 	 	 	 	 	 ddee         dedee         dee	e
df                  d	eeee
f                  d
eeeeef         e	e         f                  deeeef                  def fdZd Zd ZddZddddee         fdZ	 	 	 	 ddee         dee         dee         dee         fdZ xZS )r   zQ
    Base class for multi-stage schedules.
    Implements the `step` method.
    NTr   r   r   r   .r   r   stage_index_to_group_rankuse_full_backwardc	                 P    t          |          dk    rt          dt          |                     t                                          |||||           | _        |d         j         _        |d         j         _        |d         j	         _
        | j        D ]	}	||	_        
|d         j         _         j        D ]}	 j        |	_         fd _        i  _        | _         j        D ]3}	|	                    |            j        r|	                    |           4d S )Nr   z9Multi-stage schedule expects at least two stages but got r  r   c                 $    | j         oj        d uS rN   )r   r   )r   r:   s    r<   rW  z0PipelineScheduleMulti.__init__.<locals>.<lambda>9  s    %-EDM,E r>   )rX   r   r  r   _stagesr   r  
group_sizepp_group_size
group_rankr   ru  r   r  _should_compute_lossr[   rv  r  r  )r:   r   r   r   r   r   r   ru  rv  r   r   s   `         r<   r   zPipelineScheduleMulti.__init__  sr    v;;!YCPVKKYY   	)+// 	 	
 	
 	
 !!9/#AY11I(	$0 L L2K//)/)L& \ 	4 	4E!%!3E FEEE 	!
 CE!2 \ 	> 	>E((888! >--n===	> 	>r>   c                     t          |dd          5 }t          j        |          }| j        D ]"}|                    | j        |                    #	 ddd           dS # 1 swxY w Y   dS )zQDump a CSV representation of the schedule into a file with the provided filename.wrS   newlineN)opencsvwriterr[   writerowr:   filenamecsvfiler  r   s        r<   	_dump_csvzPipelineScheduleMulti._dump_csvG  s    (C,,, 	;Z((F+ ; ; 3D 9::::;	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	; 	;s   ?A  A$'A$c                     dt           t          t          t          d z           f         dt          dt          f fd}t	           j                   j        k    s'J d j         dt	           j                               t           j                  D ]}| j        v sJ d|              | j         j         j	                   d S )Nra   r   r   c                 b   d t          |          D             }| D ]=}| |         D ]0}|t          |t                    sJ d| d            |j        }|j        }|j        }|t          k    r'||         t                                       |           p|t          k    rM|||         t                   v sJ d| d| d            ||         t                                       |           |t          k    r]
j
        r
J d            |||         t                   v sJ d| d| d	            ||         t                                       |           2?|D ]P}t          t          t          fD ]9}t          ||         |                   }	|	|k    sJ d
|	 d| d| d|             :Qd S )Nc           
          i | ]>}|t           t                      t          t                      t          t                      i?S rG   )r.   r   r/   r0   )r`   stage_ids     r<   ra  z\PipelineScheduleMulti._validate_schedule.<locals>._validate_rank_actions.<locals>.<dictcomp>W  sP     E E E  suusuusuuE E Er>   zGot an invalid action: z, expected instance of _ActionzRunning Backward for stage z, microbatch z without first running ForwardzESchedule contains 'W' actions, but is configured to use full backwardzRunning Weight for stage z without first running BackwardzGot rs   z microbatches for stage z, expected )r   r   rI   rJ   rK   rL   r.   r   r/   r0   rv  rX   )ra   r   r   stage_actionsr   rA   s_idrf  mb_idstage_mbr:   s             r<   _validate_rank_actionszHPipelineScheduleMulti._validate_schedule.<locals>._validate_rank_actionsP  sd   E E !&j 1 1E E EM   : :%dm : :F~ %  X XWWWWX X X "-D"3E"3Ezz%d+A.2259999!!]4%8%;;;;qqqEqqq <;;%d+A.2259999! $ 6c cbc c c "]4%8%;;;;ptpp%ppp <;;%d+A.225999/:2 & l lAY l lE"=#6u#=>>H $4444khkkkkkkYikk 5444ll lr>   z2Schedule has incorrect number of ranks - expected z	, actual z%Schedule is missing actions for rank )
r	   rW   r
   rI   rX   r[   r{  r   r  r   )r:   r  r   s   `  r<   _validate_schedulez(PipelineScheduleMulti._validate_scheduleN  s   .	l#tGdN334.	l.	l ".	l .	l .	l .	l .	l .	lb #$$(::::w@Rww]`aeat]u]uww ;::$,-- 	> 	>D++++=t== ,+++ 	
 	
 	
 	
 	
r>   compute_onlyc                    |dk    sJ t          |d          5 }t          j        |          }t          |          D ]\  }}d |D             | j        |<   	 ddd           n# 1 swxY w Y   |                                  dS )zLoad a CSV representation of the schedule from a file with the provided filename.
        This API will most likely get renamed/refactored so is marked as internal for now.

        format must be "compute_only" for PipelineScheduleMulti
        r  rS   r  c                 B    g | ]}t                               |          S rG   rI   rB   r`   rV  s     r<   rg   z3PipelineScheduleMulti._load_csv.<locals>.<listcomp>  s&    ,N,N,NQW-=-=a-@-@,N,N,Nr>   N)r  r  readerr}   r[   r  )r:   r  formatr  r  r   r~   s          r<   	_load_csvzPipelineScheduleMulti._load_csv  s     ''''(B''' 	O7Z((F&v.. O O	c,N,N#,N,N,N#D))O	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	O 	!!!!!s   >A%%A),A)r   r   c                V   | j         D ]}|                                 |                     ||          \  }}|(t          t	          j        || j                            }nd}|                     ||||           | j         D ]%}|j        r| 	                    |j
                  c S &dS r  )ry  r  r   r   r   r  r   r   r   r   r   )	r:   r   r   r   r   r   r   r   r  s	            r<   r   zPipelineScheduleMulti.step  s     \ 	) 	)E&&(((( $(#5#5dF#C#C 
L  !3FD<P!Q!QRRMM M 	
L-PPP \ 	@ 	@E} @**5+>?????@ tr>   r   r   r   c           	      H	   |                      ||||          \  }}d | j        D             }t                      }t                      }|                                D ]\}|dk    r#|                    | j        |dz
                      || j        dz
  k     r#|                    | j        |dz                       ]t          | j        | j	                           D ]\  }	}
	 g }|
o|
j
        }|
j        }|
j        }|
J d            |t          j        k    rl||         }|                    |||         ||                   }|                     ||||           |                    |                    |                     n|t          j        k    rd||         }|                     ||          }|                    ||| j                   |                    |                    |                     n^|t          j        k    r<| j        rt3          d| j                  ||         }|                    |           nt3          d|           |D ]}| j        |         }d}|	t7          |          k     r||	         }||j
        }|j        }|j        }|
J d            |t          j        k    r;|dz   |v r3||dz            }|                    |                    |                     |t          j        k    s|t          j        k    rt3          d|           |D ]}| j        |         }d}|	t7          |          k     r||	         }||j
        }|j        }|j        }|
J d            |t          j        k    s|t          j        k    rp|t          j        k    r;|dz
  |v r3||dz
           }|                    |                    |                     t3          d|           |r!t=          |                                           @# t@          $ ra}tB          "                    d	| j	        | j#        j$        |	|
           tB          "                    d
tK          | j                             |d}~ww xY w| &                    | j        |           dS )
        Operate on the microbatches for looped schedules (multiple stages on each rank).

        TODO: Does not use sorted_batch_isend_irecv(). As a result, this schedule does
        not support models with skip connections.
        c                     i | ]
}|j         |S rG   rJ   r   s     r<   ra  z<PipelineScheduleMulti._step_microbatches.<locals>.<dictcomp>  *     ?
 ?
 ?
).Eu?
 ?
 ?
r>   r   r   Nr   r   full_backwardzqWe detected a weight update in the pipeline schedule, but                                 self.use_full_backward=zUnknown computation type zy[Rank %s] pipeline schedule %s caught the following exception                      at time_step %s when running action %sz%s)'r   ry  r   keysr   ru  r  r}   r[   r   rK   rL   rJ   r$   r7   r#  r   r   r$  r8   r   r/  rv  r0  r9   r   backward_weight_one_chunkrX   r!  r.  r   r"  	Exceptionr   errorr   rC   r   r   )r:   r   r   r   r   stage_index_to_stageall_prev_ranksall_next_ranksrJ   	time_steprA   r  rK   r   r   r   r   	prev_rankprev_rank_opsprev_rank_action	next_ranknext_rank_opsnext_rank_actiones                           r<   r   z(PipelineScheduleMulti._step_microbatches  s    "//JPVWW?
 ?
26,?
 ?
 ?
 $'55#&55/4466 	T 	TKQ""4#A+PQ/#RSSST-111""4#A+PQ/#RSSS!*4+>ty+I!J!J r	 r	Ivq(*%'-'>$%6H"("4K ,,\ -,,'+;+CCC 4[ A!&!8!8$gh&789L" " 00
HUUU

5#9#9(#C#CDDDD)-=-FFF 4[ A#33E8DD00$4t?U 1    

5#9#9(#C#CDDDD)-=-DDD1 ",!;!%!7!; !;# #  !5[ A77AAAA()WEU)W)WXXX "0  I$($7	$BM'+$ 3}#5#555+8+C('3+;+L(#3#D&6&B$00` 100 ,/?/GGG*Q2FFF )=[1_(M #

5+A+A(+K+K L L L,0@0III/3C3JJJ !", N<L N N# # - 42 "0  I$($7	$BM'+$ 3}#5#555+8+C('3+;+L(#3#D&6&B$00` 100 -0@0HHH/3C3JJJ !-1A1JJJ*Q2FFF )=[1_(M #

5+A+A(+K+K L L L", N<L N N# # - 46  +sOO((*** 
 
 
=IN+   T#9$:M#N#NOOO
 	DL&11111s    L7P
R#AQ??R)NNNNNTr  r   )rC   rD   rE   r  r
   r   rW   r   r   r   r   r	   rO   r   r   rp  r   r  r  r  r   r   r  r  s   @r<   r   r     s         '+AEBFIM>B"&3> 3>'(3> 3> (#	3>
 "%(<"=>3> $Do)=$>?3> $E$sCx.%**D$EF3> $,DcN#;3>  3> 3> 3> 3> 3> 3>j; ; ;=
 =
 =
~" " " " "& ! ! !x~ ! ! ! !J #'$(%)!%T2 T2$T2 D>T2 TN	T2
 T2 T2 T2 T2 T2 T2 T2 T2r>   r   c            
            e Zd ZdZ	 ddeeeee                  f         de	fdZ
dde	de	f fdZde	fdZ	 	 	 	 dd
ee         dee         dee         dee         fdZ xZS )_PipelineScheduleRuntimea%  
    Provides a simple runtime that requires a 'schedule IR' including specified communication operations.

    Can be instantiated directly by creating _PipelineScheduleRuntime and calling load_csv, or can be
    subclassed and the subclass can be responsible for creating a schedule IR.
    r  ra   r  c                      j         
J d            i  _        |dk    r@|D ];}g  j        |<   ||         D ]&}|J  j        |                             |           '<dS |dk    rH|D ]}t          ||                    j        |<    t	           j         fd j                   _        dS t          d|d          )	z
        Given an in-memory representation for a simple compute-only schedule, lower it to a complex schedule including
        communication actions.  Stores the schedule in self, and must be called before running step_mo()
        NzAstage_index_to_group_rank is required for PipelineScheduleRuntimecompute_commsr  c                     j         |          S rN   )ru  )rV  r:   s    r<   rW  z8_PipelineScheduleRuntime._load_actions.<locals>.<lambda>y  s    (Fq(I r>   )r^  r   format= is not implemented)ru  pipeline_order_with_commsr   r]  rs  r  r   )r:   ra   r  r   rA   s   `    r<   _load_actionsz&_PipelineScheduleRuntime._load_actions\  s2    *66N 766CE&_$$ H H79.t4%dm H HF!---248??GGGGHH H ~%%  7KDM8 8.t44 .<.IIII+. . .D*** &&E&E&E&EFFFr>   r  c                    |dk    r=t                                          |           |                     | j                   dS |dk    r}i }t	          |d          5 }t          j        |          }t          |          D ]\  }}d |D             ||<   |                     ||           ddd           dS # 1 swxY w Y   dS t          d|d	          )
a  Loads a csv in simple format and then lowers it to include comunication actions

        format must be either "compute_only" or "compute_comms".  If compute_only, the lowering passes
        will automatically be run to generate a compute_comms schedule.
        r  r  rS   r  c                 B    g | ]}t                               |          S rG   r  r  s     r<   rg   z6_PipelineScheduleRuntime._load_csv.<locals>.<listcomp>  s&    $F$F$FQW%5%5a%8%8$F$F$Fr>   )r  Nr  r  )	r  r  r  r[   r  r  r  r}   r   )	r:   r  r  ra   r  r  r   r~   r   s	           r<   r  z"_PipelineScheduleRuntime._load_csv  sC    ^##GGh'''t233333&&Gh+++ ;wG,,!*6!2!2 G GID#$F$F#$F$F$FGDMM""76":::	; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; ; &&E&E&E&EFFFs   AB;;B?B?c                     | j         
J d            t          |dd          5 }t          j        |          }| j         D ]"}|                    | j         |                    #	 ddd           dS # 1 swxY w Y   dS )zaDump a CSV representation of the compute + comms schedule into a file with the provided filename.Nz6Must initialize compute_comms schedule before dump_csvr  rS   r  )r  r  r  r  r  r  s        r<   r  z"_PipelineScheduleRuntime._dump_csv  s    
 *66C 766(C,,, 	FZ((F6 F F >t DEEEEF	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	Fs   ?A11A58A5Nr   r   r   r   c                 	   |                      ||||          \  }}d | j        D             }| j        
J d            i }i }g }i t                      dt          ffd}	t          | j        | j                           D ]\  }
}	 |j        }|j        |j        nd}|dk    s|t          t          fv sJ d|d	            |j        }||         }t          |j        t                    }t                              d
|
|           |t"          k    r7|                    t'          |                    |                               n|t*          k    r7|                    t'          |                    |                               n|t.          k    r9||f|vs
J d            t'          |                    |                    |||f<   nZ|t2          k    r9||f|vs
J d            t'          |                    |                    |||f<   n|t          k    r8|r4|vr|vsJ d|d            |j                            d          |<   n|t          k    rA|r=|v sJ d|d            |vsJ d|d            |j                                         n|t:          k    r|r |	|           |j        s=||f|v sJ d|d            |                    ||f                                            |!                    |||         ||                   }| "                    ||||           n|tF          k    r|r |	|           |j$        s=||f|v sJ d|d            |                    ||f                                            | %                    ||          }|&                    ||| j'                   n_|tP          k    rA|r |	|           | j'        rtS          d| j'                  |*                    |           ntS          d|d          # tV          $ rD}t          ,                    d|
|           t[          t]          | j                             |d}~ww xY wt_          |          r5|                                                                  t_          |          5t_                    dk    s
J d            | 0                    | j        |           dS )r  c                     i | ]
}|j         |S rG   r  r   s     r<   ra  z?_PipelineScheduleRuntime._step_microbatches.<locals>.<dictcomp>  r  r>   Nz=Must call _load_actions() before calling _step_microbatches()re  c                     | v r2|                                            | =                     |            | v sJ d|             dS )zQIf an unshard is active for `stage_idx`, wait() it and mark `stage_idx` unshared.z*Attempted to compute on sharded stage_idx=N)r"  r   )re  unshard_opsunsharded_stagess    r<   _assert_unshardedzF_PipelineScheduleRuntime._step_microbatches.<locals>._assert_unsharded  sl    K''I&++---	* $$Y///---->)>> .----r>   r   zaction=z missing mb_indexz8_PipelineScheduleRuntime running time_step %d, action %szARecv twice for {stage_idx=} {mb_index=} without executing forwardzBRecv twice for {stage_idx=} {mb_index=} without executing backwardzUnsharding the same stage_idx=z twiceT)async_opzResharding stage_idx=z without unshardingz before finishing unshardzComputing action=z before receiving inputz Attempted to run compute action=r  zmWe detected a weight update in the pipeline schedule, but                             self.use_full_backward=z is unknown or unsupportedz\_PipelineScheduleRuntime caught exception at step %s when running action %s.  Full Schedule:zUnused unshard operations)1r   ry  r  r   rW   r}   r   rK   rL   r1   r2   rJ   r   submodr   r   r   r3   r   r   r$  r5   r0  r4   r!  r6   r.  unshardreshardr7   is_firstr   r"  r#  r   r8   r   r   r/  rv  r9   r   r  r  r  printr   rX   r   )r:   r   r   r   r   r  bwd_recv_opsfwd_recv_opssend_opsr  r  rA   	comp_typer   re  r   stage_uses_fsdpr   r   r  r  r  s                       @@r<   r   z+_PipelineScheduleRuntime._step_microbatches  s    "//JPVWW?
 ?
26,?
 ?
 ?

 *66J 766 5746  " 1355	? 	? 	? 	? 	? 	? 	? 	? "+4+I$)+T!U!U r	 r	Ivq"3	 .: ++ 
  1}}	6 ) ) ) 1f000) ) ) #.	,Y7",U\:"F"FN   &&OOJu/E/Eh/O/O$P$PQQQQ&((OOJu/E/Eh/O/O$P$PQQQQ&((!  ** * * ,o* * * ;E..x88; ;L)X!677 &((!  ** * * ,p* * * ;E..x88; ;L)X!677 '))& U%-=== ) < < <D)DDD != < <161E1Et1E1T1TI.'))& /%)9999HHHH :99 &[888NNNN 988,,...'))& 5)))444 > G%$  * *  *  * ,Y+X+X+X *  *  * %(()X)>??DDFFF"44 '("3Yx5H F ,,UFJQQQQ(**& 5)))444 = G%$  * *  *  * YXXX	 *  *  * %(()X)>??DDFFF//x@@D,, t4;Q -     &((& 5)))444- (7!37 7   33H====$%K%K%K%KLLL 	 	 	r   ,T-KLLMMM	 (mm 	"LLNN!!! (mm 	" ;1$$$&A$$$ 	DL&11111s   
N"P..
Q<8?Q77Q<r  r   )rC   rD   rE   r  r	   rW   r
   r   rI   rO   r  r  r  r   r  r  s   @r<   r  r  T  s9         %!G !Gc4 1223!G !G !G !G !GFG G# Gs G G G G G G*
F# 
F 
F 
F 
F #'$(%)!%i2 i2$i2 D>i2 TN	i2
 i2 i2 i2 i2 i2 i2 i2 i2r>   r  c                        e Zd ZdZ	 	 d	dee         dedee         dee	e
eef         ee         f                  f fdZd Z xZS )
r!   ai  
    Breadth-First Pipeline Parallelism.
    See https://arxiv.org/abs/2211.05953 for details.
    Simliar to Interleaved 1F1B, Looped BFS supports multiple stages per rank.
    What is different is that when microbatches are ready for multiple local
    stages, Loops BFS will prioritizes the earlier stage, running all available
    microbatches at once.
    Nr   r   r   r   c                     t                                          ||||           i | _        t          | j                  D ]!}|                     |          }|| j        |<   "d S )N)r   r   r   r   )r  r   r[   r   r{  !_calculate_single_rank_operations)r:   r   r   r   r   r   rank_opsr   s          r<   r   zScheduleLoopedBFS.__init__V  s     	)/	 	 	
 	
 	
 CE$,-- 	1 	1D==dCCH(0D%%	1 	1r>   c           	      j   t          | j                  }t          || j        |z  | j                  }g }t          |          D ]}|                    d            |D ]G}t          | j                  D ]0}|                    t          |t          j        |                     1Hd| j        dz
  |z
  z  }|	                    d g|z             t          |          D ]T}t          t          | j                            D ]0}|                    t          |t          j        |                     1U|S )Nr%   r   )rX   ry  r   r{  r   r   rI   r$   r7   r   reversedr8   )	r:   r   n_local_stagesstage_indicesr  r=  rJ   r   post_warmup_opss	            r<   r  z3ScheduleLoopedBFS._calculate_single_rank_operationsm  sd   T\**$$~5t7I
 

 -/t 	" 	"AOOD!!!!( 	 	K!$"677  K)9)A8LL    t1A5<=0111#M22 	 	K$U4+?%@%@AA  K)9)BHMM    r>   )NN)rC   rD   rE   r  r
   r   rW   r   r   r   r	   rO   r   r   r   r  r  r  s   @r<   r!   r!   L  s          '+IM1 1'(1 1 (#	1
 $E$sCx.%**D$EF1 1 1 1 1 1.      r>   r!   c
                    t          t                    }
t          t                    }t          t                    }g }t          |          D ]}|                    d            | |z  d|dz
  |z
  z  z   ||z   z
  }|	r||z
  dz
  }||z   |z   }g }d}t          |          D ]f}||k     rl ||          }|
|         x}dz   |
|<   |                    t	          |t
          j        |                     ||dz
  k    r|                    d g|z             u||cxk    r
||z   k     rn n ||          }|
|         x}dz   |
|<   |                    t	          |t
          j        |                      ||          }||         x}dz   ||<   |                    t	          |t
          j        |                     |                    |           |	r]||z
  |k    rT |||                   }||         x}dz   ||<   |                    t	          |t
          j	        |                     |dz  }|	s|                    d             ||          }||         x}dz   ||<   |                    t	          |t
          j        |                     |                    |           |	r]||z
  |k    rT |||                   }||         x}dz   ||<   |                    t	          |t
          j	        |                     |dz  }h|	r||t          |          k     ri |||                   }||         x}dz   ||<   |                    t	          |t
          j	        |                     |dz  }|	r|t          |          k     i|S )Nr%   r   r   )r   rW   r   r   rI   r$   r7   r   r8   r9   rX   )r  r{  
warmup_opsfwd_bwd_opscooldown_opsr   forward_stage_indexbackward_stage_indexnum_1f1b_microbatchesr   fwd_stage_mb_indexbwd_stage_mb_indexr:  r  r=  r  	total_opsbackward_op_idsweight_op_countr  fwd_stage_indexr   r8  bwd_stage_indexr9  weight_stage_indexweight_mb_indexs                              r<   _get_1f1b_rank_opsr    s    *5S)9)9)4S)9)9,7,<,< )+H4[[   	&ma.?$.F)GG	d	O  3'$.2[(<7IOOI I% I%
??11"55O /??3/ OO)9)A8LL   Z!^## 899928888
[ 88888811"55O 2? CC3/ OO)9)A<PP   32266O 2? CC3/ OO)9)BLQQ   ""2&&&! %b:o9N&N&N%9%9#O4& &" (==O'PPO=%&89 *,<,C_   
  1$
 & &%%%22266O 2? CC3/ OO)9)BLQQ   ""2&&&! %b:o9N&N&N%9%9#O4& &" (==O'PPO=%&89 *,<,C_   
  1$
 33G3G!G!G11//2RSS45GHHO501 	&(8(?QQ	
 	
 	
 	1  33G3G!G!G Or>   c                        e Zd ZdZ	 	 	 	 ddee         dedee         dee	e
df                  deeee
f                  d	eeeeef         e	e         f                  f fd
Zdeee                  fdZ xZS )r    ak  
    The Interleaved 1F1B schedule.
    See https://arxiv.org/pdf/2104.04473 for details.
    Will perform one forward and one backward on the microbatches in steady
    state and supports multiple stages per rank. When microbatches are ready for
    multiple local stages, Interleaved 1F1B prioritizes the earlier microbatch
    (also called "depth first").
    Nr   r   r   r   .r   r   c                    |d         j         | _        || j        z  dk    rt          d| d| j         d          t                                          ||||||           t          |          | _        |d         j        | _        |d         j	        | _	        i | _
        t          | j                  D ]!}|                     |          }|| j
        |<   "d S )Nr   z?Interleaved 1F1B schedule requires the number of microbatches (zD)                 to be a multiple of the number of pipeline ranks (z).)r   r   r   r   r   r   )rz  r{  r   r  r   rX   r  r|  r   groupr[   r   r  )
r:   r   r   r   r   r   r   r   r  r   s
            r<   r   z ScheduleInterleaved1F1B.__init__  s    $AY1D..!33ZR` Z ZCGCUZ Z Z  
 	)+// 	 	
 	
 	
 "&kk1I(	AY_

 CE$,-- 	1 	1D==dCCH(0D%%	1 	1r>   r\   c           
         	  fd} |          	 j          j        z  }|	z
  }||z
  }	|z   |z   }t                              d	|||            fd} 	fd}t	           j          j        	||||          S )Nc                     j         dz
  j        z  }|dj        dz
  | z
  z  z   }t          |j        j         z            S Nr   r%   )r  r{  r6  r   )r   warmups_ops_last_stager  r:   s      r<   get_rank_warmup_opszVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.get_rank_warmup_ops?  sQ    &*&9A&=AS%S"/!8JQ8NRV7V2WWJz4#7$:M#MNNNr>   =rank %s, warmup_ops %s, 1f1b %s, cooldown_ops %s total_ops %sc                 B    | j         z  j        z  }|j         z  z   S rN   )r{  r  r   local_indexr   r:   s     r<   r  zVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.forward_stage_index[  s+    4#559LLK$"44<<r>   c                 ^    j         dz
  | z
  j        z  j         z  z
  }|j        z  z   S Nr   )r  r{  r   r  r   r:   r  s     r<   r  zWScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.backward_stage_index`  sI    #:%$*<<@SST 
  $"44<<r>   )r  r   r   r   r  r{  )
r:   r   r  microbatch_opsr  r  r  r  r  r  s
   ``       @r<   r  z9ScheduleInterleaved1F1B._calculate_single_rank_operations>  s   	O 	O 	O 	O 	O )(..
,t/CC$z1%3,|;	 	K	
 	
 	
	= 	= 	= 	= 	= 	=
	= 	= 	= 	= 	= 	= 	= " 	
 	
 		
r>   r   )rC   rD   rE   r  r
   r   rW   r   r   r   r   r	   rO   r   r   r   rI   r  r  r  s   @r<   r    r      s          '+AEBFIM%1 %1'(%1 %1 (#	%1
 "%(<"=>%1 $Do)=$>?%1 $E$sCx.%**D$EF%1 %1 %1 %1 %1 %1N3
hw>O9P 3
 3
 3
 3
 3
 3
 3
 3
r>   r    c                        e Zd ZdZ	 	 	 	 	 ddee         dedee         dee	e
df                  d	eeee
f                  d
eeeeef         e	e         f                  def fdZdeee                  fdZd Z xZS )r   a  
    The Flexible Interleaved 1F1B schedule.

    This schedule is mostly similar to the interleaved 1F1B schedule.
    It differs by being relaxing the requirement of num_microbatch % pp_size == 0.
    Using the flex_pp schedule, we will have num_rounds = max(1, n_microbatches // pp_group_size) and
    it works as long as n_microbatches % num_rounds is 0. As a few examples, support

    1. pp_group_size = 4, n_microbatches = 10. We will have num_rounds = 2 and n_microbatches % 2 is 0.
    2. pp_group_size = 4, n_microbatches = 3. We will have num_rounds = 1 and n_microbatches % 1 is 0.

    When enable_zero_bubble is True, we will use the ZB1P schedule in https://openreview.net/pdf?id=tuzTN0eIO5
    NFr   r   r   r   .r   r   r   c           	      J   |d         j         | _        t                                          |||||||            t	          |          | _        |d         j        | _        t          d|| j        z            | _	        || j	        z  | _
        || _        || j	        z  dk    rt          d| j	         d| d          i | _        t          | j                  D ]!}|                     |          }	|	| j        |<   "|                     | j        | j        z            | _        d S )Nr   )r   r   r   r   r   r   rv  r   zhFlexible Interleaved 1F1B requires the number of microbatches to be a multiple of the number of rounds (z), but got .)rz  r{  r  r   rX   r  r|  r   rq   number_of_roundsmicrobatches_per_roundr   r   r[   r   r  _add_bubbles_to_actions)r:   r   r   r   r   r   r   r   r   r  r   s             r<   r   z(ScheduleFlexibleInterleaved1F1B.__init__  s`    $AY1)+//"44 	 	
 	
 	
 "&kk1I(	 #A~9K'K L L&48M&M#"4D11Q66-595J- -)- - -   CE$,-- 	1 	1D==dCCH(0D%%
 #::$"44
 
r>   r\   c                 Z   
  fd} |          
 j          j        z  }|
z
  }||z
  }
|z   |z   }t                              d
|||            fd} 
fd} j        r%}	t           j          j        
|||||	d
  
        S t           j          j        
||||          S )Nc                     j         dz
  j        z  }j        rdnd}||j        dz
  | z
  z  z   }t	          |j        j         z            S r  )r  r   r   r{  r6  r   )r   r  multiply_factorr  r:   s       r<   r  z^ScheduleFlexibleInterleaved1F1B._calculate_single_rank_operations.<locals>.get_rank_warmup_ops  so     #a'+&," $(#:AaaO//#a'4/3 J
 z4#7$:M#MNNNr>   r  c                 B    | j         z  j        z  }|j        z  z   S rN   )r   r  r{  r  s     r<   r  z^ScheduleFlexibleInterleaved1F1B._calculate_single_rank_operations.<locals>.forward_stage_index  s+    4#>>$BUUK$"44<<r>   c                 ^    j         dz
  | z
  j        z  j         z  z
  }|j        z  z   S r  )r  r   r{  r  s     r<   r  z_ScheduleFlexibleInterleaved1F1B._calculate_single_rank_operations.<locals>.backward_stage_index  sK    #:%$*EE%&&   $"44<<r>   T)r   )r  r   r   r   r   r  r{  )r:   r   r  r  r  r  r  r  r  r  r  s   ``        @r<   r  zAScheduleFlexibleInterleaved1F1B._calculate_single_rank_operations  sQ   	O 	O 	O 	O 	O )(..
,t/CC$z1%3,|;	K	
 	
 	
	= 	= 	= 	= 	= 	=
	= 	= 	= 	= 	= 	= 	= " 	$(!%#"#$%#'    " 	
 	
 		
r>   c                 t   | j         }| j        s|S d }t                      }i }i }i }d}t          | j                  D ]}	g ||	<   d||	<   d||	<   	 d}
t                      }t          | j                  D ]}	||	         }|t          ||	                   k    r%d}
||	         |         ||	         |         }|J |\  }}} ||||||          sR||	                             ||	         |                    ||                    |||f           ||	xx         dz  cc<   ||	                             d            ||	xx         dz  cc<   ||	xx         dz  cc<   ||	                             d            |                    |           |
rnE|dk    rt          
                    d||           |S )Nc                     |t           j        k    r| dk    r| dz
  ||f|vrdS n4|t           j        k    r$| |dz
  k    r| t           j        |f|vS | dz   ||f|vS dS )Nr   r   TF)r$   r7   r8   )r   r  
microbatchnum_stages_globalseen_opss        r<   need_bubblezLScheduleFlexibleInterleaved1F1B._add_bubbles_to_actions.<locals>.need_bubble  s    %---A::519b*"=X"M"M4'000-111!#3#;ZHPXXX	2z2(BB5r>   r   TFr   z?Non zero bubbles added: total_bubbles_added=%s bubbles_added=%s)r[   r   r   r   r{  rX   r   r   updater   warning)r:   r
  ra   r  r  resultnext_pointerbubbles_addedtotal_bubbles_addedr   should_stoptemp_seen_ops	timestamptemp_actionrJ   r  r	  s                    r<   r  z7ScheduleFlexibleInterleaved1F1B._add_bubbles_to_actions  sV   %& 	N	 	 	 <?5557')(*$,-- 	$ 	$DF4L!"L"#M$ 	KDGEEMd011 . .(.	GDM 2 222#4=+7")$-	":K&2222=/KZ&;#R5F  	1 t++GDM),DEEE%1)--{B
.KLLL$T***a/****t++D111%d+++q0++++ &&&!+&&&4L''----OOM*** A 	D ""NNQ#  
 r>   )NNNNF)rC   rD   rE   r  r
   r   rW   r   r   r   r   r	   rO   r   r   rp  r   rI   r  r  r  r  s   @r<   r   r   t  s'        $ '+AEBFIM#(,
 ,
'(,
 ,
 (#	,

 "%(<"=>,
 $Do)=$>?,
 $E$sCx.%**D$EF,
 !,
 ,
 ,
 ,
 ,
 ,
\J
hw>O9P J
 J
 J
 J
XB B B B B B Br>   r   c                        e Zd ZdZ	 	 	 	 ddee         dedee         dee	e
df                  deeee
f                  d	eeeeef         e	e         f                  f fd
Z xZS )r"   a/  
    The Interleaved Zero Bubble schedule.
    See https://arxiv.org/pdf/2401.10241 for details.
    Will perform one forward and one backward on inputs for the microbatches in steady
    state and supports multiple stages per rank. Uses the backward for weights to fill in
    the pipeline bubble.
    Nr   r   r   r   .r   r   c           	      X    t                                          ||||||d           d S )NT)r   r   r   r   r   r   r   )r  r   )r:   r   r   r   r   r   r   r   s          r<   r   z&ScheduleInterleavedZeroBubble.__init__K  sE     	)+//# 	 	
 	
 	
 	
 	
r>   r   )rC   rD   rE   r  r
   r   rW   r   r   r   r   r	   rO   r   r   r   r  r  s   @r<   r"   r"   B  s          '+AEBFIM
 
'(
 
 (#	

 "%(<"=>
 $Do)=$>?
 $E$sCx.%**D$EF
 
 
 
 
 
 
 
 
 
r>   r"   schedule_namec           	          t           t          t          t          t          t
          t          t          d}| |vrt          d|            ||          S )z
    Maps a schedule name to its corresponding class object.

    Args:
        schedule_name (str): The name of the schedule.
    )1F1BInterleaved1F1BGPipeFlexibleInterleaved1F1B	LoopedBFSInterleavedZeroBubbler   r   zUnknown schedule name: )	r   r    r   r   r!   r"   r   r   r   )r  schedule_maps     r<   r   r   _  sW     2#B&!>"8!6	 	L L((B=BBCCC&&r>   )FrN   )r&   )r   F)Qr  r   loggingreabcr   r   collectionsr   enumr   typingr   r   r	   r
   r   r   r   r   r   r   r   torch.distributeddistributedr   .torch.distributed._composable.fsdp.fully_shardr   r   torch.profilerr   r	  r   r   r   r   r   r   __all__	getLoggerrC   r   r$   r7   r8   r9   r1   r2   r3   r4   r5   r6   r.   r/   r0   compilerT   rI   rW   rO   r   rp  r   r   P2POpr   r  r   r  r   r   r]  rs  r   r  r!   r  r    r   r"   r   rG   r>   r<   <module>r0     s   


      				 # # # # # # # # # # # # # #                                           T T T T T T T T * * * * * * T T T T T T T T T T % % % % % %  '&&&&&&
 
 
 
	8	$	$/E /E /E /E /Et /E /E /Ed 
"$		 

"

"		 		 		 		  
 
K 

 
 
 
 
j 
 
 
D$4T(7:K5L0L+M $RU $ $ $ $V  %	MX MXd8G#4556MXMX MX 	MX MX MX MX`n
 n
 n
 n
 n
 n
 n
 n
b1 1TZ( 1 1 1 1 1 6: $*%-c]	#ty.   6C C C C C. C C CL. . . . .1 . . .bM M M M M* M M M`E1 E1 E1 E1 E1) E1 E1 E1T C C(7+,CC 
']C C C CLJ#tG},-JSE3J'J J 
#tG}
	J J J JZE2 E2 E2 E2 E2- E2 E2 E2P
u2 u2 u2 u2 u24 u2 u2 u2p= = = = =- = = =R ~ ~ ~ ~Bd
 d
 d
 d
 d
3 d
 d
 d
NK K K K K&; K K K\
 
 
 
 
$C 
 
 
:'c ' ' ' ' ' 'r>   