Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_binary.py: 59%
194 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
1from __future__ import annotations
3import base64
4import binascii
5import contextlib
6import functools
7import typing
8import warnings
9from operator import attrgetter
11import psycopg2
13from odoo.exceptions import UserError
14from odoo.tools import SQL, human_size
15from odoo.tools.mimetypes import guess_mimetype
17from .fields import Field
18from .utils import SQL_OPERATORS
20if typing.TYPE_CHECKING:
21 from odoo.tools import Query
23 from .models import BaseModel
25# http://initd.org/psycopg/docs/usage.html#binary-adaptation
26# Received data is returned as buffer (in Python 2) or memoryview (in Python 3).
27_BINARY = memoryview
30class Binary(Field):
31 """Encapsulates a binary content (e.g. a file).
33 :param bool attachment: whether the field should be stored as `ir_attachment`
34 or in a column of the model's table (default: ``True``).
35 """
36 type = 'binary'
38 prefetch = False # not prefetched by default
39 _depends_context = ('bin_size',) # depends on context (content or size)
40 attachment = True # whether value is stored in attachment
42 @functools.cached_property
43 def column_type(self):
44 return None if self.attachment else ('bytea', 'bytea')
46 def _get_attrs(self, model_class, name):
47 attrs = super()._get_attrs(model_class, name)
48 if not attrs.get('store', True):
49 attrs['attachment'] = False
50 return attrs
52 _description_attachment = property(attrgetter('attachment'))
54 def convert_to_column(self, value, record, values=None, validate=True):
55 # Binary values may be byte strings (python 2.6 byte array), but
56 # the legacy OpenERP convention is to transfer and store binaries
57 # as base64-encoded strings. The base64 string may be provided as a
58 # unicode in some circumstances, hence the str() cast here.
59 # This str() coercion will only work for pure ASCII unicode strings,
60 # on purpose - non base64 data must be passed as a 8bit byte strings.
61 if not value: 61 ↛ 65line 61 didn't jump to line 65 because the condition on line 61 was always true
62 return None
63 # Detect if the binary content is an SVG for restricting its upload
64 # only to system users.
65 magic_bytes = {
66 b'P', # first 6 bits of '<' (0x3C) b64 encoded
67 b'<', # plaintext XML tag opening
68 }
69 if isinstance(value, str):
70 value = value.encode()
71 if validate and value[:1] in magic_bytes:
72 try:
73 decoded_value = base64.b64decode(value.translate(None, delete=b'\r\n'), validate=True)
74 except binascii.Error:
75 decoded_value = value
76 # Full mimetype detection
77 if (guess_mimetype(decoded_value).startswith('image/svg') and
78 not record.env.is_system()):
79 raise UserError(record.env._("Only admins can upload SVG files."))
80 if isinstance(value, bytes):
81 return psycopg2.Binary(value)
82 try:
83 return psycopg2.Binary(str(value).encode('ascii'))
84 except UnicodeEncodeError:
85 raise UserError(record.env._("ASCII characters are required for %(value)s in %(field)s", value=value, field=self.name))
87 def get_column_update(self, record: BaseModel):
88 # since the field depends on context, force the value where we have the data
89 bin_size_name = 'bin_size_' + self.name
90 record_no_bin_size = record.with_context(**{'bin_size': False, bin_size_name: False})
91 return self._get_cache(record_no_bin_size.env)[record.id]
93 def convert_to_cache(self, value, record, validate=True):
94 if isinstance(value, _BINARY): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 return bytes(value)
96 if isinstance(value, str):
97 # the cache must contain bytes or memoryview, but sometimes a string
98 # is given when assigning a binary field (test `TestFileSeparator`)
99 return value.encode()
100 if isinstance(value, int) and \ 100 ↛ 106line 100 didn't jump to line 106 because the condition on line 100 was never true
101 (record.env.context.get('bin_size') or
102 record.env.context.get('bin_size_' + self.name)):
103 # If the client requests only the size of the field, we return that
104 # instead of the content. Presumably a separate request will be done
105 # to read the actual content, if necessary.
106 value = human_size(value)
107 # human_size can return False (-> None) or a string (-> encoded)
108 return value.encode() if value else None
109 return None if value is False else value
111 def convert_to_record(self, value, record):
112 if isinstance(value, _BINARY): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 return bytes(value)
114 return False if value is None else value
116 def compute_value(self, records):
117 bin_size_name = 'bin_size_' + self.name
118 if records.env.context.get('bin_size') or records.env.context.get(bin_size_name): 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was never true
119 # always compute without bin_size
120 records_no_bin_size = records.with_context(**{'bin_size': False, bin_size_name: False})
121 super().compute_value(records_no_bin_size)
122 # manually update the bin_size cache
123 field_cache_data = self._get_cache(records_no_bin_size.env)
124 field_cache_size = self._get_cache(records.env)
125 for record in records:
126 try:
127 value = field_cache_data[record.id]
128 # don't decode non-attachments to be consistent with pg_size_pretty
129 if not (self.store and self.column_type):
130 with contextlib.suppress(TypeError, binascii.Error):
131 value = base64.b64decode(value)
132 try:
133 if isinstance(value, (bytes, _BINARY)):
134 value = human_size(len(value))
135 except (TypeError):
136 pass
137 cache_value = self.convert_to_cache(value, record)
138 # the dirty flag is independent from this assignment
139 field_cache_size[record.id] = cache_value
140 except KeyError:
141 pass
142 else:
143 super().compute_value(records)
145 def read(self, records):
146 def _encode(s: str | bool) -> bytes | bool:
147 if isinstance(s, str):
148 return s.encode("utf-8")
149 return s
151 # values are stored in attachments, retrieve them
152 assert self.attachment
153 domain = [
154 ('res_model', '=', records._name),
155 ('res_field', '=', self.name),
156 ('res_id', 'in', records.ids),
157 ]
158 bin_size = records.env.context.get('bin_size')
159 data = {
160 att.res_id: _encode(human_size(att.file_size)) if bin_size else att.datas
161 for att in records.env['ir.attachment'].sudo().search_fetch(domain)
162 }
163 self._insert_cache(records, map(data.get, records._ids))
165 def create(self, record_values):
166 assert self.attachment
167 if not record_values: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 return
169 # create the attachments that store the values
170 env = record_values[0][0].env
171 env['ir.attachment'].sudo().create([
172 {
173 'name': self.name,
174 'res_model': self.model_name,
175 'res_field': self.name,
176 'res_id': record.id,
177 'type': 'binary',
178 'datas': value,
179 }
180 for record, value in record_values
181 if value
182 ])
184 def write(self, records, value):
185 records = records.with_context(bin_size=False)
186 if not self.attachment:
187 super().write(records, value)
188 return
190 # discard recomputation of self on records
191 records.env.remove_to_compute(self, records)
193 # update the cache, and discard the records that are not modified
194 cache_value = self.convert_to_cache(value, records)
195 records = self._filter_not_equal(records, cache_value)
196 if not records:
197 return
198 if self.store: 198 ↛ 202line 198 didn't jump to line 202 because the condition on line 198 was always true
199 # determine records that are known to be not null
200 not_null = self._filter_not_equal(records, None)
202 self._update_cache(records, cache_value)
204 # retrieve the attachments that store the values, and adapt them
205 if self.store and any(records._ids): 205 ↛ exitline 205 didn't return from function 'write' because the condition on line 205 was always true
206 real_records = records.filtered('id')
207 atts = records.env['ir.attachment'].sudo()
208 if not_null:
209 atts = atts.search([
210 ('res_model', '=', self.model_name),
211 ('res_field', '=', self.name),
212 ('res_id', 'in', real_records.ids),
213 ])
214 if value:
215 # update the existing attachments
216 atts.write({'datas': value})
217 atts_records = records.browse(atts.mapped('res_id'))
218 # create the missing attachments
219 missing = (real_records - atts_records)
220 if missing:
221 atts.create([{
222 'name': self.name,
223 'res_model': record._name,
224 'res_field': self.name,
225 'res_id': record.id,
226 'type': 'binary',
227 'datas': value,
228 }
229 for record in missing
230 ])
231 else:
232 atts.unlink()
234 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL:
235 if not self.attachment or field_expr != self.name:
236 return super().condition_to_sql(field_expr, operator, value, model, alias, query)
237 assert operator in ('in', 'not in') and set(value) == {False}, "Should have been done in Domain optimization"
238 return SQL(
239 "%s%s(SELECT res_id FROM ir_attachment WHERE res_model = %s AND res_field = %s)",
240 model._field_to_sql(alias, 'id', query),
241 SQL_OPERATORS['not in' if operator in ('in', '=') else 'in'],
242 model._name,
243 self.name,
244 )
247class Image(Binary):
248 """Encapsulates an image, extending :class:`Binary`.
250 If image size is greater than the ``max_width``/``max_height`` limit of pixels, the image will be
251 resized to the limit by keeping aspect ratio.
253 :param int max_width: the maximum width of the image (default: ``0``, no limit)
254 :param int max_height: the maximum height of the image (default: ``0``, no limit)
255 :param bool verify_resolution: whether the image resolution should be verified
256 to ensure it doesn't go over the maximum image resolution (default: ``True``).
257 See :class:`odoo.tools.image.ImageProcess` for maximum image resolution (default: ``50e6``).
259 .. note::
261 If no ``max_width``/``max_height`` is specified (or is set to 0) and ``verify_resolution`` is False,
262 the field content won't be verified at all and a :class:`Binary` field should be used.
263 """
264 max_width = 0
265 max_height = 0
266 verify_resolution = True
268 def setup(self, model):
269 super().setup(model)
270 if not model._abstract and not model._log_access: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 warnings.warn(f"Image field {self} requires the model to have _log_access = True", stacklevel=1)
273 def create(self, record_values):
274 new_record_values = []
275 for record, value in record_values:
276 new_value = self._image_process(value, record.env)
277 new_record_values.append((record, new_value))
278 # when setting related image field, keep the unprocessed image in
279 # cache to let the inverse method use the original image; the image
280 # will be resized once the inverse has been applied
281 cache_value = self.convert_to_cache(value if self.related else new_value, record)
282 self._update_cache(record, cache_value)
283 super().create(new_record_values)
285 def write(self, records, value):
286 try:
287 new_value = self._image_process(value, records.env)
288 except UserError:
289 if not any(records._ids):
290 # Some crap is assigned to a new record. This can happen in an
291 # onchange, where the client sends the "bin size" value of the
292 # field instead of its full value (this saves bandwidth). In
293 # this case, we simply don't assign the field: its value will be
294 # taken from the records' origin.
295 return
296 raise
298 super().write(records, new_value)
299 cache_value = self.convert_to_cache(value if self.related else new_value, records)
300 self._update_cache(records, cache_value, dirty=True)
302 def _inverse_related(self, records):
303 super()._inverse_related(records)
304 if not (self.max_width and self.max_height): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 return
306 # the inverse has been applied with the original image; now we fix the
307 # cache with the resized value
308 for record in records:
309 value = self._process_related(record[self.name], record.env)
310 self._update_cache(record, value, dirty=True)
312 def _image_process(self, value, env):
313 if self.readonly and ( 313 ↛ 322line 313 didn't jump to line 322 because the condition on line 313 was never true
314 (not self.max_width and not self.max_height)
315 or (
316 isinstance(self.related_field, Image)
317 and self.max_width == self.related_field.max_width
318 and self.max_height == self.related_field.max_height
319 )
320 ):
321 # no need to process images for computed fields, or related fields
322 return value
323 try:
324 img = base64.b64decode(value or '') or False
325 except Exception as e:
326 raise UserError(env._("Image is not encoded in base64.")) from e
328 if img and guess_mimetype(img, '') == 'image/webp': 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 if not self.max_width and not self.max_height:
330 return value
331 # Fetch resized version.
332 Attachment = env['ir.attachment']
333 checksum = Attachment._compute_checksum(img)
334 origins = Attachment.search([
335 ['id', '!=', False], # No implicit condition on res_field.
336 ['checksum', '=', checksum],
337 ])
338 if origins:
339 origin_ids = [attachment.id for attachment in origins]
340 resized_domain = [
341 ['id', '!=', False], # No implicit condition on res_field.
342 ['res_model', '=', 'ir.attachment'],
343 ['res_id', 'in', origin_ids],
344 ['description', '=', 'resize: %s' % max(self.max_width, self.max_height)],
345 ]
346 resized = Attachment.sudo().search(resized_domain, limit=1)
347 if resized:
348 # Fallback on non-resized image (value).
349 return resized.datas or value
350 return value
352 # delay import of image_process until this point
353 from odoo.tools.image import image_process # noqa: PLC0415
354 return base64.b64encode(image_process(img,
355 size=(self.max_width, self.max_height),
356 verify_resolution=self.verify_resolution,
357 ) or b'') or False
359 def _process_related(self, value, env):
360 """Override to resize the related value before saving it on self."""
361 try:
362 return self._image_process(super()._process_related(value, env), env)
363 except UserError:
364 # Avoid the following `write` to fail if the related image was saved
365 # invalid, which can happen for pre-existing databases.
366 return False