Skip to content

Commit

Permalink
Merge pull request #6 from rdobson/remotetool
Browse files Browse the repository at this point in the history
Add an module with CLI support for displaying hardware info collected by the parsers.
  • Loading branch information
rdobson committed Jul 15, 2014
2 parents 0054ede + 0186837 commit 426379a
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 24 deletions.
82 changes: 58 additions & 24 deletions hwinfo/tools/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from argparse import ArgumentParser
from prettytable import PrettyTable
import paramiko
import subprocess
import os

from hwinfo.pci import PCIDevice
from hwinfo.pci.lspci import *
from hwinfo.host.dmidecode import *

from hwinfo.host import dmidecode

def remote_command(host, username, password, cmd):
client = paramiko.SSHClient()
Expand All @@ -18,12 +21,19 @@ def remote_command(host, username, password, cmd):
output = stdout.readlines()
error = stderr.readlines()
if error:
print "stderr: %s" % error
raise Exception("stderr: %s" % error)
client.close()
return ''.join(output)

def local_command(cmd):
return cmd
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode == 0:
return str(stdout).strip()
else:
print "RC: %s" % process.returncode
print stdout
raise Exception("stderr: %s" % str(stderr))

class Host(object):

Expand All @@ -38,18 +48,51 @@ def exec_command(self, cmd):
else:
return remote_command(self.host, self.username, self.password, cmd)

def get_lspci_data(self):
return self.exec_command(['lspci', '-nnmm'])

def get_dmidecode_data(self):
return self.exec_command(['dmidecode'])

def get_pci_devices(self):
data = self.exec_command(['lspci', '-nnmm'])
data = self.get_lspci_data()
parser = LspciNNMMParser(data)
devices = parser.parse_items()
return [PCIDevice(device) for device in devices]

def get_info(self):
data = self.exec_command(['dmidecode'])
parser = DmidecodeParser(data)
data = self.get_dmidecode_data()
parser = dmidecode.DmidecodeParser(data)
rec = parser.parse()
return rec

def search_for_file(dirname, filename):
for root, _, files in os.walk(dirname):
if filename in files:
return os.path.join(root, filename)
raise Exception("Could not find '%s' in directory '%s'" % (filename, dirname))

def read_from_file(filename):
fh = open(filename, 'r')
data = fh.read()
fh.close()
return data

class HostFromLogs(Host):

def __init__(self, dirname):
self.dirname = dirname

def _load_from_file(self, filename):
filename = search_for_file(self.dirname, filename)
return read_from_file(filename)

def get_lspci_data(self):
return self._load_from_file('lspci-nnm.out')

def get_dmidecode_data(self):
return self._load_from_file('dmidecode.out')

def pci_filter(devices, types):
res = []
for device in devices:
Expand All @@ -71,19 +114,6 @@ def pci_filter_for_gpu(devices):
gpu_types = ['03']
return pci_filter(devices, gpu_types)

def print_lines(lines):
max_len = 0
output = []
for line in lines:
output.append(line)
if len(line) > max_len:
max_len = len(line)
print ""
print "-" * max_len
print '\n'.join(output)
print "-" * max_len
print ""

def rec_to_table(rec):
table = PrettyTable(["Key", "Value"])
table.align['Key'] = 'l'
Expand Down Expand Up @@ -115,14 +145,18 @@ def main():
parser = ArgumentParser(prog="hwinfo")

filter_choices = ['bios', 'nic', 'storage', 'gpu']
parser.add_argument("-f", "--filter", choices=filter_choices)
parser.add_argument("-m", "--machine", default='localhost')
parser.add_argument("-u", "--username")
parser.add_argument("-p", "--password")
parser.add_argument("-f", "--filter", choices=filter_choices, help="Query a specific class.")
parser.add_argument("-m", "--machine", default='localhost', help="Remote host address.")
parser.add_argument("-u", "--username", help="Username for remote host.")
parser.add_argument("-p", "--password", help="Password for remote host.")
parser.add_argument("-l", "--logs", help="Path to the directory with the logfiles.")

args = parser.parse_args()

host = Host(args.machine, args.username, args.password)
if args.logs:
host = HostFromLogs(args.logs)
else:
host = Host(args.machine, args.username, args.password)

options = []

Expand Down
145 changes: 145 additions & 0 deletions hwinfo/tools/tests/test_inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import unittest
import mock
from mock import patch
from StringIO import StringIO

from hwinfo.tools import inspector

class HostObjectTests(unittest.TestCase):

@patch('hwinfo.tools.inspector.local_command')
def test_local_exec_command(self, local_command):
host = inspector.Host()
host.exec_command('ls')
inspector.local_command.assert_called_once_with('ls')

