Coverage for adhoc-cicd-odoo-odoo / odoo / tools / _vendor / sessions.py: 28%
122 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
1# -*- coding: utf-8 -*-
2r"""
3 Vendored copy of https://github.com/pallets/werkzeug/blob/2b2c4c3dd3cf7389e9f4aa06371b7332257c6289/src/werkzeug/contrib/sessions.py
5 werkzeug.contrib was removed from werkzeug 1.0. sessions (and secure
6 cookies) were moved to the secure-cookies package. Problem is distros
7 are starting to update werkzeug to 1.0 without having secure-cookies
8 (e.g. Arch has done so, Debian has updated python-werkzeug in
9 "experimental"), which will be problematic once that starts trickling
10 down onto more stable distros and people start deploying that.
12 Edited some to fix imports and remove some compatibility things
13 (mostly PY2) and the unnecessary (to us) SessionMiddleware
15 :copyright: 2007 Pallets
16 :license: BSD-3-Clause
17"""
18import logging
19import os
20import re
21import json
22import tempfile
23from hashlib import sha1
24from os import path, replace as rename
25from time import time
27from werkzeug.datastructures import CallbackDict
29_logger = logging.getLogger(__name__)
30_sha1_re = re.compile(r"^[a-f0-9]{40}$")
33def generate_key(salt=None):
34 if salt is None:
35 salt = repr(salt).encode("ascii")
36 return sha1(b"".join([salt, str(time()).encode("ascii"), os.urandom(30)])).hexdigest()
39class ModificationTrackingDict(CallbackDict):
40 __slots__ = ("modified", "on_update")
42 def __init__(self, *args, **kwargs):
43 def on_update(self):
44 self.modified = True
46 self.modified = False
47 CallbackDict.__init__(self, on_update=on_update)
48 dict.update(self, *args, **kwargs)
50 def copy(self):
51 """Create a flat copy of the dict."""
52 missing = object()
53 result = object.__new__(self.__class__)
54 for name in self.__slots__:
55 val = getattr(self, name, missing)
56 if val is not missing:
57 setattr(result, name, val)
58 return result
60 def __copy__(self):
61 return self.copy()
64class Session(ModificationTrackingDict):
65 """Subclass of a dict that keeps track of direct object changes. Changes
66 in mutable structures are not tracked, for those you have to set
67 `modified` to `True` by hand.
68 """
70 __slots__ = ModificationTrackingDict.__slots__ + ("sid", "new")
72 def __init__(self, data, sid, new=False):
73 ModificationTrackingDict.__init__(self, data)
74 self.sid = sid
75 self.new = new
77 def __repr__(self):
78 return "<%s %s%s>" % (
79 self.__class__.__name__,
80 dict.__repr__(self),
81 "*" if self.should_save else "",
82 )
84 @property
85 def should_save(self):
86 """True if the session should be saved.
88 .. versionchanged:: 0.6
89 By default the session is now only saved if the session is
90 modified, not if it is new like it was before.
91 """
92 return self.modified
95class SessionStore(object):
96 """Baseclass for all session stores. The Werkzeug contrib module does not
97 implement any useful stores besides the filesystem store, application
98 developers are encouraged to create their own stores.
100 :param session_class: The session class to use. Defaults to
101 :class:`Session`.
102 """
104 def __init__(self, session_class=None):
105 if session_class is None:
106 session_class = Session
107 self.session_class = session_class
109 def is_valid_key(self, key):
110 """Check if a key has the correct format."""
111 return _sha1_re.match(key) is not None
113 def generate_key(self, salt=None):
114 """Simple function that generates a new session key."""
115 return generate_key(salt)
117 def new(self):
118 """Generate a new session."""
119 return self.session_class({}, self.generate_key(), True)
121 def save(self, session):
122 """Save a session."""
124 def save_if_modified(self, session):
125 """Save if a session class wants an update."""
126 if session.should_save:
127 self.save(session)
129 def delete(self, session):
130 """Delete a session."""
132 def get(self, sid):
133 """Get a session for this sid or a new session object. This method
134 has to check if the session key is valid and create a new session if
135 that wasn't the case.
136 """
137 return self.session_class({}, sid, True)
140#: used for temporary files by the filesystem session store
141_fs_transaction_suffix = ".__wz_sess"
144class FilesystemSessionStore(SessionStore):
145 """Simple example session store that saves sessions on the filesystem.
146 This store works best on POSIX systems and Windows Vista / Windows
147 Server 2008 and newer.
149 .. versionchanged:: 0.6
150 `renew_missing` was added. Previously this was considered `True`,
151 now the default changed to `False` and it can be explicitly
152 deactivated.
154 :param path: the path to the folder used for storing the sessions.
155 If not provided the default temporary directory is used.
156 :param filename_template: a string template used to give the session
157 a filename. ``%s`` is replaced with the
158 session id.
159 :param session_class: The session class to use. Defaults to
160 :class:`Session`.
161 :param renew_missing: set to `True` if you want the store to
162 give the user a new sid if the session was
163 not yet saved.
164 """
166 def __init__(
167 self,
168 path=None,
169 filename_template="werkzeug_%s.sess",
170 session_class=None,
171 renew_missing=False,
172 mode=0o644,
173 ):
174 SessionStore.__init__(self, session_class)
175 if path is None:
176 path = tempfile.gettempdir()
177 self.path = path
178 assert not filename_template.endswith(_fs_transaction_suffix), (
179 "filename templates may not end with %s" % _fs_transaction_suffix
180 )
181 self.filename_template = filename_template
182 self.renew_missing = renew_missing
183 self.mode = mode
185 def get_session_filename(self, sid):
186 # out of the box, this should be a strict ASCII subset but
187 # you might reconfigure the session object to have a more
188 # arbitrary string.
189 return path.join(self.path, self.filename_template % sid)
191 def save(self, session):
192 fn = self.get_session_filename(session.sid)
193 fd, tmp = tempfile.mkstemp(suffix=_fs_transaction_suffix, dir=self.path)
194 f = os.fdopen(fd, "w", encoding="utf-8")
195 try:
196 json.dump(dict(session), f)
197 finally:
198 f.close()
199 try:
200 rename(tmp, fn)
201 os.chmod(fn, self.mode)
202 except (IOError, OSError):
203 pass
205 def delete(self, session):
206 fn = self.get_session_filename(session.sid)
207 try:
208 os.unlink(fn)
209 except OSError:
210 pass
212 def get(self, sid):
213 if not self.is_valid_key(sid):
214 return self.new()
215 try:
216 f = open(self.get_session_filename(sid), "r", encoding="utf-8")
217 except IOError:
218 _logger.debug('Could not load session from disk. Use empty session.', exc_info=True)
219 if self.renew_missing:
220 return self.new()
221 data = {}
222 else:
223 try:
224 try:
225 data = json.load(f)
226 except Exception:
227 _logger.debug('Could not load session data. Use empty session.', exc_info=True)
228 data = {}
229 finally:
230 f.close()
231 return self.session_class(data, sid, False)
233 def list(self):
234 """Lists all sessions in the store.
236 .. versionadded:: 0.6
237 """
238 before, after = self.filename_template.split("%s", 1)
239 filename_re = re.compile(
240 r"%s(.{5,})%s$" % (re.escape(before), re.escape(after))
241 )
242 result = []
243 for filename in os.listdir(self.path):
244 #: this is a session that is still being saved.
245 if filename.endswith(_fs_transaction_suffix):
246 continue
247 match = filename_re.match(filename)
248 if match is not None:
249 result.append(match.group(1))
250 return result