Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_binary.py: 63%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1from __future__ import annotations 

2 

3import base64 

4import binascii 

5import contextlib 

6import functools 

7import typing 

8import warnings 

9from operator import attrgetter 

10 

11import psycopg2 

12 

13from odoo.exceptions import UserError 

14from odoo.tools import SQL, human_size 

15from odoo.tools.mimetypes import guess_mimetype 

16 

17from .fields import Field 

18from .utils import SQL_OPERATORS 

19 

20if typing.TYPE_CHECKING: 

21 from odoo.tools import Query 

22 

23 from .models import BaseModel 

24 

25# http://initd.org/psycopg/docs/usage.html#binary-adaptation 

26# Received data is returned as buffer (in Python 2) or memoryview (in Python 3). 

27_BINARY = memoryview 

28 

29 

30class Binary(Field): 

31 """Encapsulates a binary content (e.g. a file). 

32 

33 :param bool attachment: whether the field should be stored as `ir_attachment` 

34 or in a column of the model's table (default: ``True``). 

35 """ 

36 type = 'binary' 

37 

38 prefetch = False # not prefetched by default 

39 _depends_context = ('bin_size',) # depends on context (content or size) 

40 attachment = True # whether value is stored in attachment 

41 

42 @functools.cached_property 

43 def column_type(self): 

44 return None if self.attachment else ('bytea', 'bytea') 

45 

46 def _get_attrs(self, model_class, name): 

47 attrs = super()._get_attrs(model_class, name) 

48 if not attrs.get('store', True): 

49 attrs['attachment'] = False 

50 return attrs 

51 

52 _description_attachment = property(attrgetter('attachment')) 

53 

54 def convert_to_column(self, value, record, values=None, validate=True): 

55 # Binary values may be byte strings (python 2.6 byte array), but 

56 # the legacy OpenERP convention is to transfer and store binaries 

57 # as base64-encoded strings. The base64 string may be provided as a 

58 # unicode in some circumstances, hence the str() cast here. 

59 # This str() coercion will only work for pure ASCII unicode strings, 

60 # on purpose - non base64 data must be passed as a 8bit byte strings. 

61 if not value: 61 ↛ 65line 61 didn't jump to line 65 because the condition on line 61 was always true

62 return None 

63 # Detect if the binary content is an SVG for restricting its upload 

64 # only to system users. 

65 magic_bytes = { 

66 b'P', # first 6 bits of '<' (0x3C) b64 encoded 

67 b'<', # plaintext XML tag opening 

68 } 

69 if isinstance(value, str): 

70 value = value.encode() 

71 if validate and value[:1] in magic_bytes: 

72 try: 

73 decoded_value = base64.b64decode(value.translate(None, delete=b'\r\n'), validate=True) 

74 except binascii.Error: 

75 decoded_value = value 

76 # Full mimetype detection 

77 if (guess_mimetype(decoded_value).startswith('image/svg') and 

78 not record.env.is_system()): 

79 raise UserError(record.env._("Only admins can upload SVG files.")) 

80 if isinstance(value, bytes): 

81 return psycopg2.Binary(value) 

82 try: 

83 return psycopg2.Binary(str(value).encode('ascii')) 

84 except UnicodeEncodeError: 

85 raise UserError(record.env._("ASCII characters are required for %(value)s in %(field)s", value=value, field=self.name)) 

86 

87 def get_column_update(self, record: BaseModel): 

88 # since the field depends on context, force the value where we have the data 

89 bin_size_name = 'bin_size_' + self.name 

90 record_no_bin_size = record.with_context(**{'bin_size': False, bin_size_name: False}) 

91 return self._get_cache(record_no_bin_size.env)[record.id] 

92 

93 def convert_to_cache(self, value, record, validate=True): 

94 if isinstance(value, _BINARY): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 return bytes(value) 

96 if isinstance(value, str): 

97 # the cache must contain bytes or memoryview, but sometimes a string 

98 # is given when assigning a binary field (test `TestFileSeparator`) 

99 return value.encode() 

100 if isinstance(value, int) and \ 100 ↛ 106line 100 didn't jump to line 106 because the condition on line 100 was never true

101 (record.env.context.get('bin_size') or 

102 record.env.context.get('bin_size_' + self.name)): 

103 # If the client requests only the size of the field, we return that 

104 # instead of the content. Presumably a separate request will be done 

105 # to read the actual content, if necessary. 

106 value = human_size(value) 

107 # human_size can return False (-> None) or a string (-> encoded) 

108 return value.encode() if value else None 

109 return None if value is False else value 

110 

111 def convert_to_record(self, value, record): 

112 if isinstance(value, _BINARY): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 return bytes(value) 

114 return False if value is None else value 

115 

116 def compute_value(self, records): 

117 bin_size_name = 'bin_size_' + self.name 

