diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2ea2489 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.pyc +*.zip +*~ +/.idea/ +/log/ +/state/ diff --git a/config.py b/config.py index a82b3aa..d39364d 100644 --- a/config.py +++ b/config.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2017 Sophos Limited +# Copyright 2019 Sophos Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. @@ -12,6 +12,10 @@ # License. # +import unittest +import shutil +import tempfile +import os import re try: import ConfigParser @@ -20,9 +24,9 @@ class Config: - "Class providing config values" + """Class providing config values""" def __init__(self, path): - "Open the config file" + """Open the config file""" self.config = ConfigParser.ConfigParser() self.config.read(path) @@ -32,8 +36,9 @@ def __getattr__(self, name): class Token: def __init__(self, token_txt): - "Initialize with the token text" - rex = re.compile(r"url\: (?Phttps\://.+), x-api-key\: (?P.+), Authorization\: (?P.+)$") + """Initialize with the token text""" + rex_txt = r"url\: (?Phttps\://.+), x-api-key\: (?P.+), Authorization\: (?P.+)$" + rex = re.compile(rex_txt) m = rex.search(token_txt) self.url = m.group("url") self.api_key = m.group("api_key") @@ -46,11 +51,8 @@ def __init__(self, token_txt): # -import unittest, copy, shutil, tempfile, os - - class TestConfig(unittest.TestCase): - "Test Config file items are exposed as attributes on config object" + """Test Config file items are exposed as attributes on config object""" def setUp(self): self.tmpdir = tempfile.mkdtemp(prefix="config_test", dir=".") @@ -68,7 +70,7 @@ def testReadingWhenAttributeExists(self): class TestToken(unittest.TestCase): - "Test the token gets parsed" + """Test the token gets parsed""" def testParse(self): txt = " url: https://anywhere.com/api, x-api-key: random, Authorization: Basic KJNKLJNjklNLKHB= " t = Token(txt) @@ -79,4 +81,3 @@ def testParse(self): if __name__ == '__main__': unittest.main() - diff --git a/name_mapping.py b/name_mapping.py index 56b0e9b..fef9a4f 100644 --- a/name_mapping.py +++ b/name_mapping.py @@ -1,4 +1,4 @@ -# Copyright 2017 Sophos Limited +# Copyright 2019 Sophos Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. @@ -11,6 +11,8 @@ # import re +import unittest +import copy threat_regex = re.compile("'(?P.*?)'.+'(?P.*?)'") @@ -18,35 +20,35 @@ # What to do with the different types of event. None indicates drop the event, otherwise a regex extracts the # various fields and inserts them into the event dictionary. TYPE_HANDLERS = { - "Event::Endpoint::Threat::Detected" : threat_regex, - "Event::Endpoint::Threat::CleanedUp" : threat_regex, - "Event::Endpoint::Threat::HIPSDismissed" : threat_regex, - "Event::Endpoint::Threat::HIPSDetected" : threat_regex, - "Event::Endpoint::Threat::PuaDetected" : threat_regex, - "Event::Endpoint::Threat::PuaCleanupFailed" : threat_regex, - "Event::Endpoint::Threat::HIPSDetected" : threat_regex, - "Event::Endpoint::Threat::HIPSDismissed" : threat_regex, - "Event::Endpoint::Threat::CleanupFailed" : threat_regex, - "Event::Endpoint::Threat::CommandAndControlDismissed" : threat_regex, - "Event::Endpoint::Threat::HIPSCleanupFailed" : threat_regex, - "Event::Endpoint::DataLossPreventionUserAllowed" : - re.compile(u"An \u2033(?P.+)\u2033.+ Username: (?P.+?) Rule names: '(?P.+?)' " - "User action: (?P.+?) Application Name: (?P.+?) Data Control action: (?P.+?) " - "File type: (?P.+?) File size: (?P\\d+?) Source path: (?P.+)$"), - - "Event::Endpoint::NonCompliant" : None, # None == ignore the event - "Event::Endpoint::Compliant" : None, - "Event::Endpoint::Device::AlertedOnly" : None, - "Event::Endpoint::UpdateFailure" : None, - "Event::Endpoint::SavScanComplete" : None, - "Event::Endpoint::Application::Allowed" : None, - "Event::Endpoint::UpdateSuccess" : None, - "Event::Endpoint::WebControlViolation" : None, - "Event::Endpoint::WebFilteringBlocked" : None, + "Event::Endpoint::Threat::Detected": threat_regex, + "Event::Endpoint::Threat::CleanedUp": threat_regex, + "Event::Endpoint::Threat::HIPSDismissed": threat_regex, + "Event::Endpoint::Threat::HIPSDetected": threat_regex, + "Event::Endpoint::Threat::PuaDetected": threat_regex, + "Event::Endpoint::Threat::PuaCleanupFailed": threat_regex, + "Event::Endpoint::Threat::CleanupFailed": threat_regex, + "Event::Endpoint::Threat::CommandAndControlDismissed": threat_regex, + "Event::Endpoint::Threat::HIPSCleanupFailed": threat_regex, + "Event::Endpoint::DataLossPreventionUserAllowed": + re.compile(u"An \u2033(?P.+)\u2033.+ Username: (?P.+?) {2}" + u"Rule names: \u2032(?P.+?)\u2032 {2}" + "User action: (?P.+?) {2}Application Name: (?P.+?) {2}" + "Data Control action: (?P.+?) {2}" + "File type: (?P.+?) {2}File size: (?P\\d+?) {2}" + "Source path: (?P.+)$"), + + "Event::Endpoint::NonCompliant": None, # None == ignore the event + "Event::Endpoint::Compliant": None, + "Event::Endpoint::Device::AlertedOnly": None, + "Event::Endpoint::UpdateFailure": None, + "Event::Endpoint::SavScanComplete": None, + "Event::Endpoint::Application::Allowed": None, + "Event::Endpoint::UpdateSuccess": None, + "Event::Endpoint::WebControlViolation": None, + "Event::Endpoint::WebFilteringBlocked": None, } - def update_fields(log, data): """ Split 'name' field into multiple fields based on regex and field names specified in TYPE_HANDLERS @@ -62,7 +64,7 @@ def update_fields(log, data): return result = prog_regex.search(data[u'name']) if not result: - log("Failed to split name field for event type %s" % data[u'type']) + log("Failed to split name field for event type %r" % data[u'type']) return # Make sure record has a name field corresponding to the first field (for the CEF format) @@ -80,10 +82,12 @@ def update_fields(log, data): # -import unittest, copy +def contains(dict_outer, dict_inner): + return all(item in dict_outer.items() for item in dict_inner.items()) + class TestNameExtraction(unittest.TestCase): - "Test logging output" + """Test logging output""" def setUp(self): self.output = [] @@ -94,16 +98,15 @@ def tearDown(self): def log(self, s): self.output.append(s) - def contains(self, dict_outer, dict_inner): - return all(item in dict_outer.items() for item in dict_inner.items()) - def testUpdateNameDLPValid(self): - "DLP event with data that can be extracted" + """DLP event with data that can be extracted""" data = { "type": "Event::Endpoint::DataLossPreventionUserAllowed", - "name": u"An \u2033allow transfer on acceptance by user\u2033 action was taken. Username: WIN10CLOUD2\\Sophos " - "Rule names: 'test' User action: File open Application Name: Google Chrome Data Control action: Allow " - "File type: Plain text (ASCII/UTF-8) File size: 36 Source path: C:\\Users\\Sophos\\Desktop\\test.txt" + "name": u"An \u2033allow transfer on acceptance by user\u2033 action was taken. " + u"Username: WIN10CLOUD2\\Sophos Rule names: \u2032test\u2032 User action: File open " + u"Application Name: Google Chrome Data Control action: Allow " + u"File type: Plain text (ASCII/UTF-8) File size: 36 " + u"Source path: C:\\Users\\Sophos\\Desktop\\test.txt" } expected = { "type": "Event::Endpoint::DataLossPreventionUserAllowed", @@ -118,12 +121,12 @@ def testUpdateNameDLPValid(self): "file_path": "C:\\Users\\Sophos\\Desktop\\test.txt" } update_fields(self.log, data) - self.assertTrue(self.contains(data, expected)) + self.assertTrue(all(item in data.items() for item in expected.items())) self.assertEqual(len(self.output), 0) def testUpdateNameThreatValid(self): - "Threat event with data that can be extracted" - data = { "type": "Event::Endpoint::Threat::CleanedUp", "name": u"Threat 'EICAR' in 'myfile.com' "} + """Threat event with data that can be extracted""" + data = {"type": "Event::Endpoint::Threat::CleanedUp", "name": u"Threat 'EICAR' in 'myfile.com' "} expected = { "type": "Event::Endpoint::Threat::CleanedUp", "name": u"EICAR", @@ -131,11 +134,11 @@ def testUpdateNameThreatValid(self): "detection_identity_name": "EICAR" } update_fields(self.log, data) - self.assertTrue(self.contains(data, expected)) # expected data present + self.assertTrue(contains(data, expected)) # expected data present self.assertEqual(len(self.output), 0) # no error def testUpdateNameInvalid(self): - "A known type, but information can't be extracted (regex mismatch)" + """A known type, but information can't be extracted (regex mismatch)""" data = {"type": "Event::Endpoint::DataLossPreventionUserAllowed", "name": u"XXXX Garbage data XXXX"} before = copy.copy(data) update_fields(self.log, data) @@ -143,7 +146,7 @@ def testUpdateNameInvalid(self): self.assertEqual(data, before) # ... and data remains unchanged def testUpdateNameFromDescription(self): - "Ensure the name gets updated from the description, if present" + """Ensure the name gets updated from the description, if present""" data = {"type": "", "description": "XXX"} expected = copy.copy(data) expected["name"] = "XXX" @@ -151,22 +154,22 @@ def testUpdateNameFromDescription(self): self.assertEqual(data, expected) def testInvalidType(self): - "Ensure that nothing gets changed when the type isn't recognised" - data = {"type": "<>", "name":"some name"} + """Ensure that nothing gets changed when the type isn't recognised""" + data = {"type": "<>", "name": "some name"} expected = copy.copy(data) update_fields(self.log, data) self.assertEqual(len(self.output), 0) # not considered an error self.assertEqual(data, expected) def testSkippedType(self): - "Ensure that entry is skipped if it's to be ignored." + """Ensure that entry is skipped if it's to be ignored.""" # First find an event type that is set to 'None' toskip = None - for k,v in TYPE_HANDLERS.items(): + for k, v in TYPE_HANDLERS.items(): if not v: - toskip = k - break - data = {"type": toskip, "name":"some name"} + toskip = k + break + data = {"type": toskip, "name": "some name"} expected = copy.copy(data) update_fields(self.log, data) self.assertEqual(len(self.output), 0) # not considered an error @@ -175,4 +178,3 @@ def testSkippedType(self): if __name__ == '__main__': unittest.main() - diff --git a/siem.py b/siem.py index e9b8592..a5d8426 100755 --- a/siem.py +++ b/siem.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2017 Sophos Limited +# Copyright 2019 Sophos Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. @@ -41,10 +41,17 @@ import config +def get_syslog_facilities(): + """Create a mapping between our names and the python syslog defines""" + out = {} + possible = "auth cron daemon kern lpr mail news syslog user uucp " \ + "local0 local1 local2 local3 local4 local5 local6 local7".split() + for facility in possible: + out[facility] = getattr(logging.handlers.SysLogHandler, "LOG_%s" % facility.upper()) + return out + -SYSLOG_FACILITY = {} -for facility in ['auth','cron','daemon','kern','lpr','mail','news','syslog','user','uucp','local0','local1','local2','local3','local4','local5','local6','local7']: - SYSLOG_FACILITY[facility] = getattr(logging.handlers.SysLogHandler, "LOG_%s" % facility.upper()) +SYSLOG_FACILITY = get_syslog_facilities() SYSLOG_SOCKTYPE = {'udp': socket.SOCK_DGRAM, @@ -68,7 +75,11 @@ 'very_high': 10} -NOISY_EVENTTYPES = [k for k,v in name_mapping.TYPE_HANDLERS.items() if not v] +def get_noisy_event_types(): + return [k for k, v in name_mapping.TYPE_HANDLERS.items() if not v] + + +NOISY_EVENTTYPES = get_noisy_event_types() EVENTS_V1 = '/siem/v1/events' ALERTS_V1 = '/siem/v1/alerts' @@ -92,7 +103,7 @@ # CEF_header_prefix: JSON_key "device_event_class_id": "type", "name": "name", - "severity" :"severity", + "severity": "severity", # json to CEF extension mapping # Format @@ -325,7 +336,6 @@ def call_endpoint(opener, endpoint, since, cursor, state_file_path, token): if DEBUG: log("RESPONSE: %s" % events_response) events = json.loads(events_response) - # events looks like this # { @@ -469,5 +479,6 @@ def format_cef(data): def remove_null_values(data): return {k: v for k, v in data.items() if v is not None} + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/test_regression.py b/test_regression.py index 3500707..6653421 100644 --- a/test_regression.py +++ b/test_regression.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2017 Sophos Limited +# Copyright 2019 Sophos Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. @@ -30,71 +30,79 @@ from ConfigParser import ConfigParser -import unittest, copy, shutil, tempfile, os, json, re, sys -import urllib2, glob +import unittest +import shutil +import tempfile +import os +import json +import re +import urllib2 +import glob from StringIO import StringIO from zipfile import ZipFile from subprocess import Popen, PIPE import pycef +def find_python(ver): + """Try to find Python of the given version. Attempt to work on Windows and Linux""" + win_python2 = glob.glob("c:\\python%d*\\python.exe" % ver) # assume Active Python on Windows + if win_python2: + return win_python2[0] + else: + return "python%d" % ver + + class SIEMRunner: - "Manage SIEM executions in a clean temporary directory, collect results" + """Manage SIEM executions in a clean temporary directory, collect results""" def __init__(self, name): self.root = tempfile.mkdtemp(prefix=name, dir=".") self.cfg_path = os.path.join(self.root, "config.ini") - self.keyval_rex = re.compile(r"^(?P\d\d\d\d-\d\d-\d\dT\d\d\:\d\d\:\d\d\.\d+Z)\s(?P.+)$") + self.keyval_rex = re.compile(r"^(?P\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z)\s(?P.+)$") - def SetConfig(self, name, value): - "Set a config value in the config.ini file" + def set_config(self, name, value): + """Set a config value in the config.ini file""" config = ConfigParser() config.read(self.cfg_path) config.set('login', name, value) with open(self.cfg_path, 'w') as fp: config.write(fp) - def GetConfig(self, name): - "Read a config value from the config.ini file" + def get_config(self, name): + """Read a config value from the config.ini file""" config = ConfigParser() config.read(self.cfg_path) return config.get('login', name) - def RunPython(self, pypath, *args): - "Run the script with given python interpreter (pypath) supplying *args" + def run_python(self, pypath, *args): + """Run the script with given python interpreter (pypath) supplying *args""" before = os.getcwd() os.chdir(self.root) - p = Popen([pypath, "siem.py"]+ list(args), stdout = PIPE, stderr = PIPE) - stdout, stderr = p.communicate("") + p = Popen([pypath, "siem.py"] + list(args), stdout=PIPE, stderr=PIPE) + p.communicate("") retcode = p.wait() os.chdir(before) return retcode - def FindPython(self, ver): - "Try to find Python of the given version. Attempt to work on Windows and Linux" - win_python2 = glob.glob("c:\\python%d*\\python.exe" % ver) # assume Active Python on Windows - if win_python2: - return win_python2[0] - else: - return "python%d" % ver + def run_python_version(self, ver, *args): + exe = find_python(ver) + self.run_python(exe, *args) - def RunPythonVer(self, ver, *args): - exe = self.FindPython(ver) - self.RunPython(exe, *args) - - def ResetState(self): - "Remove state files, start over" + def reset_state(self): + """Remove state files, start over""" os.unlink(os.path.join(self.root, "state", "siem_lastrun_events.obj")) - def GetResults(self): + def get_results(self): """ Find out where the results went, read, parse and return them if possible. """ - ofile = self.GetConfig("filename") + ofile = self.get_config("filename") out = [] with open(os.path.join(self.root, "log", ofile)) as fp: for line in fp.readlines(): data = line.strip() - if not data: continue + if not data: + continue if data.startswith("CEF"): out.append(pycef.parse(data)) continue @@ -108,70 +116,74 @@ def GetResults(self): return out +def get_release(): + """Return the latest version of Sophos SIEM from github. Cache it in the file 'master.zip'""" + zip_location = "https://github.com/sophos/Sophos-Central-SIEM-Integration/archive/master.zip" + if not os.path.exists("master.zip"): + fp = urllib2.urlopen(zip_location) + zip_data = fp.read(int(fp.headers["Content-Length"])) + fp.close() + open("master.zip", "wb").write(zip_data) + return open("master.zip", "rb").read() + + +def write_from_zip(zip_data, root): + """Given a zip file as data, write the contents to the given root dir""" + zf = StringIO(zip_data) + input_zip = ZipFile(zf) + for name in input_zip.namelist(): + data = input_zip.read(name) + ofile = os.path.basename(name) + if ofile == "": + continue # don't extract directory entry. + opath = os.path.join(root, ofile) + with open(opath, "wb") as fp: + fp.write(data) + + +def write_from_dir(srcdir, root): + """Copy contents of the srcdir, to the root""" + for i in os.listdir(srcdir): + e = os.path.splitext(i)[1] + if e in [".py", ".ini", ".txt"]: + shutil.copyfile(i, os.path.join(root, i)) + + class BaseTest(unittest.TestCase): - "Some utility functions used in the other tests" - - def GetRelease(self): - "Return the latest version of Sophos SIEM from github. Cache it in the file 'master.zip'" - zip_location = "https://github.com/sophos/Sophos-Central-SIEM-Integration/archive/master.zip" - if not os.path.exists("master.zip"): - fp = urllib2.urlopen(zip_location) - zip_data = fp.read(int(fp.headers["Content-Length"])) - fp.close() - open("master.zip","wb").write(zip_data) - return open("master.zip", "rb").read() - - def WriteFromZip(self, zip_data, root): - "Given a zip file as data, write the contents to the given root dir" - zf = StringIO(zip_data) - input_zip=ZipFile(zf) - for name in input_zip.namelist(): - data = input_zip.read(name) - ofile = os.path.basename(name) - if ofile == "": continue # don't extract directory entry. - opath = os.path.join(root, ofile) - with open(opath, "wb") as fp: - fp.write(data) - - def WriteFromDir(self, srcdir, root): - "Copy contents of the srcdir, to the root" - for i in os.listdir(srcdir): - e = os.path.splitext(i)[1] - if e in [".py", ".ini", ".txt"]: - shutil.copyfile(i, os.path.join(root, i)) - - def ConfigureAll(self, name, value): - "Write a config value to both SIEM Runners" + """Some utility functions used in the other tests""" + + def configure_all(self, name, value): + """Write a config value to both SIEM Runners""" for runner in [self.orig_runner, self.new_runner]: - runner.SetConfig(name, value) + runner.set_config(name, value) class TestCompareOutput(BaseTest): - "Test that old and new versions of the software are the same" + """Test that old and new versions of the software are the same""" def setUp(self): # Create the original version install self.orig_runner = SIEMRunner("orig_py2") - self.zip_data = self.GetRelease() - self.WriteFromZip(self.zip_data, self.orig_runner.root) + self.zip_data = get_release() + write_from_zip(self.zip_data, self.orig_runner.root) # Create the new version install self.new_runner = SIEMRunner("new_py2") - self.WriteFromDir(".", self.new_runner.root) + write_from_dir(".", self.new_runner.root) # Configure downloaded (original) version with the token from the version in cwd - orig_token = self.orig_runner.GetConfig("token_info") + orig_token = self.orig_runner.get_config("token_info") self.assertTrue(orig_token.startswith("<")) # we've got the as-shipped token text. Expected. - new_token = self.new_runner.GetConfig("token_info") + new_token = self.new_runner.get_config("token_info") self.assertTrue(new_token.startswith("url:")) # Check the substitution - self.orig_runner.SetConfig("token_info", new_token) + self.orig_runner.set_config("token_info", new_token) # Set config common to all tests - self.ConfigureAll("filename", "result.txt") - self.ConfigureAll("endpoint", "event") + self.configure_all("filename", "result.txt") + self.configure_all("endpoint", "event") def tearDown(self): for i in [self.orig_runner.root, self.new_runner.root]: @@ -179,20 +191,20 @@ def tearDown(self): shutil.rmtree(i) def RunBoth(self, ver, *args): - "Run both versions and collect results as tuple." - self.orig_runner.RunPythonVer(ver, *args) - self.new_runner.RunPythonVer(ver, *args) - return self.orig_runner.GetResults(), self.new_runner.GetResults() + """Run both versions and collect results as tuple.""" + self.orig_runner.run_python_version(ver, *args) + self.new_runner.run_python_version(ver, *args) + return self.orig_runner.get_results(), self.new_runner.get_results() def testJson(self): - "Test the json output is identical between versions" - self.ConfigureAll("format", "json") + """Test the json output is identical between versions""" + self.configure_all("format", "json") orig, new = self.RunBoth(2) self.assertEqual(orig, new) def testCEF(self): - "Test the CEF output is identical between versions" - self.ConfigureAll("format", "cef") + """Test the CEF output is identical between versions""" + self.configure_all("format", "cef") orig, new = self.RunBoth(2) self.assertEqual(orig, new) @@ -204,7 +216,7 @@ def XXtestKeyValue(self): If you need this test, comment it in (Remove XX above) and keep running it. It will pass approx 50% of the time. """ - self.ConfigureAll("format", "keyvalue") + self.configure_all("format", "keyvalue") orig, new = self.RunBoth(2) self.assertEqual(orig, new) @@ -213,12 +225,12 @@ def testJsonDifferentPython(self): Run the new version with Python3, old version with Python2 and compare output. This should result in the same output. """ - self.ConfigureAll("format", "json") - self.orig_runner.RunPythonVer(2) - orig = self.orig_runner.GetResults() + self.configure_all("format", "json") + self.orig_runner.run_python_version(2) + orig = self.orig_runner.get_results() - self.new_runner.RunPythonVer(3) - new = self.new_runner.GetResults() + self.new_runner.run_python_version(3) + new = self.new_runner.get_results() self.assertEqual(orig, new) @@ -226,36 +238,36 @@ class TestNewFunctionality(BaseTest): def setUp(self): self.runner = SIEMRunner("new_py2") - self.WriteFromDir(".", self.runner.root) - self.runner.SetConfig("format", "json") - self.runner.SetConfig("filename", "result.txt") - self.runner.SetConfig("endpoint", "event") + write_from_dir(".", self.runner.root) + self.runner.set_config("format", "json") + self.runner.set_config("filename", "result.txt") + self.runner.set_config("endpoint", "event") def tearDown(self): if os.path.exists(self.runner.root): shutil.rmtree(self.runner.root) def testRunTwice(self): - "Run the program twice, make sure the results file doesn't change in size" - self.runner.RunPythonVer(2) - first_run = self.runner.GetResults() - self.runner.RunPythonVer(2) - second_run = self.runner.GetResults() + """Run the program twice, make sure the results file doesn't change in size""" + self.runner.run_python_version(2) + first_run = self.runner.get_results() + self.runner.run_python_version(2) + second_run = self.runner.get_results() self.assertEqual(first_run, second_run) def testRunWithStaleResults(self): - "Run the program with some old data, make sure it doesn't get overwritten" + """Run the program with some old data, make sure it doesn't get overwritten""" logdir = os.path.join(self.runner.root, "log") os.makedirs(logdir) - ofile = os.path.join(logdir, self.runner.GetConfig("filename")) + ofile = os.path.join(logdir, self.runner.get_config("filename")) marker = '["SOME OLD JSON LOG DATA"]\r\n' with open(ofile, "wb") as fp: fp.write(marker) before_size = os.stat(ofile).st_size self.assertEqual(before_size, len(marker)) - self.runner.RunPythonVer(2) + self.runner.run_python_version(2) after_size = os.stat(ofile).st_size new_marker = open(ofile, "rb").read(len(marker)) self.assertEqual(marker, new_marker) @@ -263,25 +275,21 @@ def testRunWithStaleResults(self): def testLightMode(self): noisy = ["Event::Endpoint::UpdateSuccess"] - self.runner.RunPythonVer(2, "--light") - for i in self.runner.GetResults(): + self.runner.run_python_version(2, "--light") + for i in self.runner.get_results(): # We know for sure this event will always be noisy. Could check for the others. self.assertTrue(i["type"] not in noisy) # Make sure the - self.runner.ResetState() - self.runner.RunPythonVer(2) + self.runner.reset_state() + self.runner.run_python_version(2) found = False - for i in self.runner.GetResults(): + for i in self.runner.get_results(): # We know for sure this event will always be noisy. Could check for the others. if i["type"] in noisy: found = True self.assertTrue(found) # we expect this event to appear all over the place. - if __name__ == '__main__': unittest.main() - - -