Coverage for adhoc-cicd-odoo-odoo / odoo / tools / _vendor / sessions.py: 28%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# -*- coding: utf-8 -*- 

2r""" 

3 Vendored copy of https://github.com/pallets/werkzeug/blob/2b2c4c3dd3cf7389e9f4aa06371b7332257c6289/src/werkzeug/contrib/sessions.py 

4 

5 werkzeug.contrib was removed from werkzeug 1.0. sessions (and secure 

6 cookies) were moved to the secure-cookies package. Problem is distros 

7 are starting to update werkzeug to 1.0 without having secure-cookies 

8 (e.g. Arch has done so, Debian has updated python-werkzeug in 

9 "experimental"), which will be problematic once that starts trickling 

10 down onto more stable distros and people start deploying that. 

11 

12 Edited some to fix imports and remove some compatibility things 

13 (mostly PY2) and the unnecessary (to us) SessionMiddleware 

14 

15 :copyright: 2007 Pallets 

16 :license: BSD-3-Clause 

17""" 

18import logging 

19import os 

20import re 

21import json 

22import tempfile 

23from hashlib import sha1 

24from os import path, replace as rename 

25from time import time 

26 

27from werkzeug.datastructures import CallbackDict 

28 

29_logger = logging.getLogger(__name__) 

30_sha1_re = re.compile(r"^[a-f0-9]{40}$") 

31 

32 

33def generate_key(salt=None): 

34 if salt is None: 

35 salt = repr(salt).encode("ascii") 

36 return sha1(b"".join([salt, str(time()).encode("ascii"), os.urandom(30)])).hexdigest() 

37 

38 

39class ModificationTrackingDict(CallbackDict): 

40 __slots__ = ("modified", "on_update") 

41 

42 def __init__(self, *args, **kwargs): 

43 def on_update(self): 

44 self.modified = True 

45 

46 self.modified = False 

47 CallbackDict.__init__(self, on_update=on_update) 

48 dict.update(self, *args, **kwargs) 

49 

50 def copy(self): 

51 """Create a flat copy of the dict.""" 

52 missing = object() 

53 result = object.__new__(self.__class__) 

54 for name in self.__slots__: 

55 val = getattr(self, name, missing) 

56 if val is not missing: 

57 setattr(result, name, val) 

58 return result 

59 

60 def __copy__(self): 

61 return self.copy() 

62 

63 

64class Session(ModificationTrackingDict): 

65 """Subclass of a dict that keeps track of direct object changes. Changes 

66 in mutable structures are not tracked, for those you have to set 

67 `modified` to `True` by hand. 

68 """ 

69 

70 __slots__ = ModificationTrackingDict.__slots__ + ("sid", "new") 

71 

72 def __init__(self, data, sid, new=False): 

73 ModificationTrackingDict.__init__(self, data) 

74 self.sid = sid 

75 self.new = new 

76 

77 def __repr__(self): 

78 return "<%s %s%s>" % ( 

79 self.__class__.__name__, 

80 dict.__repr__(self), 

81 "*" if self.should_save else "", 

82 ) 

83 

84 @property 

85 def should_save(self): 

86 """True if the session should be saved. 

87 

88 .. versionchanged:: 0.6 

89 By default the session is now only saved if the session is 

90 modified, not if it is new like it was before. 

91 """ 

92 return self.modified 

93 

94 

95class SessionStore(object): 

96 """Baseclass for all session stores. The Werkzeug contrib module does not 

97 implement any useful stores besides the filesystem store, application 

98 developers are encouraged to create their own stores. 

99 

100 :param session_class: The session class to use. Defaults to 

101 :class:`Session`. 

102 """ 

103 

104 def __init__(self, session_class=None): 

105 if session_class is None: 

106 session_class = Session 

107 self.session_class = session_class 

108 

109 def is_valid_key(self, key): 

110 """Check if a key has the correct format.""" 

111 return _sha1_re.match(key) is not None 

112 

113 def generate_key(self, salt=None): 

114 """Simple function that generates a new session key.""" 

115 return generate_key(salt) 

116 

117 def new(self): 

118 """Generate a new session.""" 

119 return self.session_class({}, self.generate_key(), True) 

120 

121 def save(self, session): 

122 """Save a session.""" 

123 

