Coverage for ingadhoc-odoo-saas / saas_client / models / res_users.py: 33%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:22 +0000

1############################################################################## 

2# For copyright and license notices, see __manifest__.py file in module root 

3# directory 

4############################################################################## 

5import hmac 

6import json 

7import logging 

8from datetime import datetime 

9from hashlib import sha256 

10from time import mktime 

11 

12import requests 

13from odoo import _, api, fields, models, tools 

14from odoo.api import SUPERUSER_ID 

15from odoo.exceptions import AccessDenied, ValidationError 

16from odoo.fields import Domain 

17from odoo.http import request 

18 

19_logger = logging.getLogger(__name__) 

20 

21 

22class ResUsers(models.Model): 

23 _inherit = "res.users" 

24 

25 saas_provider_uuid = fields.Char( 

26 "SaaS Provider UUID", 

27 readonly=False, 

28 copy=False, 

29 ) 

30 

31 def _get_login_domain(self, login): 

32 """Allow to connect with __system__ and admin""" 

33 res = super()._get_login_domain(login=login) 

34 if login in ["__system__", "admin"]: 

35 res &= Domain([("active", "=", False)]) | Domain([("active", "=", True)]) 

36 return res 

37 

38 @api.model 

39 @tools.ormcache("uid", "passwd") 

40 def _check_uid_passwd(self, uid, passwd): 

41 """Verifies that the given (uid, password) is authorized and 

42 raise an exception if it is not.""" 

43 if uid in (SUPERUSER_ID, 2): 

44 # superuser and admin: always allowed 

45 if not passwd: 

46 # empty passwords disallowed for obvious security reasons 

47 raise AccessDenied() 

48 with self._assert_can_auth(user=uid): 

49 user = self.with_user(uid).env.user 

50 credential = {"login": user.login, "password": passwd, "type": "password"} 

51 user._check_credentials(credential, {"interactive": False}) 

52 else: 

53 super()._check_uid_passwd(uid, passwd) 

54 

55 @api.constrains("group_ids") 

56 def _check_at_least_one_administrator(self): 

57 """Call the original method with active_test=False to include the archived admin user.""" 

58 return super(ResUsers, self.with_context(active_test=False))._check_at_least_one_administrator() 

59 

60 def action_reset_password(self): 

61 self._check_password_change() 

62 return super(ResUsers, self).action_reset_password() 

63 

64 @api.constrains("password", "login", "active", "name", "email") 

65 def _check_password_change(self): 

66 for rec in self: 

67 if rec.id in [SUPERUSER_ID, 2] and rec.env.user.id not in [SUPERUSER_ID, 2]: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 raise ValidationError(_("Only Admin can change his password and login")) 

69 

70 @api.constrains("group_ids") 

71 def _check_groups_change(self): 

72 for rec in self: 

73 if rec.id in [SUPERUSER_ID, 2] and not rec.has_group("base.group_system"): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 raise ValidationError( 

75 _("The group '%s' can't be removed from user '%s'") 

76 % ( 

77 self.env.ref("base.group_system").name, 

78 rec.name, 

79 ) 

80 ) 

81 

82 def _check_credentials(self, credential, env): 

83 try: 

84 return super(ResUsers, self)._check_credentials(credential, env) 

85 except AccessDenied: 

86 # If `auth_admin` is set on the session, it's because we have authenticated 

87 # with the controller. See: meth:`controllers.auth.OAuthController.saas_auth` 

88 if request and hasattr(request, "session") and request.session.get("auth_admin"): 

89 _logger.warning( 

90 "_check_credentials for user id: " 

91 + str(request.session.uid) 

92 + " original user id: " 

93 + str(request.session.get("auth_admin")) 

94 ) 

95 return { 

96 "uid": request.session.uid, 

97 "auth_method": "password", 

98 "mfa": "default", 

99 } 

100 # Otherwise, we attempt to login with the hashed password. 

101 # This is used when connecting through odooly, mostly. 

