Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code refactoring around postgres versions. #89

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 9 additions & 21 deletions pg_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ def parse_args():
action='store', dest='tick', type='int', default=1)
parser.add_option('-o', '--output-method', help='send output to the following source', action='store',
default=OUTPUT_METHOD.curses, dest='output_method')
parser.add_option('-V', '--use-version',
help='version of the instance to monitor (in case it can\'t be autodetected)',
action='store', dest='version', type='float')
parser.add_option('-l', '--log-file', help='direct log output to the file', action='store',
dest='log_file')
parser.add_option('-R', '--reset-output', help='clear screen after each tick', action='store_true', default=False,
Expand Down Expand Up @@ -190,7 +187,6 @@ def main():
clusters = []

config = read_configuration(options.config_file) if options.config_file else None
dbversion = None
# configuration file takes priority over the rest of database connection information sources.
if config:
for instance in config:
Expand Down Expand Up @@ -221,29 +217,24 @@ def main():

# get all PostgreSQL instances
for result_work_dir, data in postmasters.items():
(ppid, dbversion, dbname) = data
# if user requested a specific database name and version - don't try to connect to others
(ppid, version, dbname) = data
# if user requested a specific database don't try to connect to others
if options.instance:
if dbname != options.instance or not result_work_dir or not ppid:
continue
if options.version is not None and dbversion != options.version:
continue
try:
conndata = detect_db_connection_arguments(
result_work_dir, ppid, dbversion, options.username, options.dbname)
result_work_dir, ppid, version, options.username, options.dbname)
if conndata is None:
continue
host = conndata['host']
port = conndata['port']
conn = build_connection(host, port, options.username, options.dbname)
pgcon = psycopg2.connect(**conn)
psycopg2.connect(**conn).close() # test if we can connect
desc = make_cluster_desc(name=dbname, version=version, workdir=result_work_dir, conn=conn)
clusters.append(desc)
except Exception as e:
logger.error('PostgreSQL exception {0}'.format(e))
pgcon = None
if pgcon:
desc = make_cluster_desc(name=dbname, version=dbversion, workdir=result_work_dir,
pid=ppid, pgcon=pgcon, conn=conn)
clusters.append(desc)
collectors = []
groups = {}
try:
Expand All @@ -255,19 +246,16 @@ def main():

# initialize the disks stat collector process and create an exchange queue
q = JoinableQueue(1)
work_directories = [cl['wd'] for cl in clusters if 'wd' in cl]
dbversion = dbversion or clusters[0]['ver']

collector = DetachedDiskStatCollector(q, work_directories, dbversion)
collector = DetachedDiskStatCollector(q, clusters)
collector.start()
consumer = DiskCollectorConsumer(q)

collectors.append(HostStatCollector())
collectors.append(SystemStatCollector())
collectors.append(MemoryStatCollector())
for cl in clusters:
part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer)
pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid)
part = PartitionStatCollector(cl['name'], cl['version'], cl['wd'], consumer)
pg = PgstatCollector(cl['name'], cl['reconnect'], options.pid)
groupname = cl['wd']
groups[groupname] = {'pg': pg, 'partitions': part}
collectors.append(part)
Expand Down
57 changes: 30 additions & 27 deletions pg_view/collectors/partition_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class PartitionStatCollector(StatCollector):
XLOG_NAME = 'xlog'
BLOCK_SIZE = 1024

def __init__(self, dbname, dbversion, work_directory, consumer):
def __init__(self, dbname, version, work_directory, consumer):
super(PartitionStatCollector, self).__init__(ticks_per_refresh=1)
self.dbname = dbname
self.dbver = dbversion
self.version = version
self.queue_consumer = consumer
self.work_directory = work_directory
self.df_list_transformation = [{'out': 'dev', 'in': 0, 'fn': self._dereference_dev_name},
Expand Down Expand Up @@ -130,7 +130,7 @@ def __init__(self, dbname, dbversion, work_directory, consumer):
self.postinit()

def ident(self):
return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.dbver)
return '{0} ({1}/{2})'.format(super(PartitionStatCollector, self).ident(), self.dbname, self.version)

@staticmethod
def _dereference_dev_name(devname):
Expand Down Expand Up @@ -163,7 +163,7 @@ def refresh(self):
self._do_refresh([result[PartitionStatCollector.DATA_NAME], result[PartitionStatCollector.XLOG_NAME]])