124 def save_if_modified(self, session): 

125 """Save if a session class wants an update.""" 

126 if session.should_save: 

127 self.save(session) 

128 

129 def delete(self, session): 

130 """Delete a session.""" 

131 

132 def get(self, sid): 

133 """Get a session for this sid or a new session object. This method 

134 has to check if the session key is valid and create a new session if 

135 that wasn't the case. 

136 """ 

137 return self.session_class({}, sid, True) 

138 

139 

140#: used for temporary files by the filesystem session store 

141_fs_transaction_suffix = ".__wz_sess" 

142 

143 

144class FilesystemSessionStore(SessionStore): 

145 """Simple example session store that saves sessions on the filesystem. 

146 This store works best on POSIX systems and Windows Vista / Windows 

147 Server 2008 and newer. 

148 

149 .. versionchanged:: 0.6 

150 `renew_missing` was added. Previously this was considered `True`, 

151 now the default changed to `False` and it can be explicitly 

152 deactivated. 

153 

154 :param path: the path to the folder used for storing the sessions. 

155 If not provided the default temporary directory is used. 

156 :param filename_template: a string template used to give the session 

157 a filename. ``%s`` is replaced with the 

158 session id. 

159 :param session_class: The session class to use. Defaults to 

160 :class:`Session`. 

161 :param renew_missing: set to `True` if you want the store to 

162 give the user a new sid if the session was 

163 not yet saved. 

164 """ 

165 

166 def __init__( 

167 self, 

168 path=None, 

169 filename_template="werkzeug_%s.sess", 

170 session_class=None, 

171 renew_missing=False, 

172 mode=0o644, 

173 ): 

174 SessionStore.__init__(self, session_class) 

175 if path is None: 

176 path = tempfile.gettempdir() 

177 self.path = path 

178 assert not filename_template.endswith(_fs_transaction_suffix), ( 

179 "filename templates may not end with %s" % _fs_transaction_suffix 

180 ) 

181 self.filename_template = filename_template 

182 self.renew_missing = renew_missing 

183 self.mode = mode 

184 

185 def get_session_filename(self, sid): 

186 # out of the box, this should be a strict ASCII subset but 

187 # you might reconfigure the session object to have a more 

188 # arbitrary string. 

189 return path.join(self.path, self.filename_template % sid) 

190 

191 def save(self, session): 

192 fn = self.get_session_filename(session.sid) 

193 fd, tmp = tempfile.mkstemp(suffix=_fs_transaction_suffix, dir=self.path) 

194 f = os.fdopen(fd, "w", encoding="utf-8") 

195 try: 

196 json.dump(dict(session), f) 

197 finally: 

198 f.close() 

199 try: 

200 rename(tmp, fn) 

201 os.chmod(fn, self.mode) 

202 except (IOError, OSError): 

203 pass 

204 

205 def delete(self, session): 

206 fn = self.get_session_filename(session.sid) 

207 try: 

208 os.unlink(fn) 

209 except OSError: 

210 pass 

211 

212 def get(self, sid): 

213 if not self.is_valid_key(sid): 

214 return self.new() 

215 try: 

216 f = open(self.get_session_filename(sid), "r", encoding="utf-8") 

217 except IOError: 

218 _logger.debug('Could not load session from disk. Use empty session.', exc_info=True) 

219 if self.renew_missing: 

220 return self.new() 

221 data = {} 

222 else: 

223 try: 

224 try: 

225 data = json.load(f) 

226 except Exception: 

227 _logger.debug('Could not load session data. Use empty session.', exc_info=True) 

228 data = {} 

229 finally: 

230 f.close() 

231 return self.session_class(data, sid, False) 

232 

233 def list(self): 

234 """Lists all sessions in the store. 

235 

236 .. versionadded:: 0.6 

237 """ 

238 before, after = self.filename_template.split("%s", 1) 

239 filename_re = re.compile( 

240 r"%s(.{5,})%s$" % (re.escape(before), re.escape(after)) 

241 ) 

242 result = [] 

243 for filename in os.listdir(self.path): 

244 #: this is a session that is still being saved. 

245 if filename.endswith(_fs_transaction_suffix): 

246 continue 

247 match = filename_re.match(filename) 

248 if match is not None: 

249 result.append(match.group(1)) 

250 return result