Module unittesting
[hide private]
[frames] | no frames]

Source Code for Module unittesting

  1  #!/usr/bin/env python 
  2  # vim: set fileencoding=UTF-8 : 
  3  """ 
  4  Unit testing of all modules happens here. 
  5  """ 
  6   
  7  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #  
  8  # Copyright (c) 2007 Jakub Hrozek (jhrozek@redhat.com) 
  9  # 
 10  # This file is part of Gnome-network-monitor. 
 11  # 
 12  # Gnome-network-monitor is free software; you can redistribute it and/or modify 
 13  # it under the terms of the GNU General Public License as published by 
 14  # the Free Software Foundation, version 2 of the License. 
 15  #  
 16  # Gnome-network-monitor is distributed in the hope that it will be useful, 
 17  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 19  # GNU General Public License for more details. 
 20  #  
 21  # See the file LICENSE for full text of GPL version 2 
 22  # 
 23  # You should have received a copy of the GNU General Public License 
 24  # along with Gnome-network-monitor; if not, write to the Free Software 
 25  # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 26  # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #  
 27   
 28  import unittest 
 29  import time 
 30  import os 
 31  import shutil 
 32  from pysqlite2 import dbapi2 as sqlite 
 33  import copy 
 34   
 35  # libraries under test 
 36  import iface 
 37  import snmp 
 38  from iface import InterfaceStats 
 39  from iptables import FwRecord, LogAnalyzer, LogTarget 
 40  from netstat import NetProcess, NetStat 
 41  from config import * 
 42   