118 if records.env.context.get('bin_size') or records.env.context.get(bin_size_name): 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was never true

119 # always compute without bin_size 

120 records_no_bin_size = records.with_context(**{'bin_size': False, bin_size_name: False}) 

121 super().compute_value(records_no_bin_size) 

122 # manually update the bin_size cache 

123 field_cache_data = self._get_cache(records_no_bin_size.env) 

124 field_cache_size = self._get_cache(records.env) 

125 for record in records: 

126 try: 

127 value = field_cache_data[record.id] 

128 # don't decode non-attachments to be consistent with pg_size_pretty 

129 if not (self.store and self.column_type): 

130 with contextlib.suppress(TypeError, binascii.Error): 

131 value = base64.b64decode(value) 

132 try: 

133 if isinstance(value, (bytes, _BINARY)): 

134 value = human_size(len(value)) 

135 except (TypeError): 

136 pass 

137 cache_value = self.convert_to_cache(value, record) 

138 # the dirty flag is independent from this assignment 

139 field_cache_size[record.id] = cache_value 

140 except KeyError: 

141 pass 

142 else: 

143 super().compute_value(records) 

144 

145 def read(self, records): 

146 def _encode(s: str | bool) -> bytes | bool: 

147 if isinstance(s, str): 

148 return s.encode("utf-8") 

149 return s 

150 

151 # values are stored in attachments, retrieve them 

152 assert self.attachment 

153 domain = [ 

154 ('res_model', '=', records._name), 

155 ('res_field', '=', self.name), 

156 ('res_id', 'in', records.ids), 

157 ] 

158 bin_size = records.env.context.get('bin_size') 

159 data = { 

160 att.res_id: _encode(human_size(att.file_size)) if bin_size else att.datas 

161 for att in records.env['ir.attachment'].sudo().search_fetch(domain) 

162 } 

163 self._insert_cache(records, map(data.get, records._ids)) 

164 

165 def create(self, record_values): 

166 assert self.attachment 

167 if not record_values: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 return 

169 # create the attachments that store the values 

170 env = record_values[0][0].env 

171 env['ir.attachment'].sudo().create([ 

172 { 

173 'name': self.name, 

174 'res_model': self.model_name, 

175 'res_field': self.name, 

176 'res_id': record.id, 

177 'type': 'binary', 

178 'datas': value, 

179 } 

180 for record, value in record_values 

181 if value 

182 ]) 

183 

184 def write(self, records, value): 

185 records = records.with_context(bin_size=False) 

186 if not self.attachment: 

187 super().write(records, value) 

188 return 

189 

190 # discard recomputation of self on records 

191 records.env.remove_to_compute(self, records) 

192 

193 # update the cache, and discard the records that are not modified 

194 cache_value = self.convert_to_cache(value, records) 

195 records = self._filter_not_equal(records, cache_value) 

196 if not records: 

197 return 

198 if self.store: 198 ↛ 202line 198 didn't jump to line 202 because the condition on line 198 was always true

199 # determine records that are known to be not null 

200 not_null = self._filter_not_equal(records, None) 

201 

202 self._update_cache(records, cache_value) 

203 

204 # retrieve the attachments that store the values, and adapt them 

205 if self.store and any(records._ids): 205 ↛ exitline 205 didn't return from function 'write' because the condition on line 205 was always true

206 real_records = records.filtered('id') 

207 atts = records.env['ir.attachment'].sudo() 

208 if not_null: 

209 atts = atts.search([ 

210 ('res_model', '=', self.model_name), 

211 ('res_field', '=', self.name), 

212 ('res_id', 'in', real_records.ids), 

213 ]) 

214 if value: 

215 # update the existing attachments 

216 atts.write({'datas': value}) 

217 atts_records = records.browse(atts.mapped('res_id')) 

218 # create the missing attachments 

219 missing = (real_records - atts_records) 

220 if missing: 

221 atts.create([{ 

222 'name': self.name, 

223 'res_model': record._name, 

224 'res_field': self.name, 

225 'res_id': record.id, 

226 'type': 'binary', 

227 'datas': value, 

228 } 

229 for record in missing 

230 ]) 

231 else: 

232 atts.unlink() 

233 

234 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL: 

235 if not self.attachment or field_expr != self.name: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 return super().condition_to_sql(field_expr, operator, value, model, alias, query) 

237 assert operator in ('in', 'not in') and set(value) == {False}, "Should have been done in Domain optimization" 

238 return SQL( 

239 "%s%s(SELECT res_id FROM ir_attachment WHERE res_model = %s AND res_field = %s)", 

240 model._field_to_sql(alias, 'id', query), 

241 SQL_OPERATORS['not in' if operator in ('in', '=') else 'in'], 

242 model._name, 

243 self.name, 

244 ) 

245 

246 

247class Image(Binary): 

