
    Ngh=                         d dl Zd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZ 	 e G d d                      Z G d d	          Z G d
 dej                  Z G d dej                  ZdS )    N)	dataclass)Queue)Thread)CallableIOOptionalUnionc                   (    e Zd ZU dZeed<   eed<   dS )PathDataa  
    Manage the IO job queue and polling thread for a single
    path. This is done to ensure that write calls to the same
    path are serialized so they are written in the same order
    as they were called.

    On each `f.write` call where `f` is of type `NonBlockingIO`,
    we send the job to the manager where it is enqueued to the
    Queue. The polling Thread picks up on the job, executes it,
    waits for it to finish, and then continues to poll.
    queuethreadN)__name__
__module____qualname____doc__r   __annotations__r        Y/var/www/html/ai-engine/env/lib/python3.11/site-packages/iopath/common/non_blocking_io.pyr   r      s.         
 
 LLLNNNNNr   r   c                   ,   e Zd ZdZ	 	 ddee         deej        j                 ddfdZ		 	 dd	e
d
eee
         ee         f         deedgdf                  dee         dej        f
dZdeeg df                  ddfdZdd	ee
         defdZdefdZdS )NonBlockingIOManagera5  
    All `opena` calls pass through this class so that it can
    keep track of the threads for proper cleanup at the end
    of the script. Each path that is opened with `opena` is
    assigned a single queue and polling thread that is kept
    open until it is cleaned up by `PathManager.async_join()`.
    FNbufferedexecutorreturnc                     i | _         || _        | j        rt          nt          | _        |pt
          j                                        | _        dS )av  
        Args:
            buffered (bool): IO instances will be `NonBlockingBufferedIO`
                or `NonBlockingIO` based on this value. This bool is set
                manually for each `PathHandler` in `_opena`.
            executor: User can optionally attach a custom executor to
                perform async operations through `PathHandler.__init__`.
        N)	_path_to_data	_bufferedNonBlockingBufferedIONonBlockingIO_IO
concurrentfuturesThreadPoolExecutor_pool)selfr   r   s      r   __init__zNonBlockingIOManager.__init__3   sF      !,0NM((H!3!F!F!H!H


r   pathio_objcallback_after_file_close	bufferingc                 >     j         s|dk    rt          d| d           j        vrQt                      }t	           j        |f          }|                                 t          ||           j        <    j         si nd|i}  j        d fd||d|S )	a  
        Called by `PathHandler._opena` with the path and returns a
        `NonBlockingIO` instance.

        Args:
            path (str): A path str to operate on. This path should be
                simplified to ensure that each absolute path has only a single
                path str that maps onto it. For example, in `NativePathHandler`,
                we can use `os.path.normpath`.
            io_obj (IO): a reference to the IO object returned by the
                `PathHandler._open` function.
            callback_after_file_close (Callable): An optional argument that can
                be passed to perform operations that depend on the asynchronous
                writes being completed. The file is first written to the local
                disk and then the callback is executed.
            buffering (int): An optional argument to set the buffer size for
                buffered asynchronous writing.
        r'   z`NonBlockingIO is not using a buffered writer but `buffering` arg is set to non-default value of z != -1.)targetargsr+   c                 N    j                  j                            |           S N)r   r   put)io_callabler(   r%   s    r   <lambda>z:NonBlockingIOManager.get_non_blocking_io.<locals>.<lambda>r   s$    "4(.22;?? r   )notify_managerr)   r*   r   )	r   
ValueErrorr   r   r   
_poll_jobsstartr   r    )r%   r(   r)   r*   r+   r   tkwargss   ``      r   get_non_blocking_ioz(NonBlockingIOManager.get_non_blocking_ioG   s    2 ~ 	)r//I6?I I I  
 t)))GGEdoUH===AGGIII'/q'9'9Dt$>GY/G tx 
     &?
 
 
 
 	
r   r   c                     	 |                                 }|dS | j                            |                                           E)a  
        A single thread runs this loop. It waits for an IO callable to be
        placed in a specific path's `Queue` where the queue contains
        callable functions. It then waits for the IO job to be completed
        before looping to ensure write order.
        TN)getr$   submitresult)r%   r   funcs      r   r6   zNonBlockingIOManager._poll_jobsz   sG    
	- 99;;D|Jd##**,,,
	-r   c                    |r|| j         vrt          | d| d          |r|gn%t          | j                                                   }d}|D ]}	 | j                             |          }|j                            d           |j                                         Q# t          $ r7 t          j        t                    }|                    d| d           d}Y w xY w|S )a  
        Waits for write jobs for a specific path or waits for all
        write jobs for the path handler if no path is provided.

        Args:
            path (str): Pass in a file path and will wait for the
                asynchronous jobs to be completed for that file path.
                If no path is passed in, then all threads operating
                on all file paths will be joined.
        z6 has no async IO associated with it. Make sure `opena(z)` is called first.TNz`NonBlockingIO` thread for z failed to join.F)r   r5   listkeyspopr   r1   r   join	Exceptionlogging	getLoggerr   	exception)r%   r(   paths_to_closesuccess_path	path_dataloggers          r   _joinzNonBlockingIOManager._join   s+     	D 222 > >$(> > >   $(L$T$2D2I2I2K2K-L-L# 	  	 E  .22599	##D))) %%''''       *844  !Vu!V!V!VWWW  s   AB!!>C"!C"c                     	 | j                                          n?# t          $ r2 t          j        t
                    }|                    d           Y dS w xY wdS )z(
        Closes the ThreadPool.
        z,`NonBlockingIO` thread pool failed to close.FT)r$   shutdownrE   rF   rG   r   rH   )r%   rM   s     r   _close_thread_poolz'NonBlockingIOManager._close_thread_pool   sl    	J!!!! 	 	 	&x00FKLLL55	 ts    8AA)FNNr'   r0   )r   r   r   r   r   boolr!   r"   Executorr&   strr	   r   bytesr   intioIOBaser:   r6   rN   rQ   r   r   r   r   r   *   sd         $):>I I4.I :-67I 
	I I I I0 GK#%1
 1
