1
1
Fork 0

Improved image validation

dev
Jan Kužílek 5 years ago
parent dbfa73a6ac
commit 0aea787ff5

@ -21,7 +21,7 @@ flask-wtf = "*"
flask-assets = "*"
# pyscss = "*"
cssmin = "*"
# python-magic = "*"
python-magic = "*"
# flask-mail = "*"
sqlalchemy-utc = "*"
# flask-admin = "*"

@ -130,12 +130,12 @@ def upload():
form = UploadForm(request.form)
if request.method == 'POST' and form.validate():
file = request.files.get(form.post_img.name)
file.data = io.BytesIO(file.read())
# file.data = io.BytesIO(file.read())
f_tags = form.tags.data.split()
tags = Tag.query.filter(Tag.content.in_(f_tags)).all()
post = Post(file, source=form.sauce.data, tags=tags, rating=RATING[form.rating.data], author=current_user)
post = Post(file, **Post.fileinfo(file), source=form.sauce.data, tags=tags, rating=form.rating.data, author=current_user)
db.session.add(post)
db.session.commit()

@ -4,10 +4,11 @@ from wtforms.validators import DataRequired, InputRequired, Email, EqualTo, AnyO
from werkzeug.utils import cached_property
from flask import current_app
from flask import current_app, flash
from flask_wtf.csrf import _FlaskFormCSRF
from yadc.models import USER_STATUS, OP_LEVEL, RATING, POST_STATUS, TAG_CATEGORY
from yadc.models import User, Post
class CSRFForm(Form):
class Meta:
@ -27,17 +28,10 @@ class LoginForm(CSRFForm):
remember_me = BooleanField('Remember me')
submit = SubmitField('Log In')
from yadc.models import User
class ResetPasswordForm(CSRFForm):
email = StringField('Email', validators=[DataRequired(), Email()], render_kw=dict(placeholder="Your email address"))
submit = SubmitField('Reset password')
# def validate_email(form, field):
# email = User.query.filter_by(email=field.data).first()
# if not email:
# raise ValidationError('This')
class RegisterForm(CSRFForm):
username = StringField('Username', validators=[DataRequired()], render_kw=dict(placeholder="Username"))
email = StringField('Email', validators=[DataRequired(), Email()], render_kw=dict(placeholder="Email"))
@ -56,7 +50,8 @@ class RegisterForm(CSRFForm):
raise ValidationError('This email address is already registered. Maybe try logging in instead?')
from flask import request
# from magic import Magic
from magic import Magic
from PIL import Image
def validate_file(form, field):
file = request.files.get(field.name)
@ -77,11 +72,30 @@ class UploadForm(CSRFForm):
file = request.files.get(field.name)
client_mimetype = file.mimetype
# Not sure if safe
# real_mimetype = Magic(mime=True).from_buffer(file.stream.read())
if client_mimetype not in ['image/png','image/jpeg']:
file.seek(0)
real_mimetype = Magic(mime=True).from_buffer(file.read())
# flash(client_mimetype)
# flash(real_mimetype)
# if client_mimetype != real_mimetype or client_mimetype not in ['image/png','image/jpeg']:
if real_mimetype not in ['image/png','image/jpeg']:
raise ValidationError('Please select an image file of PNG or JPEG format')
try:
Image.open(file).verify()
except Exception as e:
raise ValidationError('Image is corrupted.')
fileinfo = Post.fileinfo(file)
post = Post.query.filter_by(md5=fileinfo['md5']).first()
if post is not None:
raise ValidationError('Image with same hash already exists. Reposting is not allowed.')
width, height = fileinfo['resolution']
if width*height<1280*720 or width<720 or height<720:
raise ValidationError('Image has too low resolution, according to rules.')
# Change user section
class ChangeUserInfoForm(CSRFForm):
bio = TextAreaField('Biography')

