我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用time.mktime()。
def __compile_policy_value(self, str_value, value_dict): value = str_value if not '$' in value: return value self.__logger.debug("Compiling value %s", value) for key in value_dict: self.__logger.debug("Searching for key %s", key) val = value_dict[key] if id(type) and type(val) in (datetime, date): self.__logger.debug("Value in dictionary: %s -> %s", key, val) val = time.mktime(self.__get_localtime(val).timetuple()) self.__logger.debug("Timestamp converted: %s -> %s", key, val) value = value.replace(self.__get_compiled_key(key), str(val)) self.__logger.debug("Value after convertsion: %s", value) int_value = int(eval(value)) compiled_value = self.__get_localtime(int_value) self.__logger.debug("Compiled value: %s", compiled_value) return compiled_value
def kill_invalid_connection(): unfinished_logs = Log.objects.filter(is_finished=False) now = datetime.datetime.now() now_timestamp = int(time.mktime(now.timetuple())) for log in unfinished_logs: try: log_file_mtime = int(os.stat('%s.log' % log.log_path).st_mtime) except OSError: log_file_mtime = 0 if (now_timestamp - log_file_mtime) > 3600: if log.login_type == 'ssh': try: os.kill(int(log.pid), 9) except OSError: pass elif (now - log.start_time).days < 1: continue log.is_finished = True log.end_time = now log.save() logger.warn('kill log %s' % log.log_path)
def _totimestamp(dt_obj): """ Args: dt_obj (:class:`datetime.datetime`): Returns: int: """ if not dt_obj: return None try: # Python 3.3+ return int(dt_obj.timestamp()) except AttributeError: # Python 3 (< 3.3) and Python 2 return int(mktime(dt_obj.timetuple()))
def history_get(self,item_ID,date_from,date_till): ''' return history of item [eg1]#zabbix_api history_get 23296 "2016-08-01 00:00:00" "2016-09-01 00:00:00" [note]The date_till time must be within the historical data retention time ''' dateFormat = "%Y-%m-%d %H:%M:%S" try: startTime = time.strptime(date_from,dateFormat) endTime = time.strptime(date_till,dateFormat) except: err_msg("???? ['2016-05-01 00:00:00'] ['2016-06-01 00:00:00']") time_from = int(time.mktime(startTime)) time_till = int(time.mktime(endTime)) history_type=self.__item_search(item_ID) self.__history_get(history_type,item_ID,time_from,time_till)
def timestamp_normalized(self): """ we're expecting self.timestamp to be either a long, int, a datetime, or a timedelta :return: """ if not self.timestamp: return None if isinstance(self.timestamp, six.integer_types): return self.timestamp if isinstance(self.timestamp, timedelta): tmp = datetime.now() + self.timestamp else: tmp = self.timestamp return int(time.mktime(tmp.timetuple()) * 1e+6 + tmp.microsecond)
def next_time(time_string): try: parsed = list(time.strptime(time_string, "%H:%M")) except (TypeError, ValueError): return float(time_string) now = time.localtime() current = list(now) current[3:6] = parsed[3:6] current_time = time.time() delta = time.mktime(current) - current_time if delta <= 0.0: current[2] += 1 return time.mktime(current) - current_time return delta
def get_creation_date(user): client_id = 'jzkbprff40iqj646a697cyrvl0zt2m6' headers = { 'Client-ID' : client_id } # Loop ends when a value is returned while 1: # Uses try in case of request timeout try: r = requests.get('https://api.twitch.tv/kraken/users/{}'.format(user), headers = headers) except: time.sleep(1) continue if r.status_code == 200: # Captures only YYYY-MM-DD date = re.match( '([\d]{4}-[\d]{2}-[\d]{2})', json.loads(r.text)['created_at'] ) epoch = datetime.datetime.strptime("{}".format(date.group(1)) , "%Y-%m-%d") epoch = int(time.mktime(epoch.timetuple()) / 3600) # except: # print('Failed to get time') # return return epoch
def getDateSent(self): """Get the time of sending from the Date header Returns a time object using time.mktime. Not very reliable, because the Date header can be missing or spoofed (and often is, by spammers). Throws a MessageDateError if the Date header is missing or invalid. """ dh = self.getheader('Date') if dh == None: return None try: return time.mktime(rfc822.parsedate(dh)) except ValueError: raise MessageDateError("message has missing or bad Date") except TypeError: # gets thrown by mktime if parsedate returns None raise MessageDateError("message has missing or bad Date") except OverflowError: raise MessageDateError("message has missing or bad Date")
def db_value(self, value): if value is None: return if isinstance(value, datetime.datetime): pass elif isinstance(value, datetime.date): value = datetime.datetime(value.year, value.month, value.day) else: return int(round(value * self.resolution)) if self.utc: timestamp = calendar.timegm(value.utctimetuple()) else: timestamp = time.mktime(value.timetuple()) timestamp += (value.microsecond * .000001) if self.resolution > 1: timestamp *= self.resolution return int(round(timestamp))
def morsel_to_cookie(morsel): """Convert a Morsel object into a Cookie containing the one k/v pair.""" expires = None if morsel['max-age']: expires = time.time() + morsel['max-age'] elif morsel['expires']: time_template = '%a, %d-%b-%Y %H:%M:%S GMT' expires = time.mktime( time.strptime(morsel['expires'], time_template)) - time.timezone return create_cookie( comment=morsel['comment'], comment_url=bool(morsel['comment']), discard=False, domain=morsel['domain'], expires=expires, name=morsel.key, path=morsel['path'], port=None, rest={'HttpOnly': morsel['httponly']}, rfc2109=False, secure=bool(morsel['secure']), value=morsel.value, version=morsel['version'] or 0, )
def getYYYYMMDD(minLen=1, returnTimeStamp=False, returnDateTime=False, alternateValue=None): if Cmd.ArgumentsRemaining(): argstr = Cmd.Current().strip() if argstr: if alternateValue is not None and argstr.lower() == alternateValue.lower(): Cmd.Advance() return None if argstr[0] in [u'+', u'-']: argstr = getDeltaDate(argstr).strftime(YYYYMMDD_FORMAT) try: dateTime = datetime.datetime.strptime(argstr, YYYYMMDD_FORMAT) Cmd.Advance() if returnTimeStamp: return time.mktime(dateTime.timetuple())*1000 if returnDateTime: return dateTime return argstr except ValueError: invalidArgumentExit(YYYYMMDD_FORMAT_REQUIRED) elif minLen == 0: Cmd.Advance() return u'' missingArgumentExit(YYYYMMDD_FORMAT_REQUIRED)
def time_parse(self, s): try: epoch = int(s) return epoch except ValueError: pass try: epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d'))) return epoch except ValueError: pass try: epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d %H:%M:%S'))) return epoch except ValueError: pass raise ValueError('Invalid time: "%s"' % s)
def seconds_until(until=9): """ Counts the seconds until it is a certain hour again. Keyword Arguments: until (int): the hour we want to count to (default: {9}) Returns: (float): how many seconds until the specified time. """ now = time.localtime() now_sec = time.mktime(now) if now.tm_hour >= until: delta = (until * 60 * 60) \ + (60 * 60 * (24 - now.tm_hour)) \ - (60 * now.tm_min) \ - (now.tm_sec) else: delta = (until * 60 * 60) \ - (60 * 60 * now.tm_hour) \ - (60 * now.tm_min) \ - (now.tm_sec) then = time.localtime(now_sec + delta) return time.mktime(then) - time.time()
def test_crontab(): c = Crontab() c.add('boo') c.add('foo', 0) c.add('bar', [1, 3], -5, -1, -1, 0) assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'} assert c.actions(1, 1, 1, 1, 1) == {'boo'} assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'} assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'} ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple()) assert c.actions_ts(ts) == {'boo', 'bar'}
def _get_date_and_size(zip_stat): size = zip_stat.file_size # ymdhms+wday, yday, dst date_time = zip_stat.date_time + (0, 0, -1) # 1980 offset already done timestamp = time.mktime(date_time) return timestamp, size
def scroll(query, begin, until, prefix=None): diff = timedelta(minutes=4) while begin < until: to = min(begin + diff, until) res = DB.query(query % (pad(begin), pad(to))) for batch in res: for row in batch: # truncate longer ids to match with shorter host names if "container_id" in row: row["container_id"] = row["container_id"][0:11] time_col = row["time"][0:min(26, len(row["time"]) - 1)] if len(time_col) == 19: t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S") else: t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S.%f") if prefix is not None: for key in row.iterkeys(): if (key not in SKIP_PREFIX) and ((prefix + "|") not in key): row[APP_METRIC_DELIMITER.join((prefix, key))] = row.pop(key) yield (time.mktime(t), row) begin = to
def parse_retry_after(self, retry_after): # Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4 if re.match(r"^\s*[0-9]+\s*$", retry_after): seconds = int(retry_after) else: retry_date_tuple = email.utils.parsedate(retry_after) if retry_date_tuple is None: raise InvalidHeader("Invalid Retry-After header: %s" % retry_after) retry_date = time.mktime(retry_date_tuple) seconds = retry_date - time.time() if seconds < 0: seconds = 0 return seconds
def uptime(args): with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe: status = pipe.read().strip() if "No such image or container" in status: print "0" else: statusjs = json.loads(status) if statusjs["Running"]: uptime = statusjs["StartedAt"] start = time.strptime(uptime[:19], "%Y-%m-%dT%H:%M:%S") print int(time.time() - time.mktime(start)) else: print "0" # get the approximate disk usage # alt docker inspect -s -f {{.SizeRootFs}} 49219085bdaa # alt docker exec " + args.container + " du -s -b / 2> /dev/null
def convert(timeString): if timeString == 'now': return now() _sets = timeString.split(':') if len(_sets) != 7: return CF.UTCTime(0,0,0) _year, _month, _day, _blank, _hours, _minutes, _seconds = timeString.split(':') _full_seconds = float(_seconds) _time = time.mktime((int(_year),int(_month),int(_day),int(_hours),int(_minutes),int(_full_seconds),0,0,0))-time.timezone return CF.UTCTime(1, _time, _full_seconds - int(_full_seconds)) # Break out the whole seconds into a GMT time # Insert the arithmetic functions as operators on the PrecisionUTCTime class
def purge_expired_peers(): """ Removes peers who haven't announced in the last internval. Should be set as a recurring event source in your Zappa config. """ if DATASTORE == "DynamoDB": # This is a costly operation, but I think it has to be done. # Optimizations (pagination? queries? batching?) welcomed. all_torrents = table.scan() for torrent in all_torrents['Items']: for peer_id in torrent['peers']: peer_last_announce = int(torrent['peers'][peer_id][0]['last_announce']) window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL) window_unix = int(time.mktime(window.timetuple())) if peer_last_announce < window_unix: remove_peer_from_info_hash(torrent['info_hash'], peer_id) else: # There must be a better way to do this. # Also, it should probably be done as a recurring function and cache, # not dynamically every time. for key in s3_client.list_objects(Bucket=BUCKET_NAME)['Contents']: if 'peers.json' in key['Key']: remote_object = s3.Object(BUCKET_NAME, key['Key']).get() content = remote_object['Body'].read().decode('utf-8') torrent = json.loads(content) for peer_id in torrent['peers']: peer_last_announce = int(torrent['peers'][peer_id]['last_announce']) window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL) window_unix = int(time.mktime(window.timetuple())) if peer_last_announce < window_unix: remove_peer_from_info_hash(torrent['info_hash'], peer_id) return ## # Database ##
def expireat(self, name, when): """ Set an expire flag on key ``name``. ``when`` can be represented as an integer indicating unix time or a Python datetime object. """ if isinstance(when, datetime.datetime): when = int(mod_time.mktime(when.timetuple())) return self.execute_command('EXPIREAT', name, when)
def pexpireat(self, name, when): """ Set an expire flag on key ``name``. ``when`` can be represented as an integer representing unix time in milliseconds (unix time * 1000) or a Python datetime object. """ if isinstance(when, datetime.datetime): ms = int(when.microsecond / 1000) when = int(mod_time.mktime(when.timetuple())) * 1000 + ms return self.execute_command('PEXPIREAT', name, when)
def _handle_header_line(self, line): m = re.search("\\#(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+server\\s+id\\s+\\d+", line) datetime_str = "%s %s" % (m.group(1), m.group(2)) dt = datetime.datetime.strptime(datetime_str, '%y%m%d %H:%M:%S') new_header_timestamp = int(time.mktime(dt.timetuple())) self.header_timestamp = new_header_timestamp
def createTimeStamp(datestr, format="%Y-%m-%d %H:%M:%S"): return time.mktime(time.strptime(datestr, format))
def getdate(self, name): """Retrieve a date field from a header. Retrieves a date field from the named header, returning a tuple compatible with time.mktime(). """ try: data = self[name] except KeyError: return None return parsedate(data)
def getdate_tz(self, name): """Retrieve a date field from a header as a 10-tuple. The first 9 elements make up a tuple compatible with time.mktime(), and the 10th is the offset of the poster's time zone from GMT/UTC. """ try: data = self[name] except KeyError: return None return parsedate_tz(data) # Access as a dictionary (only finds *last* header of each type):
def mktime_tz(data): """Turn a 10-tuple as returned by parsedate_tz() into a UTC timestamp.""" if data[9] is None: # No zone info, so localtime is better assumption than GMT return time.mktime(data[:8] + (-1,)) else: t = time.mktime(data[:8] + (0,)) return t - data[9] - time.timezone
def cert_time_to_seconds(cert_time): """Takes a date-time string in standard ASN1_print form ("MON DAY 24HOUR:MINUTE:SEC YEAR TIMEZONE") and return a Python time value in seconds past the epoch.""" import time return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))
def __init__(self, allow_negative_scores=False, **kwargs): # negative scores introduced in CB 4.2 # negative scores indicate a measure of "goodness" versus "badness" self.allow_negative_scores = allow_negative_scores # these fields are required in every report self.required = ["iocs", "timestamp", "link", "title", "id", "score"] # these fields must be of type string self.typestring = ["link", "title", "id", "description"] # these fields must be of type int self.typeint = ["timestamp", "score"] # these fields are optional self.optional = ["tags", "description"] # valid IOC types are "md5", "ipv4", "dns", "query" self.valid_ioc_types = ["md5", "ipv4", "dns", "query"] # valid index_type options for "query" IOC self.valid_query_ioc_types = ["events", "modules"] if "timestamp" not in kwargs: kwargs["timestamp"] = int(time.mktime(time.gmtime())) self.data = kwargs
def date_to_timestamp(date): """ Args: date: Date as string with these formats: "Y-m-d", "Y-m-d H-M-S". Returns: """ # format input string if only date but no time is provided if len(date) == 10: date = "{} 00:00:00".format(date) return time.mktime(time.strptime(date, '%Y-%m-%d %H:%M:%S')) #
def _get_timestamp(self, date): return int(time.mktime(date.timetuple()))
def parse_header(help_file, header_line): """ Given the name of a help file and its first line, return back a HeaderData that provides the information from the header. The return value is None if the line is not a valid file header. """ if not _header_prefix_re.match(header_line): return None result = re.findall(_header_keypair_re, header_line) title = "No Title provided" date = 0.0 for match in result: if match[0] == "title": title = match[1] elif match[0] == "date": try: date = time.mktime(time.strptime(match[1], "%Y-%m-%d")) except: _log("Ignoring invalid file date '%s'", match[1]) date = 0.0 else: _log("Ignoring header key '%s' in '%s'", match[0], help_file) return HeaderData(help_file, title, date)
def format_rep_date(date_source): date_source = re.search(u'\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}', date_source).group(0) try: timestamp = time.mktime(time.strptime(date_source, '%Y-%m-%d %H:%M')) return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp)) except: return ''
def transfer_date(date_source): try: timestamp = time.mktime(time.strptime(date_source, '%Y-%m-%d %H:%M:%S')) return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp)) except: return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def format_date(date_source): try: timestamp = time.mktime(time.strptime(date_source, '%Y-%m-%d %H:%M:%S')) return time.strftime('%Y-%m-%d', time.localtime(timestamp)) except: return datetime.datetime.now().strftime("%Y-%m-%d")
def format_date_short(date_source): try: timestamp = time.mktime(time.strptime(date_source, '%Y-%m-%d')) return time.strftime('%Y-%m-%d', time.localtime(timestamp)) except: return datetime.datetime.now().strftime("%Y-%m-%d")
def compare_date_short(date1, date2): try: timestamp1 = time.mktime(time.strptime(date1, '%Y-%m-%d %H:%M:%S')) timestamp2 = time.mktime(time.strptime(date2, '%Y-%m-%d %H:%M:%S')) if timestamp1 > timestamp2: return date1 return date2 except: if date1 == '': return date2 return date1
def get_date_diff_seconds(date1, date2): timestamp1 = time.mktime(time.strptime(date1, '%Y-%m-%d %H:%M:%S')) timestamp2 = time.mktime(time.strptime(date2, '%Y-%m-%d %H:%M:%S')) return int(timestamp2 - timestamp1)
def prepare_time_range(time_from, time_to, relative_time_range, date_from=None, date_to=None): """ Prepare time range based on given options. Options are validated in advance. """ if relative_time_range: return {"time_range": relative_time_range} elif time_from and time_to: return {"from": int(time_from) * 1000, "to": int(time_to) * 1000} elif date_from and date_to: from_ts = int(time.mktime(time.strptime(date_from, "%Y-%m-%d %H:%M:%S"))) * 1000 to_ts = int(time.mktime(time.strptime(date_to, "%Y-%m-%d %H:%M:%S"))) * 1000 return {"from": from_ts, "to": to_ts}
def __updateHead(self): headT = self.font.get("head") if not headT: return general = self.config.get("General") style = self.config.get("Style") if general: version = general.get("version") createdTime = general.get("createdTime") modifiedTime = general.get("modifiedTime") if isinstance(version, float) or isinstance(version, int): headT.fontRevision = float(abs(version)) if isinstance(createdTime, datetime): headT.created = long(mktime(datetime.timetuple(createdTime)) - epoch_diff) if isinstance(modifiedTime, datetime): headT.modified = long(mktime(datetime.timetuple(modifiedTime)) - epoch_diff) self.font.recalcTimestamp = False if style: styleLink = style.get("styleLink") widthScale = style.get("widthScale") if styleLink in range(0, 5): # Clear related bits first headT.macStyle &= ~0b11 if styleLink == Constants.STYLELINK_BOLD: headT.macStyle |= 1 elif styleLink == Constants.STYLELINK_ITALIC: headT.macStyle |= 1<<1 elif styleLink == Constants.STYLELINK_BOLDITALIC: headT.macStyle |= 1 headT.macStyle |= 1<<1 else: pass if widthScale in range(1, 10): headT.macStyle &= ~(0b11<<5) if widthScale < 5: headT.macStyle |= 1<<5 elif widthScale > 5: headT.macStyle |= 1<<6 else: pass return
def _to_timestamp(self, date): return int(time.mktime(date.timetuple()))
def post(self): if pubsub_utils.SUBSCRIPTION_UNIQUE_TOKEN != self.request.get('token'): self.response.status = 404 return # Store the message in the datastore. message = json.loads(urllib.unquote(self.request.body).rstrip('=')) message_body = base64.b64decode(str(message['message']['data'])) message = message_body.split(',') d = datetime.strptime(message[0][:-5],'%Y-%m-%dT%H:%M:%S') timestamp = time.mktime(d.timetuple()) message = message[1:] entities = zip(message[::2],map(int,message[1::2])) data_raw = memcache.get(MC_OSCARS_TOP10) if data_raw: data = json.loads(memcache.get(MC_OSCARS_TOP10)) else: data = None if data is None or data['timestamp'] < timestamp: memcache.set(MC_OSCARS_TOP10,json.dumps({ 'timestamp': timestamp, 'entities': entities }))
def local_time(time_utc): u = int(time.mktime(time_utc.timetuple())) time_local = datetime.datetime.fromtimestamp(u, pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') return str(time_local)
def format_http_datetime(stamp): """ Formats datetime to a string following rfc1123 pattern. >>> now = datetime(2011, 9, 19, 10, 45, 30, 0, UTC) >>> format_http_datetime(now) 'Mon, 19 Sep 2011 10:45:30 GMT' if timezone is not set in datetime instance the ``stamp`` is assumed to be in UTC (``datetime.utcnow``). >>> now = datetime(2011, 9, 19, 10, 45, 30, 0) >>> format_http_datetime(now) 'Mon, 19 Sep 2011 10:45:30 GMT' >>> now = datetime.utcnow() >>> assert format_http_datetime(now) if ``stamp`` is a string just return it >>> format_http_datetime('x') 'x' >>> format_http_datetime(100) # doctest: +ELLIPSIS Traceback (most recent call last): ... TypeError: ... """ if isinstance(stamp, datetime): if stamp.tzinfo: stamp = stamp.astimezone(UTC).timetuple() else: stamp = localtime(mktime(stamp.timetuple())) elif isinstance(stamp, str): return stamp else: raise TypeError('Expecting type ``datetime.datetime``.') year, month, day, hh, mm, ss, wd, y, z = stamp return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( WEEKDAYS[wd], day, MONTHS[month], year, hh, mm, ss )
def QA_util_date_stamp(date): datestr = str(date)[0:10] date = time.mktime(time.strptime(datestr, '%Y-%m-%d')) return date