Skip to content

Commit

Permalink
Add handling of "logical replication parallel worker" (#67)
Browse files Browse the repository at this point in the history
it was introduced in PG16 and responsible for parallel apply.

Besides that split logical replication worker to "apply" and "tablesync" workers, like it was done in PG17.
  • Loading branch information
CyberDem0n authored Jul 1, 2024
1 parent 1c54846 commit f265f19
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
6 changes: 4 additions & 2 deletions bg_mon.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ static const char *process_type(pg_stat_activity p)
QUOTE(WAL_SUMMARIZER_PROC_NAME),
QUOTE(PARALLEL_WORKER_NAME),
QUOTE(LOGICAL_LAUNCHER_NAME),
QUOTE(LOGICAL_WORKER_NAME)
QUOTE(LOGICAL_TABLESYNC_WORKER_NAME),
QUOTE(LOGICAL_APPLY_WORKER_NAME),
QUOTE(LOGICAL_PARALLEL_WORKER_NAME)
};

if (p.type == PG_BG_WORKER)
Expand All @@ -332,7 +334,7 @@ static const char *process_type(pg_stat_activity p)

static const char *get_query(pg_stat_activity s)
{
if (s.type == PG_LOGICAL_WORKER)
if (s.type >= PG_LOGICAL_TABLESYNC_WORKER)
return s.ps.cmdline;

switch (s.state)
Expand Down
36 changes: 26 additions & 10 deletions postgres_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,17 @@ static void merge_stats(pg_stat_activity_list *pg_stats, proc_stat_list proc_sta

#define PARALLEL_WORKER_PROC_NAME PARALLEL_WORKER_NAME " for PID"
#define LOGICAL_LAUNCHER_PROC_NAME LOGICAL_LAUNCHER_NAME SUFFIX_PATTERN
#define LOGICAL_WORKER_PROC_NAME LOGICAL_WORKER_NAME " for"
#if PG_VERSION_NUM >= 170000
#define LOGICAL_TABLESYNC_WORKER_PROC_NAME "logical replication tablesync worker for"
#else
#define LOGICAL_TABLESYNC_WORKER_PROC_NAME "logical replication worker for"
#endif
#if PG_VERSION_NUM < 160000
#define LOGICAL_APPLY_WORKER_PROC_NAME "logical replication worker for"
#else
#define LOGICAL_APPLY_WORKER_PROC_NAME "logical replication apply worker for"
#endif
#define LOGICAL_PARALLEL_WORKER_PROC_NAME "logical replication parallel apply worker for"

#define BACKEND_ENTRY(CMDLINE_PATTERN, TYPE) TAB_ENTRY(CMDLINE_PATTERN " ", PG_##TYPE)

Expand Down Expand Up @@ -785,7 +795,6 @@ static void merge_stats(pg_stat_activity_list *pg_stats, proc_stat_list proc_sta

static PgBackendType parse_cmdline(const char * const buf, const char **rest)
{
PgBackendType type = PG_UNDEFINED;
*rest = buf;
if (strncmp(buf, cmdline_prefix, cmdline_prefix_len) == 0) {
int j;
Expand All @@ -806,7 +815,11 @@ static PgBackendType parse_cmdline(const char * const buf, const char **rest)
#endif
#if PG_VERSION_NUM >= 100000
BGWORKER(LOGICAL_LAUNCHER),
BGWORKER(LOGICAL_WORKER),
BGWORKER(LOGICAL_APPLY_WORKER),
#endif
#if PG_VERSION_NUM >= 160000
BGWORKER(LOGICAL_PARALLEL_WORKER),
BGWORKER(LOGICAL_TABLESYNC_WORKER),
#endif
#if PG_VERSION_NUM < 110000
OTH_BACKEND(BG_WORKER),
Expand All @@ -826,22 +839,25 @@ static PgBackendType parse_cmdline(const char * const buf, const char **rest)

for (j = 0; backend_tab[j].name != NULL; ++j)
if (strncmp(cmd, backend_tab[j].name, backend_tab[j].name_len) == 0) {
type = backend_tab[j].type;
*rest = cmd + backend_tab[j].name_len;
break;
#if PG_VERSION_NUM < 160000
if (backend_tab[j].type == PG_LOGICAL_APPLY_WORKER && strstr(*rest, " sync "))
return PG_LOGICAL_TABLESYNC_WORKER;
#endif
return backend_tab[j].type;
}

#if PG_VERSION_NUM >= 110000
if (backend_tab[j].name == NULL) {
{
size_t len = strlen(cmd) - sizeof(SUFFIX_PATTERN);
if (len > 0 && strcmp(cmd + len, " " SUFFIX_PATTERN) == 0) {
type = PG_BG_WORKER;
*rest = cmd;
return PG_BG_WORKER;
}
}
#endif
}
return type;
return PG_UNDEFINED;
}

static void read_proc_cmdline(pg_stat_activity *stat)
Expand Down Expand Up @@ -872,7 +888,7 @@ static void read_proc_cmdline(pg_stat_activity *stat)
rest += len + 1;
}
stat->query = json_escape_string(rest);
} else if ((type == PG_LOGICAL_WORKER || type == PG_BG_WORKER) && *rest)
} else if ((type >= PG_LOGICAL_TABLESYNC_WORKER || type == PG_BG_WORKER) && *rest)
stat->ps.cmdline = json_escape_string_len(rest, strlen(rest) - sizeof(SUFFIX_PATTERN));
else if (type == PG_PARALLEL_WORKER && *rest)
stat->parent_pid = strtoul(rest, NULL, 10);
Expand Down Expand Up @@ -910,7 +926,7 @@ static void diff_pg_stat_activity(pg_stat_activity_list old_activity, pg_stat_ac
old_activity.values[old_pos].ps.free_cmdline = true;
else if (old_activity.values[old_pos].type != PG_UNDEFINED && new_activity.values[new_pos].type == PG_UNDEFINED) {
new_activity.values[new_pos].type = old_activity.values[old_pos].type;
if (new_activity.values[new_pos].type == PG_LOGICAL_WORKER || new_activity.values[new_pos].type == PG_BG_WORKER)
if (new_activity.values[new_pos].type >= PG_LOGICAL_TABLESYNC_WORKER || new_activity.values[new_pos].type == PG_BG_WORKER)
new_activity.values[new_pos].ps.cmdline = old_activity.values[old_pos].ps.cmdline;
}

Expand Down
8 changes: 6 additions & 2 deletions postgres_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ typedef enum PgBackendType
PG_WAL_SUMMARIZER,
PG_PARALLEL_WORKER,
PG_LOGICAL_LAUNCHER,
PG_LOGICAL_WORKER
PG_LOGICAL_TABLESYNC_WORKER,
PG_LOGICAL_APPLY_WORKER,
PG_LOGICAL_PARALLEL_WORKER
} PgBackendType;

#define UNKNOWN_NAME "not initialized"
Expand All @@ -44,7 +46,9 @@ typedef enum PgBackendType
#define STATS_COLLECTOR_PROC_NAME "stats collector"
#define PARALLEL_WORKER_NAME "parallel worker"
#define LOGICAL_LAUNCHER_NAME "logical replication launcher"
#define LOGICAL_WORKER_NAME "logical replication worker"
#define LOGICAL_TABLESYNC_WORKER_NAME "logical replication tablesync worker"
#define LOGICAL_APPLY_WORKER_NAME "logical replication apply worker"
#define LOGICAL_PARALLEL_WORKER_NAME "logical replication parallel worker"

typedef struct {
bool available;
Expand Down

0 comments on commit f265f19

Please sign in to comment.