
    Ngz                     ,   d dl Z d dlZd dlmZ d dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZmZ d	Zd
Z G d dee          Z G d d          Z	 ddee         dededededeeeef                  ddfdZ G d d          ZdS )    N)Enum)Queueget_context)BaseContext)BaseProcess)Synchronized)Empty)AnyDictIterableListOptionalTypeiX     c                       e Zd ZdZdZdZdS )QueueSignalsstopconfirmerrorN)__name__
__module____qualname__r   r   r        \/var/www/html/ai-engine/env/lib/python3.11/site-packages/qdrant_client/parallel_processor.pyr   r      s        DGEEEr   r   c                   V    e Zd Zededd fd            Zdee         dee         fdZdS )Workerkwargsreturnc                     t                      NNotImplementedError)clsr   s     r   startzWorker.start   s    !###r   itemsc                     t                      r!   r"   )selfr&   s     r   processzWorker.process   s    !###r   N)r   r   r   classmethodr
   r%   r   r)   r   r   r   r   r      sj        $S $X $ $ $ [$$Xc] $x} $ $ $ $ $ $r   r   worker_classinput_queueoutput_queuenum_active_workers	worker_idr   r   c                    |i }t          j        d| dt          j                                	  | j        d	i |}dt
          t                   ffd}|                     |                      D ]}|                    |           nJ# t          $ r=}	t          j
        |	           |                    t          j                   Y d}	~	nd}	~	ww xY w|                                 |                                 |                                5  |xj        dz  c_        ddd           n# 1 swxY w Y   t          j        d| d           dS # |                                 |                                 |                                5  |xj        dz  c_        ddd           n# 1 swxY w Y   t          j        d| d           w xY w)
z
    A worker that pulls data pints off the input queue, and places the execution result on the output queue.
    When there are no data pints left on the input queue, it decrements
    num_active_workers to signal completion.
    NzReader worker: z PID: r   c               3   `   K   	                                  } | t          j        k    rd S | V  +r!   )getr   r   )itemr,   s    r   input_queue_iterablez%_worker.<locals>.input_queue_iterable6   s<      "((<,,,E


	r      zReader worker z	 finishedr   )logginginfoosgetpidr%   r   r
   r)   put	Exception	exceptionr   r   closejoin_threadget_lockvalue)
r+   r,   r-   r.   r/   r   workerr4   processed_itemes
    `        r   _workerrD       s    ~LA9AABIKKAABBB<##--f--	hsm 	 	 	 	 	 	 %nn-A-A-C-CDD 	- 	-N^,,,,	- - - -!+,,,,,,,,- 	  """((** 	* 	*$$)$$	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	:i:::;;;;; 	  """((** 	* 	*$$)$$	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	:i:::;;;;sf   AB E 