43 -class TestInterfaceStats(unittest.TestCase):
44 """ 45 Tests InterfaceStats class from iface.py 46 """
47 - def testInit(self):
48 """ 49 Tests if the object inits correctly. 50 """ 51 conf = ConfigInterface() 52 conf["proc_net_dev"] = "../test_data/proc_net_dev" 53 i = iface.InterfaceStats(conf, history_size=60)
54
55 - def testInitBadSize(self):
56 """ 57 Tests if history must have a sane size 58 """ 59 conf = ConfigInterface() 60 conf["proc_net_dev"] = "../test_data/proc_net_dev" 61 self.assertRaises(ValueError, iface.InterfaceStats, conf, history_size=-5) 62 self.assertRaises(ValueError, iface.InterfaceStats, conf, history_size=0)
63
64 - def testInitBadProcFile(self):
65 """ 66 Tests if the __init__ method checks for existing proc_net_dev file 67 """ 68 conf = ConfigInterface() 69 conf["proc_net_dev"] = "../test_data/nothere" 70 self.assertRaises(IOError, iface.InterfaceStats, conf, history_size=5)
71
72 - def testIfaceFilterBadType(self):
73 """ 74 Tests that two most probable mistakes in the set_iface_filter call raise appropriate exception 75 """ 76 conf = ConfigInterface() 77 conf["proc_net_dev"] = "../test_data/proc_net_dev" 78 i = iface.InterfaceStats(conf, history_size=60) 79 self.assertRaises(TypeError, i.set_iface_filter, None) 80 self.assertRaises(TypeError, i.set_iface_filter, "eth0")
81
82 - def testGetInterfaces(self):
83 """ 84 Test against knows interfaces in the fakeproc file. 85 """ 86 conf = ConfigInterface() 87 conf["proc_net_dev"] = "../test_data/proc_net_dev" 88 expected = [ "lo", "eth1", "eth0", "tun0" ] 89 i = iface.InterfaceStats(conf, history_size=60) 90 self.assertEqual(i.get_interfaces(), expected)
91
92 - def testGetPrettyName(self):
93 """ 94 Test retreiving pretty human-readable names for various ethernet interfaces 95 """ 96 names = { "eth0" : "Ethernet interface", "wlan1" : "Wireless interface", 97 "ppp12" : "Dial-up interface", "tun12" : "Routed IP tunnel", 98 "sit00:1" : "IPv6 tunnel", "tap123" : "VPN tunnel", "xxx" : "xxx" } 99 conf = ConfigInterface() 100 conf["proc_net_dev"] = "../test_data/proc_net_dev" 101 i = iface.InterfaceStats(conf, history_size=60) 102 for (k, v) in names.items(): 103 self.assertEqual(i.get_pretty_name(k), v)
104
105 - def testThreadWorkerFake(self):
106 """ 107 Since the fakeproc file never changes, we will never see the bandwidth rise, just the value in the 108 proc_net_dev file become the "previous". 109 """ 110 expected = [{'in': {'hist': [0, 0, 0, 0, 0], 'prev': 3628518, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 3628518, 'max' : 0}}, 111 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 9568034, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 80775813, 'max' : 0}}, 112 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 51832244, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 7635513, 'max' : 0}}, 113 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 354071, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 192899, 'max' : 0}} ] 114 115 conf = ConfigInterface() 116 conf["proc_net_dev"] = "../test_data/proc_net_dev" 117 ist = iface.InterfaceStats(conf, history_size=5) 118 ist.start_bandwidth_history() 119 time.sleep(4) 120 ist.stop_bandwidth_history() 121 122 ls = ist.get_interfaces() 123 for i in ls: 124 self.assertEqual(ist.get_history(i), expected[ls.index(i)])
125
126 - def testThreadWorkerReal(self):
127 """ 128 This is more advanced test - takes the fakeproc file and appends a known and raising value to the interface 129 metrics in the file, so the resulting history is rising lineary. 130 """ 131 expected = [{'in': {'hist': [0, 1, 2, 3, 4], 'prev': 3628528, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 3628528, 'max' : 4}}, 132 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 9568044, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 80775823, 'max' : 4}}, 133 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 51832254, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 7635523, 'max' : 4}}, 134 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 354081, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 192909, 'max' : 4}}] 135 136 # Check if we're able to write the temp file 137 if not os.access("../test_data", os.R_OK | os.W_OK ): 138 print "the user running this test must have read and write access to the test_data directory" 139 return False 140 141 try: 142 dyn = open("../test_data/proc_net_dev", "r") 143 except IOError: 144 print "Could not open test_data/proc_net_dev, return the test to dealer for repair\n" 145 return False 146 147 conf = ConfigInterface() 148 conf["proc_net_dev"] = "../test_data/proc_net_dev_dyn" 149 i_stat = InterfaceStats(conf, history_size=5) 150 i_list = i_stat.get_interfaces() 151 152 for i in xrange(5): 153 lines = dyn.readlines() 154 155 dyn.close() 156 dyn = open(conf["proc_net_dev"], "w") 157 for l in lines: 158 if ":" in l: 159 (iface, numbers) = l.split(":") 160 numbers = " ".join([ str(int(field)+i) for field in numbers.split()] ) 161 l = iface + ":" + numbers + "\n" 162 dyn.write(l) 163 dyn.seek(0) 164 dyn.close() 165 dyn = open(conf["proc_net_dev"], "r") 166 # now this is ugly. Never, ever do this outside testing code where the only reason 167 # is avoiding race condition between updating the file and getting predictable results 168 i_stat._InterfaceStats__bandwidth_thread_wrapper() 169 170 self.assertEqual([ i_stat.get_history(item) for item in i_list ], expected)
171
172 -class TestBandwidthHistory(unittest.TestCase):
173 """ 174 Tests BandwidthHistory from iface.py 175 """
176 - def testInitBadSize(self):
177 """ 178 Size must be greater than 0 179 """ 180 self.assertRaises(ValueError, iface.BandwidthHistory, 0, ["eth0", "eth1"]) 181 self.assertRaises(ValueError, iface.BandwidthHistory, -10, ["eth0", "eth1"])
182
183 - def testInitBadIfaceList(self):
184 """ 185 Passes empty list of dictionaries into the history 186 """ 187 self.assertRaises(ValueError, iface.BandwidthHistory, 1, []) 188 self.assertRaises(TypeError, iface.BandwidthHistory, 1, "eth0", "eth1")
189
190 - def testSizeEnlarge(self):
191 """ 192 Tests if size gets enlarged correctly 193 """ 194 b = iface.BandwidthHistory(2, ["eth0", "eth1"]) 195 b.set_size(5) 196 self.assertEqual(b.get_size(), 5)
197
198 - def testSizeShrink(self):
199 """ 200 Tests if size gets shrinked correctly 201 """ 202 b = iface.BandwidthHistory(5, ["eth0", "eth1"]) 203 b.set_size(2) 204 self.assertEqual(b.get_size(), 2)
205
206 - def testSizeSanity(self):
207 """ 208 Enlarges and then shrinks size, tests if remains same 209 """ 210 b = iface.BandwidthHistory(5, ["eth0", "eth1"]) 211 b.set_size(2) 212 b.set_size(5) 213 self.assertEqual(b.get_size(), 5)
214
215 - def testSizeZero(self):
216 """ 217 Tests that size cannot be set to be zero 218 """ 219 b = iface.BandwidthHistory(5, ["eth0", "eth1"]) 220 self.assertRaises(ValueError, b.set_size, 0)
221
222 - def testSizeNegative(self):
223 """ 224 Tests that size cannot be set to be negative 225 """ 226 b = iface.BandwidthHistory(5, ["eth0", "eth1"]) 227 self.assertRaises(ValueError, b.set_size, -10)
228
229 - def testGetSize(self):
230 """ 231 Initializes history to a known size, tests if it gets 232 returned correctly bu get_size 233 """ 234 b = iface.BandwidthHistory(2, ["eth0", "eth1"]) 235 self.assertEqual(b.get_size(), 2)
236
237 - def testGetHistory(self):
238 """ 239 Tests if history after initialization matches expected data 240 """ 241 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}}, 242 'eth0': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}},} 243 b = iface.BandwidthHistory(2, ["eth0", "eth1"]) 244 self.assertEqual(b.get_history(), expected)
245
246 - def testGetHistoryIface(self):
247 """ 248 Tests if history for a given inteface after initialization matches expected data 249 """ 250 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}}} 251 b = iface.BandwidthHistory(2, ["eth1"]) 252 self.assertEqual(b.get_history(), expected)
253
255 """ 256 Tests that the first value does not get stored so we avoid a spike at the beginning of the program 257 """ 258 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 158, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 12345, 'max' : 0}}} 259 b = iface.BandwidthHistory(2, ["eth1"]) 260 b.push_out("eth1", 158) 261 b.push_in("eth1", 12345) 262 self.assertEqual(b.get_history(), expected)
263
264 - def testPush(self):
265 """ 266 Tests if data get stored correctly in the history object 267 """ 268 expected = {'eth1': {'out': {'hist': [0, 1], 'prev': 2, 'max' : 1}, 'in': {'hist': [0, 0], 'prev': 1, 'max' : 0}}, 269 'eth0': {'out': {'hist': [0, 5], 'prev': 10, 'max' : 5}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}} } 270 b = iface.BandwidthHistory(2, ["eth1", "eth0"]) 271 272 b.push_out("eth1", 1) 273 b.push_out("eth1", 2) 274 b.push_in("eth1", 1) 275 b.push_in("eth1", 1) 276 277 b.push_out("eth0", 5) 278 b.push_out("eth0", 10) 279 b.push_in("eth0", 0) 280 b.push_in("eth0", 0) 281 282 self.assertEqual(b.get_history(), expected)
283
284 - def testPushAndEnlarge(self):
285 """ 286 Tests if a history made large than items stored will contain zeroes 287 """ 288 expected = {'eth1': {'out': {'hist': [1, 1, 0, 0], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1, 0, 0], 'prev': 5, 'max' : 1}}} 289 b = iface.BandwidthHistory(2, ["eth1"]) 290 for i in xrange(6): 291 b.push_in("eth1", i) 292 b.push_out("eth1", i) 293 b.set_size(4) 294 295 self.assertEqual(b.get_history(), expected)
296
297 - def testPushAndCrop(self):
298 """ 299 Tests if history is sane if shrunk to smaller size than the amount 300 of values it contains 301 """ 302 expected = {'eth1': {'out': {'hist': [1, 1], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1], 'prev': 5, 'max' : 1}}} 303 b = iface.BandwidthHistory(4, ["eth1"]) 304 for i in xrange(6): 305 b.push_in("eth1", i) 306 b.push_out("eth1", i) 307 b.set_size(2) 308 309 self.assertEqual(b.get_history(), expected)
310
311 - def testPushOverflow(self):
312 """ 313 Test if the values shift correctly if we push more 314 data than the history can handle 315 """ 316 expected = {'eth1': {'out': {'hist': [1, 1], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1], 'prev': 5, 'max' : 1}}} 317 b = iface.BandwidthHistory(2, ["eth1"]) 318 for i in xrange(6): 319 b.push_in("eth1", i) 320 b.push_out("eth1", i) 321 322 self.assertEqual(b.get_history(), expected)
323
324 - def testPushMaxInOut(self):
325 expected = {'eth1': {'out': {'hist': [7, 9], 'prev': 25, 'max' : 9}, 'in': {'hist': [3, 1], 'prev': 25, 'max' : 3}}} 326 b = iface.BandwidthHistory(2, ["eth1"]) 327 328 limit = 6 329 for i in xrange(6): 330 b.push_out("eth1", i**2) 331 b.push_in("eth1", ((limit-1)**2) - ((limit-i-1)**2)) 332 333 self.assertEqual(b.get_history(), expected)
334
335 -class TestSnmp(unittest.TestCase):
336 """ 337 Tests Snmp from snmp.py 338 """
339 - def testInit(self):
340 """ 341 Tests if correct initialization of Snmp goes OK 342 """ 343 obj = snmp.Snmp() 344 obj = snmp.Snmp("../test_data/proc_net_snmp")
345
346 - def testBadInit(self):
347 """ 348 Tests if Snmp constructor raises appropriate exception when 349 fetched with bogus data 350 """ 351 self.assertRaises(IOError, snmp.Snmp, "file_that_is_not_here")
352
353 - def testGetTcpStats(self):
354 """ 355 Tests if TCP statistics returned match expected 356 """ 357 obj = snmp.Snmp("../test_data/proc_net_snmp") 358 expected = {'RtoAlgorithm': '1', 'RetransSegs': '1003', 'InErrs': '0', 359 'CurrEstab': '0', 'RtoMin': '200', 'OutSegs': '113061', 360 'OutRsts': '549', 'PassiveOpens': '29', 'RtoMax': '120000', 361 'InSegs': '119838', 'ActiveOpens': '6073', 'MaxConn': '-1', 362 'EstabResets': '540', 'AttemptFails': '18'} 363 self.assertEquals(obj.get_tcp_stats(), expected)
364
365 - def testGetUdpStats(self):
366 """ 367 Tests if UDP statistics returned match expected 368 """ 369 obj = snmp.Snmp("../test_data/proc_net_snmp") 370 expected = {'InDatagrams': '8707', 'OutDatagrams': '19162', 371 'RcvbufErrors': '0', 'InErrors': '0', 372 'SndbufErrors': '0', 'NoPorts': '88'} 373 self.assertEquals(obj.get_udp_stats(), expected)
374
375 - def testGetIpStats(self):
376 """ 377 Tests if IP statistics returned match expected 378 """ 379 obj = snmp.Snmp("../test_data/proc_net_snmp") 380 expected = {'FragCreates': '0', 'ReasmFails': '0', 'Forwarding': '2', 381 'ReasmOKs': '0', 'ReasmReqds': '0', 'ReasmTimeout': '0', 382 'FragFails': '0', 'InReceives': '128859', 'OutDiscards': '0', 383 'InUnknownProtos': '0', 'InAddrErrors': '82', 'FragOKs': '0', 384 'InHdrErrors': '0', 'InDelivers': '128674', 'OutNoRoutes': '230', 385 'InDiscards': '0', 'DefaultTTL': '64', 'ForwDatagrams': '0', 'OutRequests': '133124'} 386 self.assertEquals(obj.get_ip_stats(), expected)
387 388
389 - def testGetIcmpStats(self):
390 """ 391 Tests if ICMP statistics returned match expected 392 """ 393 obj = snmp.Snmp("../test_data/proc_net_snmp") 394 expected = {'InRedirects': '0', 'InMsgs': '41', 'InSrcQuenchs': '0', 395 'InAddrMasks': '0', 'InTimestamps': '0', 'OutEchos': '0', 396 'OutParmProbs': '0', 'InTimestampReps': '0', 'InParmProbs': '0', 397 'OutMsgs': '184', 'OutTimestamps': '0', 'OutAddrMaskReps': '0', 398 'OutTimeExcds': '0', 'OutErrors': '0', 'InDestUnreachs': '37', 399 'InTimeExcds': '0', 'OutSrcQuenchs': '0', 'OutEchoReps': '0', 400 'OutAddrMasks': '0', 'OutDestUnreachs': '184', 401 'OutRedirects': '0', 'InEchoReps': '4', 'OutTimestampReps': '0', 402 'InEchos': '0', 'InErrors': '6', 'InAddrMaskReps': '0'} 403 self.assertEquals(obj.get_icmp_stats(), expected)
404 405
406 -class TestFwRecord(unittest.TestCase):
407 """ 408 A test for the class "FwRecord" from iptables.py 409 """ 410 411 sample_tcp = """Apr 2 21:01:25 jarilo kernel: [IPTABLES DROP] : IN=eth0 OUT= 412 MAC=00:11:43:2b:99:fe:00:19:aa:d6:24:bf:08:00 SRC=85.70.30.130 DST=10.34.32.27 413 LEN=349 TOS=0x0 0 PREC=0x00 TTL=114 ID=34270 DF PROTO=TCP SPT=33597 DPT=60375 414 WINDOW=65470 RES=0x00 ACK PSH URGP=0 OPT (0101080A000415C625823E7F)""" 415 416 sample_icmp = """Apr 2 21:15:40 jarilo kernel: [IPTABLES DROP] : IN= OUT=eth0 417 SRC=10.34.32.27 DST=209.85.129.147 LEN=84 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF 418 PROTO=ICMP TYPE=8 CODE=0 ID=36386 SEQ=1""" 419 420 sample_udp = """Apr 2 21:08:13 jarilo kernel: [IPTABLES DROP] : IN=eth0 OUT= 421 MAC=ff:ff:ff:ff:ff:ff:00:e0:81:2f:58:19:08:00 SRC=10.34.32.101 DST=255.255.255.255 422 LEN=119 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF PROTO=UDP SPT=631 DPT=631 LEN=99 423 424 """
425 - def testInitGood(self):
426 """ 427 Tests initializing based on records from different protocols as format of 428 their records vary. 429 """ 430 fwr = FwRecord(self.sample_tcp) 431 fwr = FwRecord(self.sample_icmp) 432 fwr = FwRecord(self.sample_udp) 433 fwr = FwRecord()
434
435 - def testFilter(self):
436 """ 437 Tests adding a new value to the filter, raising an exception 438 when uknown key is referenced and returning N/A upon referencing 439 known key with no value 440 """ 441 save = copy.deepcopy(FwRecord.fields) 442 FwRecord.fields += ["TTL"] 443 fwr = FwRecord(self.sample_tcp) 444 self.assertEqual(fwr["TTL"], "114") 445 446 try: 447 fwr["XYZ"] 448 except KeyError: 449 pass 450 else: 451 raise AssertionError("Getting unknown value does not raise KeyError") 452 453 FwRecord.fields += ["XYZ"] 454 self.assertEqual(fwr["XYZ"], "N/A") 455 FwRecord.fields = save
456
457 - def testItemGetValue(self):
458 """ 459 Test correctness of values retreived 460 """ 461 fwr = FwRecord(self.sample_tcp) 462 self.assertEqual(fwr["SRC"], "85.70.30.130") 463 self.assertEqual(fwr["DST"], "10.34.32.27") 464 self.assertEqual(fwr["SPT"], "33597") 465 self.assertEqual(fwr["DPT"], "60375") 466 self.assertEqual(fwr["IN"], "eth0") 467 fwr = FwRecord("") 468 self.assertEqual(str(fwr), "N/A;N/A;N/A;N/A;N/A;N/A;N/A")
469
470 - def testComparison(self):
471 """ 472 Create two records from the same data, see of they are equal, 473 then compare with a different one, see the exception 474 """ 475 fwr = FwRecord(self.sample_tcp) 476 fwr_tcp = FwRecord(self.sample_tcp) 477 fwr_udp = FwRecord(self.sample_udp) 478 self.assertEqual(fwr, fwr_tcp) 479 self.assertRaises(AssertionError, self.assertEqual, fwr, fwr_udp)
480
481 - def stressTest(self):
482 """ 483 A real world test with a big file 484 """ 485 file = open("../test_data/iptables") 486 for l in file: FwRecord(l)
487
488 -class TestLogAnalyzer(unittest.TestCase):
489 """ 490 Tests class LogAnalyzer from iptables.py 491 """ 492 test_target = LogTarget("../test_data/iptables.short", ["[IPTABLES DROP]"]) 493 config = Config(debug=True).iptables 494
495 - def testCleanInit(self):
496 i = LogAnalyzer(self.test_target, self.config)
497
498 - def testInitBadLogfile(self):
499 self.assertRaises(IOError, LogTarget, "nothere",["[IPTABLES DROP]"])
500
501 - def testGuessTagSyslog(self):
502 pass
503
504 - def testGuessTagMessages(self):
505 pass
506
507 - def testStoreAndRetreive(self):
508 expected = "{'SRC': '10.34.32.27', 'DPT': '80', 'DST': '217.132.226.190', 'SPT': '48767', 'DATETIME': datetime.datetime(1900, 4, 2, 21, 19, 10), 'IN': 'N/A', 'OUT': 'eth0'}" 509 510 i = LogAnalyzer(self.test_target, self.config) 511 i.parse_file() 512 a = str(i.get_log()[0]) 513 b = expected 514 self.assertEqual(a,b )
515 516
517 - def testTopPorts(self):
518 """ 519 Test retreiving number of most denied ports against a known iptables 520 log file. 521 """ 522 expected = [('1234', '5'), ('555', '3'), ('80', '2')] 523 i = LogAnalyzer(self.test_target, self.config) 524 i.parse_file() 525 self.assertEqual(i.top_ports(), expected)
526
527 - def testTopHosts(self):
528 """ 529 Test retreiving number of most denied ports against a known iptables 530 log file. 531 """ 532 expected = [('10.34.32.27', '8'), ('24.2.173.101', '1'), ('58.167.42.182', '1')] 533 i = LogAnalyzer(self.test_target, self.config) 534 i.parse_file() 535 self.assertEqual(i.top_hosts(), expected)
536
537 -class TestNetProcess(unittest.TestCase):
538 tcp = "7: 0200A8C0:9F90 1809BCCD:1446 01 00000000:00000000 00:00000000 00000000 500 0 126442 1 d5cc4a80 326 40 14 2 100" 539 udp = "33: 00000000:0FA1 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 8841 2 dc71f2c0" 540
541 - def testCleanInit(self):
542 """ 543 Test that a NetProcess class initializes without any exceptions 544 """ 545 t = NetProcess(TestNetProcess.tcp) 546 t = NetProcess(TestNetProcess.udp)
547
548 - def testBadInit(self):
549 """ 550 Init raises exception when fetched with bogus data 551 """ 552 self.assertRaises(IndexError, NetProcess, "")
553
554 - def testParseAddress(self):
555 """ 556 Another badness suitable only for testing code 557 """ 558 data = { "0100007F:0277" : ('127.0.0.1', 631), "0200A8C0:9CD1" : ('192.168.0.2', 40145) } 559 t = NetProcess(TestNetProcess.tcp) 560 561 for (k,v) in data.items(): 562 self.assertEquals(v, t._NetProcess__parse_address(k))
563
564 - def testCheckKey(self):
565 """ 566 Tests if an attempt to retreive correct key works OK and 567 an attempt to retreive b0rked raises exc. 568 """ 569 t = NetProcess(TestNetProcess.tcp) 570 # these should pass 571 for key in NetProcess.keys: 572 foo = t[key] 573 # these should not 574 for key in [ "foo", "bar" ]: 575 self.assertRaises(KeyError, t._NetProcess__check_key, key)
576
577 - def testGetItem(self):
578 repr = "addr_local 192.168.0.2 port_local 40848 addr_remote 205.188.9.24 port_remote 5190 pid N/A inode 126442 uid 500 state 1 progname N/A state Established" 579 expected = dict(zip(repr.split()[::2], repr.split()[1::2])) 580 t = NetProcess(TestNetProcess.tcp) 581 for key in NetProcess.keys: 582 if key == "icon": continue 583 self.assertEqual(str(t[key]), str(expected[key]))
584
585 - def testSetItem(self):
586 t = NetProcess(TestNetProcess.tcp) 587 for key in NetProcess.keys: 588 if key == "state": continue # will test separately 589 t[key] = "XXX" 590 self.assertEqual(str(t[key]), "XXX") 591 592 t["state"] = 1 593 self.assertEqual(str(t["state"]), "Established")
594
595 - def testComparison(self):
596 """ 597 Create two records from the same data, see of they are equal, 598 then compare with a different one, see the exception 599 """ 600 a = NetProcess(TestNetProcess.tcp) 601 b = NetProcess(TestNetProcess.tcp) 602 c = NetProcess(TestNetProcess.udp) 603 604 #self.assertEqual(a, b) 605 self.assertEqual(a, c) 606 self.assertRaises(AssertionError, self.assertEqual, a, c)
607
608 -class TestNetStat(unittest.TestCase):
609 - def testCleanInit(self):
610 c = ConfigNetstat
611
612 - def testFindPid(self):
613 """ 614 Set up a chroot /proc and find a matching pid for the open socket 615 """
616 617 if __name__ == "__main__": 618 unittest.main() 619