Skip to content

Commit

Permalink
feat: add support for pgpro_stats 1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
mstyushin committed Jan 14, 2025
1 parent 3465255 commit f9e8623
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 11 deletions.
22 changes: 22 additions & 0 deletions mamonsu/plugins/pgsql/driver/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, params=None):
"bootstrap": {"storage": {}, "counter": 0, "cache": 10, "version": False},
"recovery": {"storage": {}, "counter": 0, "cache": 10},
"extension_schema": {"pg_buffercache": {}, "pg_stat_statements": {}, "pg_wait_sampling": {}, "pgpro_stats": {}},
"extension_versions" : {},
"pgpro": {"storage": {}},
"pgproee": {"storage": {}}
}
Expand All @@ -135,6 +136,19 @@ def server_version(self, db=None):
result.decode("ascii"))
return self._cache["server_version"]["storage"][db]

def extension_version(self, extension, db=None):
db = self._normalize_db(db)
if extension in self._cache["extension_versions"] and db in self._cache["extension_versions"][extension][db]:
return self._cache["extension_versions"][extension][db]

version_string = self.query("select extversion from pg_catalog.pg_extension where lower(extname) = lower('{0}');".format(extension), db)[0][0]
result = bytes(
version_string.split(" ")[0], "utf-8")
self._cache["extension_versions"][extension] = {}
self._cache["extension_versions"][extension][db] = "{0}".format(
result.decode("ascii"))
return self._cache["extension_versions"][extension][db]

def server_version_greater(self, version, db=None):
db = self._normalize_db(db)
return packaging.version.parse(self.server_version(db)) >= packaging.version.parse(version)
Expand Down Expand Up @@ -229,6 +243,14 @@ def is_pgpro_ee(self, db=None):
self._cache["pgproee"][db] = False
return self._cache["pgproee"][db]

def extension_version_greater(self, extension, version, db=None):
db = self._normalize_db(db)
return packaging.version.parse(self.extension_version(extension, db)) >= packaging.version.parse(version)

def extension_version_less(self, extension, version, db=None):
db = self._normalize_db(db)
return packaging.version.parse(self.extension_version(extension, db)) <= packaging.version.parse(version)

