o
    j9:j U                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZ g d	Ze eZ	 d
ZG dd dZG dd deZeedejZd ZG dd dZG dd dZdede de!e fddZ"dej#dede deej# fddZ$dd Z%		d)de&ed f d!e'e(ef dB d"e d#e&ed f dB d$e'e(ef dB de&e!e& e!e' f fd%d&Z)d"e!e fd'd(Z*dS )*    N)Sequence)Any)DTensor)	local_mapmap_aggregate)	BlockMask)tree_flattentree_maptree_unflatten)TensorChunkSpecsplit_args_kwargs_into_chunksmerge_chunksFc                   @   s   e Zd ZdZdd ZdS )_CustomReducera$  
    Custom reducer class that can be used to specify a custom operation that
    reduces losses of multiple microbatches into one value.

    Example:
    >>> # xdoctest: +SKIP
    >>> sum_reducer = _CustomReducer(
    >>>     torch.tensor(0.0),
    >>>     lambda a, b: a + b
    >>> )
    c                 C   s   || _ || _d S N)
init_value	reduce_fn)selfr   r    r   n/home/nk/hobo-godmode/plappi-mvp/.venv/lib/python3.10/site-packages/torch/distributed/pipelining/microbatch.py__init__-   s   
z_CustomReducer.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r       s    r   c                   @      e Zd ZdS )_LossReducerNr   r   r   r   r   r   r   r   2       r   g        c                   @   sf   e Zd ZU dZdd Zeed< dd Zdd Ze	d	e
ed
f fddZe	d	eeef fddZdS )r   z2
    Class used to specify chunking of inputs
    c                 C   s
   || _ d S r   	split_dim)r   r    r   r   r   r   B   s   
zTensorChunkSpec.__init__r    c                 C   s    | j j d| j j d| j dS )N.())	__class__r   r   r    r   r   r   r   __repr__G   s   zTensorChunkSpec.__repr__c                 C   s   d| j  dS )NzTensorChunkSpec(r#   r   r%   r   r   r   __str__L   s   zTensorChunkSpec.__str__
chunk_dims.c                 C      t | dd }|S )a  
        A helper for creating a tuple of `TensorChunkSpec` from a tuple of chunk
        dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # There are three positional arguments to the model, and
            >>> # we are chunking them along dimension 0, 0 and 1, respectively
            >>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1))
        c                 S      t | S r   r   dimr   r   r   <lambda>^       z,TensorChunkSpec.from_tuple.<locals>.<lambda>r   )r(   args_chunk_specr   r   r   
from_tupleO   s
   zTensorChunkSpec.from_tuplec                 C   r)   )a\  
        A helper for creating a dictionary of `TensorChunkSpec` from a
        dictionary of chunk dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # Chunk dimension 0 for the "id" argument, 1 for the "mask" argument
            >>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1})
        c                 S   r*   r   r+   r,   r   r   r   r.   p   r/   z+TensorChunkSpec.from_dict.<locals>.<lambda>r   )r(   kwargs_chunk_specr   r   r   	from_dictb   s
   zTensorChunkSpec.from_dictN)r   r   r   r   r   int__annotations__r&   r'   staticmethodtupler1   dictstrr3   r   r   r   r   r   =   s   
 

r   c                   @   r   )
_ReplicateNr   r   r   r   r   r:   v   r   r:   
block_mask
num_chunksreturnc                    s   j ddkr g| S  j d|kstdd}t j ||}t j||} jdur8t j||ndg| } jdurJt j||ndg| }g }d}t|D ],}	 fdd}
|	t
j||	 ||	 ||	 ||	  j|
| jd |||	 d7 }qW|S )a	  Given a block mask, split the block mask along the batch dimension (dim0).

    Args:
        block_mask: Block mask to split
        num_chunks: Number of chunks to split the block mask into

    Returns:
        chunk_block_masks: List of chunked block masks
    r      z;Block mask has fewer batch size than the number of chunks. Nc                    s    fdd}|S )Nc                    s    t | } | | |||S r   )torch	full_likemask_mod)bhq_idxkv_idxb_offset)r;   idxr   r   batch_offset_mask_mod   s   zI_split_block_mask.<locals>.create_mask_mod.<locals>.batch_offset_mask_modr   )rG   rH   r;   )rG   r   create_mask_mod   s   z*_split_block_mask.<locals>.create_mask_mod)kv_num_blocks
