
    iP'                         S SK r S SKrS SKJrJrJr  S SKJr  S SKJ	r	  S SK
JrJr  S SKJr    " S S\ R                  5      r\S	:X  a  \ R$                  " 5         gg)
    N)Mockpatch	MagicMock)ABNF)	WebSocket)WebSocketProtocolExceptionWebSocketPayloadException)SSLErrorc                   D    \ rS rSrS rS rS rS rS rS r	S r
S	 rS
rg)LargePayloadTest   c                    / SQnU H  u  p#U R                  X#S9   SU-  n[        R                  " U[        R                  5      nUR	                  5       nU R                  U[        5        U R                  [        U5      U:  5        U R                  [        UR                  5      U5        SSS5        M     g! , (       d  f       M  = f)z:Test WebSocket frame length encoding at various boundaries)	)}   zSingle byte length)~   zTwo byte length start)   zTwo byte length)  zTwo byte length max)   zEight byte length start) @  z16KB boundary)@  zJust over 16KB)   32KB)i   128KB)lengthdescription   AN)subTestr   create_frameOPCODE_BINARYformatassertIsInstancebytes
assertTruelenassertEqualdata)self
test_casesr   r   payloadframe	formatteds          PC:\des-py\RoboSAPF\venv\Lib\site-packages\websocket/tests/test_large_payloads.py%test_frame_length_encoding_boundaries6LargePayloadTest.test_frame_length_encoding_boundaries   s    


 $.FVE- ))'43E3EF "LLN	 %%i7I& 89   UZZ&9 FE $.EEs   BB;;