C3C
E CE D00D47D4=GF/#G/F33G6F37Gc            	           e Zd Zddedee         dee         fdZde	ddfdZ
d	ee	         d
e	de	dee	         fdZddee         ddfdZddZddZdS )ParallelWorkerPoolNnum_workersrA   start_methodc                     || _         || _        d | _        d | _        t	          |          | _        g | _        | j        t          z  | _        d | _	        d S r!   )
r+   rG   r,   r-   r   ctx	processesmax_internal_batch_size
queue_sizer.   )r(   rG   rA   rH   s       r   __init__zParallelWorkerPool.__init__U   sV    "&,0-1 +L 9 9,.*-DD7;r   r   r   c                 f   | j                             | j                  | _        | j                             | j                  | _        | j                             d| j                  }t          |t                    sJ || _	        t          d| j                  D ]}t          | j         d          sJ | j                             t          | j        | j        | j        | j	        ||                                f          }|                                 | j                            |           d S )Nir   Process)targetargs)rJ   r   rM   r,   r-   ValuerG   
isinstance	BaseValuer.   rangehasattrrQ   rD   r+   copyr%   rK   append)r(   r   	ctx_valuer/   r)   s        r   r%   zParallelWorkerPool.start`   s   8>>$/:: HNN4?;;HNN3(899	)Y/////"+q$"233 	+ 	+I48Y/////h&&%$%+KKMM ' 
 
G MMOOON!!'****	+ 	+r   streamrS   c              /     K   	  | j         d	i | | j        
J d            | j        
J d            d}d}|D ]}||z
  | j        k     r-	 | j                                        }n\# t
          $ r d }Y nNw xY w	 | j                            t                    }n(# t
          $ r}|                                  |d }~ww xY w|<|t          j
        k    r#|                                  t          d          |V  |dz  }| j                            |           |dz  }t          | j                  D ]&}	| j                            t          j                   '||k     rb| j                            t                    }|t          j
        k    r#|                                  t          d          |V  |dz  }||k     b| j        
J d            | j        
J d            | j                                         | j                                         d S # | j        
J d            | j        
J d            | j                                         | j                                         w xY w)
NzInput queue was not initializedz Output queue was not initializedr   timeoutzThread unexpectedly terminatedr5   zInput queue is NonezOutput queue is Noner   )r%   r,   r-   rM   
get_nowaitr	   r2   processing_timeoutjoin_or_terminater   r   RuntimeErrorr:   rW   rG   r   r=   )
r(   r\   rS   r   pushedreadr3   out_itemrC   _s
             r   unordered_mapz ParallelWorkerPool.unordered_mapx   s     -	&DJ     #//1R///$002T000FD  D=4?22(#'#4#?#?#A#A  ( ( (#'( #'#4#8#8AS#8#T#T       ..000  '<#555..000*+KLLL"NNNAID $$T***!4+,, 8 8 $$\%67777--,009K0LL|111**,,,&'GHHH	 -- #//1F///$002H000""$$$##%%%%% #//1F///$002H000""$$$##%%%%sV   AH 
A$#H $A30H 2A33H 7 BH 
B="B88B==DH AI-r5   r_   c                     | j         D ]@}|                    |           |                                r|                                 A| j                                          dS )zM
        Emergency shutdown
        @param timeout:
        @return:
        r^   N)rK   joinis_alive	terminateclear)r(   r_   r)   s      r   rb   z$ParallelWorkerPool.join_or_terminate   sj     ~ 	$ 	$GLLL)))!! $!!###r   c                 t    | j         D ]}|                                 | j                                          d S r!   )rK   rj   rm   r(   r)   s     r   rj   zParallelWorkerPool.join   s<    ~ 	 	GLLNNNNr   c                 B    | j         D ]}|                                 dS )a  
        Terminate processes if the user hasn't joined. This is necessary as
        leaving stray processes running can corrupt shared state. In brief,
        we've observed shared memory counters being reused (when the memory was
        free from the perspective of the parent process) while the stray
        workers still held a reference to them.
        For a discussion of using destructors in Python in this manner, see
        https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
        N)rK   rl   ro   s     r   __del__zParallelWorkerPool.__del__   s4     ~ 	  	 G	  	 r   r!   )r5   )r   N)r   r   r   intr   r   r   strrN   r
   r%   r   rh   rb   rj   rq   r   r   r   rF   rF   T   s        	< 	<C 	<f 	<XVY] 	< 	< 	< 	<+c +d + + + +0.&HSM .&# .& .&QYZ]Q^ .& .& .& .&`
 
# 
t 
 
 
 
   
           r   rF   r!   )r6   r8   enumr   multiprocessingr   r   multiprocessing.contextr   multiprocessing.processr   multiprocessing.sharedctypesr   rV   queuer	   typingr
   r   r   r   r   r   ra   rL   rs   r   r   rr   rD   rF   r   r   r   <module>r{      s    				       . . . . . . . . / / / / / / / / / / / / B B B B B B       < < < < < < < < < < < < < < < <       3   $ $ $ $ $ $ $ $ (,1< 1<v,1<1< 1< "	1<
 1< T#s(^$1< 
1< 1< 1< 1<hp  p  p  p  p  p  p  p  p  p r   