diff --git a/include/PgSQL_HostGroups_Manager.h b/include/PgSQL_HostGroups_Manager.h index 4f66c7ef80..b9d19e722c 100644 --- a/include/PgSQL_HostGroups_Manager.h +++ b/include/PgSQL_HostGroups_Manager.h @@ -48,6 +48,15 @@ #endif /* DEBUG */ #define MYHGM_PgSQL_REPLICATION_HOSTGROUPS "CREATE TABLE pgsql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>=0) , check_type VARCHAR CHECK (LOWER(check_type) IN ('read_only')) NOT NULL DEFAULT 'read_only' , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (reader_hostgroup))" +// AWS Aurora PostgreSQL hostgroups table definition +#define MYHGM_PgSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , " \ + "active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL DEFAULT '' , " \ + "max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , " \ + "check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , " \ + "writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , " \ + "add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , " \ + "lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (reader_hostgroup))" + #define PGHGM_GEN_ADMIN_RUNTIME_SERVERS "SELECT hostgroup_id, hostname, port, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" WHEN 4 THEN \"SHUNNED\" END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers ORDER BY hostgroup_id, hostname, port" #define MYHGM_PgSQL_HOSTGROUP_ATTRIBUTES "CREATE TABLE pgsql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , hostgroup_settings VARCHAR CHECK (JSON_VALID(hostgroup_settings) OR hostgroup_settings = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" @@ -379,6 +388,33 @@ struct PgSQL_srv_opts_t { int32_t use_ssl; }; +/** + * @brief AWS Aurora PostgreSQL configuration info + * @details Stores configuration for each Aurora PostgreSQL hostgroup pair + */ +class PgSQL_AWS_Aurora_Info { +public: + int writer_hostgroup; + int reader_hostgroup; + int aurora_port; + int max_lag_ms; + int add_lag_ms; + int min_lag_ms; + int lag_num_checks; + int check_interval_ms; + int check_timeout_ms; + bool active; + bool __active; // temporary flag for tracking during regeneration + int writer_is_also_reader; + int new_reader_weight; + char *domain_name; + char *comment; + + PgSQL_AWS_Aurora_Info(int w, int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c); + bool update(int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c); + ~PgSQL_AWS_Aurora_Info(); +}; + class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { #if 0 SQLite3DB *admindb; @@ -547,6 +583,13 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { */ SQLite3_result *incoming_replication_hostgroups; + // AWS Aurora PostgreSQL + void generate_pgsql_aws_aurora_hostgroups_table(); + SQLite3_result *incoming_aws_aurora_hostgroups; + + pthread_mutex_t AWS_Aurora_Info_mutex; + std::map AWS_Aurora_Info_Map; + void generate_pgsql_hostgroup_attributes_table(); SQLite3_result *incoming_hostgroup_attributes; @@ -839,6 +882,17 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { void unshun_server_all_hostgroups(const char * address, uint16_t port, time_t t, int max_wait_sec, unsigned int *skip_hid); PgSQL_SrvC* find_server_in_hg(unsigned int _hid, const std::string& addr, int port); + // AWS Aurora PostgreSQL methods + bool aws_aurora_replication_lag_action(int _whid, int _rhid, char *server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose=true); + void update_aws_aurora_set_writer(int _whid, int _rhid, char *server_id, bool verbose=true); + void update_aws_aurora_set_reader(int _whid, int _rhid, char *server_id); + /** + * @brief Updates the resultset and corresponding checksum used by Monitor for AWS Aurora PostgreSQL. + * @param lock Whether if both 'AWS_Aurora_Info_mutex' and 'PgSQL_Monitor::aws_aurora_mutex' mutexes should + * be acquired before the update takes place or not. + */ + void update_aws_aurora_hosts_monitor_resultset(bool lock=false); + private: void update_hostgroup_manager_mappings(); uint64_t get_pgsql_servers_checksum(SQLite3_result* runtime_pgsql_servers = nullptr); diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp index bd5a3b7b78..897fbabe1e 100644 --- a/include/PgSQL_Monitor.hpp +++ b/include/PgSQL_Monitor.hpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #define MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG "CREATE TABLE pgsql_server_connect_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , connect_success_time_us INT DEFAULT 0 , connect_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" @@ -20,6 +22,20 @@ #define MONITOR_SQLITE_TABLE_PROXYSQL_SERVERS "CREATE TABLE proxysql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostname, port) )" +// AWS Aurora PostgreSQL monitoring tables +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_LOG "CREATE TABLE pgsql_server_aws_aurora_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , error VARCHAR , server_id VARCHAR NOT NULL DEFAULT '' , session_id VARCHAR , last_update_timestamp VARCHAR , replica_lag_in_msec INT NOT NULL DEFAULT 0 , estimated_lag_ms INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port, time_start_us, server_id))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_CHECK_STATUS "CREATE TABLE pgsql_server_aws_aurora_check_status (writer_hostgroup INT NOT NULL , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , last_checked_at VARCHAR , checks_tot INT NOT NULL DEFAULT 0 , checks_ok INT NOT NULL DEFAULT 0 , last_error VARCHAR , PRIMARY KEY (writer_hostgroup, hostname, port))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_FAILOVERS "CREATE TABLE pgsql_server_aws_aurora_failovers (writer_hostgroup INT NOT NULL , hostname VARCHAR NOT NULL , inserted_at VARCHAR NOT NULL)" + +#define PGSQL_AWS_Aurora_Nentries 150 + +// Forward declarations +class PgSQL_AWS_Aurora_monitor_node; +class PgSQL_AWS_Aurora_status_entry; +class PgSQL_Monitor_Connection_Pool; + struct PgSQL_Monitor { // @brief Flags if monitoring threads should be shutdown. bool shutdown = false; @@ -54,6 +70,18 @@ struct PgSQL_Monitor { const_cast("pgsql_server_read_only_log"), const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_READ_ONLY_LOG) }, + { + const_cast("pgsql_server_aws_aurora_log"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_LOG) + }, + { + const_cast("pgsql_server_aws_aurora_check_status"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_CHECK_STATUS) + }, + { + const_cast("pgsql_server_aws_aurora_failovers"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_FAILOVERS) + }, }; std::vector tables_defs_monitor_internal { @@ -63,7 +91,29 @@ struct PgSQL_Monitor { } }; + // AWS Aurora PostgreSQL monitoring members - placed at end to avoid initialization issues + /////////////////////////////////////////////////////////////////////////// + pthread_mutex_t aws_aurora_mutex; // initialized in constructor like MySQL + SQLite3_result* AWS_Aurora_Hosts_resultset; + uint64_t AWS_Aurora_Hosts_resultset_checksum; + std::map AWS_Aurora_Hosts_Map; + PgSQL_Monitor_Connection_Pool* My_Conn_Pool; // Connection pool for Aurora monitoring + /////////////////////////////////////////////////////////////////////////// + PgSQL_Monitor(); + ~PgSQL_Monitor(); + + // AWS Aurora PostgreSQL methods + unsigned int estimate_lag(char* server_id, PgSQL_AWS_Aurora_status_entry** aase, unsigned int idx, + unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks); + void evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned int rHG, + PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, + unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks); + bool server_responds_to_ping(const char* addr, int port); + + // Populate AWS Aurora monitoring tables + void populate_monitor_pgsql_server_aws_aurora_log(); + void populate_monitor_pgsql_server_aws_aurora_check_status(); }; struct pgsql_conn_t { @@ -74,6 +124,65 @@ struct pgsql_conn_t { mf_unique_ptr err {}; }; +/** + * @brief Represents a single row from aurora_replica_status() function + * @details PostgreSQL Aurora equivalent of AWS_Aurora_replica_host_status_entry + */ +class PgSQL_AWS_Aurora_replica_host_status_entry { +public: + char* server_id = nullptr; + char* session_id = nullptr; + char* last_update_timestamp = nullptr; + float replica_lag_ms = 0.0; + unsigned int estimated_lag_ms = 0; + bool is_current_master = false; + PgSQL_AWS_Aurora_replica_host_status_entry(char* serid, char* sessid, char* lut, float rlm, bool is_master); + PgSQL_AWS_Aurora_replica_host_status_entry(char* serid, char* sessid, char* lut, const char* rlm, bool is_master); + ~PgSQL_AWS_Aurora_replica_host_status_entry(); +}; + +/** + * @brief Represents a single check executed against a single Aurora node + * @details Can contain several PgSQL_AWS_Aurora_replica_host_status_entry + */ +class PgSQL_AWS_Aurora_status_entry { +public: + unsigned long long start_time; + unsigned long long check_time; + char* error; + std::vector* host_statuses; + PgSQL_AWS_Aurora_status_entry(unsigned long long st, unsigned long long ct, char* e); + void add_host_status(PgSQL_AWS_Aurora_replica_host_status_entry* hs); + ~PgSQL_AWS_Aurora_status_entry(); +}; + +/** + * @brief Represents a single Aurora node where checks are executed + * @details A single node will have a PgSQL_AWS_Aurora_status_entry per check + */ +class PgSQL_AWS_Aurora_monitor_node { +private: + int idx_last_entry; +public: + char* addr; + int port; + unsigned int writer_hostgroup; + uint64_t num_checks_tot; + uint64_t num_checks_ok; + time_t last_checked_at; + PgSQL_AWS_Aurora_status_entry* last_entries[PGSQL_AWS_Aurora_Nentries]; + PgSQL_AWS_Aurora_monitor_node(char* _a, int _p, int _whg); + ~PgSQL_AWS_Aurora_monitor_node(); + bool add_entry(PgSQL_AWS_Aurora_status_entry* ase); + PgSQL_AWS_Aurora_status_entry* last_entry() { + if (idx_last_entry == -1) return nullptr; + return last_entries[idx_last_entry]; + } +}; + void* PgSQL_monitor_scheduler_thread(); +void* PgSQL_monitor_AWS_Aurora_thread(void* arg); +void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg); +void* PgSQL_monitor_aws_aurora(void* arg); #endif diff --git a/include/ProxySQL_Admin_Tables_Definitions.h b/include/ProxySQL_Admin_Tables_Definitions.h index 6305c6026f..cfb3a4b345 100644 --- a/include/ProxySQL_Admin_Tables_Definitions.h +++ b/include/ProxySQL_Admin_Tables_Definitions.h @@ -221,6 +221,11 @@ #define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE runtime_mysql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NUlL DEFAULT 3306 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" +// AWS Aurora PostgreSQL +#define ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" + +#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE runtime_pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" + #define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUP_ATTRIBUTES_V2_5_0 "CREATE TABLE mysql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" #define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUP_ATTRIBUTES_V2_5_2 "CREATE TABLE mysql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index bc8f35675b..01d553a454 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -200,11 +200,12 @@ struct peer_mysql_servers_v2_t { struct incoming_pgsql_servers_t { SQLite3_result* incoming_pgsql_servers_v2 = NULL; SQLite3_result* incoming_replication_hostgroups = NULL; + SQLite3_result* incoming_aurora_hostgroups = NULL; SQLite3_result* incoming_hostgroup_attributes = NULL; SQLite3_result* runtime_pgsql_servers = NULL; incoming_pgsql_servers_t(); - incoming_pgsql_servers_t(SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*); + incoming_pgsql_servers_t(SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*); }; // Separate structs for runtime pgsql server and pgsql server v2 to avoid human error diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index 6e3ad7e9ba..0811484377 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -612,6 +612,8 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_hostgroup_attributes", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_HOSTGROUP_ATTRIBUTES); insert_into_tables_defs(tables_defs_admin, "pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_REPLICATION_HOSTGROUPS); insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_REPLICATION_HOSTGROUPS); + insert_into_tables_defs(tables_defs_admin, "pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS); + insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_AWS_AURORA_HOSTGROUPS); insert_into_tables_defs(tables_defs_admin, "pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_USERS); insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_FIREWALL_WHITELIST_USERS); @@ -627,6 +629,7 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_config, "pgsql_query_rules_fast_routing", ADMIN_SQLITE_TABLE_PGSQL_QUERY_RULES_FAST_ROUTING); insert_into_tables_defs(tables_defs_config, "pgsql_hostgroup_attributes", ADMIN_SQLITE_TABLE_PGSQL_HOSTGROUP_ATTRIBUTES); insert_into_tables_defs(tables_defs_config, "pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_REPLICATION_HOSTGROUPS); + insert_into_tables_defs(tables_defs_config, "pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_USERS); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_rules", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_RULES); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_sqli_fingerprints", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_SQLI_FINGERPRINTS); diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 8576d066ce..5938371733 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -8,6 +8,9 @@ using json = nlohmann::json; #include "PgSQL_PreparedStatement.h" #include "PgSQL_Data_Stream.h" +#include "PgSQL_Monitor.hpp" + +extern PgSQL_Monitor* GloPgMon; #include #include @@ -807,14 +810,17 @@ PgSQL_HostGroups_Manager::PgSQL_HostGroups_Manager() { mydb->execute(MYHGM_PgSQL_SERVERS); mydb->execute(MYHGM_PgSQL_SERVERS_INCOMING); mydb->execute(MYHGM_PgSQL_REPLICATION_HOSTGROUPS); + mydb->execute(MYHGM_PgSQL_AWS_AURORA_HOSTGROUPS); mydb->execute(MYHGM_PgSQL_HOSTGROUP_ATTRIBUTES); mydb->execute("CREATE INDEX IF NOT EXISTS idx_pgsql_servers_hostname_port ON pgsql_servers (hostname,port)"); MyHostGroups=new PtrArray(); runtime_pgsql_servers=NULL; incoming_replication_hostgroups=NULL; + incoming_aws_aurora_hostgroups=NULL; incoming_hostgroup_attributes = NULL; incoming_pgsql_servers_v2 = NULL; pgsql_servers_to_monitor = NULL; + pthread_mutex_init(&AWS_Aurora_Info_mutex, NULL); { static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; @@ -1498,6 +1504,14 @@ bool PgSQL_HostGroups_Manager::commit( generate_pgsql_hostgroup_attributes_table(); } + // Aurora PostgreSQL hostgroups + if (incoming_aws_aurora_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM pgsql_aws_aurora_hostgroups\n"); + mydb->execute("DELETE FROM pgsql_aws_aurora_hostgroups"); + generate_pgsql_aws_aurora_hostgroups_table(); + // Note: generate_pgsql_aws_aurora_hostgroups_table() already calls update_aws_aurora_hosts_monitor_resultset() + } + uint64_t new_hash = commit_update_checksum_from_pgsql_servers_v2(peer_pgsql_servers_v2.resultset); { @@ -1817,7 +1831,10 @@ void PgSQL_HostGroups_Manager::update_table_pgsql_servers_for_monitor(bool lock) SQLite3_result * PgSQL_HostGroups_Manager::dump_table_pgsql(const string& name) { char * query = (char *)""; - if (name == "pgsql_replication_hostgroups") { + if (name == "pgsql_aws_aurora_hostgroups") { + query=(char *)"SELECT writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms," + "check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment FROM pgsql_aws_aurora_hostgroups"; + } else if (name == "pgsql_replication_hostgroups") { query=(char *)"SELECT writer_hostgroup, reader_hostgroup, check_type, comment FROM pgsql_replication_hostgroups"; } else if (name == "pgsql_hostgroup_attributes") { query=(char *)"SELECT hostgroup_id, max_num_online_servers, autocommit, free_connections_pct, init_connect, multiplex, connection_warming, throttle_connections_per_sec, ignore_session_variables, hostgroup_settings, servers_defaults, comment FROM pgsql_hostgroup_attributes ORDER BY hostgroup_id"; @@ -2912,6 +2929,8 @@ void PgSQL_HostGroups_Manager::save_incoming_pgsql_table(SQLite3_result *s, cons SQLite3_result ** inc = NULL; if (name == "pgsql_replication_hostgroups") { inc = &incoming_replication_hostgroups; + } else if (name == "pgsql_aws_aurora_hostgroups") { + inc = &incoming_aws_aurora_hostgroups; } else if (name == "pgsql_hostgroup_attributes") { inc = &incoming_hostgroup_attributes; } else { @@ -2943,6 +2962,8 @@ void PgSQL_HostGroups_Manager::save_pgsql_servers_v2(SQLite3_result* s) { SQLite3_result* PgSQL_HostGroups_Manager::get_current_pgsql_table(const string& name) { if (name == "pgsql_replication_hostgroups") { return this->incoming_replication_hostgroups; + } else if (name == "pgsql_aws_aurora_hostgroups") { + return this->incoming_aws_aurora_hostgroups; } else if (name == "pgsql_hostgroup_attributes") { return this->incoming_hostgroup_attributes; } else if (name == "cluster_pgsql_servers") { @@ -4555,3 +4576,685 @@ void PgSQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(PgSQL_SrvC* srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; srv->ConnectionsFree->drop_all_connections(); } + +// AWS Aurora PostgreSQL implementations + +PgSQL_AWS_Aurora_Info::PgSQL_AWS_Aurora_Info(int w, int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { + writer_hostgroup = w; + reader_hostgroup = r; + aurora_port = _port; + domain_name = _domain ? strdup(_domain) : strdup(""); + max_lag_ms = maxl; + add_lag_ms = al; + min_lag_ms = minl; + lag_num_checks = lnc; + check_interval_ms = ci; + check_timeout_ms = ct; + active = _a; + __active = true; + writer_is_also_reader = wiar; + new_reader_weight = nrw; + comment = c ? strdup(c) : strdup(""); +} + +bool PgSQL_AWS_Aurora_Info::update(int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { + bool ret = false; + __active = true; + if (reader_hostgroup != r) { + reader_hostgroup = r; + ret = true; + } + if (aurora_port != _port) { + aurora_port = _port; + ret = true; + } + if (strcmp(domain_name, _domain)) { + free(domain_name); + domain_name = strdup(_domain); + ret = true; + } + if (max_lag_ms != maxl) { + max_lag_ms = maxl; + ret = true; + } + if (add_lag_ms != al) { + add_lag_ms = al; + ret = true; + } + if (min_lag_ms != minl) { + min_lag_ms = minl; + ret = true; + } + if (lag_num_checks != lnc) { + lag_num_checks = lnc; + ret = true; + } + if (check_interval_ms != ci) { + check_interval_ms = ci; + ret = true; + } + if (check_timeout_ms != ct) { + check_timeout_ms = ct; + ret = true; + } + if (active != _a) { + active = _a; + ret = true; + } + if (writer_is_also_reader != wiar) { + writer_is_also_reader = wiar; + ret = true; + } + if (new_reader_weight != nrw) { + new_reader_weight = nrw; + ret = true; + } + if (strcmp(comment, c)) { + free(comment); + comment = strdup(c); + ret = true; + } + return ret; +} + +PgSQL_AWS_Aurora_Info::~PgSQL_AWS_Aurora_Info() { + if (domain_name) free(domain_name); + if (comment) free(comment); +} + +void PgSQL_HostGroups_Manager::generate_pgsql_aws_aurora_hostgroups_table() { + if (incoming_aws_aurora_hostgroups == nullptr) { + return; + } + int rc; + sqlite3_stmt *statement = nullptr; + char *query = (char *)"INSERT INTO pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms," + "check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES " + "(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + rc = mydb->prepare_v2(query, &statement); + ASSERT_SQLITE_OK(rc, mydb); + proxy_info("New pgsql_aws_aurora_hostgroups table\n"); + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + // Mark all existing entries as inactive + for (std::map::iterator it1 = AWS_Aurora_Info_Map.begin(); it1 != AWS_Aurora_Info_Map.end(); ++it1) { + PgSQL_AWS_Aurora_Info *info = nullptr; + info = it1->second; + info->__active = false; + } + // Process incoming entries + for (std::vector::iterator it = incoming_aws_aurora_hostgroups->rows.begin(); it != incoming_aws_aurora_hostgroups->rows.end(); ++it) { + SQLite3_row *r = *it; + int writer_hostgroup = atoi(r->fields[0]); + int reader_hostgroup = atoi(r->fields[1]); + int active = atoi(r->fields[2]); + int aurora_port = atoi(r->fields[3]); + int max_lag_ms = atoi(r->fields[5]); + int check_interval_ms = atoi(r->fields[6]); + int check_timeout_ms = atoi(r->fields[7]); + int writer_is_also_reader = atoi(r->fields[8]); + int new_reader_weight = atoi(r->fields[9]); + int add_lag_ms = atoi(r->fields[10]); + int min_lag_ms = atoi(r->fields[11]); + int lag_num_checks = atoi(r->fields[12]); + proxy_info("Loading AWS Aurora PostgreSQL info for (%d,%d,%s,%d,\"%s\",%d,%d,%d,%d,%d,%d,\"%s\")\n", writer_hostgroup, reader_hostgroup, (active ? "on" : "off"), aurora_port, + r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, r->fields[13]); + rc = (*proxy_sqlite3_bind_int64)(statement, 1, writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 2, reader_hostgroup); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, active); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, aurora_port); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_text)(statement, 5, r->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 6, max_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 7, check_interval_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 8, check_timeout_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 9, writer_is_also_reader); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 10, new_reader_weight); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 11, add_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 12, min_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 13, lag_num_checks); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_text)(statement, 14, r->fields[13], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); + + SAFE_SQLITE3_STEP2(statement); + rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mydb); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + bool changed = false; + changed = info->update(reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[13]); + if (changed) { + // info->need_converge = true; + } + } else { + info = new PgSQL_AWS_Aurora_Info(writer_hostgroup, reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[13]); + AWS_Aurora_Info_Map.insert(AWS_Aurora_Info_Map.begin(), std::pair(writer_hostgroup, info)); + } + } + (*proxy_sqlite3_finalize)(statement); + delete incoming_aws_aurora_hostgroups; + incoming_aws_aurora_hostgroups = nullptr; + + // Remove inactive entries + for (auto it3 = AWS_Aurora_Info_Map.begin(); it3 != AWS_Aurora_Info_Map.end(); ) { + PgSQL_AWS_Aurora_Info *info = it3->second; + if (info->__active == false) { + delete info; + it3 = AWS_Aurora_Info_Map.erase(it3); + } else { + it3++; + } + } + + // Update monitor resultset + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + update_aws_aurora_hosts_monitor_resultset(false); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); +} + +bool PgSQL_HostGroups_Manager::aws_aurora_replication_lag_action(int _whid, int _rhid, char *_server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose) { + bool ret = false; + bool reader_found_in_whg = false; + if (is_writer) { + ret = false; + } + unsigned port = 5432; + char *domain_name = strdup((char *)""); + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_whid); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + port = info->aurora_port; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + char *address = (char *)malloc(strlen(_server_id) + strlen(domain_name) + 1); + sprintf(address, "%s%s", _server_id, domain_name); + + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + int i, j; + for (i = 0; i < (int)MyHostGroups->len; i++) { + PgSQL_HGC *myhgc = (PgSQL_HGC *)MyHostGroups->index(i); + if (_whid != (int)myhgc->hid && _rhid != (int)myhgc->hid) continue; + for (j = 0; j < (int)myhgc->mysrvs->cnt(); j++) { + PgSQL_SrvC *mysrvc = (PgSQL_SrvC *)myhgc->mysrvs->servers->index(j); + if (strcmp(mysrvc->address, address) == 0 && mysrvc->port == (int)port) { + // Found the server + if (enable == false) { + if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) { + if (verbose) { + proxy_warning("Aurora PostgreSQL: Shunning server %s:%d from HG %u with replication lag of %f ms\n", + address, port, myhgc->hid, current_replication_lag_ms); + } + mysrvc->status = MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; + } + } else { + if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { + if (verbose) { + proxy_warning("Aurora PostgreSQL: Re-enabling server %s:%d from HG %u with replication lag of %f ms\n", + address, port, myhgc->hid, current_replication_lag_ms); + } + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + } + } + mysrvc->aws_aurora_current_lag_us = current_replication_lag_ms * 1000; + if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE || mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { + if (ret) { + if (_whid == (int)myhgc->hid && is_writer == false) { + // Server should be a reader but is in writer hostgroup + ret = false; + reader_found_in_whg = true; + } + } else { + if (is_writer == true) { + if (_whid == (int)myhgc->hid) { + // Server is a writer and found in writer hostgroup + ret = true; + } + } else { + if (_rhid == (int)myhgc->hid) { + // Server is a reader and found in reader hostgroup + ret = true; + } + } + } + } + if (ret == false) + if (is_writer == true) + if (enable == true) + if (_whid == (int)myhgc->hid) + if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) { + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + proxy_warning("Aurora PostgreSQL: Re-enabling server %s:%d from HG %u because it is a writer\n", + address, port, myhgc->hid); + ret = true; + } + } + } + } + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + if (ret == true) { + if (reader_found_in_whg == true) { + ret = false; + } + } + free(address); + free(domain_name); + return ret; +} + +void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid, char *_server_id, bool verbose) { + int cols = 0; + int affected_rows = 0; + SQLite3_result *resultset = nullptr; + char *query = nullptr; + char *q = nullptr; + char *error = nullptr; + + int writer_is_also_reader = 0; + int new_reader_weight = 1; + bool found_writer = false; + bool found_reader = false; + int _writer_hostgroup = _whid; + int aurora_port = 5432; + char *domain_name = strdup((char *)""); + int read_HG = -1; + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + writer_is_also_reader = info->writer_is_also_reader; + new_reader_weight = info->new_reader_weight; + read_HG = info->reader_hostgroup; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + aurora_port = info->aurora_port; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + + q = (char *)"SELECT hostgroup_id FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d, %d)"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 1024); + sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + free(error); + error = nullptr; + } + + if (resultset) { + if (resultset->rows_count) { + for (auto it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row *r = *it; + int hostgroup = atoi(r->fields[0]); + if (hostgroup == _writer_hostgroup) { + found_writer = true; + } + if (read_HG >= 0) { + if (hostgroup == read_HG) { + found_reader = true; + } + } + } + } + + if (found_writer) { + if ( + (writer_is_also_reader == 0 && found_reader == false) + || + (writer_is_also_reader > 0 && found_reader == true) + ) { + delete resultset; + resultset = nullptr; + } + } + } + + if (resultset) { + if (resultset->rows_count) { + GloAdmin->pgsql_servers_wrlock(); + mydb->execute("DELETE FROM pgsql_servers_incoming"); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d"; + sprintf(query, q, _rhid); + mydb->execute(query); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port); + mydb->execute(query); + q = (char *)"UPDATE OR IGNORE pgsql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; + sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + + // Move the old writer into the reader HG + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE status=3 AND hostgroup_id=%d"; + sprintf(query, q, _rhid); + mydb->execute(query); + q = (char *)"INSERT OR IGNORE INTO pgsql_servers_incoming SELECT %d, hostname, port, %d, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d AND status=0"; + sprintf(query, q, _rhid, new_reader_weight, _whid); + mydb->execute(query); + + if (writer_is_also_reader && read_HG >= 0) { + q = (char *)"INSERT OR IGNORE INTO pgsql_servers_incoming (hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM pgsql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, read_HG, _writer_hostgroup, _server_id, domain_name, aurora_port); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET weight=%d WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, new_reader_weight, read_HG, _server_id, domain_name, aurora_port); + mydb->execute(query); + } + + // Calculate checksums to check if update is actually needed + uint64_t checksum_current = 0; + uint64_t checksum_incoming = 0; + { + int chk_cols = 0; + int chk_affected_rows = 0; + SQLite3_result *resultset_servers = nullptr; + char *chk_query = nullptr; + char *chk_error = nullptr; + const char *q1 = "SELECT DISTINCT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers.comment FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; + const char *q2 = "SELECT DISTINCT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers_incoming.comment FROM pgsql_servers_incoming JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; + chk_query = (char *)malloc(strlen(q2) + 128); + sprintf(chk_query, q1, _writer_hostgroup); + mydb->execute_statement(chk_query, &chk_error, &chk_cols, &chk_affected_rows, &resultset_servers); + if (chk_error == nullptr) { + if (resultset_servers) { + checksum_current = resultset_servers->raw_checksum(); + } + } + if (chk_error) { free(chk_error); chk_error = nullptr; } + if (resultset_servers) { + delete resultset_servers; + resultset_servers = nullptr; + } + sprintf(chk_query, q2, _writer_hostgroup); + mydb->execute_statement(chk_query, &chk_error, &chk_cols, &chk_affected_rows, &resultset_servers); + if (chk_error == nullptr) { + if (resultset_servers) { + checksum_incoming = resultset_servers->raw_checksum(); + } + } + if (chk_error) { free(chk_error); chk_error = nullptr; } + if (resultset_servers) { + delete resultset_servers; + resultset_servers = nullptr; + } + free(chk_query); + } + + if (checksum_incoming != checksum_current) { + proxy_warning("Aurora PostgreSQL: setting host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id NOT IN (%d, %d)"; + sprintf(query, q, _rhid, _whid); + mydb->execute(query); + commit(); + wrlock(); + q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + sprintf(query, q, _whid, _rhid); + mydb->execute(query); + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + + wrunlock(); + } else { + if (GloPTH->variables.hostgroup_manager_verbose > 1) { + proxy_warning("Aurora PostgreSQL: skipping setting node %s%s:%d from hostgroup %d as writer because won't change the list of ONLINE nodes in writer hostgroup\n", _server_id, domain_name, aurora_port, _writer_hostgroup); + } + } + GloAdmin->pgsql_servers_wrunlock(); + } else { + // Auto-discovery: server not found, create new entry (matching MySQL approach) + std::string full_hostname = std::string(_server_id) + std::string(domain_name); + + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + + // Use create_new_server_in_hg to create the server in memory + PgSQL_srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora PG" }; + PgSQL_srv_opts_t wr_srv_opts { -1, -1, -1 }; + + int wr_res = create_new_server_in_hg(_writer_hostgroup, srv_info, wr_srv_opts); + int rd_res = -1; + + // WRITER can also be placed as READER, or could previously be one + if (writer_is_also_reader && read_HG >= 0) { + PgSQL_srv_opts_t rd_srv_opts { new_reader_weight, -1, -1 }; + rd_res = create_new_server_in_hg(read_HG, srv_info, rd_srv_opts); + } + + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0 || rd_res == 0) { + proxy_info("Aurora PostgreSQL: setting new auto-discovered host %s:%d as writer\n", + full_hostname.c_str(), aurora_port); + + purge_pgsql_servers_table(); + + // Delete servers from the table before regenerating + char del_query[256]; + snprintf(del_query, sizeof(del_query), + "DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)", + _writer_hostgroup, _rhid); + mydb->execute(del_query); + + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + // Update the global checksums after 'pgsql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_pgsql_servers(mydb) }; + string pgsrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_pgsql_servers(resultset.release()); + proxy_info("Checksum for table %s is %s\n", "pgsql_servers", pgsrvs_checksum.c_str()); + + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_pgsql_servers_checksum(pgsrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + // Update AWS Aurora resultset used for monitoring + update_aws_aurora_hosts_monitor_resultset(true); + } + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + } + } + if (resultset) { + delete resultset; + resultset = nullptr; + } + if (query) { + free(query); + } + free(domain_name); +} + +void PgSQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid, char *_server_id) { + int cols = 0; + int affected_rows = 0; + SQLite3_result *resultset = nullptr; + char *query = nullptr; + char *q = nullptr; + char *error = nullptr; + int _writer_hostgroup = _whid; + int aurora_port = 5432; + int new_reader_weight = 1; + char *domain_name = strdup((char *)""); + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + aurora_port = info->aurora_port; + new_reader_weight = info->new_reader_weight; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + + q = (char *)"SELECT hostgroup_id FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d,%d)"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 64); + sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + free(error); + error = nullptr; + } + free(query); + + if (resultset) { + if (resultset->rows_count) { + proxy_warning("Aurora PostgreSQL: setting host %s%s:%d (part of cluster with writer_hostgroup=%d) as a reader, moving from writer_hostgroup %d to reader_hostgroup %d\n", + _server_id, domain_name, aurora_port, _whid, _whid, _rhid); + GloAdmin->pgsql_servers_wrlock(); + mydb->execute("DELETE FROM pgsql_servers_incoming"); + mydb->execute("INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers"); + // If server present as WRITER try moving it to reader_hostgroup + q = (char *)"UPDATE OR IGNORE pgsql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 512); + sprintf(query, q, _rhid, _server_id, domain_name, aurora_port, _whid); + mydb->execute(query); + // If server is still in writer_hostgroup, remove it + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _whid); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _rhid); + mydb->execute(query); + commit(); + wrlock(); + + q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + sprintf(query, q, _whid, _rhid); + mydb->execute(query); + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + free(query); + } else { + // Auto-discovery: server not found, create new entry in reader hostgroup + // Following MySQL's pattern using create_new_server_in_hg + std::string full_hostname = std::string(_server_id) + std::string(domain_name); + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + + PgSQL_srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora PG" }; + PgSQL_srv_opts_t srv_opts { new_reader_weight, -1, -1 }; + int wr_res = create_new_server_in_hg(_rhid, srv_info, srv_opts); + + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0) { + purge_pgsql_servers_table(); + + char *q1 = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + char *q2 = (char *)malloc(strlen(q1) + 64); + sprintf(q2, q1, _whid, _rhid); + mydb->execute(q2); + free(q2); + + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + // Update the global checksums after 'pgsql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_pgsql_servers(mydb) }; + string pgsrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_pgsql_servers(resultset.release()); + proxy_info("Checksum for table %s is %s\n", "pgsql_servers", pgsrvs_checksum.c_str()); + + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_pgsql_servers_checksum(pgsrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + // Update AWS Aurora resultset used for monitoring + update_aws_aurora_hosts_monitor_resultset(true); + } + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + } + } + if (resultset) { + delete resultset; + resultset = nullptr; + } + free(domain_name); +} + +const char SELECT_PGSQL_AWS_AURORA_SERVERS_FOR_MONITOR[] { + "SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl, max_lag_ms, check_interval_ms," + " check_timeout_ms, add_lag_ms, min_lag_ms, lag_num_checks FROM pgsql_servers" + " JOIN pgsql_aws_aurora_hostgroups ON" + " hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3)" + " GROUP BY writer_hostgroup, hostname, port" +}; + +void PgSQL_HostGroups_Manager::update_aws_aurora_hosts_monitor_resultset(bool lock) { + if (lock) { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + } + + SQLite3_result* resultset = nullptr; + { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + mydb->execute_statement(SELECT_PGSQL_AWS_AURORA_SERVERS_FOR_MONITOR, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Aurora PostgreSQL: Error executing monitor query: %s\n", error); + free(error); + } + } + + if (resultset) { + if (GloPgMon->AWS_Aurora_Hosts_resultset) { + delete GloPgMon->AWS_Aurora_Hosts_resultset; + } + GloPgMon->AWS_Aurora_Hosts_resultset = resultset; + GloPgMon->AWS_Aurora_Hosts_resultset_checksum = resultset->raw_checksum(); + } + + if (lock) { + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } +} diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 4db02cf77a..47a3afcbba 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -100,7 +100,146 @@ void check_and_build_standard_tables(SQLite3DB& db, const vector& t db.execute("PRAGMA foreign_keys = ON"); } +/** + * @brief Server container for PostgreSQL Monitor connection pool + * @details Holds connections per server (hostname:port) for reuse + * Equivalent to MySQL's MonMySrvC class + */ +class MonPgSrvC { +public: + char* address; + uint16_t port; + std::unique_ptr conns; + MonPgSrvC(char* a, uint16_t p) { + address = strdup(a); + port = p; + conns = std::unique_ptr(new PtrArray()); + }; + ~MonPgSrvC() { + free(address); + if (conns) { + while (conns->len) { + PGconn* pg = static_cast(conns->index(0)); + if (pg) { + PQfinish(pg); + pg = nullptr; + } + conns->remove_index_fast(0); + } + } + } +}; + +/** + * @brief Connection pool for PostgreSQL Aurora monitoring + * @details Equivalent to MySQL_Monitor_Connection_Pool + * Pools connections for Aurora health checks to reduce connection overhead + */ +class PgSQL_Monitor_Connection_Pool { +private: + std::mutex mutex; + std::unique_ptr servers; +public: + PGconn* get_connection(char* hostname, int port); + void put_connection(char* hostname, int port, PGconn* pg); + void purge_some_connections(); + void purge_all_connections(); + PgSQL_Monitor_Connection_Pool() { + servers = std::unique_ptr(new PtrArray()); + }; + ~PgSQL_Monitor_Connection_Pool() { + purge_all_connections(); + } +}; + +PGconn* PgSQL_Monitor_Connection_Pool::get_connection(char* hostname, int port) { + std::lock_guard lock(mutex); + PGconn* pg = nullptr; + + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + if (srv->port == port && strcmp(hostname, srv->address) == 0) { + if (srv->conns->len) { + while (srv->conns->len) { + unsigned int idx = rand() % srv->conns->len; + PGconn* pgconn = (PGconn*)srv->conns->remove_index_fast(idx); + + if (!pgconn) continue; + + // Check if connection is still alive + if (PQstatus(pgconn) != CONNECTION_OK) { + PQfinish(pgconn); + continue; + } + + pg = pgconn; + break; + } + } + return pg; + } + } + return pg; +} + +void PgSQL_Monitor_Connection_Pool::put_connection(char* hostname, int port, PGconn* pg) { + std::lock_guard lock(mutex); + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + if (srv->port == port && strcmp(hostname, srv->address) == 0) { + srv->conns->add(pg); + return; + } + } + // if no server was found + MonPgSrvC* srv = new MonPgSrvC(hostname, port); + srv->conns->add(pg); + servers->add(srv); +} + +void PgSQL_Monitor_Connection_Pool::purge_some_connections() { + std::lock_guard lock(mutex); + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + // Keep at most 4 connections per server (same as MySQL) + while (srv->conns->len > 4) { + PGconn* pg = (PGconn*)srv->conns->remove_index_fast(0); + if (pg) { + PQfinish(pg); + } + } + // Also check connection status and close dead connections + for (unsigned int j = 0; j < srv->conns->len; j++) { + PGconn* pg = (PGconn*)srv->conns->index(j); + if (pg && PQstatus(pg) != CONNECTION_OK) { + srv->conns->remove_index_fast(j); + PQfinish(pg); + j--; // Recheck this index + } + } + } +} + +void PgSQL_Monitor_Connection_Pool::purge_all_connections() { + std::lock_guard lock(mutex); + if (servers) { + while (servers->len) { + MonPgSrvC* srv = static_cast(servers->index(0)); + if (srv) { + delete srv; + } + servers->remove_index_fast(0); + } + } +} + PgSQL_Monitor::PgSQL_Monitor() { + // Initialize Aurora mutex and members like MySQL does + pthread_mutex_init(&aws_aurora_mutex, NULL); + AWS_Aurora_Hosts_resultset = nullptr; + AWS_Aurora_Hosts_resultset_checksum = 0; + My_Conn_Pool = new PgSQL_Monitor_Connection_Pool(); + int rc = monitordb.open( const_cast("file:mem_monitordb?mode=memory&cache=shared"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX @@ -125,6 +264,158 @@ PgSQL_Monitor::PgSQL_Monitor() { monitordb.execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON pgsql_server_connect_log (time_start_us)"); monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON pgsql_server_ping_log (time_start_us)"); monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_2 ON pgsql_server_ping_log (hostname, port, time_start_us)"); + // Aurora specific indexes + monitordb.execute("CREATE INDEX IF NOT EXISTS idx_aurora_log_time_start ON pgsql_server_aws_aurora_log (time_start_us)"); +} + +PgSQL_Monitor::~PgSQL_Monitor() { + if (AWS_Aurora_Hosts_resultset) { + delete AWS_Aurora_Hosts_resultset; + AWS_Aurora_Hosts_resultset = nullptr; + } + // Clean up Aurora hosts map + for (auto& it : AWS_Aurora_Hosts_Map) { + delete it.second; + } + AWS_Aurora_Hosts_Map.clear(); + // Clean up connection pool + if (My_Conn_Pool) { + delete My_Conn_Pool; + My_Conn_Pool = nullptr; + } +} + +bool PgSQL_Monitor::server_responds_to_ping(const char* addr, int port) { + int max_fails = 3; // Could be made configurable + cfmt_t q_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) }; + + char* err { nullptr }; + unique_ptr result { monitordb.execute_statement(q_fmt.str.c_str(), &err) }; + + if (err || result == nullptr) { + free(err); + return false; + } + return !result->rows_count; +} + +unsigned int PgSQL_Monitor::estimate_lag(char* server_id, PgSQL_AWS_Aurora_status_entry** aase, unsigned int idx, + unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { + // Safety checks - return 0 if invalid input + // Use N_L_ASE (16) for array bounds, not PGSQL_AWS_Aurora_Nentries (150) + if (!aase || !server_id) { + return 0; + } + if (idx >= N_L_ASE) { + return 0; + } + + if (lag_num_checks > N_L_ASE) lag_num_checks = N_L_ASE; + if (lag_num_checks <= 0) lag_num_checks = 1; + + unsigned int mlag = 0; + unsigned int lag = 0; + + for (unsigned int i = 1; i <= lag_num_checks; i++) { + if (!aase[idx] || !aase[idx]->host_statuses) + break; + for (auto hse : *(aase[idx]->host_statuses)) { + // NULL check for hse->server_id + if (hse && hse->server_id && strcmp(server_id, hse->server_id) == 0 && (unsigned int)hse->replica_lag_ms != 0) { + unsigned int ms = std::max(((unsigned int)hse->replica_lag_ms + add_lag_ms), min_lag_ms); + if (ms > mlag) mlag = ms; + if (!lag) lag = ms; + } + } + if (idx == 0) idx = N_L_ASE; + idx--; + } + + return mlag; +} + +// AWS Aurora PostgreSQL class implementations + +PgSQL_AWS_Aurora_replica_host_status_entry::PgSQL_AWS_Aurora_replica_host_status_entry( + char* serid, char* sessid, char* lut, float rlm, bool is_master +) { + server_id = serid ? strdup(serid) : nullptr; + session_id = sessid ? strdup(sessid) : nullptr; + last_update_timestamp = lut ? strdup(lut) : nullptr; + replica_lag_ms = rlm; + is_current_master = is_master; +} + +PgSQL_AWS_Aurora_replica_host_status_entry::PgSQL_AWS_Aurora_replica_host_status_entry( + char* serid, char* sessid, char* lut, const char* rlm, bool is_master +) { + server_id = serid ? strdup(serid) : nullptr; + session_id = sessid ? strdup(sessid) : nullptr; + last_update_timestamp = lut ? strdup(lut) : nullptr; + replica_lag_ms = rlm ? atof(rlm) : 0.0f; + is_current_master = is_master; +} + +PgSQL_AWS_Aurora_replica_host_status_entry::~PgSQL_AWS_Aurora_replica_host_status_entry() { + if (server_id) free(server_id); + if (session_id) free(session_id); + if (last_update_timestamp) free(last_update_timestamp); +} + +PgSQL_AWS_Aurora_status_entry::PgSQL_AWS_Aurora_status_entry( + unsigned long long st, unsigned long long ct, char* e +) : start_time(st), check_time(ct), error(nullptr) { + if (e) error = strdup(e); + host_statuses = new std::vector(); +} + +void PgSQL_AWS_Aurora_status_entry::add_host_status(PgSQL_AWS_Aurora_replica_host_status_entry* hs) { + host_statuses->push_back(hs); +} + +PgSQL_AWS_Aurora_status_entry::~PgSQL_AWS_Aurora_status_entry() { + if (error) free(error); + for (auto hs : *host_statuses) { + delete hs; + } + delete host_statuses; +} + +PgSQL_AWS_Aurora_monitor_node::PgSQL_AWS_Aurora_monitor_node(char* _a, int _p, int _whg) { + addr = strdup(_a); + port = _p; + writer_hostgroup = _whg; + idx_last_entry = -1; + num_checks_tot = 0; + num_checks_ok = 0; + last_checked_at = 0; + for (int i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + last_entries[i] = nullptr; + } +} + +PgSQL_AWS_Aurora_monitor_node::~PgSQL_AWS_Aurora_monitor_node() { + if (addr) free(addr); + for (int i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + if (last_entries[i]) delete last_entries[i]; + } +} + +bool PgSQL_AWS_Aurora_monitor_node::add_entry(PgSQL_AWS_Aurora_status_entry* ase) { + num_checks_tot++; + if (ase->error == nullptr) { + num_checks_ok++; + } + last_checked_at = time(nullptr); + idx_last_entry++; + if (idx_last_entry >= PGSQL_AWS_Aurora_Nentries) { + idx_last_entry = 0; + } + if (last_entries[idx_last_entry]) { + delete last_entries[idx_last_entry]; + } + last_entries[idx_last_entry] = ase; + return true; } /** @@ -636,13 +927,9 @@ pair handle_async_connect_cont(state_t& st, short revent) { break; case PGRES_POLLING_FAILED: { // During connection phase use `PQerrorMessage` - const mon_srv_t& srv { st.task.op_st.srv_info }; + // Note: Error is recorded in pgsql_server_connect_log table; logging here would be noisy + // as this fires on every connection failure. The shunning logic will log when max_failures is reached. auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - - proxy_error( - "Monitor connect failed addr='%s:%d' error='%s'\n", - srv.addr.c_str(), srv.port, err.get() - ); set_failed_st(st, ASYNC_CONNECT_FAILED, std::move(err)); break; } @@ -896,7 +1183,7 @@ pgsql_conn_t create_new_conn(task_st_t& task_st) { if (pgconn.conn) { auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - proxy_error( + proxy_debug(PROXY_DEBUG_MONITOR, 5, "Monitor connect failed addr='%s:%d' error='%s'\n", srv.addr.c_str(), srv.port, error.get() ); @@ -905,7 +1192,7 @@ pgsql_conn_t create_new_conn(task_st_t& task_st) { task_st.end = monotonic_time(); } else { mf_unique_ptr error { strdup("Out of memory") }; - proxy_error( + proxy_debug(PROXY_DEBUG_MONITOR, 5, "Monitor connect failed addr='%s:%d' error='%s'\n", srv.addr.c_str(), srv.port, "Out of memory" ); @@ -2088,6 +2375,18 @@ void* PgSQL_monitor_scheduler_thread() { workers.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) }); } + // Start Aurora PostgreSQL monitoring thread + pthread_t pgsql_monitor_aws_aurora_thread; + pthread_attr_t aurora_attr; + pthread_attr_init(&aurora_attr); + pthread_attr_setstacksize(&aurora_attr, 2048 * 1024); + if (pthread_create(&pgsql_monitor_aws_aurora_thread, &aurora_attr, PgSQL_monitor_aws_aurora, NULL) != 0) { + proxy_error("Failed to create Aurora PostgreSQL monitor thread\n"); + } else { + proxy_info("Started Aurora PostgreSQL monitor thread\n"); + } + pthread_attr_destroy(&aurora_attr); + uint64_t cur_intv_start = 0; tasks_intvs_t next_intvs {}; vector tasks_batches {}; @@ -2222,6 +2521,10 @@ void* PgSQL_monitor_scheduler_thread() { pthread_join(worker.first, NULL); } + // Wait for Aurora thread to exit + pthread_join(pgsql_monitor_aws_aurora_thread, NULL); + proxy_info("Aurora PostgreSQL monitor thread joined\n"); + // Cleanup the global connection pool; no mutex, threads joined for (auto& entry : mon_conn_pool.conn_map) { for (auto& conn : entry.second) { @@ -2233,3 +2536,757 @@ void* PgSQL_monitor_scheduler_thread() { return nullptr; } + +// ========================================================================= +// AWS Aurora PostgreSQL Monitoring Implementation +// ========================================================================= + +extern PgSQL_HostGroups_Manager* PgHGM; + +// Number of last Aurora status entries to keep +#define N_L_ASE 16 + +// Structure to hold host definitions for Aurora monitoring +struct pgsql_host_def_t { + char* host; + int port; + int use_ssl; +}; + +// Helper function to shuffle hosts array +static void shuffle_pgsql_hosts(pgsql_host_def_t* arr, unsigned int n) { + if (n <= 1) return; + for (unsigned int i = n - 1; i > 0; i--) { + unsigned int j = rand() % (i + 1); + if (i != j) { + pgsql_host_def_t tmp; + size_t stride = sizeof(pgsql_host_def_t); + memcpy(&tmp, arr + i * stride / sizeof(pgsql_host_def_t), sizeof(pgsql_host_def_t)); + memcpy(arr + i * stride / sizeof(pgsql_host_def_t), arr + j * stride / sizeof(pgsql_host_def_t), sizeof(pgsql_host_def_t)); + memcpy(arr + j * stride / sizeof(pgsql_host_def_t), &tmp, sizeof(pgsql_host_def_t)); + } + } +} + +#ifdef TEST_AURORA +static void print_pgsql_aws_aurora_status_entry(PgSQL_AWS_Aurora_status_entry* aase) { + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (PgSQL_AWS_Aurora_replica_host_status_entry* hse : *aase->host_statuses) { + if (hse) { + fprintf(stderr, "%s %s %s %f %u\n", hse->server_id, hse->session_id, + hse->last_update_timestamp, hse->replica_lag_ms, hse->estimated_lag_ms); + } + } + } + } +} +#endif // TEST_AURORA + +void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned int rHG, + PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, + unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { +#ifdef TEST_AURORA + unsigned int i = 0; + bool verbose = false; + unsigned int action_yes = 0; + unsigned int action_no = 0; + unsigned int enabling = 0; + unsigned int disabling = 0; + if (rand() % 500 == 0) { + verbose = true; + bool ev = false; + if (rand() % 1000 == 0) { + ev = true; + } + for (i = 0; i < N_L_ASE; i++) { + PgSQL_AWS_Aurora_status_entry* aase_tmp = lasts_ase[i]; + if (ev == true || i == ase_idx) { + print_pgsql_aws_aurora_status_entry(aase_tmp); + } + } + } +#endif // TEST_AURORA + unsigned int prev_ase_idx = ase_idx; + if (prev_ase_idx == 0) prev_ase_idx = N_L_ASE; + prev_ase_idx--; + + PgSQL_AWS_Aurora_status_entry* aase = lasts_ase[ase_idx]; + PgSQL_AWS_Aurora_status_entry* prev_aase = lasts_ase[prev_ase_idx]; + + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (auto it3 = aase->host_statuses->begin(); it3 != aase->host_statuses->end(); ++it3) { + PgSQL_AWS_Aurora_replica_host_status_entry* hse = *it3; + if (!hse) continue; // Skip NULL entries + + bool run_action = true; + bool enable = true; + bool is_writer = false; + bool rla_rc = true; + + // Skip if server_id is NULL + if (!hse->server_id) { + proxy_warning("Aurora PostgreSQL: Skipping entry with NULL server_id\n"); + continue; + } + + unsigned int current_lag_ms = estimate_lag(hse->server_id, lasts_ase, ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); + hse->estimated_lag_ms = current_lag_ms; + + if (current_lag_ms > max_latency_ms) { + enable = false; + } + + // PostgreSQL Aurora uses is_current_master instead of MASTER_SESSION_ID + if (hse->is_current_master) { + is_writer = true; + } + + // Determine if a change needs to be made by comparing with previous check + if (prev_aase && prev_aase->start_time) { + if (prev_aase->host_statuses->size()) { + for (auto it4 = prev_aase->host_statuses->begin(); it4 != prev_aase->host_statuses->end(); ++it4) { + PgSQL_AWS_Aurora_replica_host_status_entry* prev_hse = *it4; + if (!prev_hse || !prev_hse->server_id) continue; // Skip NULL entries + if (strcmp(prev_hse->server_id, hse->server_id) == 0) { + bool prev_enabled = true; + unsigned int prev_lag_ms = estimate_lag(hse->server_id, lasts_ase, prev_ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); + if (prev_lag_ms > max_latency_ms) { + prev_enabled = false; + } + if (prev_enabled == enable) { + // Previous status is the same, no action needed + run_action = false; + } + } + } + } + } + + if (run_action) { +#ifdef TEST_AURORA + action_yes++; + (enable ? enabling++ : disabling++); + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer, verbose); +#else + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); +#endif // TEST_AURORA + } else { +#ifdef TEST_AURORA + action_no++; +#endif // TEST_AURORA + if (is_writer) { + // If the server is a writer we run it anyway for sanity check + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); + } + } + + if (rla_rc == false) { + if (is_writer) { + // The server should be a writer but is not configured as one +#ifdef TEST_AURORA + proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_writer for %s\n", hse->server_id); +#endif // TEST_AURORA + PgHGM->update_aws_aurora_set_writer(wHG, rHG, hse->server_id); + + // Log failover event + time_t __timer; + char lut[30]; + struct tm __tm_info; + time(&__timer); + localtime_r(&__timer, &__tm_info); + strftime(lut, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); + + char* q1 = (char*)"INSERT INTO pgsql_server_aws_aurora_failovers VALUES (%d, '%s', '%s')"; + char* q2 = (char*)malloc(strlen(q1) + strlen(lut) + strlen(hse->server_id) + 32); + sprintf(q2, q1, wHG, hse->server_id, lut); + monitordb.execute(q2); + free(q2); + } else { +#ifdef TEST_AURORA + proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_reader for %s\n", hse->server_id); +#endif // TEST_AURORA + PgHGM->update_aws_aurora_set_reader(wHG, rHG, hse->server_id); + } + } + } + } + } +#ifdef TEST_AURORA + if (verbose) { + proxy_info("Aurora PostgreSQL replication_lag_actions: YES=%u , NO=%u , enabling=%u , disabling=%u\n", action_yes, action_no, enabling, disabling); + } +#endif // TEST_AURORA +} + +/** + * @brief Aurora PostgreSQL monitoring thread for a specific hostgroup + * @details This thread periodically queries aurora_replica_status() to discover cluster topology + */ +void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { + unsigned int wHG = *(unsigned int*)arg; + unsigned int rHG = 0; + unsigned int num_hosts = 0; + unsigned int cur_host_idx = 0; + unsigned int max_lag_ms = 0; + unsigned int check_interval_ms = 0; + unsigned int check_timeout_ms = 0; + unsigned int add_lag_ms = 0; + unsigned int min_lag_ms = 0; + unsigned int lag_num_checks = 1; + + proxy_info("Started Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); + + // Initialize thread-local variables (matching MySQL pattern) + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + pgsql_thr->curtime = monotonic_time(); + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + pgsql_thr->refresh_variables(); + + // Quick exit checks + if (!GloPTH) { + delete pgsql_thr; + return nullptr; + } + if (!GloPgMon) { + delete pgsql_thr; + return nullptr; + } + + // Get monitor credentials from GloPTH + char* monitor_user = GloPTH->get_variable_string((char*)"monitor_username"); + char* monitor_pass = GloPTH->get_variable_string((char*)"monitor_password"); + + uint64_t initial_raw_checksum = 0; + + // Static array of the latest reads + unsigned int ase_idx = 0; + PgSQL_AWS_Aurora_status_entry* lasts_ase[N_L_ASE]; + for (unsigned int i = 0; i < N_L_ASE; i++) { + lasts_ase[i] = nullptr; + } + + // Initialize hpa to NULL for proper cleanup + pgsql_host_def_t* hpa = nullptr; + + // Initial data load + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + initial_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset_checksum; + + // Count the number of hosts + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + if (atoi(r->fields[0]) == (int)wHG) { + num_hosts++; + if (max_lag_ms == 0) { + max_lag_ms = atoi(r->fields[5]); + } + if (check_interval_ms == 0) { + check_interval_ms = atoi(r->fields[6]); + } + if (check_timeout_ms == 0) { + check_timeout_ms = atoi(r->fields[7]); + } + if (rHG == 0) { + rHG = atoi(r->fields[1]); + } + add_lag_ms = atoi(r->fields[8]); + min_lag_ms = atoi(r->fields[9]); + lag_num_checks = atoi(r->fields[10]); + } + } + + if (num_hosts == 0) { + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + proxy_warning("Aurora PostgreSQL Monitor: No hosts found for writer HG %u\n", wHG); + // Cleanup before early return + if (monitor_user) free(monitor_user); + if (monitor_pass) free(monitor_pass); + return nullptr; + } + + hpa = (pgsql_host_def_t*)malloc(sizeof(pgsql_host_def_t) * num_hosts); + cur_host_idx = 0; + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + if (atoi(r->fields[0]) == (int)wHG) { + hpa[cur_host_idx].host = strdup(r->fields[2]); + hpa[cur_host_idx].port = atoi(r->fields[3]); + hpa[cur_host_idx].use_ssl = atoi(r->fields[4]); + cur_host_idx++; + } + } + // NOTE: 'cur_host_idx' should never be higher than 'num_hosts' otherwise later an invalid memory access + // can take place later when accessing 'hpa[cur_host_idx]'. + if (cur_host_idx >= num_hosts) { + cur_host_idx = num_hosts - 1; + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + bool exit_now = false; + unsigned long long t1 = 0; + unsigned long long next_loop_at = 0; + + uint64_t current_raw_checksum = 0; + bool found_pingable_host = false; + + t1 = monotonic_time(); + unsigned long long start_time = t1; + + while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true && exit_now == false) { + unsigned int glover; + t1 = monotonic_time(); + + if (!GloPTH) { + goto __exit_pgsql_monitor_AWS_Aurora_thread_HG_now; + } + + // if variables has changed, triggers new checks + glover = GloPTH->get_global_version(); + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover; + pgsql_thr->refresh_variables(); + next_loop_at = 0; + } + + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + current_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset_checksum; + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + if (current_raw_checksum != initial_raw_checksum) { + // Content has changed, exit + exit_now = true; + break; + } + + if (t1 < next_loop_at) { + unsigned long long st = next_loop_at - t1; + if (st > 50000) { + st = 50000; + } + usleep(st); + continue; + } + + found_pingable_host = false; + + // Pick a random host + size_t rnd = (size_t)rand(); + rnd %= num_hosts; + if (GloPgMon->server_responds_to_ping(hpa[rnd].host, hpa[rnd].port)) { + found_pingable_host = true; + cur_host_idx = rnd; + } else { + // Try all hosts + shuffle_pgsql_hosts(hpa, num_hosts); + for (unsigned int i = 0; found_pingable_host == false && i < num_hosts; i++) { + if (GloPgMon->server_responds_to_ping(hpa[i].host, hpa[i].port)) { + found_pingable_host = true; + cur_host_idx = i; + } + } + } + + if (found_pingable_host == false) { + proxy_error("No node is pingable for AWS Aurora PostgreSQL cluster with writer HG %u\n", wHG); + next_loop_at = t1 + check_interval_ms * 1000; + continue; + } + + // Execute Aurora replica status query + start_time = t1; + char* error_msg = nullptr; + + // Try to get connection from pool first (crc=false means from pool, crc=true means new connection) + // Note: crc is kept for MySQL parity and potential future use (e.g., connection timeout tracking) + bool crc __attribute__((unused)) = false; + PGconn* conn = GloPgMon->My_Conn_Pool->get_connection(hpa[cur_host_idx].host, hpa[cur_host_idx].port); + if (!conn) { + // Build connection string with monitor credentials + // Note: dbname=postgres is used because aurora_replica_status() is a system function + char conninfo[1024]; + snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=postgres user=%s password=%s connect_timeout=%d", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, + monitor_user ? monitor_user : "", + monitor_pass ? monitor_pass : "", + check_timeout_ms / 1000); + conn = PQconnectdb(conninfo); + crc = true; // Mark as new connection + } + + unsigned long long t2 = monotonic_time(); + PgSQL_AWS_Aurora_status_entry* ase = nullptr; + PgSQL_AWS_Aurora_status_entry* ase_l = nullptr; + + if (PQstatus(conn) != CONNECTION_OK) { + error_msg = strdup(PQerrorMessage(conn)); + proxy_error("Error on AWS Aurora PostgreSQL check for %s:%d after %llums. Unable to create a connection. Error: %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, (t2 - start_time) / 1000, error_msg); + ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + free(error_msg); + } else { + // Execute the aurora_replica_status() query + // Aurora PostgreSQL provides: server_id, session_id, replica_lag_in_msec + // Writer is identified by session_id = 'MASTER_SESSION_ID' + const char* query = "SELECT server_id, session_id, replica_lag_in_msec, " + "CASE WHEN session_id = 'MASTER_SESSION_ID' THEN true ELSE false END as is_writer " + "FROM aurora_replica_status()"; + + PGresult* res = PQexec(conn, query); + t2 = monotonic_time(); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + error_msg = strdup(PQerrorMessage(conn)); + proxy_error("Error on AWS Aurora PostgreSQL check for %s:%d after %llums. Query failed. Error: %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, (t2 - start_time) / 1000, error_msg); + ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + free(error_msg); + } else { + unsigned long long time_now = realtime_time(); + time_now = time_now - (t2 - start_time); + ase = new PgSQL_AWS_Aurora_status_entry(time_now, t2 - start_time, nullptr); + ase_l = new PgSQL_AWS_Aurora_status_entry(time_now, t2 - start_time, nullptr); + + int nrows = PQntuples(res); + for (int i = 0; i < nrows; i++) { + char* server_id = PQgetvalue(res, i, 0); + char* session_id = PQgetvalue(res, i, 1); + char* replica_lag_str = PQgetvalue(res, i, 2); + char* is_writer_str = PQgetvalue(res, i, 3); + + float replica_lag = replica_lag_str ? atof(replica_lag_str) : 0.0f; + bool is_writer = (is_writer_str && (strcmp(is_writer_str, "t") == 0 || strcmp(is_writer_str, "true") == 0 || strcmp(is_writer_str, "1") == 0)); + + // Use session_id as last_update placeholder (not available in aurora_replica_status()) + PgSQL_AWS_Aurora_replica_host_status_entry* arhse = + new PgSQL_AWS_Aurora_replica_host_status_entry(server_id, session_id, session_id, replica_lag, is_writer); + ase->add_host_status(arhse); + + PgSQL_AWS_Aurora_replica_host_status_entry* arhse_l = + new PgSQL_AWS_Aurora_replica_host_status_entry(server_id, session_id, session_id, replica_lag, is_writer); + ase_l->add_host_status(arhse_l); + } + // Query succeeded, return connection to pool + // Note: MySQL distinguishes between pool connections (crc=false) and new connections (crc=true) + // with set_wait_timeout() for new connections. PostgreSQL doesn't have this, so we always + // return the connection to pool on success regardless of crc flag. + GloPgMon->My_Conn_Pool->put_connection(hpa[cur_host_idx].host, hpa[cur_host_idx].port, conn); + conn = nullptr; // Mark as handled + } + PQclear(res); + } + // If connection wasn't returned to pool (error case), close it + // This matches MySQL's behavior: on error, connection is closed (not returned to pool) + if (conn) { + PQfinish(conn); + conn = nullptr; + } + + // Process results + if (lasts_ase[ase_idx]) { + delete lasts_ase[ase_idx]; + } + lasts_ase[ase_idx] = ase_l; + + GloPgMon->evaluate_pgsql_aws_aurora_results(wHG, rHG, &lasts_ase[0], ase_idx, max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks); + + // Copy estimated_lag_ms from ase_l to ase + for (auto h : *(ase_l->host_statuses)) { + for (auto h2 : *(ase->host_statuses)) { + if (strcmp(h2->server_id, h->server_id) == 0) { + h2->estimated_lag_ms = h->estimated_lag_ms; + } + } + } + + ase_idx++; + if (ase_idx == N_L_ASE) { + ase_idx = 0; + } + + // Store in Aurora hosts map for monitoring statistics + if (GloPgMon && ase && hpa && cur_host_idx < num_hosts && hpa[cur_host_idx].host) { + std::string key = std::string(hpa[cur_host_idx].host) + ":" + std::to_string(hpa[cur_host_idx].port); + + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + auto it2 = GloPgMon->AWS_Aurora_Hosts_Map.find(key); + PgSQL_AWS_Aurora_monitor_node* node = nullptr; + if (it2 != GloPgMon->AWS_Aurora_Hosts_Map.end()) { + node = it2->second; + node->add_entry(ase); + } else { + node = new PgSQL_AWS_Aurora_monitor_node(hpa[cur_host_idx].host, hpa[cur_host_idx].port, wHG); + node->add_entry(ase); + GloPgMon->AWS_Aurora_Hosts_Map.insert(std::make_pair(key, node)); + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + } else if (ase) { + // If we can't store it, delete to prevent memory leak + delete ase; + ase = nullptr; + } + + next_loop_at = t1 + (check_interval_ms * 1000); + } + +__exit_pgsql_monitor_AWS_Aurora_thread_HG_now: + // Cleanup + if (monitor_user) free(monitor_user); + if (monitor_pass) free(monitor_pass); + + if (hpa) { + for (unsigned int i = 0; i < num_hosts; i++) { + if (hpa[i].host) { + free(hpa[i].host); + } + } + free(hpa); + } + + for (unsigned int i = 0; i < N_L_ASE; i++) { + if (lasts_ase[i]) { + delete lasts_ase[i]; + } + } + + // Cleanup thread object + if (pgsql_thr) { + delete pgsql_thr; + } + + proxy_info("Stopping Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); + return nullptr; +} + +/** + * @brief Main Aurora PostgreSQL monitoring function + * @details Spawns per-hostgroup monitoring threads when Aurora hostgroups are configured + */ +void* PgSQL_monitor_aws_aurora(void* arg) { + (void)arg; // unused + if (!GloPgMon) return nullptr; + + // Initialize the PgSQL Thread (note: this is not a real thread, just the structures associated with it) + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + pgsql_thr->curtime = monotonic_time(); + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + pgsql_thr->refresh_variables(); + if (!GloPTH) return nullptr; // quick exit during shutdown/restart + + uint64_t last_raw_checksum = 0; + unsigned int* hgs_array = nullptr; + pthread_t* pthreads_array = nullptr; + unsigned int hgs_num = 0; + + proxy_info("Started Aurora PostgreSQL Monitor main thread\n"); + + while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true) { + unsigned int glover; + + if (!GloPTH) { + goto __exit_pgsql_monitor_aws_aurora; + } + + // if variables has changed, triggers new checks + glover = GloPTH->get_global_version(); + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover; + pgsql_thr->refresh_variables(); + } + + // Check if list of servers or HG or options has changed + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + uint64_t new_raw_checksum = 0; + if (GloPgMon->AWS_Aurora_Hosts_resultset) { + new_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset->raw_checksum(); + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + if (new_raw_checksum != last_raw_checksum) { + proxy_info("Aurora PostgreSQL: Detected new/changed definition for monitoring\n"); + last_raw_checksum = new_raw_checksum; + + if (pthreads_array) { + // Wait for all threads to terminate + for (unsigned int i = 0; i < hgs_num; i++) { + pthread_join(pthreads_array[i], nullptr); + proxy_info("Stopped Aurora PostgreSQL Monitor thread for writer HG %u\n", hgs_array[i]); + } + free(pthreads_array); + free(hgs_array); + pthreads_array = nullptr; + hgs_array = nullptr; + hgs_num = 0; + } + + // Count unique writer hostgroups + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + if (GloPgMon->AWS_Aurora_Hosts_resultset && GloPgMon->AWS_Aurora_Hosts_resultset->rows_count) { + std::map unique_whgs; + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + unsigned int whg = atoi(r->fields[0]); + unique_whgs[whg] = true; + } + hgs_num = unique_whgs.size(); + if (hgs_num) { + proxy_info("Activating Monitoring of %u AWS Aurora PostgreSQL clusters\n", hgs_num); + hgs_array = (unsigned int*)malloc(sizeof(unsigned int) * hgs_num); + pthreads_array = (pthread_t*)malloc(sizeof(pthread_t) * hgs_num); + unsigned int idx = 0; + for (auto& it : unique_whgs) { + hgs_array[idx] = it.first; + idx++; + } + } + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + // Start threads for each writer hostgroup + for (unsigned int i = 0; i < hgs_num; i++) { + proxy_info("Starting Monitor thread for AWS Aurora PostgreSQL writer HG %u\n", hgs_array[i]); + if (pthread_create(&pthreads_array[i], nullptr, PgSQL_monitor_AWS_Aurora_thread_HG, &hgs_array[i]) != 0) { + proxy_error("Thread creation failed for AWS Aurora PostgreSQL writer HG %u\n", hgs_array[i]); + } + } + } + + usleep(500000); // 500ms + } + +__exit_pgsql_monitor_aws_aurora: + // Cleanup thread object + if (pgsql_thr) { + delete pgsql_thr; + pgsql_thr = nullptr; + } + + // Cleanup on shutdown + if (pthreads_array) { + for (unsigned int i = 0; i < hgs_num; i++) { + pthread_join(pthreads_array[i], nullptr); + } + free(pthreads_array); + free(hgs_array); + } + + proxy_info("Stopping Aurora PostgreSQL Monitor main thread\n"); + return nullptr; +} + +void PgSQL_Monitor::populate_monitor_pgsql_server_aws_aurora_log() { + SQLite3DB* db = &monitordb; + int rc; + char *query1 = nullptr; + query1 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_log VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"; + sqlite3_stmt *statement1 = nullptr; + char *query2 = nullptr; + query2 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_log (hostname, port, time_start_us, success_time_us, error) VALUES (?1, ?2, ?3, ?4, ?5)"; + sqlite3_stmt *statement2 = nullptr; + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + rc = db->prepare_v2(query2, &statement2); + ASSERT_SQLITE_OK(rc, db); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + db->execute((char *)"DELETE FROM pgsql_server_aws_aurora_log"); + std::map::iterator it2; + PgSQL_AWS_Aurora_monitor_node *node = nullptr; + for (it2 = GloPgMon->AWS_Aurora_Hosts_Map.begin(); it2 != GloPgMon->AWS_Aurora_Hosts_Map.end(); ++it2) { + std::string s = it2->first; + node = it2->second; + std::size_t found = s.find_last_of(":"); + std::string host = s.substr(0, found); + std::string port = s.substr(found + 1); + int i; + for (i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + PgSQL_AWS_Aurora_status_entry *aase = node->last_entries[i]; + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (std::vector::iterator it3 = aase->host_statuses->begin(); it3 != aase->host_statuses->end(); ++it3) { + PgSQL_AWS_Aurora_replica_host_status_entry *hse = *it3; + if (hse) { + rc = (*proxy_sqlite3_bind_text)(statement1, 1, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, aase->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, aase->check_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 5, aase->error, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 6, hse->server_id, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, hse->session_id, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 8, hse->last_update_timestamp, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_double)(statement1, 9, hse->replica_lag_ms); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 10, hse->estimated_lag_ms); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + } + } else { + rc = (*proxy_sqlite3_bind_text)(statement2, 1, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 2, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 3, aase->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 4, aase->check_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement2, 5, aase->error, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement2); + rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, db); + } + } + } + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement2); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); +} + +void PgSQL_Monitor::populate_monitor_pgsql_server_aws_aurora_check_status() { + SQLite3DB* db = &monitordb; + int rc; + char *query1 = nullptr; + query1 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_check_status VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; + sqlite3_stmt *statement1 = nullptr; + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + db->execute((char *)"DELETE FROM pgsql_server_aws_aurora_check_status"); + std::map::iterator it2; + PgSQL_AWS_Aurora_monitor_node *node = nullptr; + for (it2 = GloPgMon->AWS_Aurora_Hosts_Map.begin(); it2 != GloPgMon->AWS_Aurora_Hosts_Map.end(); ++it2) { + std::string s = it2->first; + node = it2->second; + std::size_t found = s.find_last_of(":"); + std::string host = s.substr(0, found); + std::string port = s.substr(found + 1); + PgSQL_AWS_Aurora_status_entry *aase = node->last_entry(); + char *error_msg = nullptr; + if (aase && aase->start_time) { + if (aase->error) { + error_msg = aase->error; + } + } + char lut[30]; + struct tm __tm_info; + localtime_r(&node->last_checked_at, &__tm_info); + strftime(lut, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); + + rc = (*proxy_sqlite3_bind_int64)(statement1, 1, node->writer_hostgroup); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 2, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 4, lut, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 5, node->num_checks_tot); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 6, node->num_checks_ok); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + (*proxy_sqlite3_finalize)(statement1); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); +} diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index daebe79ced..f34c05a97e 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -42,6 +42,7 @@ using json = nlohmann::json; #include "ProxySQL_Statistics.hpp" #include "MySQL_Logger.hpp" #include "PgSQL_Logger.hpp" +#include "PgSQL_Monitor.hpp" #include "SQLite3_Server.h" #include "Web_Interface.hpp" @@ -147,7 +148,7 @@ static const vector pgsql_servers_tablenames = { "pgsql_replication_hostgroups", // "pgsql_group_replication_hostgroups", // "pgsql_galera_hostgroups", -// "pgsql_aws_aurora_hostgroups", + "pgsql_aws_aurora_hostgroups", "pgsql_hostgroup_attributes", }; @@ -322,6 +323,7 @@ extern MySQL_Logger *GloMyLogger; extern PgSQL_Logger* GloPgSQL_Logger; extern MySQL_STMT_Manager_v14 *GloMyStmt; extern MySQL_Monitor *GloMyMon; +extern PgSQL_Monitor* GloPgMon; extern PgSQL_Threads_Handler* GloPTH; extern void (*flush_logs_function)(); @@ -872,11 +874,13 @@ incoming_pgsql_servers_t::incoming_pgsql_servers_t() {} incoming_pgsql_servers_t::incoming_pgsql_servers_t( SQLite3_result* incoming_pgsql_servers_v2, SQLite3_result* incoming_replication_hostgroups, + SQLite3_result* incoming_aurora_hostgroups, SQLite3_result* incoming_hostgroup_attributes, SQLite3_result* runtime_pgsql_servers ) : incoming_pgsql_servers_v2(incoming_pgsql_servers_v2), incoming_replication_hostgroups(incoming_replication_hostgroups), + incoming_aurora_hostgroups(incoming_aurora_hostgroups), incoming_hostgroup_attributes(incoming_hostgroup_attributes), runtime_pgsql_servers(runtime_pgsql_servers) {} @@ -1205,6 +1209,9 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool monitor_mysql_server_aws_aurora_log=false; bool monitor_mysql_server_aws_aurora_check_status=false; + bool monitor_pgsql_server_aws_aurora_log=false; + bool monitor_pgsql_server_aws_aurora_check_status=false; + bool stats_proxysql_servers_checksums = false; bool stats_proxysql_servers_metrics = false; bool stats_proxysql_message_metrics = false; @@ -1382,6 +1389,8 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign || strstr(query_no_space, "runtime_pgsql_replication_hostgroups") || + strstr(query_no_space, "runtime_pgsql_aws_aurora_hostgroups") + || strstr(query_no_space, "runtime_pgsql_hostgroup_attributes") ) { runtime_pgsql_servers = true; refresh = true; @@ -1465,6 +1474,12 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (strstr(query_no_space,"mysql_server_aws_aurora_check_status")) { monitor_mysql_server_aws_aurora_check_status=true; refresh=true; } + if (strstr(query_no_space,"pgsql_server_aws_aurora_log")) { + monitor_pgsql_server_aws_aurora_log=true; refresh=true; + } + if (strstr(query_no_space,"pgsql_server_aws_aurora_check_status")) { + monitor_pgsql_server_aws_aurora_check_status=true; refresh=true; + } // if (stats_mysql_processlist || stats_mysql_connection_pool || stats_mysql_query_digest || stats_mysql_query_digest_reset) { if (refresh==true) { //pthread_mutex_lock(&admin_mutex); @@ -1685,6 +1700,16 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign GloMyMon->populate_monitor_mysql_server_aws_aurora_check_status(); } } + if (monitor_pgsql_server_aws_aurora_log) { + if (GloPgMon) { + GloPgMon->populate_monitor_pgsql_server_aws_aurora_log(); + } + } + if (monitor_pgsql_server_aws_aurora_check_status) { + if (GloPgMon) { + GloPgMon->populate_monitor_pgsql_server_aws_aurora_check_status(); + } + } //pthread_mutex_unlock(&admin_mutex); } @@ -7165,6 +7190,56 @@ void ProxySQL_Admin::save_pgsql_servers_runtime_to_database(bool _runtime) { } if (resultset) delete resultset; resultset = NULL; + + // dump pgsql_aws_aurora_hostgroups + + if (_runtime) { + query = (char*)"DELETE FROM main.runtime_pgsql_aws_aurora_hostgroups"; + } else { + query = (char*)"DELETE FROM main.pgsql_aws_aurora_hostgroups"; + } + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute(query); + resultset = PgHGM->dump_table_pgsql("pgsql_aws_aurora_hostgroups"); + if (resultset) { + int rc; + sqlite3_stmt* statement = NULL; + + char* q = NULL; + if (_runtime) { + q = (char*)"INSERT INTO runtime_pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + } else { + q = (char*)"INSERT INTO pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + } + + rc = admindb->prepare_v2(q, &statement); + ASSERT_SQLITE_OK(rc, admindb); + + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + rc = (*proxy_sqlite3_bind_int64)(statement, 1, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 2, atoi(r->fields[1])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, atoi(r->fields[3])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_text)(statement, 5, r->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 6, atoi(r->fields[5])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 7, atoi(r->fields[6])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 8, atoi(r->fields[7])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 9, atoi(r->fields[8])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 10, atoi(r->fields[9])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 11, atoi(r->fields[10])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 12, atoi(r->fields[11])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 13, atoi(r->fields[12])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_text)(statement, 14, r->fields[13], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb); + + SAFE_SQLITE3_STEP2(statement); + rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, admindb); + } + (*proxy_sqlite3_finalize)(statement); + } + if (resultset) delete resultset; + resultset = NULL; } @@ -7435,10 +7510,12 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ SQLite3_result* resultset = NULL; SQLite3_result* resultset_servers = NULL; SQLite3_result* resultset_replication = NULL; + SQLite3_result* resultset_aws_aurora = NULL; SQLite3_result* resultset_hostgroup_attributes = NULL; SQLite3_result* runtime_pgsql_servers = incoming_pgsql_servers.runtime_pgsql_servers; SQLite3_result* incoming_replication_hostgroups = incoming_pgsql_servers.incoming_replication_hostgroups; + SQLite3_result* incoming_aurora_hostgroups = incoming_pgsql_servers.incoming_aurora_hostgroups; SQLite3_result* incoming_hostgroup_attributes = incoming_pgsql_servers.incoming_hostgroup_attributes; SQLite3_result* incoming_pgsql_servers_v2 = incoming_pgsql_servers.incoming_pgsql_servers_v2; @@ -7500,6 +7577,39 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ //if (resultset) delete resultset; //resultset=NULL; + // support for AWS Aurora PostgreSQL, table pgsql_aws_aurora_hostgroups + + // look for invalid combinations + query = (char*)"SELECT a.* FROM pgsql_aws_aurora_hostgroups a JOIN pgsql_aws_aurora_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } + else { + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + proxy_error("Incompatible entry in pgsql_aws_aurora_hostgroups will be ignored : ( %s , %s , %s , %s )\n", r->fields[0], r->fields[1], r->fields[2], r->fields[3]); + } + } + if (resultset) delete resultset; + resultset = NULL; + + query = (char*)"SELECT a.* FROM pgsql_aws_aurora_hostgroups a LEFT JOIN pgsql_aws_aurora_hostgroups b ON (a.writer_hostgroup=b.reader_hostgroup) WHERE b.reader_hostgroup IS NULL ORDER BY writer_hostgroup"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + if (incoming_aurora_hostgroups == nullptr) { + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset_aws_aurora); + } + else { + resultset_aws_aurora = incoming_aurora_hostgroups; + } + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } + else { + // Pass the resultset to PgHGM + PgHGM->save_incoming_pgsql_table(resultset_aws_aurora, "pgsql_aws_aurora_hostgroups"); + } // support for hostgroup attributes, table pgsql_hostgroup_attributes query = (char*)"SELECT * FROM pgsql_hostgroup_attributes ORDER BY hostgroup_id"; @@ -7535,6 +7645,10 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ delete resultset_replication; resultset_replication = NULL; } + if (resultset_aws_aurora) { + //delete resultset_aws_aurora; // do not delete, resultset is stored in PgHGM + resultset_aws_aurora = NULL; + } if (resultset_hostgroup_attributes) { resultset_hostgroup_attributes = NULL; } diff --git a/lib/ProxySQL_Config.cpp b/lib/ProxySQL_Config.cpp index 1a0aecdc60..c19debd958 100644 --- a/lib/ProxySQL_Config.cpp +++ b/lib/ProxySQL_Config.cpp @@ -1786,6 +1786,62 @@ int ProxySQL_Config::Read_PgSQL_Servers_from_configfile(std::string& error) { rows++; } } + // AWS Aurora PostgreSQL + if (root.exists("pgsql_aws_aurora_hostgroups")==true) { + const Setting &pgsql_aws_aurora_hostgroups = root["pgsql_aws_aurora_hostgroups"]; + int count = pgsql_aws_aurora_hostgroups.getLength(); + char *q=(char *)"INSERT OR REPLACE INTO pgsql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment ) VALUES (%d, %d, %d, %d, '%s', %d, %d, %d, %d, %d, %d, %d, %d, '%s')"; + for (i=0; i< count; i++) { + const Setting &line = pgsql_aws_aurora_hostgroups[i]; + int writer_hostgroup; + int reader_hostgroup; + int active=1; // default + int aurora_port; + int max_lag_ms; + int add_lag_ms; + int min_lag_ms; + int lag_num_checks; + int check_interval_ms; + int check_timeout_ms; + int writer_is_also_reader; + int new_reader_weight; + std::string comment=""; + std::string domain_name=""; + if (line.lookupValue("writer_hostgroup", writer_hostgroup)==false) { + proxy_error("Admin: detected a pgsql_aws_aurora_hostgroups in config file without a mandatory writer_hostgroup\n"); + continue; + } + if (line.lookupValue("reader_hostgroup", reader_hostgroup)==false) { + proxy_error("Admin: detected a pgsql_aws_aurora_hostgroups in config file without a mandatory reader_hostgroup\n"); + continue; + } + if (line.lookupValue("aurora_port", aurora_port)==false) aurora_port=5432; + if (line.lookupValue("max_lag_ms", max_lag_ms)==false) max_lag_ms=600000; + if (line.lookupValue("check_interval_ms", check_interval_ms)==false) check_interval_ms=1000; + if (line.lookupValue("check_timeout_ms", check_timeout_ms)==false) check_timeout_ms=800; + if (line.lookupValue("writer_is_also_reader", writer_is_also_reader)==false) writer_is_also_reader=0; + if (line.lookupValue("new_reader_weight", new_reader_weight)==false) new_reader_weight=1; + if (line.lookupValue("add_lag_ms", add_lag_ms)==false) add_lag_ms=30; + if (line.lookupValue("min_lag_ms", min_lag_ms)==false) min_lag_ms=30; + if (line.lookupValue("lag_num_checks", lag_num_checks)==false) lag_num_checks=1; + line.lookupValue("active", active); + line.lookupValue("comment", comment); + line.lookupValue("domain_name", domain_name); + char *o1=strdup(comment.c_str()); + char *o=escape_string_single_quotes(o1, false); + char *p1=strdup(domain_name.c_str()); + char *p=escape_string_single_quotes(p1, false); + char *query=(char *)malloc(strlen(q)+strlen(o)+strlen(p)+256); + sprintf(query,q, writer_hostgroup, reader_hostgroup, active, aurora_port, p, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, o); + admindb->execute(query); + if (o!=o1) free(o); + free(o1); + if (p!=p1) free(p); + free(p1); + free(query); + rows++; + } + } admindb->execute("PRAGMA foreign_keys = ON"); return rows; }