o
    j9:jG                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZmZ ddddZ	dd Z
ejfd	d
Zd ejfddZd ejfddZejejfddZejejfddZejfddZejfddZejfddZddejfddZejejfddZG dd deZG dd  d eZG d!d" d"eZG d#d$ d$eZG d%d& d&eZG d'd( d(eZG d)d* d*eZG d+d, d,eZG d-d. d.eZG d/d0 d0eZ dS )1    N)Function)groupReduceOp
suggestionc                C   s(   d|  d}|r|d| d7 }t |)N torch.distributed.nn.functional.z& is not supported under torch.compile.z Use 	 instead.)RuntimeError)namer   msg r   f/home/nk/hobo-godmode/plappi-mvp/.venv/lib/python3.10/site-packages/torch/distributed/nn/functional.py_not_supported_under_compile   s
   
r   c                 C   s"   t jd|  d| dtdd d S )Nr   z is deprecated, use r      )category
stacklevel)warningswarnFutureWarning)r
   r   r   r   r   _deprecated   s   
r   c                 C   .   t j rtddd tdd t||| S )a  
    Broadcasts the tensor to the whole group.

    ``tensor`` must have the same number of elements in all processes
    participating in the collective.

    Arguments:
        tensor (Tensor): Data to be sent if ``src`` is the rank of current
            process.
        src (int): Source rank.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        Tensor: Received tensor from the broadcast op.

    	broadcastz3torch.distributed._functional_collectives.broadcastr   )torchcompileris_compilingr   r   
_Broadcastapply)tensorsrcr   r   r   r   r       s   

r   c                 C   s    t j r	td t||| S )aT  
    Gathers a list of tensors in a single process.

    Arguments:
        tensor (Tensor): Input tensor.
        dst (int, optional): Destination rank (default is 0).
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        tuple[Tensor]: List of appropriately-sized tensors with the gathered data.
    gather)r   r   r   r   _Gatherr   )r   dstr   r   r   r   r   :   s   
