Python magic 模块,from_file() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.from_file()。
def _load_index(self):
index_path = self.index_path()
if not os.path.exists(index_path):
return {}
content_type = magic.from_file(index_path, mime=True)
if content_type == 'text/plain':
logger.debug('Detected plaintext encoding for reading index')
method = open
elif content_type in ('application/gzip', 'application/x-gzip'):
logger.debug('Detected gzip encoding for reading index')
method = gzip.open
else:
raise ValueError('Index is of unknown type', content_type)
with method(index_path, 'rt') as fp:
data = json.load(fp)
return data
def get_plaintext_document_body(fpath, keep_layout=False):
"""Given a file-path to a full-text, return a list of unicode strings
whereby each string is a line of the fulltext.
In the case of a plain-text document, this simply means reading the
contents in from the file. In the case of a PDF however,
this means converting the document to plaintext.
It raises UnknownDocumentTypeError if the document is not a PDF or
plain text.
@param fpath: (string) - the path to the fulltext file
@return: (list) of strings - each string being a line in the document.
"""
textbody = []
mime_type = magic.from_file(fpath, mime=True)
if mime_type == "text/plain":
with open(fpath, "r") as f:
textbody = [line.decode("utf-8") for line in f.readlines()]
elif mime_type == "application/pdf":
textbody = convert_PDF_to_plaintext(fpath, keep_layout)
else:
raise UnknownDocumentTypeError(mime_type)
return textbody
def __init__(self, filename):
"""
Creates a file object for a malware sample.
:param filename: The file name of the available malware sample.
"""
if not os.path.exists(filename):
raise ValueError("File {0} does not exist!".format(filename))
# Default settings of members
self.running_entropy_data = None
self.running_entropy_window_size = 0
self.file_size = 0
self.parsedfile = None
# Fill out other data here...
self.filename = filename
self.data = list()
self.filetype = magic.from_file(self.filename)
self._read_file()
self._parse_file_type()
def get_type(self):
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.file(self.path)
except:
try:
file_type = magic.from_file(self.path)
except:
try:
import subprocess
file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE)
file_type = file_process.stdout.read().strip()
except:
return ''
finally:
try:
ms.close()
except:
pass
return file_type
def file_parser(fname, pages=None):
if magic.from_file(fname, mime=True) == 'application/pdf':
try:
text_array = []
d = pdf.Document(fname)
for i, p in enumerate(d, start=1):
for f in p:
for b in f:
for l in b:
text_array.append(l.text.encode('UTF-8'))
if i == pages: # break after x pages
break
print "Processed %i pages" % (i)
return '\n'.join(text_array)
except Exception as e:
print "PDF Parser Exception: ", e
else:
try:
content = parser.from_file(fname)['content']
return (content or '').encode('UTF-8')
except Exception as e:
print "File Parser Exception: ", e
def save_file(self, msg, msg_type):
path = os.path.join("storage", self.channel_id)
if not os.path.exists(path):
os.makedirs(path)
filename = "%s_%s_%s" % (msg_type, msg['NewMsgId'], int(time.time()))
fullpath = os.path.join(path, filename)
msg['Text'](fullpath)
mime = magic.from_file(fullpath, mime=True)
if isinstance(mime, bytes):
mime = mime.decode()
guess_ext = mimetypes.guess_extension(mime) or ".unknown"
if guess_ext == ".unknown":
self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
ext = ".jpeg" if mime == "image/jpeg" else guess_ext
os.rename(fullpath, "%s%s" % (fullpath, ext))
fullpath = "%s%s" % (fullpath, ext)
self.logger.info("File saved from WeChat\nFull path: %s\nMIME: %s", fullpath, mime)
return fullpath, mime
def file_magic(in_file):
print "\n\t\tFile Type :", magic.from_file(in_file)
def do_sample_type_detect(datafile):
"""
Checks the datafile type's.
"""
mtype = magic.from_file(datafile, mime=True)
stype = magic.from_file(datafile)
return (mtype, stype)
def _process_cache(self, split="\n", rstrip=True):
try:
ftype = magic.from_file(self.cache, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(self.cache)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'):
from csirtg_smrt.decoders.zgzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
if ftype == "application/zip":
from csirtg_smrt.decoders.zzip import get_lines
for l in get_lines(self.cache, split=split):
yield l
return
# all others, mostly txt, etc...
with open(self.cache) as f:
for l in f:
yield l
def get_mimetype(f):
try:
ftype = magic.from_file(f, mime=True)
except AttributeError:
try:
mag = magic.open(magic.MAGIC_MIME)
mag.load()
ftype = mag.file(f)
except AttributeError as e:
raise RuntimeError('unable to detect cached file type')
if PYVERSION < 3:
ftype = ftype.decode('utf-8')
return ftype
def preprocess(sample):
"""Preprocess files after upload.
:param sample: :class:`~app.models.Sample`
:return:
"""
hash_path = os.path.join(
current_app.config['APP_UPLOADS_SAMPLES'],
sample.sha256
)
if zipfile.is_zipfile(hash_path):
mt = magic.from_file(hash_path, mime=True)
if mt in skip_mimes:
return None
current_app.log.debug('Extracting {}'.format(hash_path))
zfile = zipfile.ZipFile(hash_path)
for zipfo in zfile.namelist():
cfg = current_app.config
if zfile.getinfo(zipfo).compress_type == 99: # PK compat. v5.1
pwd = '-p{}'.format(cfg['INFECTED_PASSWD'])
with popen('7z', 'e', '-so', pwd, hash_path) as zproc:
buf, stderr = zproc.communicate()
else:
buf = zfile.read(zipfo,
pwd=bytes(cfg['INFECTED_PASSWD'], 'utf-8'))
digests = get_hashes(buf)
hash_path = os.path.join(cfg['APP_UPLOADS_SAMPLES'],
digests.sha256)
if not os.path.isfile(hash_path):
with open(hash_path, 'wb') as wf:
wf.write(buf)
s = Sample(user_id=sample.user_id, filename=zipfo,
parent_id=sample.id,
md5=digests.md5, sha1=digests.sha1,
sha256=digests.sha256, sha512=digests.sha512,
ctph=digests.ctph)
db.session.add(s)
db.session.commit()
def _check(self, file):
"""
Run apropriate check based on `file`'s extension and return it,
otherwise raise an Error
"""
if not os.path.exists(file):
raise Error("file \"{}\" not found".format(file))
_, extension = os.path.splitext(file)
try:
check = self.extension_map[extension[1:]]
except KeyError:
magic_type = magic.from_file(file)
for name, cls in self.magic_map.items():
if name in magic_type:
check = cls
break
else:
raise Error("unknown file type \"{}\", skipping...".format(file))
try:
with open(file) as f:
code = f.read()
except UnicodeDecodeError:
raise Error("file does not seem to contain text, skipping...")
# Ensure we don't warn about adding trailing newline
try:
if code[-1] != '\n':
code += '\n'
except IndexError:
pass
return check(code)
def handle(cls, user, club, file):
filename = os.urandom(8).encode('hex')
temppath = os.path.join('/tmp', filename)
file.save(temppath)
try:
# Don't use mimetypes.guess_type(temppath) -- Faked extensions
mime = magic.from_file(temppath, mime=True)
if mime not in cls._mimedict:
raise UploadNotSupported
filename = filename + cls._mimedict[mime]
permpath = cls.mk_internal_path(filename)
permdir = os.path.dirname(permpath)
if not os.path.isdir(permdir):
os.makedirs(permdir, 0o755)
# resize to 600, 450
cls._thumb(temppath, permpath)
fs.watch(permpath)
finally:
os.remove(temppath)
obj = cls.new()
obj.club = club
obj.uploader = user
obj._location = filename
obj.mime = mime
return obj.create()
def check(filepath):
result = magic.from_file(filepath, mime=True)
if re.match('application/pdf', result):
return True
return False
def get_magic(filename):
if g_m:
return g_m.file(filename)
else:
return magic.from_file(filename)
def guess_mime_type_from_file_contents(file_path):
""" Get type from file magic bytes. """
mt = magic.from_file(file_path, mime=True)
if mt:
return mt
def _compute_default_properties(self):
self['names'] = [os.path.basename(self['filepath'])]
self['detailed_type'] = magic.from_file(self['filepath'])
self['mime'] = magic.from_file(self['filepath'], mime=True)
self['analysis'] = []
# Init antivirus status
self['antivirus'] = {}
for module in dispatcher.get_antivirus_modules():
self['antivirus'][module.name] = False
self._set_type()
# Convert mime/types into clearer type
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash, symlink):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash, symlink=symlink)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def create_by_old_paste(cls, filehash):
filepath = get_file_path(filehash)
mimetype = magic.from_file(filepath, mime=True)
filestat = os.stat(filepath)
size = filestat.st_size
rst = cls(filehash, mimetype, size, filehash=filehash)
return rst
def load_pickle(pickle_path, dataset_path):
if not os.path.exists(pickle_path):
import magic
image_files = []
for dir, _, _, in os.walk(dataset_path):
filenames = glob.glob( os.path.join(dir, '*.JPEG')) # may be JPEG, depending on your image files
image_files.append(filenames)
## use magic to perform a simple check of the images
# import magic
# for filename in filenames:
# if magic.from_file(filename, mime=True) == 'image/jpeg':
# image_files.append(filename)
# else:
# print '%s is not a jpeg!' % filename
# print magic.from_file(filename)
if len(image_files) > 0:
image_files = np.hstack(image_files)
dataset_filenames = {'image_path':image_files}
pickle.dump( dataset_filenames, open( pickle_path, "wb" ) )
else:
dataset_filenames = pickle.load( open( pickle_path, "rb" ) )
return dataset_filenames
# return a pd object
def get_executables(files):
"""
Filters the only executable files from a files array
"""
exec_files = []
for file in files:
if "executable" in magic.from_file(file):
exec_files.append(file)
return exec_files
def _get_and_cache(file_path, supported_formats):
mime_type = from_file(file_path, mime=True)
try:
fmt = supported_formats[mime_type]
MagicCharacterizerMixin._cache[file_path] = fmt
return fmt
except KeyError:
message = '{0} characterized as {1} format, which is not supported'
message = message.format(file_path, mime_type)
raise UnsupportedFormat(message, http_status_code=500)
def file_info(self, report):
info = []
with open(self.filename, 'rb') as f:
file = f.read()
if report == "output":
return ""
else:
info.append("File: {}".format(self.filename))
info.append("Size: {} bytes".format(os.path.getsize(self.filename)))
info.append("Type: {}".format(magic.from_file(self.filename, mime=True)))
info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
if ssdeep_r:
info.append("ssdeep: {}".format(self.get_ssdeep()))
return info
def file_info(filename):
info = []
with open(filename, 'rb') as f:
file = f.read()
info.append("File: {}".format(filename))
info.append("Size: {} bytes".format(os.path.getsize(filename)))
info.append("Type: {}".format(magic.from_file(filename, mime=True)))
info.append("MD5: {}".format(hashlib.md5(file).hexdigest()))
info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest()))
if ssdeep_r:
info.append("ssdeep: {}".format(ssdeep.hash_from_file(filename)))
return info
def post_file():
file_uuid = secure_filename(str(uuid.uuid4()))
filename = '/tmp/%s' % file_uuid
try:
file = request.files['file']
except Exception:
raise BadRequestException("Not a valid multipart upload form with "
"key named file.")
if 'Content-Range' in request.headers:
# Extract starting byte from Content-Range header string.
range_str = request.headers['Content-Range']
start_bytes = int(range_str.split(' ')[1].split('-')[0])
# Append chunk to the file on disk, or create new.
with open(filename, 'a') as f:
f.seek(start_bytes)
f.write(file.stream.read())
else:
# This is not a chunked request, so just save the whole file.
file.save(filename)
# Generate hash of file, and create new, or renew existing db row.
file_hashes = get_all_hashes(filename)
file_size = os.path.getsize(filename)
file_type = magic.from_file(filename, mime=True)
file = create_or_renew_by_hash(file_hashes, file_size, file_type)
file_id = file.file_id
file_dict = file.to_dict()
# Upload to swift and remove the local temp file.
upload_to_swift(filename, file_uuid)
os.remove(filename)
# Send message to worker queue with file details.
worker_msg = {"file_uuid": file_uuid, "file_id": file_id}
submit_worker_notification(worker_msg)
return jsonify(file_dict)
def maybe_gunzip(fname, base, ext):
if fname and 'gzip' in magic.from_file(fname):
start = time.time()
print("Gunzip file " + str(fname))
newf = safe_fname(base, ext)
sh("gunzip", fname, "-c >", newf)
fname = newf
print("Gunzip took %g seconds" % (time.time() - start))
return fname
def get_filetype(fpath):
"""Return a mime-style filetype string."""
return magic.from_file(fpath, mime=True)
def file_is(file_description, fmt):
"""Get if file stored in `file_path` is a `fmt` document.
:file_path: Full path for a `fmt` file or a buffer containing `fmt` data.
:returns: True if is `fmt` and False otherwise
"""
import magic
logger.debug("Checking filetype")
if isinstance(file_description, str):
# This means that the file_description is a string
result = re.match(
r".*%s.*" % fmt, magic.from_file(file_description, mime=True),
re.IGNORECASE
)
if result:
logger.debug(
"File %s appears to be of type %s" % (file_description, fmt)
)
elif isinstance(file_description, bytes):
# Suppose that file_description is a buffer
result = re.match(
r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True)
)
if result:
logger.debug(
"Buffer appears to be of type %s" % (fmt)
)
return True if result else False
def register_files(self):
print("Start registering files")
for root, dirs, files in os.walk(self.extracted_path):
for file in files:
full_path = os.path.join(root, file)
if not os.path.isfile(full_path):
continue
path = full_path.replace(self.extracted_path, "")
content = ""
hash = ""
with open(full_path, "rb") as fd:
content = fd.read()
hash_content = "%s:%s" % (file, content)
hash = hashlib.md5(hash_content.encode('utf-8')).hexdigest()
try:
file_obj = FileModel.objects.get(hash=hash)
file_obj.firmware.add(self.firmware)
file_obj.save()
except FileModel.DoesNotExist:
try:
file_obj = FileModel()
file_obj.filepath = os.path.join(root, file)
file_obj.hash = hash
file_obj.filesize = len(content)
file_obj.filename = path
file_obj.save()
file_obj.firmware.add(self.firmware)
file_obj.file_type = magic.from_file(os.path.join(root,
file))
file_obj.save()
self.find_loots(file_obj)
# Performance tweak
file_obj.nb_loots = file_obj.loots.all().count()
except:
file_obj.file_type = "unknown"
print("Files registered")
def parse_file_info(file_path, dir_path):
print("entering parse_file_info")
mime_type = magic.from_file(file_path, mime=True)
print(mime_type)
print(file_path)
if mime_type in file_mimetype_relation:
return file_mimetype_relation[mime_type](file_path, dir_path)
return None
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
magicObj = magic.open(magic.MAGIC_NONE)
magicObj.load()
magic_out = str(magicObj.file(full_targ_path))
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
return(magic_out)
def _get_file_type(full_targ_path):
# This function takes the full path of a target sample and determines/returns the file type via python-magic.
try:
#magicObj = magic.open(magic.MAGIC_NONE)
#magicObj.load()
#magic_out = str(magicObj.file(full_targ_path))
magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True)
magic_out = str(magicObj.from_file(full_targ_path))
print magic_out
except AttributeError:
magic_out = str(magic.from_file(full_targ_path))
print magic_out+" ERROR?!?!?!!?"
return(magic_out)
def get_type(self):
"""Get MIME file type.
@return: file type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_SYMLINK)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", "-L", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except:
pass
return file_type
def get_content_type(self):
"""Get MIME content file type (example: image/jpeg).
@return: file content type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_MIME|magic.MAGIC_SYMLINK)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path, mime=True)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", "-L", "--mime-type", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except:
pass
return file_type
def processDownload(tmpFilePath, fileName, fileUrl):
logging.info('Downloaded as temporary file: {0}. Beginning processing...'.format(tmpFilePath))
fileSize = os.path.getsize(tmpFilePath) >> 20
if (fileSize > 10):
logging.error('File is {0}MB. Too large to process.'.format(fileSize))
cleanUp(tmpFilePath)
return False
fileHash = sha256SumFile(tmpFilePath)
if not isAcceptedHash(fileHash):
cleanUp(tmpFilePath)
return False
filePath = os.path.join(baseConfig.outputFolder, fileHash)
os.rename(tmpFilePath, filePath)
# Trust only the content type of the downloaded file.
mimeType = magic.from_file(filePath, mime=True)
if mimeType not in ['application/octet-stream', 'application/x-dosexec', 'application/x-msdownload', 'application/x-ms-installer', 'application/pdf', 'application/x-pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.wordprocessingml.template', 'application/vnd.ms-word.document.macroEnabled', 'application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.spreadsheetml.template', 'application/vnd.ms-excel.sheet.macroEnabled', 'application/vnd.ms-excel.template.macroEnabled', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.binary.macroEnabled', 'application/x-shockwave-flash']:
logging.error('Detected non-binary or executable file type ({0}). Skipping: {1}'.format(mimeType, filePath))
cleanUp(filePath)
return False
logging.info('File with hash: {0} identified as type: {1}'.format(fileHash, mimeType))
uploaded = uploadToViper(filePath, fileName, fileHash, fileUrl)
addToHashCache(fileHash)
cleanUp(filePath)
return uploaded
def validate_elm_make(ctx, param, value):
if value is None:
return value
realpath = os.path.realpath(value)
if not os.path.isfile(realpath):
realpath = shutil.which(value)
if realpath is None or not os.path.isfile(realpath):
raise click.BadParameter('{} not found'.format(value))
elm_make_mimetype = magic.from_file(realpath, mime=True)
if not elm_make_mimetype.startswith('text'):
return value
perhaps_binwrap_of = os.path.normpath(
os.path.join(
os.path.dirname(realpath),
os.pardir,
'elm',
'Elm-Platform',
'*',
'.cabal-sandbox',
'bin',
'elm-make'))
raise click.BadParameter('''should be the real elm-make binary; this looks like a text file.
if you installed Elm through npm, then try {}'''.format(perhaps_binwrap_of))
def libmagic_file_type(self):
"""
Returns:
str: The libmagic-parsed file type.
"""
return magic.from_file(self.path, mime=True)
def get_type(self):
"""Get MIME file type.
@return: file type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path)
except Exception as e:
log.debug("Error getting magic from file %s: %s",
self.file_path, e)
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
p = subprocess.Popen(["file", "-b", self.file_path],
stdout=subprocess.PIPE)
file_type = p.stdout.read().strip()
except Exception as e:
log.debug("Error running file(1) on %s: %s",
self.file_path, e)
return file_type
def get_content_type(self):
"""Get MIME content file type (example: image/jpeg).
@return: file content type.
"""
file_type = None
if HAVE_MAGIC:
try:
ms = magic.open(magic.MAGIC_MIME)
ms.load()
file_type = ms.file(self.file_path)
except:
try:
file_type = magic.from_file(self.file_path, mime=True)
except:
pass
finally:
try:
ms.close()
except:
pass
if file_type is None:
try:
args = ["file", "-b", "--mime-type", self.file_path]
file_type = subprocess.check_output(args).strip()
except:
pass
return file_type
def guess_mimetype(path):
magic_mimetype = magic.from_file(str(path), mime=True)
if magic_mimetype == b"audio/x-m4a":
return "audio/mp4"
else:
return magic_mimetype.decode("utf-8")
def inspect(self, sample):
sample.info[self.NAME] = {"magic": magic.from_file(sample.path), "mime": magic.from_file(sample.path, mime = True)}
def get_mime(self):
try:
ms = magic.open(magic.MIME)
ms.load()
mime_type = ms.file(self.path)
except:
try:
mime = magic.Magic(mime=True)
mime_type = mime.from_file(self.path)
except:
return ''
return mime_type
def mime(self):
if hasattr(magic, "from_file"):
# Use https://pypi.python.org/pypi/python-magic
return magic.from_file(self.fetch('filename'), mime=True)
elif hasattr(magic, "open"):
# Use the python-magic library in distro repos from the `file`
# command - http://www.darwinsys.com/file/
magic_instance = magic.open(magic.MAGIC_MIME)
magic_instance.load()
return magic_instance.file(self.fetch('filename'))
raise ImportError(
'The `magic` module that was found is not the expected pypi '
'package python-magic (https://pypi.python.org/pypi/python-magic) '
'nor file\'s (http://www.darwinsys.com/file/) package.')
def create_pads_from_files(job_id, attachment, email, client_id, client_secret):
""" For each HTML file in zipped attachment, create a new pad, return the number of
created pads
"""
logging.info("Opening attached zip %s." % attachment)
m = re.search('^.+attachments/(.+)\.zip$', attachment)
directory = './data/' + m.group(1)
unzip_attachment(attachment, directory)
files = os.listdir(directory)
hackpad = Hackpad(api_scheme = os.getenv('HACKPAD_API_SCHEME') or 'http',
api_domain = os.getenv('HACKPAD_API_DOMAIN') or 'hackpad.dev',
sub_domain = os.getenv('HACKPAD_SUB_DOMAIN') or '',
consumer_key = client_id,
consumer_secret = client_secret)
pads_created = pads_skipped = 0
for file_name in files:
file_path = directory + '/' + file_name
# check if it is really an html file
file_type = magic.from_file(file_path, mime=True)
if file_type != 'text/html':
logging.info('Invalid file type for file %s :%s' % (file_path, file_type))
continue
fh = open(file_path)
logging.info('importing for %s: %s' % (email, file_name))
if insert_pad_from_file(job_id, hackpad, fh, file_name, client_id, client_secret):
pads_created += 1
else:
pads_skipped += 1
fh.close()
# Check if all files are imported
if pads_created + pads_skipped != len(files):
email_error("Not all files were processed", job_id)
return pads_created, pads_skipped
def attachFile(attachList, filename, pos=None, replace=False):
"""Check a path and add it to the attachment list
If pos is given and replace is False, insert attachment at given position.
If pos is given and replace is True, replace the attachment at the given position.
"""
if pos is not None:
if pos < 1 or pos > len(attachList):
print("Bad position. {} not between 1 and {}".format(pos, len(attachList)))
return
# Adjust from human position to index
pos -= 1
try:
st = os.stat(filename)
except OSError as err:
import errno
# Can't read it. Is it because it doesn't exist?
if err.errno == errno.ENOENT:
print("WARNING: Given file doesn't currently exist. Adding to list anyway. We'll try reading it again when completing the message")
else:
print("WARNING: Couldn't get information about the file: %s" % err.strerror)
print("Adding to list anyway. We'll try reading it again when completing the message.")
else:
if not os.access(filename, os.R_OK):
print("WARNING: Can't read existing file. Adding to list anyway. We'll try again when completing the message.")
else:
print("Attachment added to list. Raw size is currently %i bytes. Note: we'll actually read the data when completing the message" % st.st_size)
mtype = magic.from_file(filename, mime=True)
print("Mime type appears to be %s" % mtype)
if pos is None:
attachList.append(filename)
elif replace == False:
attachList.insert(pos, filename)
else:
attachList[pos] = filename
def _download_file(self, tg_msg, file_obj, msg_type):
"""
Download media file from telegram platform.
Args:
tg_msg: Telegram message instance
file_obj: File object
msg_type: Type of message
Returns:
tuple of str[2]: Full path of the file, MIME type
"""
path = os.path.join("storage", self.channel_id)
if not os.path.exists(path):
os.makedirs(path)
size = getattr(file_obj, "file_size", None)
file_id = file_obj.file_id
if size and size > telegram.constants.MAX_FILESIZE_DOWNLOAD:
raise EFBMessageError("Attachment is too large. Maximum 20 MB. (AT01)")
f = self.bot.bot.getFile(file_id)
fname = "%s_%s_%s_%s" % (msg_type, tg_msg.chat.id, tg_msg.message_id, int(time.time()))
fullpath = os.path.join(path, fname)
f.download(fullpath)
mime = getattr(file_obj, "mime_type", magic.from_file(fullpath, mime=True))
if type(mime) is bytes:
mime = mime.decode()
guess_ext = mimetypes.guess_extension(mime) or ".unknown"
if guess_ext == ".unknown":
self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime)
ext = ".jpeg" if mime == "image/jpeg" else guess_ext
os.rename(fullpath, "%s%s" % (fullpath, ext))
fullpath = "%s%s" % (fullpath, ext)
return fullpath, mime
def _produce_one_sample(self):
dirname = os.path.dirname(self.path)
if not check_dir(dirname):
raise ValueError("Invalid data path.")
with open(self.path, 'r') as fid:
flist = [l.strip() for l in fid.xreadlines()]
if self.shuffle:
random.shuffle(flist)
input_files = [os.path.join(dirname, 'input', f) for f in flist]
output_files = [os.path.join(dirname, 'output', f) for f in flist]
self.nsamples = len(input_files)
input_queue, output_queue = tf.train.slice_input_producer(
[input_files, output_files], shuffle=self.shuffle,
seed=0123, num_epochs=self.num_epochs)
if '16-bit' in magic.from_file(input_files[0]):
input_dtype = tf.uint16
input_wl = 65535.0
else:
input_wl = 255.0
input_dtype = tf.uint8
if '16-bit' in magic.from_file(output_files[0]):
output_dtype = tf.uint16
output_wl = 65535.0
else:
output_wl = 255.0
output_dtype = tf.uint8
input_file = tf.read_file(input_queue)
output_file = tf.read_file(output_queue)
if os.path.splitext(input_files[0])[-1] == '.jpg':
im_input = tf.image.decode_jpeg(input_file, channels=3)
else:
im_input = tf.image.decode_png(input_file, dtype=input_dtype, channels=3)
if os.path.splitext(output_files[0])[-1] == '.jpg':
im_output = tf.image.decode_jpeg(output_file, channels=3)
else:
im_output = tf.image.decode_png(output_file, dtype=output_dtype, channels=3)
# normalize input/output
sample = {}
with tf.name_scope('normalize_images'):
im_input = tf.to_float(im_input)/input_wl
im_output = tf.to_float(im_output)/output_wl
inout = tf.concat([im_input, im_output], 2)
fullres, inout = self._augment_data(inout, 6)
sample['lowres_input'] = inout[:, :, :3]
sample['lowres_output'] = inout[:, :, 3:]
sample['image_input'] = fullres[:, :, :3]
sample['image_output'] = fullres[:, :, 3:]
return sample