1
 bgr%y()1
 $,HdVT\,B#C	1

 C=1
 
1
 1
 1
 1
f-"d(); < - - - - -& (3- 4    >
D 
 
 
 
 
 
r   r   c            
           e Zd Z	 ddeeg df         gdf         deee         ee         f         deedgdf                  ddf fdZ	de
fdZde
fdZde
fd	Zd
eeef         ddfdZddededefdZdefdZddedefdZddZ xZS )r   Nr4   r)   r*   r   c                     t                                                       || _        || _        || _        d| _        dS )a  
        Returned to the user on an `opena` call. Uses a Queue to manage the
        IO jobs that need to be run to ensure order preservation and a
        polling Thread that checks the Queue. Implementation for these are
        lifted to `NonBlockingIOManager` since `NonBlockingIO` closes upon
        leaving the context block.

        NOTE: Writes to the same path are serialized so they are written in
        the same order as they were called but writes to distinct paths can
        happen concurrently.

        Args:
            notify_manager (Callable): a callback function passed in from the
                `NonBlockingIOManager` so that all IO jobs can be stored in
                the manager. It takes in a single argument, namely another
                callable function.
                Example usage:
                ```
                    notify_manager(lambda: file.write(data))
                    notify_manager(lambda: file.close())
                ```
                Here, we tell `NonBlockingIOManager` to add a write callable
                to the path's Queue, and then to add a close callable to the
                path's Queue. The path's polling Thread then executes the write
                callable, waits for it to finish, and then executes the close
                callable. Using `lambda` allows us to pass callables to the
                manager.
            io_obj (IO): a reference to the IO object returned by the
                `PathHandler._open` function.
            callback_after_file_close (Callable): An optional argument that can
                be passed to perform operations that depend on the asynchronous
                writes being completed. The file is first written to the local
                disk and then the callback is executed.
        FN)superr&   _notify_manager_io_callback_after_file_close_close_called)r%   r4   r)   r*   	__class__s       r   r&   zNonBlockingIO.__init__   sA    P 	-*C'"r   c                     dS NFr   r%   s    r   readablezNonBlockingIO.readable       ur   c                     dS NTr   rd   s    r   writablezNonBlockingIO.writable       tr   c                     dS rh   r   rd   s    r   seekablezNonBlockingIO.seekable   rj   r   bc                 <                            fd           dS )Q
        Called on `f.write()`. Gives the manager the write job to call.
        c                  8    j                                        S r0   r^   write)rm   r%   s   r   r3   z%NonBlockingIO.write.<locals>.<lambda>   s    TX^^A%6%6 r   Nr]   )r%   rm   s   ``r   rr   zNonBlockingIO.write   s-     	6666677777r   r   offsetwhencec                 @                            fd           dS )z'
        Called on `f.seek()`.
        c                  :    j                                        S r0   )r^   seek)rt   r%   ru   s   r   r3   z$NonBlockingIO.seek.<locals>.<lambda>  s    TX]]66%B%B r   Nrs   )r%   rt   ru   s   ```r   rx   zNonBlockingIO.seek   s1     	BBBBBBCCCCCr   c                      t          d          )z'
        Called on `f.tell()`.
        z2ioPath async writes does not support `tell` calls.)r5   rd   s    r   tellzNonBlockingIO.tell  s     MNNNr   sizec                 <                            fd           dS )z+
        Called on `f.truncate()`.
        c                  8     j                                       S r0   )r^   truncater%   r{   s   r   r3   z(NonBlockingIO.truncate.<locals>.<lambda>  s    TX%6%6t%<%< r   Nrs   r   s   ``r   r~   zNonBlockingIO.truncate  s-     	<<<<<=====r   c                                             fd            j        s! j        r                       j                   d _        dS )
        Called on `f.close()` or automatically by the context manager.
        We add the `close` call to the file's queue to make sure that
        the file is not closed before all of the write jobs are complete.
        c                  6     j                                         S r0   r^   closerd   s   r   r3   z%NonBlockingIO.close.<locals>.<lambda>      TX^^%5%5 r   TN)r]   r`   r_   rd   s   `r   r   zNonBlockingIO.close  sa     	5555666! 	Bd&E 	B   !@AAA!r   r0   )r   r   N)r   r   r   r   r	   r   rU   rV   r   r&   rS   re   ri   rl   	bytearrayrr   rW   rx   rz   r~   r   __classcell__ra   s   @r   r   r      s       
 GK	-# -# (2t8"4!5t!;<-# bgr%y()-# $,HdVT\,B#C	-#
 
-# -# -# -# -# -#^$    $    $    8uUI-. 84 8 8 8 8D D3 D DC D D D DOc O O O O> >S >C > > > >" " " " " " " "r   r   c                        e Zd ZdZ	 	 ddeeg df         gdf         deee         ee         f         de	edgdf                  de
ddf
 fd	Zdefd
ZdefdZdefdZdeeef         ddfdZddZddZ xZS )r   i   Nr'   r4   r)   r*   r+   r   c                     t                                                       || _        || _        || _        t          j                    g| _        |dk    r|n| j        | _	        d| _
        dS )aW  
        Buffered version of `NonBlockingIO`. All write data is stored in an
        IO buffer until the buffer is full, or `flush` or `close` is called.

        Args:
            Same as `NonBlockingIO` args.
            buffering (int): An optional argument to set the buffer size for
                buffered asynchronous writing.
        r   FN)r\   r&   r]   r^   r_   rX   BytesIO_buffersMAX_BUFFER_BYTES_buffer_sizer`   )r%   r4   r)   r*   r+   ra   s        r   r&   zNonBlockingBufferedIO.__init__+  sf      	-*C')2QIID<Q"r   c                     dS rc   r   rd   s    r   re   zNonBlockingBufferedIO.readableE  rf   r   c                     dS rh   r   rd   s    r   ri   zNonBlockingBufferedIO.writableH  rj   r   c                     dS rc   r   rd   s    r   rl   zNonBlockingBufferedIO.seekableK  rf   r   rm   c                     | j         d         }t          |          5 }|                    |           ddd           n# 1 swxY w Y   |                                | j        k     rdS |                                  dS )ro   r'   N)r   
