Coverage for ingadhoc-odoo-saas / saas_client / models / res_users.py: 33%
95 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
1##############################################################################
2# For copyright and license notices, see __manifest__.py file in module root
3# directory
4##############################################################################
5import hmac
6import json
7import logging
8from datetime import datetime
9from hashlib import sha256
10from time import mktime
12import requests
13from odoo import _, api, fields, models, tools
14from odoo.api import SUPERUSER_ID
15from odoo.exceptions import AccessDenied, ValidationError
16from odoo.fields import Domain
17from odoo.http import request
19_logger = logging.getLogger(__name__)
22class ResUsers(models.Model):
23 _inherit = "res.users"
25 saas_provider_uuid = fields.Char(
26 "SaaS Provider UUID",
27 readonly=False,
28 copy=False,
29 )
31 def _get_login_domain(self, login):
32 """Allow to connect with __system__ and admin"""
33 res = super()._get_login_domain(login=login)
34 if login in ["__system__", "admin"]:
35 res &= Domain([("active", "=", False)]) | Domain([("active", "=", True)])
36 return res
38 @api.model
39 @tools.ormcache("uid", "passwd")
40 def _check_uid_passwd(self, uid, passwd):
41 """Verifies that the given (uid, password) is authorized and
42 raise an exception if it is not."""
43 if uid in (SUPERUSER_ID, 2):
44 # superuser and admin: always allowed
45 if not passwd:
46 # empty passwords disallowed for obvious security reasons
47 raise AccessDenied()
48 with self._assert_can_auth(user=uid):
49 user = self.with_user(uid).env.user
50 credential = {"login": user.login, "password": passwd, "type": "password"}
51 user._check_credentials(credential, {"interactive": False})
52 else:
53 super()._check_uid_passwd(uid, passwd)
55 @api.constrains("group_ids")
56 def _check_at_least_one_administrator(self):
57 """Call the original method with active_test=False to include the archived admin user."""
58 return super(ResUsers, self.with_context(active_test=False))._check_at_least_one_administrator()
60 def action_reset_password(self):
61 self._check_password_change()
62 return super(ResUsers, self).action_reset_password()
64 @api.constrains("password", "login", "active", "name", "email")
65 def _check_password_change(self):
66 for rec in self:
67 if rec.id in [SUPERUSER_ID, 2] and rec.env.user.id not in [SUPERUSER_ID, 2]: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 raise ValidationError(_("Only Admin can change his password and login"))
70 @api.constrains("group_ids")
71 def _check_groups_change(self):
72 for rec in self:
73 if rec.id in [SUPERUSER_ID, 2] and not rec.has_group("base.group_system"): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 raise ValidationError(
75 _("The group '%s' can't be removed from user '%s'")
76 % (
77 self.env.ref("base.group_system").name,
78 rec.name,
79 )
80 )
82 def _check_credentials(self, credential, env):
83 try:
84 return super(ResUsers, self)._check_credentials(credential, env)
85 except AccessDenied:
86 # If `auth_admin` is set on the session, it's because we have authenticated
87 # with the controller. See: meth:`controllers.auth.OAuthController.saas_auth`
88 if request and hasattr(request, "session") and request.session.get("auth_admin"):
89 _logger.warning(
90 "_check_credentials for user id: "
91 + str(request.session.uid)
92 + " original user id: "
93 + str(request.session.get("auth_admin"))
94 )
95 return {
96 "uid": request.session.uid,
97 "auth_method": "password",
98 "mfa": "default",
99 }
100 # Otherwise, we attempt to login with the hashed password.
101 # This is used when connecting through odooly, mostly.
102 elif (
103 credential["type"] == "password"
104 and (password := credential.get("password"))
105 and password.startswith("_hash##")
106 ):
107 dummy, u_login, e_expires, o_org_user_id, hash_ = password.split("##")
108 user = self._check_admin_auth_login(u_login, e_expires, o_org_user_id, hash_)
109 if user.id:
110 return {
111 "uid": user.id,
112 "auth_method": "password",
113 "mfa": "default",
114 }
115 else:
116 raise AccessDenied("Credentials error")
118 @api.model
119 def _check_admin_auth_login(self, u_login, e_expires, o_org_user_id, hash_):
120 """
121 Checks that the parameters are valid and that the user exists.
123 :param u_login: Desired user id to login as.
124 :param e_expires: Expiration timestamp
125 :param o_org_user_id: Original user id.
126 :param hash_: HMAC generated hash
127 :return: `res.users`
128 """
130 now = datetime.utcnow()
131 now = int(mktime(now.timetuple()))
133 key = self.env["ir.config_parameter"].sudo().get_param("saas_client.database_uuid")
134 if not key:
135 raise AccessDenied("No saas_client.database_uuid on client database")
137 myh = hmac.new(key.encode(), str(str(u_login) + str(e_expires) + str(o_org_user_id)).encode(), sha256)
139 if not hmac.compare_digest(hash_, myh.hexdigest()):
140 raise AccessDenied("Invalid Request")
142 # TODO: Check cache and reincorporate
143 # fifteen = now + (15 * 60)
144 # if not (now <= int(e_expires) <= fifteen):
145 # raise AccessDenied('Expired')
147 user = self.env["res.users"].sudo().with_context(active_test=False).search([("login", "=", u_login)], limit=1)
148 if not user.id:
149 raise AccessDenied("Invalid User")
150 return user
152 def _post_request_on_saas_provider(self, path, params=None):
153 """
154 Permitimos mandar por contexto host, saas_db_uuid y saas_provider_uuid
155 para poder usarlo desde nuestra base de datos en la carga de tickets.
156 Tal vez se pueda usar para algo mas
157 """
158 self and self.ensure_one()
159 host = self.env.context.get(
160 "host",
161 self.env["ir.config_parameter"]
162 .sudo()
163 .get_param(
164 "saas_client.provider_url",
165 ),
166 )
167 saas_db_uuid = self.env.context.get(
168 "saas_db_uuid",
169 self.env["ir.config_parameter"]
170 .sudo()
171 .get_param(
172 "saas_client.database_uuid",
173 ),
174 )
176 if not host or not saas_db_uuid:
177 raise ValidationError(_("Not Provider url or saas database uuid configured"))
179 url = "%s%s" % (host, path)
180 headers = {"content-type": "application/json"}
182 if not params:
183 params = {}
184 params.update(
185 {
186 "saas_db_uuid": saas_db_uuid,
187 "saas_uuid": self.env.context.get("saas_provider_uuid", self.saas_provider_uuid),
188 }
189 )
191 data = {
192 "jsonrpc": "2.0",
193 "method": "call",
194 "params": params,
195 }
197 res = requests.post(
198 url,
199 data=json.dumps(data),
200 headers=headers,
201 timeout=30,
202 )
203 result = res.json().get("result")
204 if not isinstance(result, dict):
205 raise ValidationError(
206 _('Post "%s" on host "%s" must return a dictionary but returns:\n%s') % (path, host, result)
207 )
208 if result.get("error"):
209 _logger.info('Could not post "%s" on host "%s". This is what we get: %s', path, host, result.get("error"))
210 raise ValidationError(result.get("error"))
211 return result