248 """Encapsulates an image, extending :class:`Binary`. 

249 

250 If image size is greater than the ``max_width``/``max_height`` limit of pixels, the image will be 

251 resized to the limit by keeping aspect ratio. 

252 

253 :param int max_width: the maximum width of the image (default: ``0``, no limit) 

254 :param int max_height: the maximum height of the image (default: ``0``, no limit) 

255 :param bool verify_resolution: whether the image resolution should be verified 

256 to ensure it doesn't go over the maximum image resolution (default: ``True``). 

257 See :class:`odoo.tools.image.ImageProcess` for maximum image resolution (default: ``50e6``). 

258 

259 .. note:: 

260 

261 If no ``max_width``/``max_height`` is specified (or is set to 0) and ``verify_resolution`` is False, 

262 the field content won't be verified at all and a :class:`Binary` field should be used. 

263 """ 

264 max_width = 0 

265 max_height = 0 

266 verify_resolution = True 

267 

268 def setup(self, model): 

269 super().setup(model) 

270 if not model._abstract and not model._log_access: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 warnings.warn(f"Image field {self} requires the model to have _log_access = True", stacklevel=1) 

272 

273 def create(self, record_values): 

274 new_record_values = [] 

275 for record, value in record_values: 

276 new_value = self._image_process(value, record.env) 

277 new_record_values.append((record, new_value)) 

278 # when setting related image field, keep the unprocessed image in 

279 # cache to let the inverse method use the original image; the image 

280 # will be resized once the inverse has been applied 

281 cache_value = self.convert_to_cache(value if self.related else new_value, record) 

282 self._update_cache(record, cache_value) 

283 super().create(new_record_values) 

284 

285 def write(self, records, value): 

286 try: 

287 new_value = self._image_process(value, records.env) 

288 except UserError: 

289 if not any(records._ids): 

290 # Some crap is assigned to a new record. This can happen in an 

291 # onchange, where the client sends the "bin size" value of the 

292 # field instead of its full value (this saves bandwidth). In 

293 # this case, we simply don't assign the field: its value will be 

294 # taken from the records' origin. 

295 return 

296 raise 

297 

298 super().write(records, new_value) 

299 cache_value = self.convert_to_cache(value if self.related else new_value, records) 

300 self._update_cache(records, cache_value, dirty=True) 

301 

302 def _inverse_related(self, records): 

303 super()._inverse_related(records) 

304 if not (self.max_width and self.max_height): 

305 return 

306 # the inverse has been applied with the original image; now we fix the 

307 # cache with the resized value 

308 for record in records: 

309 value = self._process_related(record[self.name], record.env) 

310 self._update_cache(record, value, dirty=True) 

311 

312 def _image_process(self, value, env): 

313 if self.readonly and ( 313 ↛ 322line 313 didn't jump to line 322 because the condition on line 313 was never true

314 (not self.max_width and not self.max_height) 

315 or ( 

316 isinstance(self.related_field, Image) 

317 and self.max_width == self.related_field.max_width 

318 and self.max_height == self.related_field.max_height 

319 ) 

320 ): 

321 # no need to process images for computed fields, or related fields 

322 return value 

323 try: 

324 img = base64.b64decode(value or '') or False 

325 except Exception as e: 

326 raise UserError(env._("Image is not encoded in base64.")) from e 

327 

328 if img and guess_mimetype(img, '') == 'image/webp': 

329 if not self.max_width and not self.max_height: 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true

330 return value 

331 # Fetch resized version. 

332 Attachment = env['ir.attachment'] 

333 checksum = Attachment._compute_checksum(img) 

334 origins = Attachment.search([ 

335 ['id', '!=', False], # No implicit condition on res_field. 

336 ['checksum', '=', checksum], 

337 ]) 

338 if origins: 

339 origin_ids = [attachment.id for attachment in origins] 

340 resized_domain = [ 

341 ['id', '!=', False], # No implicit condition on res_field. 

342 ['res_model', '=', 'ir.attachment'], 

343 ['res_id', 'in', origin_ids], 

344 ['description', '=', 'resize: %s' % max(self.max_width, self.max_height)], 

345 ] 

346 resized = Attachment.sudo().search(resized_domain, limit=1) 

347 if resized: 

348 # Fallback on non-resized image (value). 

349 return resized.datas or value 

350 return value 

351 

352 # delay import of image_process until this point 

353 from odoo.tools.image import image_process # noqa: PLC0415 

354 return base64.b64encode(image_process(img, 

355 size=(self.max_width, self.max_height), 

356 verify_resolution=self.verify_resolution, 

357 ) or b'') or False 

358 

359 def _process_related(self, value, env): 

360 """Override to resize the related value before saving it on self.""" 

361 try: 

362 return self._image_process(super()._process_related(value, env), env) 

363 except UserError: 

364 # Avoid the following `write` to fail if the related image was saved 

365 # invalid, which can happen for pre-existing databases. 

366 return False