@staticmethod
def calculate_time_until_full(colname, prev, cur):
def calculate_time_until_full(_, prev, cur):
# both should be expressed in common units, guaranteed by BLOCK_SIZE
if (cur.get('path_size', 0) > 0 and
prev.get('path_size', 0) > 0 and
Expand All @@ -178,8 +178,8 @@ def get_io_data(pnames):
result = {}
found = 0 # stop if we found records for all partitions
total = len(pnames)
fp = None
try:
fp = None
fp = open(PartitionStatCollector.DISK_STAT_FILE, 'rU')
for l in fp:
elements = l.split()
Expand Down Expand Up @@ -208,22 +208,21 @@ class DetachedDiskStatCollector(Process):
OLD_WAL_SUBDIR = '/pg_xlog/'
WAL_SUBDIR = '/pg_wal/'

NEW_WAL_SINCE = 10.0
NEW_WAL_SINCE = 100000

def __init__(self, q, work_directories, db_version):
def __init__(self, q, clusters):
super(DetachedDiskStatCollector, self).__init__()
self.work_directories = work_directories
self.q = q
self.daemon = True
self.db_version = db_version
self.clusters = clusters
self.df_cache = {}

@property
def wal_directory(self):
@staticmethod
def wal_directory(version):
""" Since Postgresql 10.0 wal directory was renamed, so we need to
choose actual wal directory based on a db_version.
"""
if self.db_version < DetachedDiskStatCollector.NEW_WAL_SINCE:
if version < DetachedDiskStatCollector.NEW_WAL_SINCE:
return DetachedDiskStatCollector.OLD_WAL_SUBDIR
else:
return DetachedDiskStatCollector.WAL_SUBDIR
Expand All @@ -234,32 +233,36 @@ def run(self):
self.q.join()
result = {}
self.df_cache = {}
for wd in self.work_directories:
du_data = self.get_du_data(wd)
df_data = self.get_df_data(wd)
result[wd] = [du_data, df_data]
for cluster in self.clusters:
work_directory = cluster['wd']
wal_directory = self.wal_directory(cluster['version'])
du_data = self.get_du_data(work_directory, wal_directory)
df_data = self.get_df_data(work_directory, wal_directory)
result[work_directory] = [du_data, df_data]
self.q.put(result)
time.sleep(consts.TICK_LENGTH)

def get_du_data(self, wd):
def get_du_data(self, work_directory, wal_directory):
data_size = 0
xlog_size = 0

result = {'data': [], 'xlog': []}
try:
data_size = self.run_du(wd, BLOCK_SIZE)
xlog_size = self.run_du(wd + self.wal_directory, BLOCK_SIZE)
data_size = self.run_du(work_directory, BLOCK_SIZE)
xlog_size = self.run_du(work_directory + wal_directory, BLOCK_SIZE)
except Exception as e:
logger.error('Unable to read free space information for the pg_xlog and data directories for the directory\
{0}: {1}'.format(wd, e))
{0}: {1}'.format(work_directory, e))
else:
# XXX: why do we pass the block size there?
result['data'] = str(data_size), wd
result['xlog'] = str(xlog_size), wd + self.wal_directory
result['data'] = str(data_size), work_directory
result['xlog'] = str(xlog_size), work_directory + wal_directory
return result

@staticmethod
def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']):
def run_du(pathname, block_size=BLOCK_SIZE, exclude=None):
if exclude == None:
exclude = ["lost+found"]
size = 0
folders = [pathname]
root_dev = os.lstat(pathname).st_dev
Expand All @@ -285,21 +288,21 @@ def run_du(pathname, block_size=BLOCK_SIZE, exclude=['lost+found']):
size += st.st_size
return long(size / block_size)

def get_df_data(self, work_directory):
def get_df_data(self, work_directory, wal_directory):
""" Retrive raw data from df (transformations are performed via df_list_transformation) """

result = {'data': [], 'xlog': []}
# obtain the device names
data_dev = self.get_mounted_device(self.get_mount_point(work_directory))
xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + self.wal_directory))
xlog_dev = self.get_mounted_device(self.get_mount_point(work_directory + wal_directory))
if data_dev not in self.df_cache:
data_vfs = os.statvfs(work_directory)
self.df_cache[data_dev] = data_vfs
else:
data_vfs = self.df_cache[data_dev]

if xlog_dev not in self.df_cache:
xlog_vfs = os.statvfs(work_directory + self.wal_directory)
xlog_vfs = os.statvfs(work_directory + wal_directory)
self.df_cache[xlog_dev] = xlog_vfs
else:
xlog_vfs = self.df_cache[xlog_dev]
Expand Down Expand Up @@ -353,7 +356,7 @@ def get_mounted_device(pathname):
@staticmethod
def get_mount_point(pathname):
"""Get the mounlst point of the filesystem containing pathname"""

mount_point = None
pathname = os.path.normcase(os.path.realpath(pathname))
parent_device = path_device = os.stat(pathname).st_dev
while parent_device == path_device:
Expand Down
47 changes: 24 additions & 23 deletions pg_view/collectors/pg_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pg_view.collectors.base_collector import StatCollector
from pg_view.loggers import logger
from pg_view.models.outputs import COLSTATUS, COLALIGN
from pg_view.utils import MEM_PAGE_SIZE, dbversion_as_float
from pg_view.utils import MEM_PAGE_SIZE