kv_indicesfull_kv_num_blocksfull_kv_indices
BLOCK_SIZErA   seq_lengths)rK   sizeAssertionErrorr?   tensor_splitrL   rM   rN   rangeappendr   from_kv_blocksrO   rP   )r;   r<   	batch_dimkv_num_blocks_chunkskv_indices_chunksfull_kv_num_blocks_chunksfull_kv_indices_chunkschunk_block_masksbatch_offset	chunk_idxrJ   r   rI   r   _split_block_maskz   sH   


r_   tensorspecc                    s  |  j kstd|  j dt| t}|r3| j}t fdd|f  |fd}|| }nt|  j}| j	rJ| j
rJ|D ]}|  qCtsN|S dtjdtjdttjd	f ffd
d}|r| j}t|}	t||f|	 |f|f|	  d}
t|
| g|R  S t|| g|R  S )zGiven a tensor, and a chunking spec, split the tensor.
    Args:

        tensor: Tensor to split
        spec: Chunking spec
        num_chunks: Number of chunks to split the tensor into

    Returns:
        chunk_tensors: List of chunked tensors
    zTensor size z is smaller than num_chunksc                    s   t |  jS r   )r?   rS   r    )tr<   ra   r   r   r.          z_split_tensor.<locals>.<lambda>out_placementsin_placementsorigchunksr=   .c                    sv   g }d}|D ]0}t | }|| j }td g|j }t||| j< |||< || || j7 }qt|S )Nr   )r?   
zeros_likerQ   r    slicendimrU   r7   )rh   ri   expandedrG   chunknew_valupperslices)ra   r   r   _expand_chunks   s   