102 elif ( 

103 credential["type"] == "password" 

104 and (password := credential.get("password")) 

105 and password.startswith("_hash##") 

106 ): 

107 dummy, u_login, e_expires, o_org_user_id, hash_ = password.split("##") 

108 user = self._check_admin_auth_login(u_login, e_expires, o_org_user_id, hash_) 

109 if user.id: 

110 return { 

111 "uid": user.id, 

112 "auth_method": "password", 

113 "mfa": "default", 

114 } 

115 else: 

116 raise AccessDenied("Credentials error") 

117 

118 @api.model 

119 def _check_admin_auth_login(self, u_login, e_expires, o_org_user_id, hash_): 

120 """ 

121 Checks that the parameters are valid and that the user exists. 

122 

123 :param u_login: Desired user id to login as. 

124 :param e_expires: Expiration timestamp 

125 :param o_org_user_id: Original user id. 

126 :param hash_: HMAC generated hash 

127 :return: `res.users` 

128 """ 

129 

130 now = datetime.utcnow() 

131 now = int(mktime(now.timetuple())) 

132 

133 key = self.env["ir.config_parameter"].sudo().get_param("saas_client.database_uuid") 

134 if not key: 

135 raise AccessDenied("No saas_client.database_uuid on client database") 

136 

137 myh = hmac.new(key.encode(), str(str(u_login) + str(e_expires) + str(o_org_user_id)).encode(), sha256) 

138 

139 if not hmac.compare_digest(hash_, myh.hexdigest()): 

140 raise AccessDenied("Invalid Request") 

141 

142 # TODO: Check cache and reincorporate 

143 # fifteen = now + (15 * 60) 

144 # if not (now <= int(e_expires) <= fifteen): 

145 # raise AccessDenied('Expired') 

146 

147 user = self.env["res.users"].sudo().with_context(active_test=False).search([("login", "=", u_login)], limit=1) 

148 if not user.id: 

149 raise AccessDenied("Invalid User") 

150 return user 

151 

152 def _post_request_on_saas_provider(self, path, params=None): 

153 """ 

154 Permitimos mandar por contexto host, saas_db_uuid y saas_provider_uuid 

155 para poder usarlo desde nuestra base de datos en la carga de tickets. 

156 Tal vez se pueda usar para algo mas 

157 """ 

158 self and self.ensure_one() 

159 host = self.env.context.get( 

160 "host", 

161 self.env["ir.config_parameter"] 

162 .sudo() 

163 .get_param( 

164 "saas_client.provider_url", 

165 ), 

166 ) 

167 saas_db_uuid = self.env.context.get( 

168 "saas_db_uuid", 

169 self.env["ir.config_parameter"] 

170 .sudo() 

171 .get_param( 

172 "saas_client.database_uuid", 

173 ), 

174 ) 

175 

176 if not host or not saas_db_uuid: 

177 raise ValidationError(_("Not Provider url or saas database uuid configured")) 

178 

179 url = "%s%s" % (host, path) 

180 headers = {"content-type": "application/json"} 

181 

182 if not params: 

183 params = {} 

184 params.update( 

185 { 

186 "saas_db_uuid": saas_db_uuid, 

187 "saas_uuid": self.env.context.get("saas_provider_uuid", self.saas_provider_uuid), 

188 } 

189 ) 

190 

191 data = { 

192 "jsonrpc": "2.0", 

193 "method": "call", 

194 "params": params, 

195 } 

196 

197 res = requests.post( 

198 url, 

199 data=json.dumps(data), 

200 headers=headers, 

201 timeout=30, 

202 ) 

203 result = res.json().get("result") 

204 if not isinstance(result, dict): 

205 raise ValidationError( 

206 _('Post "%s" on host "%s" must return a dictionary but returns:\n%s') % (path, host, result) 

207 ) 

208 if result.get("error"): 

209 _logger.info('Could not post "%s" on host "%s". This is what we get: %s', path, host, result.get("error")) 

210 raise ValidationError(result.get("error")) 

211 return result