r   c                 C   s&   t j r	td tj||g| R  S )a  
    Scatters a list of tensors to all processes in a group.

    Each process will receive exactly one tensor and store its data in the
    ``tensor`` argument.

    Arguments:
        tensors (list[Tensor]): List of tensors to scatter on the source rank.
            Receivers must pass ``None`.
        src (int, optional): Source rank (default is 0).
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        Tensor: Output tensor from the scatter operation.

    scatter)r   r   r   r   _Scatterr   )tensorsr   r   r   r   r   r"   K   s   
r"   c                 C   s"   t j r	td t|||| S )a  
    Reduces the tensor data across all machines.

    Only the process with rank ``dst`` is going to receive the final result.

    Arguments:
        tensor (Tensor): Input of the collective.
        dst (int): Destination rank.
        op (optional): One of the values from
            ``torch.distributed.ReduceOp``
            enum.  Specifies an operation used for element-wise reductions.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        Tensor: Output of the collective.

    reduce)r   r   r   r   _Reducer   )r   r!   opr   r   r   r   r%   a   s   
r%   c                 C   s6   t j rtddd tdd tj||| g|R  S )a  
    Reduces, then scatters a list of tensors to all processes in a group.

    Arguments:
        output (Tensor): Output tensor.
        input_list (list[Tensor]): List of tensors to reduce and scatter.
        op (optional): One of the values from
            ``torch.distributed.ReduceOp``
            enum.  Specifies an operation used for element-wise reductions.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        Tensor: Output of the collective.

    reduce_scatterz?torch.distributed._functional_collectives.reduce_scatter_tensorr   )r   r   r   r   r   _Reduce_Scatterr   )output
input_listr'   r   r   r   r   r(   x   s   
r(   c                 C   s,   t j rtddd tdd t|| S )a  
    Gathers tensors from the whole group in a list.

    Arguments:
        tensor (Tensor): Tensor to be broadcast from current process.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        tuple([Tensor]): Output of the collective.

    
all_gatherz;torch.distributed._functional_collectives.all_gather_tensorr   )r   r   r   r   r   
_AllGatherr   )r   r   r   r   r   r,      s   
r,   c                 C   s    t j r	td t| ||S )a  
    Single tensor all gather. Gathers a single tensor from all ranks, and puts them in a single output tensor.

    Args:
        output_tensor (Tensor): Output tensor. It should contain
            correctly-sized tensors to be used for output of the collective.
        input_tensor (Tensor): Tensor to be broadcast from current process.
        group (ProcessGroup, optional): The process group to work on. If None,
            the default process group will be used.

    Examples:
        >>> # All tensors below are of torch.int64 dtype.
        >>> # We have 2 process groups, 2 ranks.
        >>> # xdoctest: +SKIP("incorrect want text")
        >>> output_tensor = torch.zeros(2, dtype=torch.int64)
        >>> output_tensor
        [tensor([0, 0])] # Rank 0 and 1
        >>> tensor = torch.arange(1, dtype=torch.int64) + 1 + rank
        >>> tensor
        tensor([1]) # Rank 0
        tensor([2]) # Rank 1
        >>> dist.all_gather_base(output_tensor, tensor)
        >>> output_tensor
        tensor([1,2]) # Rank 0
        tensor([1,2]) # Rank 1

    .. warning::
        `_all_gather_base` is experimental and subject to change.
        It is the caller's responsibility to ensure the output_tensor
        is correctly sized.

    _all_gather_base)r   r   r   r   _AllGatherBaser   )output_tensorinput_tensorr   r   r   r   r.      s   
!r.   c                 C   s&   t j r	td tj|| g|R  S )a  
    Each process scatters list of input tensors to all processes in a group and return gathered list of tensors in output list.

    Arguments:
        output_tensor_list (list[Tensor]): list of tensors to gather one per rank.
        input_tensor_list (list[Tensor]): List of tensors to scatter one per rank.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        tuple([Tensor]): Output of the collective.

    
all_to_all)r   r   r   r   	_AlltoAllr   )output_tensor_listinput_tensor_listr   r   r   r   r2      s   
r2   c                 C   s2   t j rtddd tdd t|| |||S )a  
    Each process splits input tensor and then scatters the split list to all processes in a group.

    Then concatenate the received tensors from all the processes in the group and return single output tensor.

    Arguments:
        output (Tensor): Gathered concatenated output tensor.
        input (Tensor): Input tensor to scatter.
        output_split_sizes: (list[Int], optional): Output split sizes for dim 0
            if specified None or empty, dim 0 of ``output`` tensor must divide
            equally by ``world_size``.
        input_split_sizes: (list[Int], optional): Input split sizes for dim 0
            if specified None or empty, dim 0 of ``input`` tensor must divide
            equally by ``world_size``.

    Returns:
        Tensor: Output of the collective.

    all_to_all_singlez;torch.distributed._functional_collectives.all_to_all_singler   )r   r   r   r   r   _AlltoAllSingler   )r*   inputoutput_split_sizesinput_split_sizesr   r   r   r   r6      s   

r6   c                 C   r   )a&  
    Reduces the tensor data across all machines in such a way that all get the final result.

    After the call the returned tensor is going to be bitwise
    identical in all processes.

    Arguments:
        tensor (Tensor): Input of the collective.
        op (optional): One of the values from
            ``torch.distributed.ReduceOp``
            enum.  Specifies an operation used for element-wise reductions.
        group (ProcessGroup, optional): The process group to work on.

    Returns:
        Tensor: Output of the collective

    
all_reducez4torch.distributed._functional_collectives.all_reducer   )r   r   r   r   r   
_AllReducer   )r   r'   r   r   r   r   r;     s   

r;   c                   @   $   e Zd Zedd Zedd ZdS )r   c                 C   s6   || _ || _tj|d| _| }tj|||d |S Nr   )r   r   distget_rankrankcloner   )ctxr   r   r   r   r   r   forward'  s   z_Broadcast.forwardc                 C   s4   t | jtj| j|}| j| jkr|  d d |fS N)r&   r   r   r   SUMr   rB   zero_)rD   grad_outputgxr   r   r   backward3  s   
z_Broadcast.backwardN__name__
__module____qualname__staticmethodrE   rK   r   r   r   r   r   &  s
    

r   c                   @   r=   )r    c                    sv   || _ || _ fddttj|dD }   tj|d|kr.tj |||d t|S tj d ||d t|S )Nc                       g | ]}t  qS r   )r   
zeros_like.0ir   r   r   
<listcomp>F      
z#_Gather.forward.<locals>.<listcomp>r?   )	r!   r   ranger@   get_world_size
contiguousrA   r   tuple)rD   r!   r   r   tensor_listr   rV   r   rE   =  s   
z_Gather.forwardc                 G   s   dt j| j| jg|R  f S NNN)r#   r   r!   r   )rD   grad_outputsr   r   r   rK   Q  s   z_Gather.backwardNrL   r   r   r   r   r    <  s
    
r    c                   @   r=   )r#   c                    st   || _ || _t fdd D stt d }tj|d|kr/tj|t	 ||d |S tj|d ||d |S )Nc                 3   s$    | ]}|   d    kV  qdS )r   NsizerT   tr$   r   r   	<genexpr>\  s   " z#_Scatter.forward.<locals>.<genexpr>r   r?   )
r   r   allAssertionErrorr   rR   r@   rA   r"   list)rD   r   r   r$   r*   r   re   r   rE   W  s   z_Scatter.forwardc                 C   s   dt | j| j| S r^   )r    r   r   r   rD   rI   r   r   r   rK   e  s   z_Scatter.backwardNrL   r   r   r   r   r#   V  s
    
r#   c                   @   r=   )r&   c                 C   s*   || _ || _| }tj||||d |S )Nr'   r   )r   r   rC   r@   r%   )rD   r   r'   r   r   r   r   r   rE   l  s
   z_Reduce.forwardc                 C      dt | j| j|f S N)NNN)r   r   r   r   rj   r   r   r   rK   u     z_Reduce.backwardNrL   r   r   r   r   r&   k  
    
r&   c                   @   r=   )r)   c                 G   s:   || _ | }tdd |D }tj|t|||d |S )Nc                 s       | ]}|  V  qd S rF   r[   rc   r   r   r   rf         z*_Reduce_Scatter.forward.<locals>.<genexpr>rk   )r   r[   r\   r@   r(   ri   )rD   r'   r   r   r5   r   r   r   rE   |  s
   z_Reduce_Scatter.forwardc                 C   s   dt | j| S rm   )r-   r   r   rj   r   r   r   rK     s   z_Reduce_Scatter.backwardNrL   r   r   r   r   r)   {  s
    
r)   c                   @   r=   )r-   c                    sD       || _ fddttj|dD }tj| |d t|S )Nc                    rQ   r   r   
empty_like)rT   _rV   r   r   rW     rX   z&_AllGather.forward.<locals>.<listcomp>r?   )r[   r   rY   r@   rZ   r,   r\   )rD   r   r   out_tensor_listr   rV   r   rE     s   
z_AllGather.forwardc                 G   s   t j| jdt jjt jjfv r.t j| jd}t|| }t	j
tj| j|g|R  }d |fS dd |D }tj
| j|g|R  }tjt|dd}d |fS )Nr?   c                 S   s   g | ]}t |qS r   rs   )rT   r   r   r   r   rW     s    z'_AllGather.backward.<locals>.<listcomp>r   )dim)r@   get_backendr   BackendNCCLXCCLrA   r   rt   r)   r   r   rG   r3   sumstack)rD   r`   rB   rJ   r]   gxsr   r   r   rK     s   z_AllGather.backwardNrL   r   r   r   r   r-     s
    