z%_split_tensor.<locals>._expand_chunks)rQ   r    rR   
isinstancer   
placementsr   r?   rS   requires_gradis_leafretain_grad_debug_mask_minibatchesTensorr7   lenlist)r`   ra   r<   _is_dtensorrt   split_fnchunk_tensorsrn   rr   n	expand_fnr   rc   r   _split_tensor   sH   


r   c                    sP  | sdd t |D S t| t|ks%tdt|   dt|  |du r-tdt| dd d	\} t|d
d d	\}}g }t||ddD ]o\}}|tu sWt|tr]|	| qJt|t
jr{t|tsqtdt| |	||j qJt|trt|tstdt| |jdkstd|jddkr|	| qJ|	|jd qJtd| d| dtg ||R  }	dd t |	D }
t||ddD ]K\}}g }|tu st|tr|g|	 }n$t|t
jrt|||	}nt|trt||	}ntd| d| dt|
|ddD ]
\}}|	| qqӇ fdd|
D S )aW  
    Given a dictionary of args, and a dictionary of chunking specs, shard the
    args according to the chunking specs.

    Args:
        args_dict: Dictionary of args
        args_chunk_spec: Dictionary of chunking specs
        num_chunks: Number of chunks to shard the args into

    Returns:
        args_split: List of sharded args
    c                 S   s   g | ]}i qS r   r   .0_r   r   r   
<listcomp>  rd   z'_shard_dict_of_args.<locals>.<listcomp>zargs_dict.keys() = z args_chunk_spec.keys() = Nz.args_chunk_spec should have been set by callerc                 S   
   t | tS r   rs   r   xr   r   r   r.   %     
 z%_shard_dict_of_args.<locals>.<lambda>rv   c                 S   r   r   r   r   r   r   r   r.   (  r   TstrictzExpected TensorChunkSpec, got r   z#BlockMask only supports split_dim=0r>   zUnsupported chunk spec: z and value: z combination.c                 S   s   g | ]}g qS r   r   r   r   r   r   r   F  rd   c                    s   g | ]}t | qS r   )r   )r   _flat_split_result	tree_specr   r   r   Y  s    )rT   rz   rR   r{   keysr	   zipr:   rs   rU   r?   ry   r   typerQ   r    r   rK   
ValueErrorminr   r_   )	args_dictr0   r<   valueschunk_specsr   split_sizesvra   result_num_chunksflat_split_resultsv_splitsr   _v_splitr   r   r   _shard_dict_of_args  sp   







r   args.kwargsri   r0   r2   c           
      C   s   |du ri }dd }|du rt || dd d}|du r$t ||dd d}ttt| tt||}t|}t|||}t||k rTt|}ttt| tt||}t|t|krjtdt| d	t| d
d |D }	|	|fS )a  
    Given a sequence of args and kwargs, split them into a number of chunks
    according to  their respective chunking specs.

    Args:
        args: Tuple of args
        kwargs: Dict of kwargs
        chunks: Number of chunks to split the args and kwargs into
        args_chunk_spec: chunking specs for args, in same shape as args
        kwargs_chunk_spec: chunking specs for kwargs, in same shape as kwargs

    Returns:
        args_split: List of sharded args
        kwargs_split: List of sharded kwargs
    Nc                 S   s   t | tjtB rttS t S r   )rs   r?   ry   r   r   DEFAULT_CHUNK_DIMr:   r   r   r   r   default_spec  s   z3split_args_kwargs_into_chunks.<locals>.default_specc                 S   r   r   r   r   r   r   r   r.     r   z/split_args_kwargs_into_chunks.<locals>.<lambda>r   c                 S   r   r   r   r   r   r   r   r.     r   z;args and kwargs are split into different number of chunks: z, c                    s*   g | ] t  fd dtt D qS )c                 3   s    | ]} | V  qd S r   r   )r   i
chunk_argsr   r   	<genexpr>  s    z;split_args_kwargs_into_chunks.<locals>.<listcomp>.<genexpr>)r7   rT   rz   )r   r   r   r   r     s    z1split_args_kwargs_into_chunks.<locals>.<listcomp>)r
   r   r8   	enumeraterz   RuntimeError)
r   r   ri   r0   r2   r   args_split_dictreal_num_chunkskwargs_split
args_splitr   r   r   r   _  sR   8





r   c              	      s:  |durt |\}}nt | d \}}ttgt| }g | D ]}t |\}}t|t|kr:td| d| | q g }t|D ]P\ t trGfddttD }	t	r|	d j
}
|	dd D ]}|j
|
ks~td|
 d	|j
 qltjtj|
d
dit|	 jd}g }d}t|	t|kstdt|	 dt| t|	|ddD ])\}}|| j }tdddg|j }t||| j< || }|| |}qn|	}dd |D }t|r;t|std|d jt|dd dD ]\}}|jkrtd| d d	|j q t fddftfddtt|D d}|||  qF|tj| jd qFt trj j}ttD ]} ||  }qV|| qFd  }tdtD ]}|  |kstd| d	|   qw|| qFt||S )z
    Given a list of chunks, merge them into a single value according to
    the chunk spec.

    Args:
        chunks: list of chunks
        chunk_spec: Chunking spec for the chunks

    Returns:
        value: Merged value
    Nr   zChunk z did not match chunk spec c                    s   g | ]}|   qS r   r   )r   r^   )arg_idxchunks_flattenedr   r   r     s    
z merge_chunks.<locals>.<listcomp>r>   zExpected shape z, got devicemeta)sectionsr-   z6Expected len(partial_values) == len(meta_chunks), got z != Tr   c                 S   s   g | ]}t |tqS r   )rs   r   )r   r   r   r   r   r   C  s    zRmerge_chunks: expected all values to be DTensors or none to be DTensors, got a mixz*merge_chunks: placement mismatch at chunk z: expected c                     s   t j|  jdS )Nr,   )r?   catr    )ri   )argr   r   r.   S  rd   zmerge_chunks.<locals>.<lambda>c                 3   s    | ]} V  qd S r   r   r   )rt   r   r   r   U  s    zmerge_chunks.<locals>.<genexpr>re   r,   z	Expected )r	   r   r   rz   r   rU   r   rs   rT   rx   shaperR   r?   rS   emptyr    r   rQ   rk   rl   anyallrt   r   r7   r   r   r   r   r   )ri   
chunk_specspec_flattenedflatten_specchunk0_flatrn   chunk_flattenedr   args_flattenedpartial_valuesoverall_shapevalmeta_chunksvalues_to_catchunk_start_idxpartial_value
meta_chunkchunk_end_idxslice_indicessliceddtensor_flagsr   r   cat_fnreduced_valr^   valuer   )r   r   r   rt   r   r     s   -







r   )NN)+loggingoperatorcollections.abcr   typingr   r?   torch.distributed.tensorr   %torch.distributed.tensor.experimentalr   torch.fx.noder   !torch.nn.attention.flex_attentionr   torch.utils._pytreer	   r
   r   __all__	getLoggerr   loggerrx   r   r   r`   addsum_reducerr   r   r:   r4   r{   r_   ry   r   r   r7   r8   r9   r   r   r   r   r   r   <module>   sn   
9
A
L\

s