C
	c                 $  ^^	 SS-  n/ m	Sn[        S[        U5      U5       H  nT	R                  XX2-    5        M     SmUU	4S jnSSKJn  U" USS9nUR                  [        U5      5      nU R                  Xq5        U R                  TS	5        g
)zHTest receiving large payloads in chunks (simulating the 16KB recv issue)   Br   r   r   c                 <   > T[        T5      :  a  gTT   nTS-  mU$ N       r#   bufsizeresult
call_countchunkss     r+   	mock_recvCLargePayloadTest.test_recv_large_payload_chunked.<locals>.mock_recvO   *    S[(J'F!OJMr2   frame_bufferTskip_utf8_validationr3   N)ranger#   appendwebsocket._abnfr>   recv_strictr$   assertGreater)
r&   large_payload
chunk_sizeir:   r>   fbr7   r8   r9   s
           @@r+   test_recv_large_payload_chunked0LargePayloadTest.test_recv_large_payload_chunkedA   s     u 
q#m,j9AMM-AN;< : 
	 	1)$? M 23/:q)r2   c                   ^ Sn/ mU4S jnSSK Jn  U" USS9nUR                  U5      nU R                  [	        U5      U5        U R                  [        S T 5       5      5        U R                  [	        T5      S5        g	)
z&Simulate SSL BAD_LENGTH error scenarior   c                 f   > TR                  U 5        U S:  a  [        S5      eS[        U S5      -  $ )Nr   z[SSL: BAD_LENGTH] unknown error   C)rB   r
   min)r6   
recv_callss    r+   mock_recv_with_ssl_limitTLargePayloadTest.test_ssl_large_payload_simulation.<locals>.mock_recv_with_ssl_limitk   s6    g&@AA#gu---r2   r   r=   Tr?   c              3   *   #    U  H	  oS :*  v   M     g7f)r   N ).0calls     r+   	<genexpr>ELargePayloadTest.test_ssl_large_payload_simulation.<locals>.<genexpr>|   s     AjdEMjs   r3   N)rC   r>   rD   r$   r#   r"   allrE   )r&   payload_sizerQ   r>   rI   r7   rP   s         @r+   !test_ssl_large_payload_simulation2LargePayloadTest.test_ssl_large_payload_simulationc   sw     
	. 	12N - 	Vl3AjAAB3z?A.r2   c                    / SQnU H  nU R                  US9   SU-  n[        R                  " U[        R                  5      nUR	                  5       nU R                  U[        5        U R                  [        UR                  5      U5        SnU[        R                  :  a  SU-   nOU[        R                  :  a  SU-   nOSU-   nU R                  [        U5      Xr-   5        SSS5        M     g! , (       d  f       M  = f)z6Test frame formatting with various large payload sizes)i?  r   r   r   r   r   )size   D      
   N)r   r   r   r   r   r    r!   r$   r#   r%   LENGTH_7	LENGTH_16)r&   
test_sizesr^   r(   r)   r*   	mask_sizeexpected_header_sizes           r+    test_frame_format_large_payloads1LargePayloadTest.test_frame_format_large_payloads   s     @
D4(+))'43E3EF "LLN	 %%i7  UZZ$7 	$--' I ) DNN* I ) Y )   Y1E1LM= )( ((s   CC44
D	c                 0  ^ [        5       n/ mU4S jnX!l        SUR                  l        [	        5       nXl        SUl        SS-  n[        S5       nS Ul        UR                  U5      nU R                  US5        S	S	S	5        g	! , (       d  f       g	= f)
z?Test that large payloads are sent in chunks to avoid SSL issuesc                 N   > TR                  [        U 5      5        [        U 5      $ N)rB   r#   )r%   sent_chunkss    r+   	mock_sendDLargePayloadTest.test_send_large_payload_chunking.<locals>.mock_send   s    s4y)t9r2   g      >@T   Er   zwebsocket._core.sendc                     [        U5      $ rl   r4   )sockr%   s     r+   <lambda>CLargePayloadTest.test_send_large_payload_chunking.<locals>.<lambda>   s    CIr2   r   N)r   send
gettimeoutreturn_valuer   rr   	connectedr   side_effectsend_binaryrE   )r&   	mock_sockrn   wsrF   mock_send_funcr7   rm   s          @r+    test_send_large_payload_chunking1LargePayloadTest.test_send_large_payload_chunking   s     F	 	 #,0	) [ u )*n)EN& ^^M2F vq) +**s   ,B
Bc           	         SS-  n[         R                  " U[         R                  5      nUR                  5       nU R	                  U[
        5        [        R                  " SS5      S-   n[        SSSS[         R                  SU5      nU R                  [        5         UR                  SS	9  S
S
S
5        g
! , (       d  f       g
= f)z.Test UTF-8 validation with large text payloadsu   Hello 世界! i  z!Hi  s    invalid utf8r3   r   Fr?   N)r   r   OPCODE_TEXTr   r    r!   structpackOPCODE_CLOSEassertRaisesr   validate)r&   
large_textr)   r*   invalid_utf8_close_datas        r+   test_utf8_validation_large_text0LargePayloadTest.test_utf8_validation_large_text   s     &,
 !!*d.>.>? LLN	i/ #)++dD"9<T"T Q1a!2!2A7NO 9:NNN6 ;::s   B88
Cc                   ^^ SS-  n[        S[        U5      S5       Vs/ s H	  o!X"S-    PM     snmSmUU4S jnSSKJn  U" USS9nUR	                  S5      nU R                  Xa5        U R                  TS	5        g
s  snf )z9Test frame buffer with edge cases that could trigger bugs   Fr   r   i   c                 <   > T[        T5      :  a  gTT   nTS-  mU$ r1   r4   r5   s     r+   r:   @LargePayloadTest.test_frame_buffer_edge_cases.<locals>.mock_recv   r<   r2   r=   Tr?   r`   N)rA   r#   rC   r>   rD   r$   )	r&   payload_16krH   r:   r>   rI   r7   r8   r9   s	          @@r+   test_frame_buffer_edge_cases-LargePayloadTest.test_frame_buffer_edge_cases   s     Ul 6;1c+>NPT5UV5Uad(+5UV
	 	1)$?&-Q'' Ws   A>c                     SnSU-  n[         R                  " U[         R                  5      nUR                  5       nU R	                  U[
        5        U R                  [        UR                  5      U5        g)z4Test behavior at WebSocket maximum frame size limitsi      GN)	r   r   r   r   r    r!   r$   r#   r%   )r&   
large_sizer(   r)   r*   s        r+   test_max_frame_size_limits+LargePayloadTest.test_max_frame_size_limits   sd     !
# !!'4+=+=> LLN	i/ 	UZZ*5r2   rT   N)__name__
__module____qualname____firstlineno__r,   rJ   r[   rh   r~   r   r   r   __static_attributes__rT   r2   r+   r   r      s/     :D *D/:%NN*B7.(86r2   r   __main__)unittestr   unittest.mockr   r   r   rC   r   websocket._corer   websocket._exceptionsr   r	   websocket._ssl_compatr
   TestCaser   r   mainrT   r2   r+   <module>r      sO      0 0   % W *&o6x(( o6d zMMO r2   