if sys.hexversion >= 0x03000000:
long = int
Expand All @@ -20,22 +20,17 @@ class PgstatCollector(StatCollector):

STATM_FILENAME = '/proc/{0}/statm'

def __init__(self, pgcon, reconnect, pid, dbname, dbver, always_track_pids):
def __init__(self, dbname, reconnect, always_track_pids):
super(PgstatCollector, self).__init__()
self.postmaster_pid = pid
self.pgcon = pgcon
self.reconnect = reconnect
self.reconnect_fn = reconnect
self.pids = []
self.rows_diff = []
self.rows_diff_output = []
# figure out our backend pid
self.connection_pid = pgcon.get_backend_pid()
self.max_connections = self._get_max_connections()
# figure out connection invariants
self.connect_to_postgres()
self.recovery_status = self._get_recovery_status()
self.always_track_pids = always_track_pids
self.dbname = dbname
self.dbver = dbver
self.server_version = pgcon.get_parameter_status('server_version')
self.filter_aux_processes = True
self.total_connections = 0
self.active_connections = 0
Expand Down Expand Up @@ -240,7 +235,7 @@ def idle_format_fn(self, text):
if not r:
return text
else:
if self.dbver >= 9.2:
if self.pgversion >= 90200:
return 'idle in transaction for ' + StatCollector.time_pretty_print(int(r.group(1)))
else:
return 'idle in transaction ' + StatCollector.time_pretty_print(int(r.group(1))) \
Expand All @@ -256,7 +251,7 @@ def query_status_fn(self, row, col):
return {-1: COLSTATUS.cs_ok}

def ident(self):
return '{0} ({1}/{2})'.format('postgres', self.dbname, self.dbver)
return '{0} ({1}/{2})'.format('postgres', self.dbname, self.pgversion)

@staticmethod
def _get_psinfo(cmdline):
Expand Down Expand Up @@ -293,6 +288,16 @@ def ncurses_filter_row(self, row):
else:
return False

def connect_to_postgres(self):
self.pgcon, self.postmaster_pid = self.reconnect_fn()
self._read_connection_invariants(self.pgcon)

def _read_connection_invariants(self, pgcon):
self.connection_pid = self.pgcon.get_backend_pid()
self.max_connections = self._get_max_connections(pgcon)
self.pgversion = int(self.pgcon.server_version)
self.server_version_as_string = self.pgcon.get_parameter_status('server_version')

def refresh(self):
""" Reads data from /proc and PostgreSQL stats """
result = []
Expand All @@ -302,11 +307,7 @@ def refresh(self):
if not self.pgcon:
# if we've lost the connection, try to reconnect and
# re-initialize all connection invariants
self.pgcon, self.postmaster_pid = self.reconnect()
self.connection_pid = self.pgcon.get_backend_pid()
self.max_connections = self._get_max_connections()
self.dbver = dbversion_as_float(self.pgcon)
self.server_version = self.pgcon.get_parameter_status('server_version')
self.connect_to_postgres()
stat_data = self._read_pg_stat_activity()
except psycopg2.OperationalError as e:
logger.info("failed to query the server: {}".format(e))
Expand Down Expand Up @@ -408,10 +409,10 @@ def _get_memory_usage(self, pid):
uss = (long(statm[1]) - long(statm[2])) * MEM_PAGE_SIZE
return uss

def _get_max_connections(self):
def _get_max_connections(self, pgcon):
""" Read max connections from the database """

cur = self.pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur = pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute('show max_connections')
result = cur.fetchone()
cur.close()
Expand All @@ -434,7 +435,7 @@ def _read_pg_stat_activity(self):

# the pg_stat_activity format has been changed to 9.2, avoiding ambigiuous meanings for some columns.
# since it makes more sense then the previous layout, we 'cast' the former versions to 9.2
if self.dbver < 9.2:
if self.pgversion < 90200:
cur.execute("""
SELECT datname,
procpid as pid,
Expand Down Expand Up @@ -486,7 +487,7 @@ def _read_pg_stat_activity(self):
WHERE procpid != pg_backend_pid()
GROUP BY 1,2,3,4,5,6,7,9
""")
elif self.dbver < 9.6:
elif self.pgversion < 90600:
cur.execute("""
SELECT datname,
a.pid as pid,
Expand Down Expand Up @@ -607,15 +608,15 @@ def ncurses_produce_prefix(self):
if self.pgcon:
return "{dbname} {version} {role} connections: {conns} of {max_conns} allocated, {active_conns} active\n". \
format(dbname=self.dbname,
version=self.server_version,
version=self.server_version_as_string,
role=self.recovery_status,
conns=self.total_connections,
max_conns=self.max_connections,
active_conns=self.active_connections)
else:
return "{dbname} {version} (offline)\n". \
format(dbname=self.dbname,
version=self.server_version)
version=self.server_version_as_string)

@staticmethod
def process_sort_key(process):
Expand Down
Loading