authen
.public
Tables
(current)
Columns
Constraints
Relationships
Orphan Tables
Anomalies
Routines
sp_sync_rate_interface
Parameters
Name
Type
Mode
Definition
DECLARE v_service_code text; v_conns text[]; v_conn text; v_row jsonb; v_conn_name text; r_enabled boolean; r_prop_uuid uuid; r_max_sync_days integer; r_last_sync timestamptz; r_channel_code text; r_api_key text; r_endpoint text; sql_call_get_params text; sql_call_get_channel_params text; sql_call_sp_get_sync_roomrate text; sql_update_channel_response text; sql_commit_state text; v_new_stamp timestamptz; -- Parallel execution variables v_conn_index int; v_total_conns int; v_pending_conns text[]; v_pending_traces uuid[]; v_pending_data jsonb[]; v_trace_id uuid; v_conn_params record; v_sql text; v_start_time timestamptz; v_timeout_seconds int := 300; -- 5 นาที timeout สำหรับแต่ละ connection v_elapsed double precision; v_has_result boolean; -- Retry logic variables v_max_retries int := 2; v_retry_count int := 0; v_retry_delay_ms int := 500; -- 500ms delay between retries v_conn_established boolean := false; BEGIN SELECT array_agg(fn_prop_connection(id)) INTO v_conns FROM property WHERE enabled = true; IF v_conns IS NULL OR array_length(v_conns,1) IS NULL THEN PERFORM public.sp_system_notify( 'SYNC RATE SKIP', 'No property connections found for sync.', NULL, NULL ); RETURN; END IF; v_total_conns := array_length(v_conns, 1); -- ============================================================================ -- PHASE 1: ส่ง Query ทั้งหมดพร้อมกัน (Non-blocking) -- ============================================================================ <<parallel_send>> FOR v_conn_index IN 1..v_total_conns LOOP v_conn := v_conns[v_conn_index]; v_trace_id := gen_random_uuid(); v_conn_name := 'conn_' || v_trace_id::text; BEGIN -- 0) Validate connection before use with retry logic v_conn_established := false; v_retry_count := 0; <<connect_retry>> LOOP BEGIN -- Disconnect first if connection exists from previous retry BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; -- Ignore if connection doesn't exist yet END; -- Try to connect EXECUTE format('SELECT dblink_connect(%L, %L)', v_conn_name, v_conn); v_conn_established := true; EXIT connect_retry; -- Connection successful, exit retry loop EXCEPTION WHEN OTHERS THEN v_retry_count := v_retry_count + 1; IF v_retry_count <= v_max_retries THEN -- Cleanup failed connection before retry BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; -- Wait before retry (using pg_sleep for milliseconds) PERFORM pg_sleep(v_retry_delay_ms / 1000.0); CONTINUE connect_retry; ELSE -- Max retries exceeded, report error and continue to next connection -- Cleanup failed connection BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; PERFORM public.sp_system_notify( 'SYNC RATE CONNECTION FAILED', format('Cannot establish dblink connection after %s retries | Conn Hash: %s | Trace: %s | Error: %s | SQLState: %s', v_max_retries, md5(v_conn), v_trace_id, SQLERRM, SQLSTATE), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', SQLERRM, 'sqlstate', SQLSTATE, 'phase', 'connection_establishment', 'retries_attempted', v_retry_count ); EXIT connect_retry; END IF; END; END LOOP connect_retry; -- Skip this connection if we couldn't establish it IF NOT v_conn_established THEN CONTINUE parallel_send; END IF; -- Connection successful, now proceed with queries -- 1) อ่าน NBIS params sql_call_get_params := $SQL$ SELECT jsonb_build_object( 'enabled', COALESCE(fn_intf_param('NBIS','INTERFACE_RATE_ENABLED','false')::boolean, false), 'max_days', COALESCE(fn_intf_param('NBIS','MAX_SYNC_DAYS','400')::int, 400), 'interface_code', COALESCE(NULLIF(fn_intf_param('NBIS','INTERFACE_CODE','')::text,''), 'CHANNEX') )::jsonb $SQL$; BEGIN SELECT (j->>'enabled')::boolean, (j->>'max_days')::int, (j->>'interface_code')::text INTO r_enabled, r_max_sync_days, r_channel_code FROM ( SELECT value AS j FROM dblink(v_conn_name, sql_call_get_params) AS t(value jsonb) ) s; EXCEPTION WHEN OTHERS THEN PERFORM public.sp_system_notify( 'SYNC RATE FAILED', format('Failed to read NBIS params | Conn Hash: %s | Trace: %s | Error: %s | SQLState: %s', md5(v_conn), v_trace_id, SQLERRM, SQLSTATE), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', SQLERRM, 'sqlstate', SQLSTATE, 'phase', 'read_nbis_params' ); -- Disconnect on error BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; CONTINUE parallel_send; END; IF NOT r_enabled THEN -- PERFORM public.sp_system_notify( -- 'SYNC RATE SKIP', -- format('INTERFACE_RATE_ENABLED=false | Conn Hash: %s | Trace: %s', md5(v_conn), v_trace_id), -- NULL, NULL --); RETURN NEXT jsonb_build_object( 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'note', 'INTERFACE_RATE_ENABLED=false' ); CONTINUE parallel_send; END IF; v_service_code := 'SYNC-RATE-INTERFACE-' || r_channel_code; -- 2) อ่าน channel params sql_call_get_channel_params := format($SQL$ SELECT jsonb_build_object( 'prop_uuid', fn_intf_param('%1$s','PROPERTY_ID','')::uuid, 'api_key', fn_intf_param('%1$s','API-KEY',''), 'endpoint', fn_intf_param('%1$s','ENDPOINT','') )::jsonb $SQL$, r_channel_code); BEGIN SELECT (j->>'prop_uuid')::uuid, (j->>'api_key')::text, (j->>'endpoint')::text INTO r_prop_uuid, r_api_key, r_endpoint FROM ( SELECT value AS j FROM dblink(v_conn_name, sql_call_get_channel_params) AS t(value jsonb) ) s; EXCEPTION WHEN OTHERS THEN PERFORM public.sp_system_notify( 'SYNC RATE FAILED', format('Failed to read channel params | Channel: %s | Conn Hash: %s | Trace: %s | Error: %s | SQLState: %s', r_channel_code, md5(v_conn), v_trace_id, SQLERRM, SQLSTATE), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', SQLERRM, 'sqlstate', SQLSTATE, 'phase', 'read_channel_params' ); -- Disconnect on error BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; CONTINUE parallel_send; END; IF r_prop_uuid IS NULL THEN PERFORM public.sp_system_notify( 'SYNC RATE FAILED', format('Missing PROPERTY_ID | Channel: %s | Conn: %s', r_channel_code, v_conn), NULL, NULL ); RETURN NEXT jsonb_build_object('connection', v_conn, 'trace_id', v_trace_id, 'error', 'Missing PROPERTY_ID'); CONTINUE parallel_send; END IF; IF r_endpoint IS NULL OR r_endpoint = '' THEN PERFORM public.sp_system_notify( 'SYNC RATE FAILED', format('Missing ENDPOINT | Channel: %s | Conn: %s', r_channel_code, v_conn), NULL, NULL ); RETURN NEXT jsonb_build_object('connection', v_conn, 'trace_id', v_trace_id, 'error', 'Missing ENDPOINT'); CONTINUE parallel_send; END IF; -- อ่าน last_sync จาก sync_stamp (ใช้สำหรับกำหนด sync_all flag) SELECT last_sync INTO r_last_sync FROM sync_stamp WHERE service_code = v_service_code AND property_id = r_prop_uuid; -- 3) สร้าง SQL สำหรับเรียก sp_get_sync_roomrate sql_call_sp_get_sync_roomrate := format($Q$ SELECT public.sp_get_sync_roomrate( jsonb_build_object( 'trace_id', %L, 'db_name', '%s', 'source', 'cron.sp_sync_rate_interface', 'sync_times', NOW() )::jsonb, jsonb_build_object( 'last_sync', %L::timestamptz, 'sync_all', %s, 'roomtypeno', NULL, 'rate_id', NULL, 'rateplan_id', NULL, 'max_days', %s, 'property_id', %L )::jsonb )::jsonb $Q$, v_trace_id, current_database(), r_last_sync, CASE WHEN r_last_sync IS NULL THEN 'true' ELSE 'false' END, COALESCE(r_max_sync_days, 400), r_prop_uuid::text ); -- 4) ส่ง query แบบ non-blocking (parallel) PERFORM dblink_send_query(v_conn_name, sql_call_sp_get_sync_roomrate); -- 5) เก็บข้อมูลไว้ใช้ตอนรับผล v_pending_conns := array_append(v_pending_conns, v_conn_name); v_pending_traces := array_append(v_pending_traces, v_trace_id); v_pending_data := array_append(v_pending_data, jsonb_build_object( 'connection', v_conn, 'connection_name', v_conn_name, 'trace_id', v_trace_id, 'service_code', v_service_code, 'prop_uuid', r_prop_uuid, 'channel_code', r_channel_code, 'api_key', r_api_key, 'endpoint', r_endpoint, 'last_sync', r_last_sync, 'max_days', r_max_sync_days )); EXCEPTION WHEN OTHERS THEN PERFORM public.sp_system_notify( 'SYNC RATE CONNECTION FAILED', format('Conn Hash: %s | Trace: %s | Error: %s | SQLState: %s', md5(v_conn), v_trace_id, SQLERRM, SQLSTATE), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', SQLERRM, 'sqlstate', SQLSTATE, 'phase', 'send_query' ); -- Cleanup connection on error BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; END; END LOOP parallel_send; -- ============================================================================ -- PHASE 2: รอรับผลลัพธ์จากทุก Connections (พร้อมกัน) -- ============================================================================ <<parallel_receive>> FOR v_conn_index IN 1..COALESCE(array_length(v_pending_conns, 1), 0) LOOP v_conn_name := v_pending_conns[v_conn_index]; v_trace_id := v_pending_traces[v_conn_index]; v_start_time := clock_timestamp(); v_conn := v_pending_data[v_conn_index]->>'connection'; -- รอผลลัพธ์ด้วย timeout LOOP -- ตรวจสอบ timeout SELECT EXTRACT(EPOCH FROM (clock_timestamp() - v_start_time)) INTO v_elapsed; IF v_elapsed > v_timeout_seconds THEN PERFORM public.sp_system_notify( 'SYNC RATE TIMEOUT', format('Conn Hash: %s | Trace: %s | Timeout: %s seconds', md5(v_conn), v_trace_id, v_timeout_seconds), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', 'Timeout exceeded', 'timeout_seconds', v_timeout_seconds, 'phase', 'receive_result' ); -- Cleanup connection on timeout BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; EXIT; END IF; -- พยายามรับผลลัพธ์ BEGIN FOR v_row IN SELECT jsonb_build_object('connection', v_conn, 'data', value, 'trace_id', v_trace_id) FROM dblink_get_result(v_conn_name) AS t(value jsonb) LOOP -- เรียก function แยกเพื่อประมวลผลผลลัพธ์ RETURN QUERY SELECT * FROM public.fn_process_sync_result( v_row, v_pending_data[v_conn_index] ); END LOOP; -- Cleanup connection after receiving result BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; EXIT; -- ได้รับผลแล้ว EXCEPTION WHEN OTHERS THEN PERFORM public.sp_system_notify( 'SYNC RATE PROCESS FAILED', format('Conn Hash: %s | Trace: %s | Error: %s | SQLState: %s', md5(v_conn), v_trace_id, SQLERRM, SQLSTATE), NULL, NULL ); RETURN NEXT jsonb_build_object( 'connection', v_conn, 'connection_hash', md5(v_conn), 'trace_id', v_trace_id, 'error', SQLERRM, 'sqlstate', SQLSTATE, 'phase', 'receive_result' ); -- Cleanup connection on error BEGIN EXECUTE format('SELECT dblink_disconnect(%L)', v_conn_name); EXCEPTION WHEN OTHERS THEN NULL; END; EXIT; END; END LOOP; END LOOP parallel_receive; RETURN; END;