r-   c                   @   r=   )r/   c                 C   s   || _ tj|| |d |S r>   )r   r@   r.   r[   )rD   r0   r1   r   r   r   r   rE     s   z_AllGatherBase.forwardc                 C   s   t j| jdt jjt jjfv rPt j| jd}t| }|d | dkr.t	d| d| |d t j| jd |d< t
j||j|jd}t ||tj| j nt	dd |d fS )Nr?   r   zTensor with dimensions: z8 does not have first dimension divisible by world_size: devicedtypezBackend not supported!)r@   rx   r   ry   rz   r{   rZ   ri   rb   r	   r   emptyr   r   _reduce_scatter_baser   rG   )rD   rI   
world_sizeout_sizerJ   r   r   r   rK     s    

z_AllGatherBase.backwardNrL   r   r   r   r   r/     s
    
r/   c                   @   r=   )r3   c                    s   || _  fddttj|dD | _tj|d}tdd  D  tj|dtjj	u rPttj|dD ]}d }||kr@t
 }tj|| |||d q4t|S tj|t
 |d t|S )Nc                    s   g | ]} |   qS r   ra   rS   re   r   r   rW     s    z%_AlltoAll.forward.<locals>.<listcomp>r?   c                 s   rp   rF   rq   rc   r   r   r   rf     rr   z$_AlltoAll.forward.<locals>.<genexpr>)r   rY   r@   rZ   input_tensor_size_listrA   r\   rx   ry   GLOOri   r"   r2   )rD   r   rv   r$   my_rankrU   to_sendr   re   r   rE     s&   
z_AlltoAll.forwardc                    s.    fdd| j D }dtj| j|g R   S )Nc                    s(   g | ]}t j| d  j d  jdqS )r   r   )r   r   r   r   )rT   rb   r`   r   r   rW     s    z&_AlltoAll.backward.<locals>.<listcomp>r_   )r   r3   r   r   )rD   r`   r]   r   r   r   rK     s   
z_AlltoAll.backwardNrL   r   r   r   r   r3     s
    
r3   c                   @   r=   )r7   c                 C   s4   || _ | | _|| _|| _tj|||||d |S )N)r9   r:   r   )r   rb   
input_sizer9   r:   r@   r6   )rD   r   r*   r9   r:   r8   r   r   r   rE     s   
z_AlltoAllSingle.forwardc              	   C   s8   t j| j|j|jd}dt| j|| j| j	|
 f S )Nr   )NNNN)r   r   r   r   r   r7   r   r   r9   r:   r[   )rD   rI   r   r   r   r   rK     s   z_AlltoAllSingle.backwardNrL   r   r   r   r   r7     s
    
r7   c                   @   r=   )r<   c                 C   s.   || _ || _|jtjd}tj|||d |S )N)memory_formatrk   )r   r'   rC   r   contiguous_formatr@   r;   )rD   r'   r   r   r   r   r   rE     s
   z_AllReduce.forwardc                 C   rl   r^   )r<   r   r'   r   rj   r   r   r   rK     rn   z_AllReduce.backwardNrL   r   r   r   r   r<     ro   r<   )!r   r   torch.distributeddistributedr@   torch.autogradr   r   r   r   r   WORLDr   r   r"   rG   r%   r(   r,   r.   r2   r6   r;   r   r    r#   r&   r)   r-   r/   r3   r7   r<   r   r   r   r   <module>   s<   		&
($"