1
2
3 """
4 Unit testing of all modules happens here.
5 """
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 import unittest
29 import time
30 import os
31 import shutil
32 from pysqlite2 import dbapi2 as sqlite
33 import copy
34
35
36 import iface
37 import snmp
38 from iface import InterfaceStats
39 from iptables import FwRecord, LogAnalyzer, LogTarget
40 from netstat import NetProcess, NetStat
41 from config import *
42
44 """
45 Tests InterfaceStats class from iface.py
46 """
48 """
49 Tests if the object inits correctly.
50 """
51 conf = ConfigInterface()
52 conf["proc_net_dev"] = "../test_data/proc_net_dev"
53 i = iface.InterfaceStats(conf, history_size=60)
54
56 """
57 Tests if history must have a sane size
58 """
59 conf = ConfigInterface()
60 conf["proc_net_dev"] = "../test_data/proc_net_dev"
61 self.assertRaises(ValueError, iface.InterfaceStats, conf, history_size=-5)
62 self.assertRaises(ValueError, iface.InterfaceStats, conf, history_size=0)
63
65 """
66 Tests if the __init__ method checks for existing proc_net_dev file
67 """
68 conf = ConfigInterface()
69 conf["proc_net_dev"] = "../test_data/nothere"
70 self.assertRaises(IOError, iface.InterfaceStats, conf, history_size=5)
71
73 """
74 Tests that two most probable mistakes in the set_iface_filter call raise appropriate exception
75 """
76 conf = ConfigInterface()
77 conf["proc_net_dev"] = "../test_data/proc_net_dev"
78 i = iface.InterfaceStats(conf, history_size=60)
79 self.assertRaises(TypeError, i.set_iface_filter, None)
80 self.assertRaises(TypeError, i.set_iface_filter, "eth0")
81
83 """
84 Test against knows interfaces in the fakeproc file.
85 """
86 conf = ConfigInterface()
87 conf["proc_net_dev"] = "../test_data/proc_net_dev"
88 expected = [ "lo", "eth1", "eth0", "tun0" ]
89 i = iface.InterfaceStats(conf, history_size=60)
90 self.assertEqual(i.get_interfaces(), expected)
91
93 """
94 Test retreiving pretty human-readable names for various ethernet interfaces
95 """
96 names = { "eth0" : "Ethernet interface", "wlan1" : "Wireless interface",
97 "ppp12" : "Dial-up interface", "tun12" : "Routed IP tunnel",
98 "sit00:1" : "IPv6 tunnel", "tap123" : "VPN tunnel", "xxx" : "xxx" }
99 conf = ConfigInterface()
100 conf["proc_net_dev"] = "../test_data/proc_net_dev"
101 i = iface.InterfaceStats(conf, history_size=60)
102 for (k, v) in names.items():
103 self.assertEqual(i.get_pretty_name(k), v)
104
106 """
107 Since the fakeproc file never changes, we will never see the bandwidth rise, just the value in the
108 proc_net_dev file become the "previous".
109 """
110 expected = [{'in': {'hist': [0, 0, 0, 0, 0], 'prev': 3628518, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 3628518, 'max' : 0}},
111 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 9568034, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 80775813, 'max' : 0}},
112 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 51832244, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 7635513, 'max' : 0}},
113 {'in': {'hist': [0, 0, 0, 0, 0], 'prev': 354071, 'max' : 0}, 'out': {'hist': [0, 0, 0, 0, 0], 'prev': 192899, 'max' : 0}} ]
114
115 conf = ConfigInterface()
116 conf["proc_net_dev"] = "../test_data/proc_net_dev"
117 ist = iface.InterfaceStats(conf, history_size=5)
118 ist.start_bandwidth_history()
119 time.sleep(4)
120 ist.stop_bandwidth_history()
121
122 ls = ist.get_interfaces()
123 for i in ls:
124 self.assertEqual(ist.get_history(i), expected[ls.index(i)])
125
127 """
128 This is more advanced test - takes the fakeproc file and appends a known and raising value to the interface
129 metrics in the file, so the resulting history is rising lineary.
130 """
131 expected = [{'in': {'hist': [0, 1, 2, 3, 4], 'prev': 3628528, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 3628528, 'max' : 4}},
132 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 9568044, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 80775823, 'max' : 4}},
133 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 51832254, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 7635523, 'max' : 4}},
134 {'in': {'hist': [0, 1, 2, 3, 4], 'prev': 354081, 'max' : 4}, 'out': {'hist': [0, 1, 2, 3, 4], 'prev': 192909, 'max' : 4}}]
135
136
137 if not os.access("../test_data", os.R_OK | os.W_OK ):
138 print "the user running this test must have read and write access to the test_data directory"
139 return False
140
141 try:
142 dyn = open("../test_data/proc_net_dev", "r")
143 except IOError:
144 print "Could not open test_data/proc_net_dev, return the test to dealer for repair\n"
145 return False
146
147 conf = ConfigInterface()
148 conf["proc_net_dev"] = "../test_data/proc_net_dev_dyn"
149 i_stat = InterfaceStats(conf, history_size=5)
150 i_list = i_stat.get_interfaces()
151
152 for i in xrange(5):
153 lines = dyn.readlines()
154
155 dyn.close()
156 dyn = open(conf["proc_net_dev"], "w")
157 for l in lines:
158 if ":" in l:
159 (iface, numbers) = l.split(":")
160 numbers = " ".join([ str(int(field)+i) for field in numbers.split()] )
161 l = iface + ":" + numbers + "\n"
162 dyn.write(l)
163 dyn.seek(0)
164 dyn.close()
165 dyn = open(conf["proc_net_dev"], "r")
166
167
168 i_stat._InterfaceStats__bandwidth_thread_wrapper()
169
170 self.assertEqual([ i_stat.get_history(item) for item in i_list ], expected)
171
172 -class TestBandwidthHistory(unittest.TestCase):
173 """
174 Tests BandwidthHistory from iface.py
175 """
176 - def testInitBadSize(self):
177 """
178 Size must be greater than 0
179 """
180 self.assertRaises(ValueError, iface.BandwidthHistory, 0, ["eth0", "eth1"])
181 self.assertRaises(ValueError, iface.BandwidthHistory, -10, ["eth0", "eth1"])
182
184 """
185 Passes empty list of dictionaries into the history
186 """
187 self.assertRaises(ValueError, iface.BandwidthHistory, 1, [])
188 self.assertRaises(TypeError, iface.BandwidthHistory, 1, "eth0", "eth1")
189
190 - def testSizeEnlarge(self):
191 """
192 Tests if size gets enlarged correctly
193 """
194 b = iface.BandwidthHistory(2, ["eth0", "eth1"])
195 b.set_size(5)
196 self.assertEqual(b.get_size(), 5)
197
198 - def testSizeShrink(self):
199 """
200 Tests if size gets shrinked correctly
201 """
202 b = iface.BandwidthHistory(5, ["eth0", "eth1"])
203 b.set_size(2)
204 self.assertEqual(b.get_size(), 2)
205
206 - def testSizeSanity(self):
207 """
208 Enlarges and then shrinks size, tests if remains same
209 """
210 b = iface.BandwidthHistory(5, ["eth0", "eth1"])
211 b.set_size(2)
212 b.set_size(5)
213 self.assertEqual(b.get_size(), 5)
214
215 - def testSizeZero(self):
216 """
217 Tests that size cannot be set to be zero
218 """
219 b = iface.BandwidthHistory(5, ["eth0", "eth1"])
220 self.assertRaises(ValueError, b.set_size, 0)
221
223 """
224 Tests that size cannot be set to be negative
225 """
226 b = iface.BandwidthHistory(5, ["eth0", "eth1"])
227 self.assertRaises(ValueError, b.set_size, -10)
228
229 - def testGetSize(self):
230 """
231 Initializes history to a known size, tests if it gets
232 returned correctly bu get_size
233 """
234 b = iface.BandwidthHistory(2, ["eth0", "eth1"])
235 self.assertEqual(b.get_size(), 2)
236
237 - def testGetHistory(self):
238 """
239 Tests if history after initialization matches expected data
240 """
241 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}},
242 'eth0': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}},}
243 b = iface.BandwidthHistory(2, ["eth0", "eth1"])
244 self.assertEqual(b.get_history(), expected)
245
247 """
248 Tests if history for a given inteface after initialization matches expected data
249 """
250 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 0, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}}}
251 b = iface.BandwidthHistory(2, ["eth1"])
252 self.assertEqual(b.get_history(), expected)
253
255 """
256 Tests that the first value does not get stored so we avoid a spike at the beginning of the program
257 """
258 expected = {'eth1': {'out': {'hist': [0, 0], 'prev': 158, 'max' : 0}, 'in': {'hist': [0, 0], 'prev': 12345, 'max' : 0}}}
259 b = iface.BandwidthHistory(2, ["eth1"])
260 b.push_out("eth1", 158)
261 b.push_in("eth1", 12345)
262 self.assertEqual(b.get_history(), expected)
263
264 - def testPush(self):
265 """
266 Tests if data get stored correctly in the history object
267 """
268 expected = {'eth1': {'out': {'hist': [0, 1], 'prev': 2, 'max' : 1}, 'in': {'hist': [0, 0], 'prev': 1, 'max' : 0}},
269 'eth0': {'out': {'hist': [0, 5], 'prev': 10, 'max' : 5}, 'in': {'hist': [0, 0], 'prev': 0, 'max' : 0}} }
270 b = iface.BandwidthHistory(2, ["eth1", "eth0"])
271
272 b.push_out("eth1", 1)
273 b.push_out("eth1", 2)
274 b.push_in("eth1", 1)
275 b.push_in("eth1", 1)
276
277 b.push_out("eth0", 5)
278 b.push_out("eth0", 10)
279 b.push_in("eth0", 0)
280 b.push_in("eth0", 0)
281
282 self.assertEqual(b.get_history(), expected)
283
285 """
286 Tests if a history made large than items stored will contain zeroes
287 """
288 expected = {'eth1': {'out': {'hist': [1, 1, 0, 0], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1, 0, 0], 'prev': 5, 'max' : 1}}}
289 b = iface.BandwidthHistory(2, ["eth1"])
290 for i in xrange(6):
291 b.push_in("eth1", i)
292 b.push_out("eth1", i)
293 b.set_size(4)
294
295 self.assertEqual(b.get_history(), expected)
296
297 - def testPushAndCrop(self):
298 """
299 Tests if history is sane if shrunk to smaller size than the amount
300 of values it contains
301 """
302 expected = {'eth1': {'out': {'hist': [1, 1], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1], 'prev': 5, 'max' : 1}}}
303 b = iface.BandwidthHistory(4, ["eth1"])
304 for i in xrange(6):
305 b.push_in("eth1", i)
306 b.push_out("eth1", i)
307 b.set_size(2)
308
309 self.assertEqual(b.get_history(), expected)
310
312 """
313 Test if the values shift correctly if we push more
314 data than the history can handle
315 """
316 expected = {'eth1': {'out': {'hist': [1, 1], 'prev': 5, 'max' : 1}, 'in': {'hist': [1, 1], 'prev': 5, 'max' : 1}}}
317 b = iface.BandwidthHistory(2, ["eth1"])
318 for i in xrange(6):
319 b.push_in("eth1", i)
320 b.push_out("eth1", i)
321
322 self.assertEqual(b.get_history(), expected)
323
325 expected = {'eth1': {'out': {'hist': [7, 9], 'prev': 25, 'max' : 9}, 'in': {'hist': [3, 1], 'prev': 25, 'max' : 3}}}
326 b = iface.BandwidthHistory(2, ["eth1"])
327
328 limit = 6
329 for i in xrange(6):
330 b.push_out("eth1", i**2)
331 b.push_in("eth1", ((limit-1)**2) - ((limit-i-1)**2))
332
333 self.assertEqual(b.get_history(), expected)
334
336 """
337 Tests Snmp from snmp.py
338 """
340 """
341 Tests if correct initialization of Snmp goes OK
342 """
343 obj = snmp.Snmp()
344 obj = snmp.Snmp("../test_data/proc_net_snmp")
345
347 """
348 Tests if Snmp constructor raises appropriate exception when
349 fetched with bogus data
350 """
351 self.assertRaises(IOError, snmp.Snmp, "file_that_is_not_here")
352
354 """
355 Tests if TCP statistics returned match expected
356 """
357 obj = snmp.Snmp("../test_data/proc_net_snmp")
358 expected = {'RtoAlgorithm': '1', 'RetransSegs': '1003', 'InErrs': '0',
359 'CurrEstab': '0', 'RtoMin': '200', 'OutSegs': '113061',
360 'OutRsts': '549', 'PassiveOpens': '29', 'RtoMax': '120000',
361 'InSegs': '119838', 'ActiveOpens': '6073', 'MaxConn': '-1',
362 'EstabResets': '540', 'AttemptFails': '18'}
363 self.assertEquals(obj.get_tcp_stats(), expected)
364
366 """
367 Tests if UDP statistics returned match expected
368 """
369 obj = snmp.Snmp("../test_data/proc_net_snmp")
370 expected = {'InDatagrams': '8707', 'OutDatagrams': '19162',
371 'RcvbufErrors': '0', 'InErrors': '0',
372 'SndbufErrors': '0', 'NoPorts': '88'}
373 self.assertEquals(obj.get_udp_stats(), expected)
374
376 """
377 Tests if IP statistics returned match expected
378 """
379 obj = snmp.Snmp("../test_data/proc_net_snmp")
380 expected = {'FragCreates': '0', 'ReasmFails': '0', 'Forwarding': '2',
381 'ReasmOKs': '0', 'ReasmReqds': '0', 'ReasmTimeout': '0',
382 'FragFails': '0', 'InReceives': '128859', 'OutDiscards': '0',
383 'InUnknownProtos': '0', 'InAddrErrors': '82', 'FragOKs': '0',
384 'InHdrErrors': '0', 'InDelivers': '128674', 'OutNoRoutes': '230',
385 'InDiscards': '0', 'DefaultTTL': '64', 'ForwDatagrams': '0', 'OutRequests': '133124'}
386 self.assertEquals(obj.get_ip_stats(), expected)
387
388
390 """
391 Tests if ICMP statistics returned match expected
392 """
393 obj = snmp.Snmp("../test_data/proc_net_snmp")
394 expected = {'InRedirects': '0', 'InMsgs': '41', 'InSrcQuenchs': '0',
395 'InAddrMasks': '0', 'InTimestamps': '0', 'OutEchos': '0',
396 'OutParmProbs': '0', 'InTimestampReps': '0', 'InParmProbs': '0',
397 'OutMsgs': '184', 'OutTimestamps': '0', 'OutAddrMaskReps': '0',
398 'OutTimeExcds': '0', 'OutErrors': '0', 'InDestUnreachs': '37',
399 'InTimeExcds': '0', 'OutSrcQuenchs': '0', 'OutEchoReps': '0',
400 'OutAddrMasks': '0', 'OutDestUnreachs': '184',
401 'OutRedirects': '0', 'InEchoReps': '4', 'OutTimestampReps': '0',
402 'InEchos': '0', 'InErrors': '6', 'InAddrMaskReps': '0'}
403 self.assertEquals(obj.get_icmp_stats(), expected)
404
405
407 """
408 A test for the class "FwRecord" from iptables.py
409 """
410
411 sample_tcp = """Apr 2 21:01:25 jarilo kernel: [IPTABLES DROP] : IN=eth0 OUT=
412 MAC=00:11:43:2b:99:fe:00:19:aa:d6:24:bf:08:00 SRC=85.70.30.130 DST=10.34.32.27
413 LEN=349 TOS=0x0 0 PREC=0x00 TTL=114 ID=34270 DF PROTO=TCP SPT=33597 DPT=60375
414 WINDOW=65470 RES=0x00 ACK PSH URGP=0 OPT (0101080A000415C625823E7F)"""
415
416 sample_icmp = """Apr 2 21:15:40 jarilo kernel: [IPTABLES DROP] : IN= OUT=eth0
417 SRC=10.34.32.27 DST=209.85.129.147 LEN=84 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF
418 PROTO=ICMP TYPE=8 CODE=0 ID=36386 SEQ=1"""
419
420 sample_udp = """Apr 2 21:08:13 jarilo kernel: [IPTABLES DROP] : IN=eth0 OUT=
421 MAC=ff:ff:ff:ff:ff:ff:00:e0:81:2f:58:19:08:00 SRC=10.34.32.101 DST=255.255.255.255
422 LEN=119 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF PROTO=UDP SPT=631 DPT=631 LEN=99
423
424 """
426 """
427 Tests initializing based on records from different protocols as format of
428 their records vary.
429 """
430 fwr = FwRecord(self.sample_tcp)
431 fwr = FwRecord(self.sample_icmp)
432 fwr = FwRecord(self.sample_udp)
433 fwr = FwRecord()
434
436 """
437 Tests adding a new value to the filter, raising an exception
438 when uknown key is referenced and returning N/A upon referencing
439 known key with no value
440 """
441 save = copy.deepcopy(FwRecord.fields)
442 FwRecord.fields += ["TTL"]
443 fwr = FwRecord(self.sample_tcp)
444 self.assertEqual(fwr["TTL"], "114")
445
446 try:
447 fwr["XYZ"]
448 except KeyError:
449 pass
450 else:
451 raise AssertionError("Getting unknown value does not raise KeyError")
452
453 FwRecord.fields += ["XYZ"]
454 self.assertEqual(fwr["XYZ"], "N/A")
455 FwRecord.fields = save
456
458 """
459 Test correctness of values retreived
460 """
461 fwr = FwRecord(self.sample_tcp)
462 self.assertEqual(fwr["SRC"], "85.70.30.130")
463 self.assertEqual(fwr["DST"], "10.34.32.27")
464 self.assertEqual(fwr["SPT"], "33597")
465 self.assertEqual(fwr["DPT"], "60375")
466 self.assertEqual(fwr["IN"], "eth0")
467 fwr = FwRecord("")
468 self.assertEqual(str(fwr), "N/A;N/A;N/A;N/A;N/A;N/A;N/A")
469
471 """
472 Create two records from the same data, see of they are equal,
473 then compare with a different one, see the exception
474 """
475 fwr = FwRecord(self.sample_tcp)
476 fwr_tcp = FwRecord(self.sample_tcp)
477 fwr_udp = FwRecord(self.sample_udp)
478 self.assertEqual(fwr, fwr_tcp)
479 self.assertRaises(AssertionError, self.assertEqual, fwr, fwr_udp)
480
482 """
483 A real world test with a big file
484 """
485 file = open("../test_data/iptables")
486 for l in file: FwRecord(l)
487
489 """
490 Tests class LogAnalyzer from iptables.py
491 """
492 test_target = LogTarget("../test_data/iptables.short", ["[IPTABLES DROP]"])
493 config = Config(debug=True).iptables
494
497
499 self.assertRaises(IOError, LogTarget, "nothere",["[IPTABLES DROP]"])
500
503
506
508 expected = "{'SRC': '10.34.32.27', 'DPT': '80', 'DST': '217.132.226.190', 'SPT': '48767', 'DATETIME': datetime.datetime(1900, 4, 2, 21, 19, 10), 'IN': 'N/A', 'OUT': 'eth0'}"
509
510 i = LogAnalyzer(self.test_target, self.config)
511 i.parse_file()
512 a = str(i.get_log()[0])
513 b = expected
514 self.assertEqual(a,b )
515
516
518 """
519 Test retreiving number of most denied ports against a known iptables
520 log file.
521 """
522 expected = [('1234', '5'), ('555', '3'), ('80', '2')]
523 i = LogAnalyzer(self.test_target, self.config)
524 i.parse_file()
525 self.assertEqual(i.top_ports(), expected)
526
528 """
529 Test retreiving number of most denied ports against a known iptables
530 log file.
531 """
532 expected = [('10.34.32.27', '8'), ('24.2.173.101', '1'), ('58.167.42.182', '1')]
533 i = LogAnalyzer(self.test_target, self.config)
534 i.parse_file()
535 self.assertEqual(i.top_hosts(), expected)
536
538 tcp = "7: 0200A8C0:9F90 1809BCCD:1446 01 00000000:00000000 00:00000000 00000000 500 0 126442 1 d5cc4a80 326 40 14 2 100"
539 udp = "33: 00000000:0FA1 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 8841 2 dc71f2c0"
540
547
549 """
550 Init raises exception when fetched with bogus data
551 """
552 self.assertRaises(IndexError, NetProcess, "")
553
555 """
556 Another badness suitable only for testing code
557 """
558 data = { "0100007F:0277" : ('127.0.0.1', 631), "0200A8C0:9CD1" : ('192.168.0.2', 40145) }
559 t = NetProcess(TestNetProcess.tcp)
560
561 for (k,v) in data.items():
562 self.assertEquals(v, t._NetProcess__parse_address(k))
563
565 """
566 Tests if an attempt to retreive correct key works OK and
567 an attempt to retreive b0rked raises exc.
568 """
569 t = NetProcess(TestNetProcess.tcp)
570
571 for key in NetProcess.keys:
572 foo = t[key]
573
574 for key in [ "foo", "bar" ]:
575 self.assertRaises(KeyError, t._NetProcess__check_key, key)
576
578 repr = "addr_local 192.168.0.2 port_local 40848 addr_remote 205.188.9.24 port_remote 5190 pid N/A inode 126442 uid 500 state 1 progname N/A state Established"
579 expected = dict(zip(repr.split()[::2], repr.split()[1::2]))
580 t = NetProcess(TestNetProcess.tcp)
581 for key in NetProcess.keys:
582 if key == "icon": continue
583 self.assertEqual(str(t[key]), str(expected[key]))
584
586 t = NetProcess(TestNetProcess.tcp)
587 for key in NetProcess.keys:
588 if key == "state": continue
589 t[key] = "XXX"
590 self.assertEqual(str(t[key]), "XXX")
591
592 t["state"] = 1
593 self.assertEqual(str(t["state"]), "Established")
594
607
611
613 """
614 Set up a chroot /proc and find a matching pid for the open socket
615 """
616
617 if __name__ == "__main__":
618 unittest.main()
619