我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用django.utils.six.BytesIO()。
def _load_stream(self): """ Return the content body of the request, as a stream. """ meta = self._request.META try: content_length = int( meta.get('CONTENT_LENGTH', meta.get('HTTP_CONTENT_LENGTH', 0)) ) except (ValueError, TypeError): content_length = 0 if content_length == 0: self._stream = None elif not self._request._read_started: self._stream = self._request else: self._stream = six.BytesIO(self.body)
def get_ssr_qrcode(request, node_id): '''?????????ssr???''' # ?????? ss_user = request.user.ss_user user = request.user # ?????? node = Node.objects.get(node_id=node_id) # ?????????? if user.level < node.level: return HttpResponse('?????????????????????') ssr_link = node.get_ssr_link(ss_user) ssr_img = qrcode.make(ssr_link) buf = BytesIO() ssr_img.save(buf) image_stream = buf.getvalue() # ????reponse response = HttpResponse(image_stream, content_type="image/png") return response
def get_ss_qrcode(request, node_id): '''?????????ss???''' # ?????? ss_user = request.user.ss_user user = request.user # ?????? node = Node.objects.get(node_id=node_id) # ?????????? if user.level < node.level: return HttpResponse('?????????????????????') ss_link = node.get_ss_link(ss_user) ss_img = qrcode.make(ss_link) buf = BytesIO() ss_img.save(buf) image_stream = buf.getvalue() # ????reponse response = HttpResponse(image_stream, content_type="image/png") return response
def gen_face_pay_qrcode(request): '''?????????''' try: # ?seesion????????? url = request.session.get('code_url', '') # ????????? record = AlipayRequest.objects.create(username=request.user, info_code=request.session['out_trade_no'], amount=request.session['amount'],) # ??sessions?? del request.session['code_url'] del request.session['amount'] # ??ss??? img = qrcode.make(url) buf = BytesIO() img.save(buf) image_stream = buf.getvalue() # ????reponse response = HttpResponse(image_stream, content_type="image/png") return response except: return HttpResponse('wrong request')
def _load_stream(self): """ Return the content body of the request, as a stream. """ meta = self._request.META try: content_length = int( meta.get('CONTENT_LENGTH', meta.get('HTTP_CONTENT_LENGTH', 0)) ) except (ValueError, TypeError): content_length = 0 if content_length == 0: self._stream = None elif hasattr(self._request, 'read'): self._stream = self._request else: self._stream = six.BytesIO(self.raw_post_data)
def test_runs_operations(self): run_mock = Mock() def run(willow, image, env): run_mock(willow, image, env) self.operation_instance.run = run fil = Filter(spec='operation1|operation2') image = Image.objects.create( title="Test image", file=get_test_image_file(), ) fil.run(image, BytesIO()) self.assertEqual(run_mock.call_count, 2)
def parse(data): data = _bytes(data) code, body = data.split(_delim, 1) if code not in _formats: raise UnknownFormatError('Could not find parser for format %s' % code.decode()) return _formats[code]['parser'].parse( BytesIO(body) )
def __init__(self, request, request_data): super(BatchRequest, self).__init__() self.name = request_data.get('name') self.omit_response_on_success = request_data.get('omit_response_on_success', False) self._stream = BytesIO(request_data['_body'].encode('utf-8')) self._read_started = False self.method = request_data['method'] self.path_info = self.path = request_data['relative_url'] self.META = request.META self.COOKIES = request.COOKIES self.GET = parse_qs(urlparse(self.path_info).query)
def as_bytes(self, unixfrom=False, linesep='\n'): """Return the entire formatted message as bytes. Optional `unixfrom' when True, means include the Unix From_ envelope header. This overrides the default as_bytes() implementation to not mangle lines that begin with 'From '. See bug #13433 for details. """ fp = six.BytesIO() g = generator.BytesGenerator(fp, mangle_from_=False) g.flatten(self, unixfrom=unixfrom, linesep=linesep) return fp.getvalue()
def test_json_to_StatusReport(self): json = JSONRenderer().render(self.expected_dict) stream = BytesIO(json) data = JSONParser().parse(stream) serializer = StatusReportSerializer(self.new_status, data=data) self.assertTrue(serializer.is_valid()) status = serializer.save() self.assertEqual(self.new_status, status) self.assertEqual(self.new_status.id, status.id) self.assertEqual(self.new_status.status, status.status) self.assertEqual(self.new_status.when, status.when) self.assertEqual(self.new_status.user, status.user)
def test_json_to_new_StatusReport(self): json = JSONRenderer().render(self.expected_dict) stream = BytesIO(json) data = JSONParser().parse(stream) serializer = StatusReportSerializer(data=data) self.assertTrue(serializer.is_valid()) status = serializer.save() self.assertEqual(self.new_status.status, status.status) self.assertIsNotNone(status.when) self.assertEqual(self.new_status.user, status.user)
def from_native(self, data): """ Checks that the file-upload field data contains a valid image (GIF, JPG, PNG, possibly others -- whatever the Python Imaging Library supports). """ f = super(ImageField, self).from_native(data) if f is None: return None from api.compat import Image assert Image is not None, 'Either Pillow or PIL must be installed for ImageField support.' # We need to get a file object for PIL. We might have a path or we might # have to read the data into memory. if hasattr(data, 'temporary_file_path'): _file = data.temporary_file_path() else: if hasattr(data, 'read'): _file = six.BytesIO(data.read()) else: _file = six.BytesIO(data['content']) try: # load() could spot a truncated JPEG, but it loads the entire # image in memory, which is a DoS vector. See #3848 and #18520. # verify() must be called immediately after the constructor. Image.open(_file).verify() except ImportError: # Under PyPy, it is possible to import PIL. However, the underlying # _imaging C module isn't available, so an ImportError will be # raised. Catch and re-raise. raise except Exception: # Python Imaging Library doesn't recognize it as an image raise ValidationError(self.error_messages['invalid_image']) if hasattr(f, 'seek') and callable(f.seek): f.seek(0) return f
def parse_json_response(json): """ parse the json response """ rendered = JSONRenderer().render(json) stream = BytesIO(rendered) return JSONParser().parse(stream)
def _get_image_or_404(identifier, load_image=False): """ Return image matching `identifier`. The `identifier` is expected to be a raw image ID for now, but may be more complex later. """ ik_image = get_object_or_404(ICEkitImage, id=identifier) if not load_image: return ik_image, None #################################################################### # Image-loading incantation cribbed from easythumbnail's `pil_image` image = Image.open(BytesIO(ik_image.image.read())) # Fully load the image now to catch any problems with the image contents. try: # An "Image file truncated" exception can occur for some images that # are still mostly valid -- we'll swallow the exception. image.load() except IOError: pass # Try a second time to catch any other potential exceptions. image.load() if True: # Support EXIF orientation data image = et_utils.exif_orientation(image) #################################################################### return ik_image, image
def save_image(image, destination=None, filename=None, **options): """ Save a PIL image. """ if destination is None: destination = BytesIO() filename = filename or '' # Ensure plugins are fully loaded so that Image.EXTENSION is populated. Image.init() format = Image.EXTENSION.get(os.path.splitext(filename)[1].lower(), 'JPEG') if format in ('JPEG', 'WEBP'): options.setdefault('quality', 85) saved = False if format == 'JPEG': if settings.THUMBNAIL_PROGRESSIVE and ( max(image.size) >= settings.THUMBNAIL_PROGRESSIVE): options['progressive'] = True try: image.save(destination, format=format, optimize=1, **options) saved = True except IOError: # Try again, without optimization (PIL can't optimize an image # larger than ImageFile.MAXBLOCK, which is 64k by default). This # shouldn't be triggered very often these days, as recent versions # of pillow avoid the MAXBLOCK limitation. pass if not saved: image.save(destination, format=format, **options) if hasattr(destination, 'seek'): destination.seek(0) return destination
def setUp(self): self.source = BytesIO(six.b('file-contents'))
def image_from_b64(data): return Image.open(BytesIO(base64.b64decode(data)))
def test_not_image(self): """ Non-images raise an exception. """ self.assertRaises( IOError, source_generators.pil_image, BytesIO(six.b('not an image')))
def test_nearly_image(self): """ Truncated images *don't* raise an exception if they can still be read. """ data = self.create_image(None, None) reference = source_generators.pil_image(data) data.seek(0) trunc_data = BytesIO() trunc_data.write(data.read()[:-10]) trunc_data.seek(0) im = source_generators.pil_image(trunc_data) # im will be truncated, but it should be the same dimensions. self.assertEqual(im.size, reference.size) # self.assertRaises(IOError, source_generators.pil_image, trunc_data)
def test_switch_off_exif_orientation(self): """ Images with EXIF orientation data are not reoriented if the ``exif_orientation`` parameter is ``False``. """ reference = image_from_b64(EXIF_REFERENCE) data = EXIF_ORIENTATION[2] im = image_from_b64(data) self.assertFalse(near_identical(reference, im)) im = source_generators.pil_image( BytesIO(base64.b64decode(data)), exif_orientation=False) self.assertFalse( near_identical(reference, im), 'Image should not have been modified')
def create_image(self, storage, filename, size=(800, 600), image_mode='RGB', image_format='JPEG'): """ Generate a test image, returning the filename that it was saved as. If ``storage`` is ``None``, the BytesIO containing the image data will be passed instead. """ data = BytesIO() Image.new(image_mode, size).save(data, image_format) data.seek(0) if not storage: return data image_file = ContentFile(data.read()) return storage.save(filename, image_file)
def pil_image(source, exif_orientation=True, **options): """ Try to open the source file directly using PIL, ignoring any errors. exif_orientation If EXIF orientation data is present, perform any required reorientation before passing the data along the processing pipeline. """ # Use a BytesIO wrapper because if the source is an incomplete file like # object, PIL may have problems with it. For example, some image types # require tell and seek methods that are not present on all storage # File objects. if not source: return source = BytesIO(source.read()) image = Image.open(source) # Fully load the image now to catch any problems with the image contents. try: # An "Image file truncated" exception can occur for some images that # are still mostly valid -- we'll swallow the exception. image.load() except IOError: pass # Try a second time to catch any other potential exceptions. image.load() if exif_orientation: image = utils.exif_orientation(image) return image
def test_bad_username(self): """ Tests the error handling when the specified user does not exist. """ test_json = textwrap.dedent(""" { "model": "config_models.ExampleDeserializeConfig", "data": [{"name": "dino"}] } """) with self.assertRaisesRegexp(Exception, "User matching query does not exist"): deserialize_json(BytesIO(test_json), "unknown_username")
def test_invalid_json(self): """ Tests the error handling when there is invalid JSON. """ test_json = textwrap.dedent(""" { "model": "config_models.ExampleDeserializeConfig", "data": [{"name": "dino" """) with self.assertRaisesRegexp(Exception, "JSON parse error"): deserialize_json(BytesIO(test_json), self.test_username)
def test_invalid_model(self): """ Tests the error handling when the configuration model specified does not exist. """ test_json = textwrap.dedent(""" { "model": "xxx.yyy", "data":[{"name": "dino"}] } """) with self.assertRaisesRegexp(Exception, "No installed app"): deserialize_json(BytesIO(test_json), self.test_username)
def get_test_image_file(filename='test.png', colour='white', size=(640, 480)): f = BytesIO() image = PIL.Image.new('RGB', size, colour) image.save(f, 'PNG') return ImageFile(f, name=filename)
def get_test_image_file_jpeg(filename='test.jpg', colour='white', size=(640, 480)): f = BytesIO() image = PIL.Image.new('RGB', size, colour) image.save(f, 'JPEG') return ImageFile(f, name=filename)
def test_runs_operations_without_env_argument(self): # The "env" argument was added in Wagtail 1.5. This tests that # image operations written for 1.4 will still work run_mock = Mock() def run(willow, image): run_mock(willow, image) self.operation_instance.run = run fil = Filter(spec='operation1|operation2') image = Image.objects.create( title="Test image", file=get_test_image_file(), ) with warnings.catch_warnings(record=True) as ws: warnings.simplefilter('always') fil.run(image, BytesIO()) self.assertEqual(len(ws), 2) self.assertIs(ws[0].category, RemovedInWagtail19Warning) self.assertEqual(run_mock.call_count, 2)
def test_jpeg(self): fil = Filter(spec='width-400|format-jpeg') image = Image.objects.create( title="Test image", file=get_test_image_file(), ) out = fil.run(image, BytesIO()) self.assertEqual(out.format_name, 'jpeg')
def test_gif(self): fil = Filter(spec='width-400|format-gif') image = Image.objects.create( title="Test image", file=get_test_image_file(), ) out = fil.run(image, BytesIO()) self.assertEqual(out.format_name, 'gif')
def test_invalid(self): fil = Filter(spec='width-400|format-foo') image = Image.objects.create( title="Test image", file=get_test_image_file(), ) self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
def test_default_quality(self): fil = Filter(spec='width-400') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) f = BytesIO() with patch('PIL.Image.Image.save') as save: fil.run(image, f) save.assert_called_with(f, 'JPEG', quality=85, optimize=True, progressive=True)
def test_jpeg_quality_filter(self): fil = Filter(spec='width-400|jpegquality-40') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) f = BytesIO() with patch('PIL.Image.Image.save') as save: fil.run(image, f) save.assert_called_with(f, 'JPEG', quality=40, optimize=True, progressive=True)
def test_jpeg_quality_filter_invalid(self): fil = Filter(spec='width-400|jpegquality-abc') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
def test_jpeg_quality_filter_too_big(self): fil = Filter(spec='width-400|jpegquality-101') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
def test_jpeg_quality_setting(self): fil = Filter(spec='width-400') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) f = BytesIO() with patch('PIL.Image.Image.save') as save: fil.run(image, f) save.assert_called_with(f, 'JPEG', quality=50, optimize=True, progressive=True)
def test_jpeg_quality_filter_overrides_setting(self): fil = Filter(spec='width-400|jpegquality-40') image = Image.objects.create( title="Test image", file=get_test_image_file_jpeg(), ) f = BytesIO() with patch('PIL.Image.Image.save') as save: fil.run(image, f) save.assert_called_with(f, 'JPEG', quality=40, optimize=True, progressive=True)
def get_rendition(self, filter): if isinstance(filter, string_types): filter = Filter(spec=filter) cache_key = filter.get_cache_key(self) Rendition = self.get_rendition_model() try: rendition = self.renditions.get( filter_spec=filter.spec, focal_point_key=cache_key, ) except Rendition.DoesNotExist: # Generate the rendition image generated_image = filter.run(self, BytesIO()) # Generate filename input_filename = os.path.basename(self.file.name) input_filename_without_extension, input_extension = os.path.splitext(input_filename) # A mapping of image formats to extensions FORMAT_EXTENSIONS = { 'jpeg': '.jpg', 'png': '.png', 'gif': '.gif', } output_extension = filter.spec.replace('|', '.') + FORMAT_EXTENSIONS[generated_image.format_name] if cache_key: output_extension = cache_key + '.' + output_extension # Truncate filename to prevent it going over 60 chars output_filename_without_extension = input_filename_without_extension[:(59 - len(output_extension))] output_filename = output_filename_without_extension + '.' + output_extension rendition, created = self.renditions.get_or_create( filter_spec=filter.spec, focal_point_key=cache_key, defaults={'file': File(generated_image.f, name=output_filename)} ) return rendition