@patch('hwinfo.tools.inspector.remote_command')
def test_remote_exec_command(self, remote_command):
host = inspector.Host('mymachine', 'root', 'pass')
host.exec_command('ls')
inspector.remote_command.assert_called_once_with('mymachine', 'root', 'pass', 'ls')

@patch('hwinfo.tools.inspector.Host.exec_command')
def test_get_pci_devices(self, exec_command):
host = inspector.Host()
devs = host.get_pci_devices()
exec_command.assert_called_once_with(['lspci', '-nnmm'])

@patch('hwinfo.host.dmidecode.DmidecodeParser')
@patch('hwinfo.tools.inspector.Host.exec_command')
def test_get_info(self, mock_exec_command, mock_dmidecode_parser_cls):
mock_exec_command.return_value = 'blah'
mparser = mock_dmidecode_parser_cls.return_value = mock.Mock()
mparser.parse.return_value = {'key':'value'}
host = inspector.Host()
rec = host.get_info()
self.assertEqual(rec, {'key':'value'})


class RemoteCommandTests(unittest.TestCase):

def setUp(self):
self.stdout = StringIO('')
self.stdin = StringIO('')
self.stderr = StringIO('')

@patch('paramiko.SSHClient')
def test_ssh_connect(self, ssh_client_cls):
client = ssh_client_cls.return_value = mock.Mock()
client.exec_command.return_value = self.stdout, self.stdin, self.stderr
inspector.remote_command('test', 'user', 'pass', 'ls')
client.connect.assert_called_with('test', password='pass', username='user', timeout=10)

@patch('paramiko.SSHClient')
def test_ssh_connect_error(self, ssh_client_cls):
client = ssh_client_cls.return_value = mock.Mock()
client.exec_command.return_value = self.stdout, self.stdin, StringIO("Error")
with self.assertRaises(Exception) as context:
inspector.remote_command('test', 'user', 'pass', 'ls')
self.assertEqual(context.exception.message, "stderr: ['Error']")

class LocalCommandTests(unittest.TestCase):

@patch('subprocess.Popen')
def test_local_call(self, mock_popen_cls):
mprocess =mock_popen_cls.return_value = mock.MagicMock()
mprocess.communicate.return_value = 'test', None
mprocess.returncode = 0
stdout = inspector.local_command("echo 'test'")
self.assertEqual(stdout, 'test')

@patch('subprocess.Popen')
def test_local_call_error(self, mock_popen_cls):
mprocess =mock_popen_cls.return_value = mock.MagicMock()
mprocess.communicate.return_value = 'test', 'my error'
mprocess.returncode = 1
with self.assertRaises(Exception) as context:
stdout = inspector.local_command("echo 'test'")
self.assertEqual(context.exception.message, "stderr: my error")


class PCIFilterTests(unittest.TestCase):

def setUp(self):
device_a = mock.MagicMock()
device_b = mock.MagicMock()
device_c = mock.MagicMock()
device_d = mock.MagicMock()

device_a.get_pci_class.return_value = '0230'
device_b.get_pci_class.return_value = '0340'
device_c.get_pci_class.return_value = '0210'
device_d.get_pci_class.return_value = '0100'

self.devices = [device_a, device_b, device_c, device_d]

def test_pci_filter_match_all(self):
devs = inspector.pci_filter(self.devices, ['0'])
self.assertEqual(len(devs), len(self.devices))

def test_pci_filter_match_two(self):
devs = inspector.pci_filter(self.devices, ['02'])
for dev in devs:
print dev.get_pci_class()
self.assertEqual(len(devs), 2)

def test_pci_filter_match_one(self):
devs = inspector.pci_filter(self.devices, ['023'])
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0230')

def test_pci_filter_match_none(self):
devs = inspector.pci_filter(self.devices, ['0234'])
self.assertEqual(devs, [])

def test_pci_filter_for_nics(self):
devs = inspector.pci_filter_for_nics(self.devices)
self.assertEqual(len(devs), 2)

def test_pci_filter_for_storage(self):
devs = inspector.pci_filter_for_storage(self.devices)
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0100')

def test_pci_filter_for_gpu(self):
devs = inspector.pci_filter_for_gpu(self.devices)
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0340')


class TabulateTests(unittest.TestCase):

@patch('hwinfo.tools.inspector.PrettyTable')
def test_rec_to_table(self, mock_pt_cls):
mock_table = mock_pt_cls.return_value = mock.MagicMock()
rec = {'one': 1, 'two': 2, 'three': 3}
inspector.rec_to_table(rec)
self.assertEqual(mock_table.add_row.call_count, 3)
expected_calls = [
mock.call(['one', 1]),
mock.call(['two', 2]),
mock.call(['three', 3]),
]
mock_table.add_row.assert_has_calls(expected_calls, any_order=True)


0 comments on commit 426379a

Please sign in to comment.