@ -185,49 +185,42 @@ class Post(TimestampMixin, db.Model):
def __init__(self, file, **kwargs):
super().__init__(**kwargs)
self.md5 = hashlib.md5(file.data.getbuffer()).hexdigest()
self.filetype = FILETYPE[file.mimetype.split('/')[1]]
# self.md5 = hashlib.md5(file.data.getbuffer()).hexdigest()
# self.filetype = FILETYPE[file.mimetype.split('/')[1]]
with Image.open(file.data) as im:
self.width, self.height = im.width, im.height
self.filesize = file.data.getbuffer().nbytes
# with Image.open(file.data) as im:
# self.width, self.height = im.width, im.height
# self.filesize = file.data.getbuffer().nbytes
self.origin_filename = file.filename
# self.origin_filename = file.filename
self.generate_image_files(file)
def __repr__(self):
return('<Post #{} by {}, {} of {} bytes>'.format(self.id, self.author, self.filetype.name, self.filesize))
@cached_property
def flex(self):
return "flex: {0:.2f} 1 {0:.2f}px;".format(self.width/self.height*current_app.config.get('POST_LIST_THUMB_HEIGHT', 240))
@property
def file_uri(self):
# filename = "{}.{}".format('maybe_later_generated_cute_filename', 'jpg' if self.filetype is FILETYPE.jpeg else 'png')
# filename = 'maybe_later_generated_cute_filename'
filename = "{} - {} {}".format(current_app.config.get('INSTANCE_NAME'), self.id, " ".join(tag.content.replace(' ', '_') for tag in self.tags))
return os.path.join(self.md5, filename)
def resolution(self):
return (self.width, self.height)
def url(self, path, endpoint='img'):
if endpoint == 'img':
return url_for('main.uploaded_img', path="{}.{}".format(path,'jpg' if self.filetype is FILETYPE.jpeg else 'png'))
elif endpoint == 'jpeg':
return url_for('main.uploaded_jpeg' if not self.filetype is FILETYPE.jpeg else 'main.uploaded_img', path="{}.{}".format(path,'jpg'))
elif endpoint == 'sample':
return url_for('main.uploaded_sample', path="{}.{}".format(path,'jpg'))
elif endpoint == 'thumb':
return url_for('main.uploaded_thumb', path="{}.{}".format(path,'jpg'))
@resolution.setter
def resolution(self, value):
self.width, self.height = value
@staticmethod
def fileinfo(file):
return dict(
md5=(file.seek(0) or hashlib.md5(file.read()).hexdigest()),
filetype=FILETYPE[file.mimetype.split('/')[1]], # not safe
resolution=Image.open(file).size,
filesize=(file.seek(0,2) or file.tell()),
origin_filename=file.filename
)
def generate_image_files(self, file):
with open(self.image_files['image'], "wb") as f:
f.write(file.data.getbuffer())
# file.seek(0)
# file.save(post.image_path)
# with open(self.image_files['image'], "wb") as f:
# f.write(file.data.getbuffer())
file.seek(0)
file.save(self.image_files['image'])
with Image.open(file.data) as im:
with Image.open(file) as im:
im = im.convert('RGB')
if self.image_files['jpeg'] is not None:
@ -256,6 +249,29 @@ class Post(TimestampMixin, db.Model):
sample=os.path.join(current_app.config.get('POST_UPLOADS'), 'sample', "{}.{}".format(self.md5, 'jpg')),
thumb=os.path.join(current_app.config.get('POST_UPLOADS'), 'thumb', "{}.{}".format(self.md5, 'jpg'))
)
def __repr__(self):
return('<Post #{} by {}, {} of {} bytes>'.format(self.id, self.author, self.filetype.name, self.filesize))
@property
def file_uri(self):
# filename = "{}.{}".format('maybe_later_generated_cute_filename', 'jpg' if self.filetype is FILETYPE.jpeg else 'png')
# filename = 'maybe_later_generated_cute_filename'
filename = "{} - {} {}".format(current_app.config.get('INSTANCE_NAME'), self.id, " ".join(tag.content.replace(' ', '_') for tag in self.tags))
return os.path.join(self.md5, filename)
def url(self, path, endpoint='img'):
if endpoint == 'img':
return url_for('main.uploaded_img', path="{}.{}".format(path,'jpg' if self.filetype is FILETYPE.jpeg else 'png'))
elif endpoint == 'jpeg':
return url_for('main.uploaded_jpeg' if not self.filetype is FILETYPE.jpeg else 'main.uploaded_img', path="{}.{}".format(path,'jpg'))
elif endpoint == 'sample':
return url_for('main.uploaded_sample', path="{}.{}".format(path,'jpg'))
elif endpoint == 'thumb':
return url_for('main.uploaded_thumb', path="{}.{}".format(path,'jpg'))
@cached_property
def flex(self):
return "flex: {0:.2f} 1 {0:.2f}px;".format(self.width/self.height*current_app.config.get('POST_LIST_THUMB_HEIGHT', 240))
@cached_property
def image_resolution(self):

Loading…
Cancel
Save