Python distutils.version 模块,LooseVersion() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用distutils.version.LooseVersion()。
def searchExploit(exploit_list, soft_name, soft_version):
""" Search affected packages in exploit_list """
result = []
version_search = versionVartions(soft_version, args.level)
for exploit in exploit_list:
if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS
if args.filter == None or args.filter.lower() in exploit[2].lower(): # Filter
if args.type == None or args.type == exploit[6]: # Type
query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+'))
if re.search(query, exploit[2],re.IGNORECASE):
affected_versions = extractVersions(exploit[2])
for affected_version in affected_versions:
if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version):
if args.duplicates == False: exploit_list.remove(exploit) # Duplicates
printOutput(exploit, soft_name, soft_version)
result.append([exploit, soft_name, soft_version])
break
return result
def __init__(self, *args, **kwargs):
timeout = kwargs.pop("timeout", 120) # set default timeout period to two minutes, 2x the default nginx timeout
super(CbResponseAPI, self).__init__(product_name="response", timeout=timeout, *args, **kwargs)
self._parsed_url = urllib.parse.urlparse(self.url)
try:
self.server_info = self.info()
except UnauthorizedError:
raise UnauthorizedError(uri=self.url, message="Invalid API token for server {0:s}.".format(self.url))
log.debug('Connected to Cb server version %s at %s' % (self.server_info['version'], self.session.server))
self.cb_server_version = LooseVersion(self.server_info['version'])
if self.cb_server_version < LooseVersion('5.0'):
raise ApiError("CbEnterpriseResponseAPI only supports Cb servers version >= 5.0.0")
self._lr_scheduler = None
def group_by(self, field_name):
"""Set the group-by field name for this query. Typically, you will want to set this to 'id' if you only want
one result per process.
This method is only available for Cb Response servers 6.0 and above. Calling this on a Query object connected
to a Cb Response 5.x server will simply result in a no-op.
:param str field_name: Field name to group the result set by.
:return: Query object
:rtype: :py:class:`ProcessQuery`
"""
if self._cb.cb_server_version >= LooseVersion('6.0.0'):
nq = self._clone()
nq._default_args["cb.group"] = field_name
return nq
else:
log.debug("group_by only supported in Cb Response 6.1+")
return self
def min_last_update(self, v):
"""Set the minimum last update time (relative to sensor) for this query. The timestamp can be expressed either
as a ``datetime`` like object or as an ISO 8601 string formatted timestamp such as 2017-04-29T04:21:18Z.
If a ``datetime`` like object is provided, it is assumed to be in GMT time zone.
This option will limit the number of Solr cores that need to be searched for events that match the query.
This method is only available for Cb Response servers 6.0 and above. Calling this on a Query object connected
to a Cb Response 5.x server will simply result in a no-op.
:param str v: Timestamp (either string or datetime object).
:return: Query object
:rtype: :py:class:`ProcessQuery`
"""
if self._cb.cb_server_version >= LooseVersion('6.0.0'):
nq = self._clone()
try:
v = v.strftime("%Y-%m-%dT%H:%M:%SZ")
except AttributeError:
v = str(v)
nq._default_args["cb.min_last_update"] = v
return nq
else:
log.debug("min_last_update only supported in Cb Response 6.1+")
return self
def min_last_server_update(self, v):
"""Set the minimum last update time (relative to server) for this query. The timestamp can be expressed either
as a ``datetime`` like object or as an ISO 8601 string formatted timestamp such as 2017-04-29T04:21:18Z.
If a ``datetime`` like object is provided, it is assumed to be in GMT time zone.
This option will limit the number of Solr cores that need to be searched for events that match the query.
This method is only available for Cb Response servers 6.0 and above. Calling this on a Query object connected
to a Cb Response 5.x server will simply result in a no-op.
:param str v: Timestamp (either string or datetime object).
:return: Query object
:rtype: :py:class:`ProcessQuery`
"""
if self._cb.cb_server_version >= LooseVersion('6.0.0'):
nq = self._clone()
try:
v = v.strftime("%Y-%m-%dT%H:%M:%SZ")
except AttributeError:
v = str(v)
nq._default_args["cb.min_last_server_update"] = v
return nq
else:
log.debug("min_last_server_update only supported in Cb Response 6.1+")
return self
def max_last_update(self, v):
"""Set the maximum last update time (relative to sensor) for this query. The timestamp can be expressed either
as a ``datetime`` like object or as an ISO 8601 string formatted timestamp such as 2017-04-29T04:21:18Z.
If a ``datetime`` like object is provided, it is assumed to be in GMT time zone.
This option will limit the number of Solr cores that need to be searched for events that match the query.
This method is only available for Cb Response servers 6.0 and above. Calling this on a Query object connected
to a Cb Response 5.x server will simply result in a no-op.
:param str v: Timestamp (either string or datetime object).
:return: Query object
:rtype: :py:class:`ProcessQuery`
"""
if self._cb.cb_server_version >= LooseVersion('6.0.0'):
nq = self._clone()
try:
v = v.strftime("%Y-%m-%dT%H:%M:%SZ")
except AttributeError:
v = str(v)
nq._default_args["cb.max_last_update"] = v
return nq
else:
log.debug("max_last_update only supported in Cb Response 6.1+")
return self
def max_last_server_update(self, v):
"""Set the maximum last update time (relative to server) for this query. The timestamp can be expressed either
as a ``datetime`` like object or as an ISO 8601 string formatted timestamp such as 2017-04-29T04:21:18Z.
If a ``datetime`` like object is provided, it is assumed to be in GMT time zone.
This option will limit the number of Solr cores that need to be searched for events that match the query.
This method is only available for Cb Response servers 6.0 and above. Calling this on a Query object connected
to a Cb Response 5.x server will simply result in a no-op.
:param str v: Timestamp (either string or datetime object).
:return: Query object
:rtype: :py:class:`ProcessQuery`
"""
if self._cb.cb_server_version >= LooseVersion('6.0.0'):
nq = self._clone()
try:
v = v.strftime("%Y-%m-%dT%H:%M:%SZ")
except AttributeError:
v = str(v)
nq._default_args["cb.max_last_server_update"] = v
return nq
else:
log.debug("max_last_server_update only supported in Cb Response 6.1+")
return self
def checkSettings(self, username):
if not self.customPath:
self.IGDataPath = os.path.join(
os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data'),
username,
''
)
if not os.path.isdir(self.IGDataPath): os.mkdir(self.IGDataPath, 0o777)
self.settings = Settings(
os.path.join(self.IGDataPath, 'settings-' + username + '.dat')
)
if self.settings.get('version') is None:
self.settings.set('version', Constants.VERSION)
if (self.settings.get('user_agent') is None) or (
LooseVersion(self.settings.get('version')) < LooseVersion(Constants.VERSION)):
userAgent = UserAgent(self)
ua = userAgent.buildUserAgent()
self.settings.set('version', Constants.VERSION)
self.settings.set('user_agent', ua)
def test_cmp(self):
versions = (('1.5.1', '1.5.2b2', -1),
('161', '3.10a', 1),
('8.02', '8.02', 0),
('3.4j', '1996.07.12', -1),
('3.2.pl0', '3.1.1.6', 1),
('2g6', '11g', -1),
('0.960923', '2.2beta29', -1),
('1.13++', '5.5.kw', -1))
for v1, v2, wanted in versions:
res = LooseVersion(v1).__cmp__(LooseVersion(v2))
self.assertEqual(res, wanted,
'cmp(%s, %s) should be %s, got %s' %
(v1, v2, wanted, res))
def checkForUpdate(self):
try:
console.show_activity('Checking for update...')
response = requests.get(self.latestReleaseUrl)
data = json.loads(response.text)
rel = release(data)
console.hide_activity()
if rel.prerelease == False:
if LooseVersion(self.currentVersion) < LooseVersion(rel.tag_name.replace('v','')):
ret = console.alert('Update available', rel.tag_name + ' is available, would you like to install it. \n\n Details: ' + rel.body.replace('\r\n','\n'), hide_cancel_button=True, button1 = 'No', button2 = 'Yes')
if ret == 2:
self.install(rel)
else:
console.alert('No update available', 'v' + self.currentVersion + ' is the current version.', hide_cancel_button=True, button1 = 'Ok')
except requests.exceptions.ConnectionError as e:
console.alert('Check your internet connection', 'Unable to check for update.', hide_cancel_button=True, button1 = 'Ok')
def get_version(self,*args,**kwds):
version = FCompiler.get_version(self,*args,**kwds)
if version is None and sys.platform.startswith('aix'):
# use lslpp to find out xlf version
lslpp = find_executable('lslpp')
xlf = find_executable('xlf')
if os.path.exists(xlf) and os.path.exists(lslpp):
s, o = exec_command(lslpp + ' -Lc xlfcmp')
m = re.search('xlfcmp:(?P<version>\d+([.]\d+)+)', o)
if m: version = m.group('version')
xlf_dir = '/etc/opt/ibmcmp/xlf'
if version is None and os.path.isdir(xlf_dir):
# linux:
# If the output of xlf does not contain version info
# (that's the case with xlf 8.1, for instance) then
# let's try another method:
l = sorted(os.listdir(xlf_dir))
l.reverse()
l = [d for d in l if os.path.isfile(os.path.join(xlf_dir, d, 'xlf.cfg'))]
if l:
from distutils.version import LooseVersion
self.version = version = LooseVersion(l[0])
return version
def add_arguments(self, parser):
parser.add_argument('--worker-class', action='store', dest='worker_class',
default='rq.Worker', help='RQ Worker class to use')
parser.add_argument('--pid', action='store', dest='pid',
default=None, help='PID file to write the worker`s pid into')
parser.add_argument('--burst', action='store_true', dest='burst',
default=False, help='Run worker in burst mode')
parser.add_argument('--name', action='store', dest='name',
default=None, help='Name of the worker')
parser.add_argument('--queue-class', action='store', dest='queue_class',
default='django_rq.queues.DjangoRQ', help='Queues class to use')
parser.add_argument('--worker-ttl', action='store', type=int,
dest='worker_ttl', default=420,
help='Default worker timeout to be used')
if LooseVersion(get_version()) >= LooseVersion('1.10'):
parser.add_argument('args', nargs='*', type=str,
help='The queues to work on, separated by space')
def run_suricata():
global IDS_BINARY, IDS_CONFIG_FILE, IDS_LOG_DIRECTORY, JOB_ALERT_LOG, PCAP_FILES, JOB_IDS_LOG
print_debug("run_suricata() called")
if not IDS_BINARY:
print_error("No Suricata binary found on system.")
print_msg("Running pcap(s) thru Suricata")
# some Suri versions don't support all modern options like '-k' so try to deal with that here
add_options = ""
try:
if LooseVersion(SENSOR_VERSION) > LooseVersion(2.0):
# not sure if the '-k' option was added is Suri 2.0 or some other time but setting it to this for now
add_options = "-k none"
except Exception, e:
add_options = ""
suricata_command = "%s -c %s -l %s %s -r %s" % (IDS_BINARY, IDS_CONFIG_FILE, IDS_LOG_DIRECTORY, add_options, PCAP_FILES[0])
print_debug("Running suricata with the following command:\n%s" % suricata_command)
suri_output_fh = open(JOB_IDS_LOG, "wb")
subprocess.call(suricata_command, shell = True, stderr=subprocess.STDOUT, stdout=suri_output_fh)
suri_output_fh.close()
# generate fast pattern info; this requires a separate Suricata run
# with the '--engine-analysis' flag set
def by_version(self, visible=False, *args, **kwargs):
qs = self.get_queryset().filter(*args, **kwargs)
if visible:
qs = qs.filter(hidden=False)
def generate_valid_versions(qs):
for item in qs:
v = versioner(item.number)
comparable = True
for elem in v.version:
if isinstance(elem, str):
comparable = False
if comparable:
yield item
return sorted(list(generate_valid_versions(qs)), key=lambda v: versioner(v.number))
def check_update():
try:
update_rule = config.get(["update", "check_update"], "stable")
if update_rule == "dont-check":
return
check_push_update()
if update_rule != "stable" and update_rule != "test":
return
versions = update_from_github.get_github_versions()
current_version = update_from_github.current_version()
if update_rule == "test":
if LooseVersion(current_version) < LooseVersion(versions[0][1]):
xlog.info("update to test version %s", versions[0][1])
update_from_github.update_version(versions[0][1])
elif update_rule == "stable":
if LooseVersion(current_version) < LooseVersion(versions[1][1]):
xlog.info("update to stable version %s", versions[1][1])
update_from_github.update_version(versions[1][1])
except IOError as e:
xlog.warn("check update fail:%r", e)
except Exception as e:
xlog.exception("check_update fail:%r", e)
def check_update():
try:
update_rule = config.get(["update", "check_update"], "stable")
if update_rule == "dont-check":
return
check_push_update()
if update_rule != "stable" and update_rule != "test":
return
versions = update_from_github.get_github_versions()
current_version = update_from_github.current_version()
if update_rule == "test":
if LooseVersion(current_version) < LooseVersion(versions[0][1]):
xlog.info("update to test version %s", versions[0][1])
update_from_github.update_version(versions[0][1])
elif update_rule == "stable":
if LooseVersion(current_version) < LooseVersion(versions[1][1]):
xlog.info("update to stable version %s", versions[1][1])
update_from_github.update_version(versions[1][1])
except IOError as e:
xlog.warn("check update fail:%r", e)
except Exception as e:
xlog.exception("check_update fail:%r", e)
def run(self):
""" Redifine the run method.
"""
# Check cmake is installed and is sufficiently new.
try:
out = subprocess.check_output(["cmake", "--version"])
except OSError:
raise RuntimeError(
"CMake must be installed to build the following extensions: " +
", ".join(e.name for e in self.extensions))
cmake_version = LooseVersion(re.search(r"version\s*([\d.]+)",
out.decode()).group(1))
if cmake_version < "2.8.0":
raise RuntimeError("CMake >= 2.8.0 is required.")
# Build extensions
for ext in self.extensions:
self.build_extension(ext)
def _get_tag_func(value):
if callable(value):
return value
elif isinstance(value, str):
vm = VERSION_REGEX.match(value)
if vm:
version_comparison = vm.group(1)
version = LooseVersion(vm.group(2))
if version_comparison == '==':
return version.__eq__
if version_comparison == '>':
return version.__lt__
elif version_comparison == '<':
return version.__gt__
elif version_comparison == '>=':
return version.__le__
elif version_comparison == '<=':
return version.__ge__
# Should be excluded by regular expression.
raise ValueError("Undefined version comparison.", version_comparison)
elif isinstance(value, REGEX_TYPE):
return value.match
# Fall back to equality comparison.
return value.__eq__
def __init__(self, module):
"""
Construct module
"""
self.clc = clc_sdk
self.module = module
self.firewall_dict = {}
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(
requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.module = module
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(
requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.module = module
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(
requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.clc = clc_sdk
self.module = module
self.lb_dict = {}
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(
requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.clc = clc_sdk
self.module = module
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(
requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.clc = clc_sdk
self.module = module
self.group_dict = {}
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def __init__(self, module):
"""
Construct module
"""
self.module = module
self.policy_dict = {}
if not CLC_FOUND:
self.module.fail_json(
msg='clc-python-sdk required for this module')
if not REQUESTS_FOUND:
self.module.fail_json(
msg='requests library is required for this module')
if requests.__version__ and LooseVersion(requests.__version__) < LooseVersion('2.5.0'):
self.module.fail_json(
msg='requests library version should be >= 2.5.0')
self._set_user_agent(self.clc)
def has_juju_version(minimum_version):
"""Return True if the Juju version is at least the provided version"""
return LooseVersion(juju_version()) >= LooseVersion(minimum_version)
def parse_version(version):
"""Use parse_version from pkg_resources or distutils as available."""
global parse_version
try:
from pkg_resources import parse_version
except ImportError:
from distutils.version import LooseVersion as parse_version
return parse_version(version)
def _sort_key(self):
return (self.parsed_filename.group('name'),
parse_version(self.parsed_filename.group('ver')),
tuple(-x for x in self.rank),
self.filename)
def __lt__(self, other):
if self.context != other.context:
raise TypeError("{0}.context != {1}.context".format(self, other))
return self._sort_key < other._sort_key
# XXX prune
sn = self.parsed_filename.group('name')
on = other.parsed_filename.group('name')
if sn != on:
return sn < on
sv = parse_version(self.parsed_filename.group('ver'))
ov = parse_version(other.parsed_filename.group('ver'))
if sv != ov:
return sv < ov
# Compatibility
if self.context != other.context:
raise TypeError("{0}.context != {1}.context".format(self, other))
sc = self.rank
oc = other.rank
if sc != None and oc != None and sc != oc:
# Smaller compatibility ranks are "better" than larger ones,
# so we have to reverse the sense of the comparison here!
return sc > oc
elif sc == None and oc != None:
return False
return self.filename < other.filename
def version_compare(v1, v2, op=None):
"""
:param v1: Version to compare
:param v2: Recommended version
:param op: Compare operation
:return:
"""
_map = {
'<': [-1],
'lt': [-1],
'<=': [-1, 0],
'le': [-1, 0],
'>': [1],
'gt': [1],
'>=': [1, 0],
'ge': [1, 0],
'==': [0],
'eq': [0],
'!=': [-1, 1],
'ne': [-1, 1],
'<>': [-1, 1]
}
if v1.isspace() or not v1:
return False
v1 = LooseVersion(v1)
v2 = LooseVersion(v2)
result = cmp(v1, v2)
if op:
assert op in _map.keys()
return result in _map[op]
return result
def parse_version(version):
"""Use parse_version from pkg_resources or distutils as available."""
global parse_version
try:
from pkg_resources import parse_version
except ImportError:
from distutils.version import LooseVersion as parse_version
return parse_version(version)
def _sort_key(self):
return (self.parsed_filename.group('name'),
parse_version(self.parsed_filename.group('ver')),
tuple(-x for x in self.rank),
self.filename)
def __lt__(self, other):
if self.context != other.context:
raise TypeError("{0}.context != {1}.context".format(self, other))
return self._sort_key < other._sort_key
# XXX prune
sn = self.parsed_filename.group('name')
on = other.parsed_filename.group('name')
if sn != on:
return sn < on
sv = parse_version(self.parsed_filename.group('ver'))
ov = parse_version(other.parsed_filename.group('ver'))
if sv != ov:
return sv < ov
# Compatibility
if self.context != other.context:
raise TypeError("{0}.context != {1}.context".format(self, other))
sc = self.rank
oc = other.rank
if sc is not None and oc is not None and sc != oc:
# Smaller compatibility ranks are "better" than larger ones,
# so we have to reverse the sense of the comparison here!
return sc > oc
elif sc is None and oc is not None:
return False
return self.filename < other.filename
def check_cli_version():
"""Check if the current cli version satisfies the server requirements"""
try:
server_version = PolyaxonClients().version.get_cli_version()
except (PolyaxonHTTPError, PolyaxonShouldExitError) as e:
Printer.print_error('Could not get cli version.')
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
current_version = pkg_resources.get_distribution(PROJECT_CLI_NAME).version
if LooseVersion(current_version) < LooseVersion(server_version.min_version):
click.echo("""Your version of CLI ({}) is no longer compatible with server.""".format(
current_version))
if click.confirm("Do you want to upgrade to "
"version {} now?".format(server_version.latest_version)):
pip_upgrade()
sys.exit(0)
else:
clint.textui.puts("Your can manually run:")
with clint.textui.indent(4):
clint.textui.puts("pip install -U polyaxon-cli")
clint.textui.puts(
"to upgrade to the latest version `{}`".format(server_version.latest_version))
sys.exit(0)
elif LooseVersion(current_version) < LooseVersion(server_version.latest_version):
clint.textui.puts("New version of CLI ({}) is now available. To upgrade run:".format(
server_version.latest_version
))
with clint.textui.indent(4):
clint.textui.puts("pip install -U polyaxon-cli")
def __init__(self, vstring=None):
if vstring is None:
# treat None like an empty string
self.parse('')
if vstring is not None:
if isinstance(vstring, unicode):
# unicode string! Why? Oh well...
# convert to string so version.LooseVersion doesn't choke
vstring = vstring.encode('UTF-8')
self.parse(str(vstring))
def has_juju_version(minimum_version):
"""Return True if the Juju version is at least the provided version"""
return LooseVersion(juju_version()) >= LooseVersion(minimum_version)
def has_juju_version(minimum_version):
"""Return True if the Juju version is at least the provided version"""
return LooseVersion(juju_version()) >= LooseVersion(minimum_version)
def main():
parser = build_cli_parser()
commands = parser.add_subparsers(help="Storage Partition commands", dest="command_name")
list_command = commands.add_parser("list", help="List all storage partitions")
create_command = commands.add_parser("create", help="Create new active writer partition")
del_command = commands.add_parser("delete", help="Delete partition")
del_command.add_argument("-N", "--name", help="Name of partition to delete.", required=True)
mount_command = commands.add_parser("mount", help="Mount partition")
mount_command.add_argument("-N", "--name", help="Name of partition to mount.", required=True)
unmount_command = commands.add_parser("unmount", help="Unmount partition")
unmount_command.add_argument("-N", "--name", help="Name of partition to unmount.", required=True)
args = parser.parse_args()
cb = get_cb_response_object(args)
if cb.cb_server_version < LooseVersion("6.1.0"):
parser.error("This script can only work with server versions >= 6.1.0; {0} is running {1}"
.format(cb.url, cb.cb_server_version))
return 1
if args.command_name == "list":
return list_partitions(cb, parser, args)
elif args.command_name == "create":
return create_partition(cb, parser, args)
elif args.command_name == "delete":
return delete_partition(cb, parser, args)
elif args.command_name == "mount":
return mount_partition(cb, parser, args)
elif args.command_name == "unmount":
return unmount_partition(cb, parser, args)
def __init__(self, doc_class, cb, query=None, raw_query=None):
super(Query, self).__init__(doc_class, cb, query=query)
if raw_query:
self._raw_query = urllib.parse.parse_qs(raw_query)
else:
self._raw_query = None
self._sort_by = getattr(self._doc_class, 'default_sort', None)
self._default_args = {"cb.urlver": 1}
# FIX: Cb Response server version 5.1.0-3 throws an exception after returning HTTP 504 when facet=false in the
# HTTP request. Work around this by only setting facet=false on 5.1.1 and above server versions.
if self._cb.cb_server_version >= LooseVersion('5.1.1'):
self._default_args["facet"] = "false"
def _query_implementation(cls, cb):
# ** Disable the paginated query implementation for now **
# if cb.cb_server_version >= LooseVersion("5.2.0"):
# return SensorPaginatedQuery(cls, cb)
# else:
# return SensorQuery(cls, cb)
return SensorQuery(cls, cb)
def _update_object(self):
if self._cb.cb_server_version < LooseVersion("6.1.0"):
# only include IDs of the teams and not the entire dictionary
if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
new_object_info = deepcopy(self._info)
try:
del(new_object_info["id"])
except KeyError:
pass
new_teams = [t.get("id") for t in new_object_info["teams"]]
new_teams = [t for t in new_teams if t]
new_object_info["teams"] = new_teams
try:
if not self._new_object_needs_primary_key:
del (new_object_info[self.__class__.primary_key])
except Exception:
pass
log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
data=new_object_info)
else:
log.debug(
"Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
ret = self._cb.api_json_request(self.__class__._change_object_http_method,
self._build_api_request_uri(), data=self._info)
return self._refresh_if_needed(ret)
else:
return super(User, self)._update_object()
def get_segments(self):
if not self._segments:
if self._cb.cb_server_version < LooseVersion('6.0.0'):
log.debug("using process_id search for cb response server < 6.0")
segment_query = Query(Process, self._cb, query="process_id:{0}".format(self.id)).sort("")
proclist = sorted([res["segment_id"] for res in segment_query._search()])
else:
log.debug("using segment API route for cb response server >= 6.0")
res = self._cb.get_object("/api/v1/process/{0}/segment".format(self.id))\
.get("process", {}).get("segments", {})
proclist = [self.parse_guid(x["unique_id"])[1] for x in res]
self._segments = proclist
return self._segments
def __init__(self, *args, **kwargs):
super(CbProtectionAPI, self).__init__(product_name="protection", *args, **kwargs)
self._server_info = None
try:
self._populate_server_info()
except UnauthorizedError:
raise UnauthorizedError(uri=self.url, message="Invalid API token for server {0:s}.".format(self.url))
log.debug('Connected to Cb server version %s at %s' % (self._server_info['ParityServerVersion'], self.session.server))
self.cb_server_version = LooseVersion(self._server_info['ParityServerVersion'])
def _minimum_server_version(cls):
return LooseVersion("8.0")
def _minimum_server_version(cls):
return LooseVersion("8.0")
def _minimum_server_version(cls):
return LooseVersion("8.0")
def _minimum_server_version(cls):
return LooseVersion("8.0")
def _minimum_server_version(cls):
return LooseVersion("8.0")
def _minimum_server_version(cls):
return LooseVersion("8.0")