| #!/usr/bin/python2 |
| |
| import os |
| import time |
| import unittest |
| import process_manager |
| |
| class test_linkfail(unittest.TestCase, process_manager.ProcessManager): |
| """Strategy link failure test scenario""" |
| |
| def setUp(self): |
| os.chdir("test_linkfail") |
| |
| def tearDown(self): |
| self.startProcess("stop", ["./stop.sh"], "-> Stopping NFD") |
| self.waitForProcessCompletion("stop", None) |
| os.chdir("..") |
| |
| def run_strategy_test(self, key, strategy, expectLoss=False): |
| print "[linkfail] testing", key |
| self.startProcess("start", ["./start.sh", key, strategy], "-> Starting NFD and creating faces") |
| self.waitForProcessCompletion("start", None) |
| self.startProcess("pingserver", ["./pingserver.sh", key], "-> Starting ping server on B") |
| time.sleep(1) |
| self.startProcess("ping", ["./ping.sh", key], "-> Starting ping client on A") |
| |
| time.sleep(5) |
| self.startProcess("ABfail", ["./linkfail.sh", "fail", "A1", "B1"], "-> Failing link A-B") |
| self.waitForProcessCompletion("ABfail", None) |
| time.sleep(5) |
| self.startProcess("ABrecover", ["./linkfail.sh", "recover", "A1", "B1"], "-> Recovering link A-B") |
| self.waitForProcessCompletion("ABrecover", None) |
| time.sleep(5) |
| self.startProcess("ACfail", ["./linkfail.sh", "fail", "A1", "C1"], "-> Failing link A-C") |
| self.waitForProcessCompletion("ACfail", None) |
| time.sleep(5) |
| self.startProcess("ACrecover", ["./linkfail.sh", "recover", "A1", "C1"], "-> Recovering link A-C") |
| self.waitForProcessCompletion("ACrecover", None) |
| self.waitForProcessCompletion("ping", None) |
| self.killProcess("pingserver") |
| |
| if expectLoss: |
| if self.getProcessReturnCode("ping") == 0: |
| self.fail("EXPECTED loss or delay; ACTUAL no loss or delay") |
| else: |
| if self.getProcessReturnCode("ping") <> 0: |
| self.fail("ping loss or delay") |
| |
| def test_broadcast(self): |
| self.run_strategy_test("broadcast", "ndn:/localhost/nfd/strategy/broadcast") |
| |
| def test_ncc(self): |
| self.run_strategy_test("ncc", "ndn:/localhost/nfd/strategy/ncc") |
| |
| def test_bestroute(self): |
| self.run_strategy_test("bestroute", "ndn:/localhost/nfd/strategy/best-route", True) |