memoryviewrr   rz   r   flush)r%   rm   bufferviews       r   rr   zNonBlockingBufferedIO.writeN  s     r"]] 	dLL	 	 	 	 	 	 	 	 	 	 	 	 	 	 	;;==4,,,F

s   ?AAc                                                                               fd                                 fd            j        s! j        r                      j                   d _        dS )r   c                  B     j         d                                         S rR   )r   r   rd   s   r   r3   z-NonBlockingBufferedIO.close.<locals>.<lambda>a  s    T]2%6%<%<%>%> r   c                  6     j                                         S r0   r   rd   s   r   r3   z-NonBlockingBufferedIO.close.<locals>.<lambda>c  r   r   TN)r   r]   r`   r_   rd   s   `r   r   zNonBlockingBufferedIO.closeY  s     	

>>>>???5555666! 	Bd&E 	B   !@AAA!r   c                      j         d                                         dk    rdS d}                    dt          j                  }                                }||k     r<||| j        z            }                     |f fd	           | j        z  }||k     <                     fd            j                             t          j	                               dS )aR  
        Called on `f.write()` if the buffer is filled (or overfilled). Can
        also be explicitly called by user.
        NOTE: Buffering is used in a strict manner. Any buffer that exceeds
        `self._buffer_size` will be broken into multiple write jobs where
        each has a write call with `self._buffer_size` size.
        r'   r   Nc                 8    j                             |           S r0   rq   )itemr%   s    r   r3   z-NonBlockingBufferedIO.flush.<locals>.<lambda>~  s    48>>$3G3G r   c                  ,                                      S r0   )r   )r   s   r   r3   z-NonBlockingBufferedIO.flush.<locals>.<lambda>  s    V\\^^ r   )
r   rz   rx   rX   SEEK_END	getbufferr   r]   appendr   )r%   pos
total_sizer   r   r   s   `    @r   r   zNonBlockingBufferedIO.flushj  s     r";;==AF[[BK00
!!JcD$5556D   T!G!G!G!G!GHHH4$$C J 	3333444RZ\\*****r   rR   r   )r   r   r   r   r   r	   r   rU   rV   r   rW   r&   rS   re   ri   rl   r   rr   r   r   r   r   s   @r   r   r   '  s_       ' GK# # (2t8"4!5t!;<# bgr%y()# $,HdVT\,B#C	#
 # 
# # # # # #4$    $    $    	uUI-. 	4 	 	 	 	" " " ""+ + + + + + + +r   r   )concurrent.futuresr!   rX   rF   dataclassesr   r   r   	threadingr   typingr   r   r   r	   r   r   rY   r   r   r   r   r   <module>r      s^       				  ! ! ! ! ! !             0 0 0 0 0 0 0 0 0 0 0 0        $L L L L L L L L`h" h" h" h" h"BI h" h" h"Z\+ \+ \+ \+ \+BI \+ \+ \+ \+ \+r   