def extension_installed(self, ext, db=None):
db = self._normalize_db(db)
result = self.query("""
Expand Down
70 changes: 60 additions & 10 deletions mamonsu/plugins/pgsql/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Statements(Plugin):

query_info = """
SELECT {metrics}
FROM {extension_schema}.pg_stat_statements_info;
FROM {extension_schema}.{info_view_name};
"""
key = "pgsql."
# zbx_key, sql, desc, unit, delta, (Graph, color, side)
Expand Down Expand Up @@ -88,6 +88,32 @@ class Statements(Plugin):
("PostgreSQL Statements Info: Last Statistics Reset Time", "9C8A4E", 0))
]

Items_pgpro_stats_1_8 = [
("stat[read_bytes]",
"(sum(shared_blks_read+local_blks_read+temp_blks_read)*8*1024)::bigint",
"Read bytes/s", Plugin.UNITS.bytes_per_second, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Bytes", "87C2B9", 0)),
("stat[write_bytes]",
"(sum(shared_blks_written+local_blks_written+temp_blks_written)*8*1024)::bigint",
"Write bytes/s", Plugin.UNITS.bytes_per_second, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Bytes", "793F5D", 0)),
("stat[dirty_bytes]",
"(sum(shared_blks_dirtied+local_blks_dirtied)*8*1024)::bigint",
"Dirty bytes/s", Plugin.UNITS.bytes_per_second, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Bytes", "9C8A4E", 0)),
("stat[read_time]",
"(sum(shared_blk_read_time+local_blk_read_time+temp_blk_read_time)/float4(100))::bigint",
"Read IO Time", Plugin.UNITS.s, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Spent Time", "87C2B9", 0)),
("stat[write_time]",
"(sum(shared_blk_write_time+local_blk_write_time+temp_blk_write_time)/float4(100))::bigint",
"Write IO Time", Plugin.UNITS.s, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Spent Time", "793F5D", 0)),
["stat[other_time]",
"(sum(total_exec_time+total_plan_time-shared_blk_read_time-local_blk_read_time-temp_blk_read_time-shared_blk_write_time-local_blk_write_time-temp_blk_write_time)/float4(100))::bigint",
"Other (mostly CPU) Time", Plugin.UNITS.s, Plugin.DELTA.speed_per_second,
("PostgreSQL Statements: Spent Time", "9C8A4E", 0)]]

all_graphs = [
("PostgreSQL Statements: Bytes", None),
("PostgreSQL Statements: Spent Time", 1),
Expand Down Expand Up @@ -115,21 +141,45 @@ def run(self, zbx):

# TODO: add 13 and 14 items when pgpro_stats added new WAL metrics
all_items = self.Items.copy()
if Pooler.server_version_greater("14"):

if Pooler.extension_version_greater("pgpro_stats", "1.8"):
info_view = 'pg_stat_statements_info'
if self.extension == "pgpro_stats":
info_view = 'pgpro_stats_info'

info_items = self.Items_pg_14
info_params = [x[1] for x in info_items]
info_result = Pooler.query(
self.query_info.format(metrics=(", ".join(info_params)), extension_schema=extension_schema, info_view_name=info_view))
for key, value in enumerate(info_result[0]):
zbx_key, value = "pgsql.{0}".format(
info_items[key][0]), int(value)
zbx.send(zbx_key, value, info_items[key][4])

all_items = self.Items_pgpro_stats_1_8.copy()
all_items += self.Items_pg_13

elif Pooler.server_version_greater("14"):
self.Items[5][1] = self.Items[5][1].format("total_exec_time+total_plan_time")
all_items += self.Items_pg_13
info_view = 'pgpro_stats_info'
if self.extension == "pg_stat_statements":
info_items = self.Items_pg_14
info_params = [x[1] for x in info_items]
info_result = Pooler.query(
self.query_info.format(metrics=(", ".join(info_params)), extension_schema=extension_schema))
for key, value in enumerate(info_result[0]):
zbx_key, value = "pgsql.{0}".format(
info_items[key][0]), int(value)
zbx.send(zbx_key, value, info_items[key][4])
info_view = 'pg_stat_statements_info'
info_items = self.Items_pg_14
info_params = [x[1] for x in info_items]
info_result = Pooler.query(
self.query_info.format(metrics=(", ".join(info_params)),
extension_schema=extension_schema,
info_view_name=info_view))
for key, value in enumerate(info_result[0]):
zbx_key, value = "pgsql.{0}".format(
info_items[key][0]), int(value)
zbx.send(zbx_key, value, info_items[key][4])

elif Pooler.server_version_greater("13"):
self.Items[5][1] = self.Items[5][1].format("total_exec_time+total_plan_time")
all_items += self.Items_pg_13

else:
self.Items[5][1] = self.Items[5][1].format("total_time")
columns = [x[1] for x in all_items]
Expand Down
8 changes: 7 additions & 1 deletion mamonsu/tools/bootstrap/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,13 @@ def run_deploy():
if Pooler.is_pgpro() or Pooler.is_pgpro_ee():
bootstrap_extension_queries = fill_query_params(CreateWaitSamplingFunctionsSQL)
Pooler.query(bootstrap_extension_queries)
if Pooler.server_version_greater("12"):
if Pooler.extension_version_greater("pgpro_stats", "1.8"):
statements_items = [x[1] for x in Statements.Items_pgpro_stats_1_8] + [x[1] for x in Statements.Items_pg_13]
statements_columns = [x[0][x[0].find("[")+1:x[0].find("]")] for x in Statements.Items_pgpro_stats_1_8] + [x[0][x[0].find("[")+1:x[0].find("]")] for x in Statements.Items_pg_13]
bootstrap_extension_queries = CreateStatementsFunctionsSQL.format(
columns=" bigint, ".join(statements_columns) + " bigint", metrics=(", ".join(statements_items)))
Pooler.query(bootstrap_extension_queries)
elif Pooler.server_version_greater("12"):
statements_items = [x[1] for x in Statements.Items] + ([x[1] for x in Statements.Items_pg_13] if Pooler.server_version_greater("13") else [])
statements_items[5] = statements_items[5].format("total_exec_time+total_plan_time")
statements_columns = [x[0][x[0].find("[")+1:x[0].find("]")] for x in Statements.Items] + ([x[0][x[0].find("[")+1:x[0].find("]")] for x in Statements.Items_pg_13] if Pooler.server_version_greater("13") else [])
Expand Down

0 comments on commit f9e8623

Please sign in to comment.