Coverage for adhoc-cicd-odoo-odoo / odoo / service / server.py: 23%

1069 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1#----------------------------------------------------------- 

2# Threaded, Gevent and Prefork Servers 

3#----------------------------------------------------------- 

4import contextlib 

5import collections 

6import datetime 

7import errno 

8import logging 

9import os 

10import os.path 

11import platform 

12import random 

13import select 

14import signal 

15import socket 

16import subprocess 

17import sys 

18import threading 

19import time 

20from collections import deque 

21from io import BytesIO 

22 

23import psutil 

24import werkzeug.serving 

25from werkzeug .urls import uri_to_iri 

26 

27if os.name == 'posix': 27 ↛ 40line 27 didn't jump to line 40 because the condition on line 27 was always true

28 # Unix only for workers 

29 import fcntl 

30 import resource 

31 try: 

32 import inotify 

33 from inotify.adapters import InotifyTrees 

34 from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO 

35 INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO 

36 except ImportError: 

37 inotify = None 

38else: 

39 # Windows shim 

40 signal.SIGHUP = -1 

41 inotify = None 

42 

43if not inotify: 43 ↛ 52line 43 didn't jump to line 52 because the condition on line 43 was always true

44 try: 

45 import watchdog 

46 from watchdog.observers import Observer 

47 from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent 

48 except ImportError: 

49 watchdog = None 

50 

51# Optional process names for workers 

52try: 

53 from setproctitle import setproctitle 

54except ImportError: 

55 setproctitle = lambda x: None 

56 

57from odoo import api, sql_db 

58from odoo.modules.registry import Registry 

59from odoo.release import nt_service_name 

60from odoo.tools import config, gc, osutil, OrderedSet, profiler 

61from odoo.tools.cache import log_ormcache_stats 

62from odoo.tools.misc import stripped_sys_argv, dumpstacks 

63from .db import list_dbs 

64 

65_logger = logging.getLogger(__name__) 

66 

67SLEEP_INTERVAL = 60 # 1 min 

68 

69 

70# A global-ish object, each thread/worker uses its own 

71thread_local = threading.local() 

72 

73# the model and method name that was called via rpc, for logging 

74thread_local.rpc_model_method = '' 

75 

76 

77def memory_info(process): 

78 """ 

79 :return: the relevant memory usage according to the OS in bytes. 

80 """ 

81 # psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info 

82 pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)() 

83 # MacOSX allocates very large vms to all processes so we only monitor the rss usage. 

84 if platform.system() == 'Darwin': 

85 return pmem.rss 

86 return pmem.vms 

87 

88 

89def set_limit_memory_hard(): 

90 if platform.system() != 'Linux': 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return 

92 limit_memory_hard = config['limit_memory_hard'] 

93 import odoo # for eventd 

94 if odoo.evented and config['limit_memory_hard_gevent']: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 limit_memory_hard = config['limit_memory_hard_gevent'] 

96 if limit_memory_hard: 96 ↛ exitline 96 didn't return from function 'set_limit_memory_hard' because the condition on line 96 was always true

97 rlimit = resource.RLIMIT_AS 

98 soft, hard = resource.getrlimit(rlimit) 

99 resource.setrlimit(rlimit, (limit_memory_hard, hard)) 

100 

101def empty_pipe(fd): 

102 try: 

103 while os.read(fd, 1): 

104 pass 

105 except OSError as e: 

106 if e.errno not in [errno.EAGAIN]: 

107 raise 

108 

109 

110def cron_database_list(): 

111 return config['db_name'] or list_dbs(True) 

112 

113 

114#---------------------------------------------------------- 

115# Werkzeug WSGI servers patched 

116#---------------------------------------------------------- 

117class LoggingBaseWSGIServerMixIn(object): 

118 def handle_error(self, request, client_address): 

119 t, e, _ = sys.exc_info() 

120 if t == socket.error and e.errno == errno.EPIPE: 

121 # broken pipe, ignore error 

122 return 

123 _logger.exception('Exception happened during processing of request from %s', client_address) 

124 

125class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer): 

126 """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer 

127 use this class, sets the socket and calls the process_request() manually 

128 """ 

129 def __init__(self, app): 

130 werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app, handler=CommonRequestHandler) 

131 # Directly close the socket. It will be replaced by WorkerHTTP when processing requests 

132 if self.socket: 

133 self.socket.close() 

134 

135 def server_activate(self): 

136 # dont listen as we use PreforkServer#socket 

137 pass 

138 

139class CommonRequestHandler(werkzeug.serving.WSGIRequestHandler): 

140 def log_request(self, code = "-", size = "-"): 

141 try: 

142 path = uri_to_iri(self.path) 

143 fragment = thread_local.rpc_model_method 

144 if fragment: 

145 path += '#' + fragment 

146 msg = f"{self.command} {path} {self.request_version}" 

147 except AttributeError: 

148 # path isn't set if the requestline was bad 

149 msg = self.requestline 

150 

151 code = str(code) 

152 

153 if code[0] == "1": # 1xx - Informational 

154 msg = werkzeug.serving._ansi_style(msg, "bold") 

155 elif code == "200": # 2xx - Success 

156 pass 

157 elif code == "304": # 304 - Resource Not Modified 

158 msg = werkzeug.serving._ansi_style(msg, "cyan") 

159 elif code[0] == "3": # 3xx - Redirection 

160 msg = werkzeug.serving._ansi_style(msg, "green") 

161 elif code == "404": # 404 - Resource Not Found 

162 msg = werkzeug.serving._ansi_style(msg, "yellow") 

163 elif code[0] == "4": # 4xx - Client Error 

164 msg = werkzeug.serving._ansi_style(msg, "bold", "red") 

165 else: # 5xx, or any other response 

166 msg = werkzeug.serving._ansi_style(msg, "bold", "magenta") 

167 

168 self.log("info", '"%s" %s %s', msg, code, size) 

169 

170 

171class RequestHandler(CommonRequestHandler): 

172 def setup(self): 

173 # timeout to avoid chrome headless preconnect during tests 

174 if config['test_enable']: 

175 self.timeout = 5 

176 # flag the current thread as handling a http request 

177 super(RequestHandler, self).setup() 

178 me = threading.current_thread() 

179 me.name = 'odoo.service.http.request.%s' % (me.ident,) 

180 

181 def make_environ(self): 

182 environ = super().make_environ() 

183 # Add the TCP socket to environ in order for the websocket 

184 # connections to use it. 

185 environ['socket'] = self.connection 

186 if self.headers.get('Upgrade') == 'websocket': 

187 # Since the upgrade header is introduced in version 1.1, Firefox 

188 # won't accept a websocket connection if the version is set to 

189 # 1.0. 

190 self.protocol_version = "HTTP/1.1" 

191 return environ 

192 

193 def send_header(self, keyword, value): 

194 # Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 ) 

195 # since it is incompatible with websocket. 

196 if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close': 

197 # Do not keep processing requests. 

198 self.close_connection = True 

199 return 

200 super().send_header(keyword, value) 

201 

202 def end_headers(self, *a, **kw): 

203 super().end_headers(*a, **kw) 

204 # At this point, Werkzeug assumes the connection is closed and will discard any incoming 

205 # data. In the case of WebSocket connections, data should not be discarded. Replace the 

206 # rfile/wfile of this handler to prevent any further action (compatibility with werkzeug >= 2.3.x). 

207 # See: https://github.com/pallets/werkzeug/blob/2.3.x/src/werkzeug/serving.py#L334 

208 if self.headers.get('Upgrade') == 'websocket': 

209 self.rfile = BytesIO() 

210 self.wfile = BytesIO() 

211 

212 def log_error(self, format, *args): 

213 if format == "Request timed out: %r" and config['test_enable']: 

214 _logger.info(format, *args) 

215 else: 

216 super().log_error(format, *args) 

217 

218class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer): 

219 """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket 

220 given by the environment, this is used by autoreload to keep the listen 

221 socket open when a reload happens. 

222 """ 

223 def __init__(self, host, port, app): 

224 # The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent 

225 # socket connections accepted by a threaded server, implicitly limiting the amount of 

226 # concurrent threads running for http requests handling. 

227 self.max_http_threads = os.environ.get("ODOO_MAX_HTTP_THREADS") 

228 if self.max_http_threads: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 try: 

230 self.max_http_threads = int(self.max_http_threads) 

231 except ValueError: 

232 # If the value can't be parsed to an integer then it's computed in an automated way to 

233 # half the size of db_maxconn because while most requests won't borrow cursors concurrently 

234 # there are some exceptions where some controllers might allocate two or more cursors. 

235 self.max_http_threads = max((config['db_maxconn'] - config['max_cron_threads']) // 2, 1) 

236 self.http_threads_sem = threading.Semaphore(self.max_http_threads) 

237 super(ThreadedWSGIServerReloadable, self).__init__(host, port, app, 

238 handler=RequestHandler) 

239 

240 # See https://github.com/pallets/werkzeug/pull/770 

241 # This allow the request threads to not be set as daemon 

242 # so the server waits for them when shutting down gracefully. 

243 self.daemon_threads = False 

244 

245 def server_bind(self): 

246 SD_LISTEN_FDS_START = 3 

247 if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()): 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 self.reload_socket = True 

249 self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM) 

250 _logger.info('HTTP service (werkzeug) running through socket activation') 

251 else: 

252 self.reload_socket = False 

253 super(ThreadedWSGIServerReloadable, self).server_bind() 

254 _logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port) 

255 

256 def server_activate(self): 

257 if not self.reload_socket: 257 ↛ exitline 257 didn't return from function 'server_activate' because the condition on line 257 was always true

258 super(ThreadedWSGIServerReloadable, self).server_activate() 

259 

260 def process_request(self, request, client_address): 

261 """ 

262 Start a new thread to process the request. 

263 Override the default method of class socketserver.ThreadingMixIn 

264 to be able to get the thread object which is instantiated 

265 and set its start time as an attribute 

266 """ 

267 t = threading.Thread(target = self.process_request_thread, 

268 args = (request, client_address)) 

269 t.daemon = self.daemon_threads 

270 t.type = 'http' 

271 t.start_time = time.time() 

272 t.start() 

273 

274 def _handle_request_noblock(self): 

275 if self.max_http_threads and not self.http_threads_sem.acquire(timeout=0.1): 

276 # If the semaphore is full we will return immediately to the upstream (most probably 

277 # socketserver.BaseServer's serve_forever loop which will retry immediately as the 

278 # selector will find a pending connection to accept on the socket. There is a 100 ms 

279 # penalty in such case in order to avoid cpu bound loop while waiting for the semaphore. 

280 return 

281 # upstream _handle_request_noblock will handle errors and call shutdown_request in any cases 

282 super(ThreadedWSGIServerReloadable, self)._handle_request_noblock() 

283 

284 def shutdown_request(self, request): 

285 if self.max_http_threads: 

286 # upstream is supposed to call this function no matter what happens during processing 

287 self.http_threads_sem.release() 

288 super().shutdown_request(request) 

289 

290#---------------------------------------------------------- 

291# FileSystem Watcher for autoreload and cache invalidation 

292#---------------------------------------------------------- 

293class FSWatcherBase(object): 

294 def handle_file(self, path): 

295 if path.endswith('.py') and not os.path.basename(path).startswith('.~'): 

296 try: 

297 source = open(path, 'rb').read() + b'\n' 

298 compile(source, path, 'exec') 

299 except IOError: 

300 _logger.error('autoreload: python code change detected, IOError for %s', path) 

301 except SyntaxError: 

302 _logger.error('autoreload: python code change detected, SyntaxError in %s', path) 

303 else: 

304 if not server_phoenix: 

305 _logger.info('autoreload: python code updated, autoreload activated') 

306 restart() 

307 return True 

308 

309 

310class FSWatcherWatchdog(FSWatcherBase): 

311 def __init__(self): 

312 self.observer = Observer() 

313 import odoo.addons # noqa: PLC0415 

314 for path in odoo.addons.__path__: 

315 _logger.info('Watching addons folder %s', path) 

316 self.observer.schedule(self, path, recursive=True) 

317 

318 def dispatch(self, event): 

319 if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)): 

320 if not event.is_directory: 

321 path = getattr(event, 'dest_path', '') or event.src_path 

322 self.handle_file(path) 

323 

324 def start(self): 

325 self.observer.start() 

326 _logger.info('AutoReload watcher running with watchdog') 

327 

328 def stop(self): 

329 self.observer.stop() 

330 self.observer.join() 

331 

332 

333class FSWatcherInotify(FSWatcherBase): 

334 def __init__(self): 

335 self.started = False 

336 # ignore warnings from inotify in case we have duplicate addons paths. 

337 inotify.adapters._LOGGER.setLevel(logging.ERROR) 

338 # recreate a list as InotifyTrees' __init__ deletes the list's items 

339 paths_to_watch = [] 

340 import odoo.addons # noqa: PLC0415 

341 for path in odoo.addons.__path__: 

342 paths_to_watch.append(path) 

343 _logger.info('Watching addons folder %s', path) 

344 self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5) 

345 

346 def run(self): 

347 _logger.info('AutoReload watcher running with inotify') 

348 dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE')) 

349 while self.started: 

350 for event in self.watcher.event_gen(timeout_s=0, yield_nones=False): 

351 (_, type_names, path, filename) = event 

352 if 'IN_ISDIR' not in type_names: 

353 # despite not having IN_DELETE in the watcher's mask, the 

354 # watcher sends these events when a directory is deleted. 

355 if 'IN_DELETE' not in type_names: 

356 full_path = os.path.join(path, filename) 

357 if self.handle_file(full_path): 

358 return 

359 elif dir_creation_events.intersection(type_names): 

360 full_path = os.path.join(path, filename) 

361 for root, _, files in os.walk(full_path): 

362 for file in files: 

363 if self.handle_file(os.path.join(root, file)): 

364 return 

365 

366 def start(self): 

367 self.started = True 

368 self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher") 

369 self.thread.daemon = True 

370 self.thread.start() 

371 

372 def stop(self): 

373 self.started = False 

374 self.thread.join() 

375 del self.watcher # ensures inotify watches are freed up before reexec 

376 

377 

378#---------------------------------------------------------- 

379# Servers: Threaded, Gevented and Prefork 

380#---------------------------------------------------------- 

381 

382class CommonServer(object): 

383 _on_stop_funcs = [] 

384 

385 def __init__(self, app): 

386 self.app = app 

387 # config 

388 self.interface = config['http_interface'] or '0.0.0.0' 

389 self.port = config['http_port'] 

390 # runtime 

391 self.pid = os.getpid() 

392 

393 def close_socket(self, sock): 

394 """ Closes a socket instance cleanly 

395 :param sock: the network socket to close 

396 :type sock: socket.socket 

397 """ 

398 try: 

399 sock.shutdown(socket.SHUT_RDWR) 

400 except socket.error as e: 

401 if e.errno == errno.EBADF: 

402 # Werkzeug > 0.9.6 closes the socket itself (see commit 

403 # https://github.com/mitsuhiko/werkzeug/commit/4d8ca089) 

404 return 

405 # On OSX, socket shutdowns both sides if any side closes it 

406 # causing an error 57 'Socket is not connected' on shutdown 

407 # of the other side (or something), see 

408 # http://bugs.python.org/issue4397 

409 # note: stdlib fixed test, not behavior 

410 if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']: 

411 raise 

412 sock.close() 

413 

414 @classmethod 

415 def on_stop(cls, func): 

416 """ Register a cleanup function to be executed when the server stops """ 

417 cls._on_stop_funcs.append(func) 

418 

419 def stop(self): 

420 for func in self._on_stop_funcs: 

421 try: 

422 _logger.debug("on_close call %s", func) 

423 func() 

424 except Exception: 

425 _logger.warning("Exception in %s", func.__name__, exc_info=True) 

426 

427 

428class ThreadedServer(CommonServer): 

429 def __init__(self, app): 

430 super(ThreadedServer, self).__init__(app) 

431 self.main_thread_id = threading.current_thread().ident 

432 # Variable keeping track of the number of calls to the signal handler defined 

433 # below. This variable is monitored by ``quit_on_signals()``. 

434 self.quit_signals_received = 0 

435 

436 #self.socket = None 

437 self.httpd = None 

438 self.limits_reached_threads = set() 

439 self.limit_reached_time = None 

440 

441 def signal_handler(self, sig, frame): 

442 if sig in [signal.SIGINT, signal.SIGTERM]: 

443 # shutdown on kill -INT or -TERM 

444 self.quit_signals_received += 1 

445 if self.quit_signals_received > 1: 

446 # logging.shutdown was already called at this point. 

447 sys.stderr.write("Forced shutdown.\n") 

448 os._exit(0) 

449 # interrupt run() to start shutdown 

450 raise KeyboardInterrupt() 

451 elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU: 

452 sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n") 

453 sys.stderr.flush() 

454 os._exit(0) 

455 elif sig == signal.SIGHUP: 

456 # restart on kill -HUP 

457 global server_phoenix # noqa: PLW0603 

458 server_phoenix = True 

459 self.quit_signals_received += 1 

460 # interrupt run() to start shutdown 

461 raise KeyboardInterrupt() 

462 

463 def process_limit(self): 

464 memory = memory_info(psutil.Process(os.getpid())) 

465 if config['limit_memory_soft'] and memory > config['limit_memory_soft']: 

466 _logger.warning('Server memory limit (%s) reached.', memory) 

467 self.limits_reached_threads.add(threading.current_thread()) 

468 

469 for thread in threading.enumerate(): 

470 thread_type = getattr(thread, 'type', None) 

471 if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron': 

472 # We apply the limits on cron threads and HTTP requests, 

473 # websocket requests excluded. 

474 if getattr(thread, 'start_time', None): 

475 thread_execution_time = time.time() - thread.start_time 

476 thread_limit_time_real = config['limit_time_real'] 

477 if (getattr(thread, 'type', None) == 'cron' and 

478 config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0): 

479 thread_limit_time_real = config['limit_time_real_cron'] 

480 if thread_limit_time_real and thread_execution_time > thread_limit_time_real: 

481 _logger.warning( 

482 'Thread %s virtual real time limit (%d/%ds) reached.', 

483 thread, thread_execution_time, thread_limit_time_real) 

484 self.limits_reached_threads.add(thread) 

485 # Clean-up threads that are no longer alive 

486 # e.g. threads that exceeded their real time, 

487 # but which finished before the server could restart. 

488 for thread in list(self.limits_reached_threads): 

489 if not thread.is_alive(): 

490 self.limits_reached_threads.remove(thread) 

491 if self.limits_reached_threads: 

492 self.limit_reached_time = self.limit_reached_time or time.time() 

493 else: 

494 self.limit_reached_time = None 

495 

496 def cron_thread(self, number): 

497 # Steve Reich timing style with thundering herd mitigation. 

498 # 

499 # On startup, all workers bind on a notification channel in 

500 # postgres so they can be woken up at will. At worst they wake 

501 # up every SLEEP_INTERVAL with a jitter. The jitter creates a 

502 # chorus effect that helps distribute on the timeline the moment 

503 # when individual worker wake up. 

504 # 

505 # On NOTIFY, all workers are awaken at the same time, sleeping 

506 # just a bit prevents they all poll the database at the exact 

507 # same time. This is known as the thundering herd effect. 

508 

509 from odoo.addons.base.models.ir_cron import IrCron # noqa: PLC0415 

510 

511 def _run_cron(cr): 

512 pg_conn = cr._cnx 

513 # LISTEN / NOTIFY doesn't work in recovery mode 

514 cr.execute("SELECT pg_is_in_recovery()") 

515 in_recovery = cr.fetchone()[0] 

516 if not in_recovery: 

517 cr.execute("LISTEN cron_trigger") 

518 else: 

519 _logger.warning("PG cluster in recovery mode, cron trigger not activated") 

520 cr.commit() 

521 check_all_time = 0.0 # last time that we listed databases, initialized far in the past 

522 all_db_names = [] 

523 alive_time = time.monotonic() 

524 while config['limit_time_worker_cron'] <= 0 or (time.monotonic() - alive_time) <= config['limit_time_worker_cron']: 

525 select.select([pg_conn], [], [], SLEEP_INTERVAL + number) 

526 time.sleep(number / 100) 

527 try: 

528 pg_conn.poll() 

529 except Exception: 

530 if pg_conn.closed: 

531 # connection closed, just exit the loop 

532 return 

533 raise 

534 notified = OrderedSet( 

535 notif.payload 

536 for notif in pg_conn.notifies 

537 if notif.channel == 'cron_trigger' 

538 ) 

539 pg_conn.notifies.clear() # free resources 

540 

541 if time.time() - SLEEP_INTERVAL > check_all_time: 

542 # check all databases 

543 # last time we checked them was `now - SLEEP_INTERVAL` 

544 check_all_time = time.time() 

545 # process notified databases first, then the other ones 

546 all_db_names = OrderedSet(cron_database_list()) 

547 db_names = [ 

548 *(db for db in notified if db in all_db_names), 

549 *(db for db in all_db_names if db not in notified), 

550 ] 

551 else: 

552 # restrict to notified databases only 

553 db_names = notified.intersection(all_db_names) 

554 if not db_names: 

555 continue 

556 

557 _logger.debug('cron%d polling for jobs (notified: %s)', number, notified) 

558 for db_name in db_names: 

559 thread = threading.current_thread() 

560 thread.start_time = time.time() 

561 try: 

562 IrCron._process_jobs(db_name) 

563 except Exception: 

564 _logger.warning('cron%d encountered an Exception:', number, exc_info=True) 

565 thread.start_time = None 

566 

567 while True: 

568 conn = sql_db.db_connect('postgres') 

569 with contextlib.closing(conn.cursor()) as cr: 

570 _run_cron(cr) 

571 cr._cnx.close() 

572 _logger.info('cron%d max age (%ss) reached, releasing connection.', number, config['limit_time_worker_cron']) 

573 

574 def cron_spawn(self): 

575 """ Start the above runner function in a daemon thread. 

576 

577 The thread is a typical daemon thread: it will never quit and must be 

578 terminated when the main process exits - with no consequence (the processing 

579 threads it spawns are not marked daemon). 

580 

581 """ 

582 for i in range(config['max_cron_threads']): 

583 t = threading.Thread(target=self.cron_thread, args=(i,), name=f"odoo.service.cron.cron{i}") 

584 t.daemon = True 

585 t.type = 'cron' 

586 t.start() 

587 _logger.debug("cron%d started!", i) 

588 

589 def http_spawn(self): 

590 self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, self.app) 

591 threading.Thread( 

592 target=self.httpd.serve_forever, 

593 name="odoo.service.httpd", 

594 daemon=True, 

595 ).start() 

596 

597 def start(self, stop=False): 

598 _logger.debug("Setting signal handlers") 

599 set_limit_memory_hard() 

600 if os.name == 'posix': 600 ↛ 609line 600 didn't jump to line 609 because the condition on line 600 was always true

601 signal.signal(signal.SIGINT, self.signal_handler) 

602 signal.signal(signal.SIGTERM, self.signal_handler) 

603 signal.signal(signal.SIGCHLD, self.signal_handler) 

604 signal.signal(signal.SIGHUP, self.signal_handler) 

605 signal.signal(signal.SIGXCPU, self.signal_handler) 

606 signal.signal(signal.SIGQUIT, dumpstacks) 

607 signal.signal(signal.SIGUSR1, log_ormcache_stats) 

608 signal.signal(signal.SIGUSR2, log_ormcache_stats) 

609 elif os.name == 'nt': 

610 import win32api 

611 win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1) 

612 

613 if config['test_enable'] or (config['http_enable'] and not stop): 613 ↛ exitline 613 didn't return from function 'start' because the condition on line 613 was always true

614 # some tests need the http daemon to be available... 

615 self.http_spawn() 

616 

617 def stop(self): 

618 """ Shutdown the WSGI server. Wait for non daemon threads. 

619 """ 

620 if server_phoenix: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true

621 _logger.info("Initiating server reload") 

622 else: 

623 _logger.info("Initiating shutdown") 

624 _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.") 

625 

626 stop_time = time.time() 

627 

628 if self.httpd: 628 ↛ 631line 628 didn't jump to line 631 because the condition on line 628 was always true

629 self.httpd.shutdown() 

630 

631 super().stop() 

632 

633 # Manually join() all threads before calling sys.exit() to allow a second signal 

634 # to trigger _force_quit() in case some non-daemon threads won't exit cleanly. 

635 # threading.Thread.join() should not mask signals (at least in python 2.5). 

636 me = threading.current_thread() 

637 _logger.debug('current thread: %r', me) 

638 for thread in threading.enumerate(): 

639 _logger.debug('process %r (%r)', thread, thread.daemon) 

640 if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and 640 ↛ 642line 640 didn't jump to line 642 because the condition on line 640 was never true

641 thread not in self.limits_reached_threads): 

642 while thread.is_alive() and (time.time() - stop_time) < 1: 

643 # We wait for requests to finish, up to 1 second. 

644 _logger.debug('join and sleep') 

645 # Need a busyloop here as thread.join() masks signals 

646 # and would prevent the forced shutdown. 

647 thread.join(0.05) 

648 time.sleep(0.05) 

649 

650 sql_db.close_all() 

651 

652 current_process = psutil.Process() 

653 children = current_process.children(recursive=False) 

654 for child in children: 654 ↛ 655line 654 didn't jump to line 655 because the loop on line 654 never started

655 _logger.info('A child process was found, pid is %s, process may hang', child) 

656 

657 _logger.debug('--') 

658 logging.shutdown() 

659 

660 def run(self, preload=None, stop=False): 

661 """ Start the http server and the cron thread then wait for a signal. 

662 

663 The first SIGINT or SIGTERM signal will initiate a graceful shutdown while 

664 a second one if any will force an immediate exit. 

665 """ 

666 with Registry._lock: 

667 self.start(stop=stop) 

668 rc = preload_registries(preload) 

669 

670 if stop: 670 ↛ 683line 670 didn't jump to line 683 because the condition on line 670 was always true

671 if config['test_enable']: 671 ↛ 680line 671 didn't jump to line 680 because the condition on line 671 was always true

672 from odoo.tests.result import _logger as logger # noqa: PLC0415 

673 with Registry.registries._lock: 

674 for db, registry in Registry.registries.items(): 

675 report = registry._assertion_report 

676 log = logger.error if not report.wasSuccessful() \ 

677 else logger.warning if not report.testsRun \ 

678 else logger.info 

679 log("%s when loading database %r", report, db) 

680 self.stop() 

681 return rc 

682 

683 self.cron_spawn() 

684 

685 # Wait for a first signal to be handled. (time.sleep will be interrupted 

686 # by the signal handler) 

687 try: 

688 while self.quit_signals_received == 0: 

689 self.process_limit() 

690 if self.limit_reached_time: 

691 has_other_valid_requests = any( 

692 not t.daemon and 

693 t not in self.limits_reached_threads 

694 for t in threading.enumerate() 

695 if getattr(t, 'type', None) == 'http') 

696 if (not has_other_valid_requests or 

697 (time.time() - self.limit_reached_time) > SLEEP_INTERVAL): 

698 # We wait there is no processing requests 

699 # other than the ones exceeding the limits, up to 1 min, 

700 # before asking for a reload. 

701 _logger.info('Dumping stacktrace of limit exceeding threads before reloading') 

702 dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads]) 

703 self.reload() 

704 # `reload` increments `self.quit_signals_received` 

705 # and the loop will end after this iteration, 

706 # therefore leading to the server stop. 

707 # `reload` also sets the `server_phoenix` flag 

708 # to tell the server to restart the server after shutting down. 

709 else: 

710 time.sleep(1) 

711 else: 

712 time.sleep(SLEEP_INTERVAL) 

713 except KeyboardInterrupt: 

714 pass 

715 

716 self.stop() 

717 

718 def reload(self): 

719 os.kill(self.pid, signal.SIGHUP) 

720 

721class GeventServer(CommonServer): 

722 def __init__(self, app): 

723 super(GeventServer, self).__init__(app) 

724 self.port = config['gevent_port'] 

725 self.httpd = None 

726 

727 def process_limits(self): 

728 restart = False 

729 if self.ppid != os.getppid(): 

730 _logger.warning("Gevent Parent changed: %s", self.pid) 

731 restart = True 

732 memory = memory_info(psutil.Process(self.pid)) 

733 limit_memory_soft = config['limit_memory_soft_gevent'] or config['limit_memory_soft'] 

734 if limit_memory_soft and memory > limit_memory_soft: 

735 _logger.warning('Gevent virtual memory limit reached: %s', memory) 

736 restart = True 

737 if restart: 

738 # suicide !! 

739 os.kill(self.pid, signal.SIGTERM) 

740 

741 def watchdog(self, beat=4): 

742 import gevent 

743 self.ppid = os.getppid() 

744 while True: 

745 self.process_limits() 

746 gevent.sleep(beat) 

747 

748 def start(self): 

749 import gevent 

750 try: 

751 from gevent.pywsgi import WSGIServer, WSGIHandler 

752 except ImportError: 

753 from gevent.wsgi import WSGIServer, WSGIHandler 

754 

755 class ProxyHandler(WSGIHandler): 

756 """ When logging requests, try to get the client address from 

757 the environment so we get proxyfix's modifications (if any). 

758 

759 Derived from werzeug.serving.WSGIRequestHandler.log 

760 / werzeug.serving.WSGIRequestHandler.address_string 

761 """ 

762 def _connection_upgrade_requested(self): 

763 if self.headers.get('Connection', '').lower() == 'upgrade': 

764 return True 

765 if self.headers.get('Upgrade', '').lower() == 'websocket': 

766 return True 

767 return False 

768 

769 def format_request(self): 

770 old_address = self.client_address 

771 if getattr(self, 'environ', None): 

772 self.client_address = self.environ['REMOTE_ADDR'] 

773 elif not self.client_address: 

774 self.client_address = '<local>' 

775 # other cases are handled inside WSGIHandler 

776 try: 

777 return super().format_request() 

778 finally: 

779 self.client_address = old_address 

780 

781 def finalize_headers(self): 

782 # We need to make gevent.pywsgi stop dealing with chunks when the connection 

783 # Is being upgraded. see https://github.com/gevent/gevent/issues/1712 

784 super().finalize_headers() 

785 if self.code == 101: 

786 # Switching Protocols. Disable chunked writes. 

787 self.response_use_chunked = False 

788 

789 def get_environ(self): 

790 # Add the TCP socket to environ in order for the websocket 

791 # connections to use it. 

792 environ = super().get_environ() 

793 environ['socket'] = self.socket 

794 # Disable support for HTTP chunking on reads which cause 

795 # an issue when the connection is being upgraded, see 

796 # https://github.com/gevent/gevent/issues/1712 

797 if self._connection_upgrade_requested(): 

798 environ['wsgi.input'] = self.rfile 

799 environ['wsgi.input_terminated'] = False 

800 return environ 

801 

802 set_limit_memory_hard() 

803 if os.name == 'posix': 

804 # Set process memory limit as an extra safeguard 

805 signal.signal(signal.SIGQUIT, dumpstacks) 

806 signal.signal(signal.SIGUSR1, log_ormcache_stats) 

807 signal.signal(signal.SIGUSR2, log_ormcache_stats) 

808 gevent.spawn(self.watchdog) 

809 

810 self.httpd = WSGIServer( 

811 (self.interface, self.port), self.app, 

812 log=logging.getLogger('longpolling'), 

813 error_log=logging.getLogger('longpolling'), 

814 handler_class=ProxyHandler, 

815 ) 

816 _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port) 

817 try: 

818 self.httpd.serve_forever() 

819 except: 

820 _logger.exception("Evented Service (longpolling): uncaught error during main loop") 

821 raise 

822 

823 def stop(self): 

824 import gevent 

825 self.httpd.stop() 

826 super().stop() 

827 gevent.shutdown() 

828 

829 def run(self, preload, stop): 

830 self.start() 

831 self.stop() 

832 

833class PreforkServer(CommonServer): 

834 """ Multiprocessing inspired by (g)unicorn. 

835 PreforkServer (aka Multicorn) currently uses accept(2) as dispatching 

836 method between workers but we plan to replace it by a more intelligent 

837 dispatcher to will parse the first HTTP request line. 

838 """ 

839 def __init__(self, app): 

840 super().__init__(app) 

841 # config 

842 self.population = config['workers'] 

843 self.timeout = config['limit_time_real'] 

844 self.limit_request = config['limit_request'] 

845 self.cron_timeout = config['limit_time_real_cron'] or None 

846 if self.cron_timeout == -1: 

847 self.cron_timeout = self.timeout 

848 # working vars 

849 self.beat = 4 

850 self.socket = None 

851 self.workers_http = {} 

852 self.workers_cron = {} 

853 self.workers = {} 

854 self.generation = 0 

855 self.queue = collections.deque() 

856 self.long_polling_pid = None 

857 

858 def pipe_new(self): 

859 pipe = os.pipe() 

860 for fd in pipe: 

861 # non_blocking 

862 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK 

863 fcntl.fcntl(fd, fcntl.F_SETFL, flags) 

864 # close_on_exec 

865 flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC 

866 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 

867 return pipe 

868 

869 def pipe_ping(self, pipe): 

870 try: 

871 os.write(pipe[1], b'.') 

872 except IOError as e: 

873 if e.errno not in [errno.EAGAIN, errno.EINTR]: 

874 raise 

875 

876 def signal_handler(self, sig, frame): 

877 if len(self.queue) < 5 or sig == signal.SIGCHLD: 

878 self.queue.append(sig) 

879 self.pipe_ping(self.pipe) 

880 else: 

881 _logger.warning("Dropping signal: %s", sig) 

882 

883 def worker_spawn(self, klass, workers_registry): 

884 self.generation += 1 

885 worker = klass(self) 

886 pid = os.fork() 

887 if pid != 0: 

888 worker.pid = pid 

889 self.workers[pid] = worker 

890 workers_registry[pid] = worker 

891 return worker 

892 else: 

893 worker.run() 

894 sys.exit(0) 

895 

896 def long_polling_spawn(self): 

897 nargs = stripped_sys_argv() 

898 cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:] 

899 popen = subprocess.Popen(cmd) 

900 self.long_polling_pid = popen.pid 

901 

902 def worker_pop(self, pid): 

903 if pid == self.long_polling_pid: 

904 self.long_polling_pid = None 

905 if pid in self.workers: 

906 _logger.debug("Worker (%s) unregistered", pid) 

907 try: 

908 self.workers_http.pop(pid, None) 

909 self.workers_cron.pop(pid, None) 

910 u = self.workers.pop(pid) 

911 u.close() 

912 except OSError: 

913 return 

914 

915 def worker_kill(self, pid, sig): 

916 try: 

917 os.kill(pid, sig) 

918 if sig == signal.SIGKILL: 

919 self.worker_pop(pid) 

920 except OSError as e: 

921 if e.errno == errno.ESRCH: 

922 self.worker_pop(pid) 

923 

924 def process_signals(self): 

925 while self.queue: 

926 sig = self.queue.popleft() 

927 if sig in [signal.SIGINT, signal.SIGTERM]: 

928 raise KeyboardInterrupt 

929 elif sig == signal.SIGHUP: 

930 # restart on kill -HUP 

931 global server_phoenix # noqa: PLW0603 

932 server_phoenix = True 

933 raise KeyboardInterrupt 

934 elif sig == signal.SIGQUIT: 

935 # dump stacks on kill -3 

936 dumpstacks() 

937 elif sig in [signal.SIGUSR1, signal.SIGUSR2]: 

938 # log ormcache stats on kill -SIGUSR1 or kill -SIGUSR2 

939 log_ormcache_stats(sig) 

940 elif sig == signal.SIGTTIN: 

941 # increase number of workers 

942 self.population += 1 

943 elif sig == signal.SIGTTOU: 

944 # decrease number of workers 

945 self.population -= 1 

946 

947 def process_zombie(self): 

948 # reap dead workers 

949 while 1: 

950 try: 

951 wpid, status = os.waitpid(-1, os.WNOHANG) 

952 if not wpid: 

953 break 

954 if (status >> 8) == 3: 

955 msg = "Critial worker error (%s)" 

956 _logger.critical(msg, wpid) 

957 raise Exception(msg % wpid) 

958 self.worker_pop(wpid) 

959 except OSError as e: 

960 if e.errno == errno.ECHILD: 

961 break 

962 raise 

963 

964 def process_timeout(self): 

965 now = time.time() 

966 for (pid, worker) in list(self.workers.items()): 

967 if worker.watchdog_timeout is not None and \ 

968 (now - worker.watchdog_time) >= worker.watchdog_timeout: 

969 _logger.error("%s (%s) timeout after %ss", 

970 worker.__class__.__name__, 

971 pid, 

972 worker.watchdog_timeout) 

973 self.worker_kill(pid, signal.SIGKILL) 

974 

975 def process_spawn(self): 

976 # Before spawning any process, check the registry signaling 

977 registries = Registry.registries.snapshot 

978 

979 def check_registries(): 

980 # check the registries on the first call only! 

981 if not registries: 

982 return 

983 for registry in registries.values(): 

984 with registry.cursor() as cr: 

985 registry.check_signaling(cr) 

986 registries.clear() 

987 # Close all opened cursors 

988 sql_db.close_all() 

989 

990 if config['http_enable']: 

991 while len(self.workers_http) < self.population: 

992 check_registries() 

993 self.worker_spawn(WorkerHTTP, self.workers_http) 

994 if not self.long_polling_pid: 

995 check_registries() 

996 self.long_polling_spawn() 

997 while len(self.workers_cron) < config['max_cron_threads']: 

998 check_registries() 

999 self.worker_spawn(WorkerCron, self.workers_cron) 

1000 

1001 def sleep(self): 

1002 try: 

1003 # map of fd -> worker 

1004 fds = {w.watchdog_pipe[0]: w for w in self.workers.values()} 

1005 fd_in = list(fds) + [self.pipe[0]] 

1006 # check for ping or internal wakeups 

1007 ready = select.select(fd_in, [], [], self.beat) 

1008 # update worker watchdogs 

1009 for fd in ready[0]: 

1010 if fd in fds: 

1011 fds[fd].watchdog_time = time.time() 

1012 empty_pipe(fd) 

1013 except select.error as e: 

1014 if e.args[0] not in [errno.EINTR]: 

1015 raise 

1016 

1017 def start(self): 

1018 # wakeup pipe, python doesn't throw EINTR when a syscall is interrupted 

1019 # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the 

1020 # signal handler to overcome this behaviour 

1021 self.pipe = self.pipe_new() 

1022 # set signal handlers 

1023 signal.signal(signal.SIGINT, self.signal_handler) 

1024 signal.signal(signal.SIGTERM, self.signal_handler) 

1025 signal.signal(signal.SIGHUP, self.signal_handler) 

1026 signal.signal(signal.SIGCHLD, self.signal_handler) 

1027 signal.signal(signal.SIGTTIN, self.signal_handler) 

1028 signal.signal(signal.SIGTTOU, self.signal_handler) 

1029 signal.signal(signal.SIGQUIT, dumpstacks) 

1030 signal.signal(signal.SIGUSR1, log_ormcache_stats) 

1031 signal.signal(signal.SIGUSR2, log_ormcache_stats) 

1032 

1033 if config['http_enable']: 

1034 if config.http_socket_activation: 

1035 _logger.info('HTTP service (werkzeug) running through socket activation') 

1036 else: 

1037 _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port) 

1038 

1039 if os.environ.get('ODOO_HTTP_SOCKET_FD'): 

1040 # reload 

1041 self.socket = socket.socket(fileno=int(os.environ.pop('ODOO_HTTP_SOCKET_FD'))) 

1042 elif config.http_socket_activation: 

1043 # socket activation 

1044 SD_LISTEN_FDS_START = 3 

1045 self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM) 

1046 else: 

1047 # default 

1048 family = socket.AF_INET 

1049 if ':' in self.interface: 

1050 family = socket.AF_INET6 

1051 self.socket = socket.socket(family, socket.SOCK_STREAM) 

1052 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

1053 self.socket.setblocking(0) 

1054 self.socket.bind((self.interface, self.port)) 

1055 self.socket.listen(8 * self.population) 

1056 

1057 def fork_and_reload(self): 

1058 _logger.info("Reloading server") 

1059 pid = os.fork() 

1060 if pid != 0: 

1061 # keep the http listening socket open during _reexec() to ensure uptime 

1062 http_socket_fileno = self.socket.fileno() 

1063 flags = fcntl.fcntl(http_socket_fileno, fcntl.F_GETFD) 

1064 fcntl.fcntl(http_socket_fileno, fcntl.F_SETFD, flags & ~fcntl.FD_CLOEXEC) 

1065 os.environ['ODOO_HTTP_SOCKET_FD'] = str(http_socket_fileno) 

1066 os.environ['ODOO_READY_SIGHUP_PID'] = str(pid) 

1067 _reexec() # stops execution 

1068 

1069 # child process handles old server shutdown 

1070 _logger.info("Waiting for new server to start ...") 

1071 phoenix_hatched = False 

1072 

1073 def sighup_handler(sig, frame): 

1074 nonlocal phoenix_hatched 

1075 phoenix_hatched = True 

1076 

1077 signal.signal(signal.SIGHUP, sighup_handler) 

1078 

1079 reload_timeout = time.monotonic() + 60 

1080 while not phoenix_hatched and time.monotonic() < reload_timeout: 

1081 time.sleep(0.1) 

1082 

1083 if not phoenix_hatched: 

1084 _logger.error("Server reload timed out (check the updated code)") 

1085 else: 

1086 _logger.info("New server has started") 

1087 

1088 def stop_workers_gracefully(self): 

1089 _logger.info("Stopping workers gracefully") 

1090 

1091 if self.long_polling_pid is not None: 

1092 # FIXME make longpolling process handle SIGTERM correctly 

1093 self.worker_kill(self.long_polling_pid, signal.SIGKILL) 

1094 self.long_polling_pid = None 

1095 

1096 # Signal workers to finish their current workload then stop 

1097 for pid in self.workers: 

1098 self.worker_kill(pid, signal.SIGINT) 

1099 

1100 is_main_server = self.pid == os.getpid() # False if server reload, cannot reap children -> use psutil 

1101 if not is_main_server: 

1102 processes = {} 

1103 for pid in self.workers: 

1104 with contextlib.suppress(psutil.NoSuchProcess): 

1105 processes[pid] = psutil.Process(pid) 

1106 

1107 self.beat = 0.1 

1108 while self.workers: 

1109 try: 

1110 self.process_signals() 

1111 except KeyboardInterrupt: 

1112 _logger.info("Forced shutdown.") 

1113 break 

1114 

1115 if is_main_server: 

1116 self.process_zombie() 

1117 else: 

1118 for pid, proc in list(processes.items()): 

1119 if not proc.is_running(): 

1120 self.worker_pop(pid) 

1121 processes.pop(pid) 

1122 

1123 self.sleep() 

1124 self.process_timeout() 

1125 

1126 def stop(self, graceful=True): 

1127 global server_phoenix # noqa: PLW0603 

1128 if server_phoenix: 

1129 # PreforkServer reloads gracefully, disable outdated mechanism 

1130 server_phoenix = False 

1131 

1132 self.fork_and_reload() 

1133 self.stop_workers_gracefully() 

1134 

1135 _logger.info("Old server stopped") 

1136 return 

1137 

1138 if self.socket: 

1139 self.socket.close() 

1140 if graceful: 

1141 super().stop() 

1142 self.stop_workers_gracefully() 

1143 else: 

1144 _logger.info("Stopping forcefully") 

1145 for pid in list(self.workers): 

1146 self.worker_kill(pid, signal.SIGTERM) 

1147 

1148 def run(self, preload, stop): 

1149 self.start() 

1150 

1151 rc = preload_registries(preload) 

1152 

1153 if stop: 

1154 self.stop() 

1155 return rc 

1156 

1157 # Empty the cursor pool, we dont want them to be shared among forked workers. 

1158 sql_db.close_all() 

1159 

1160 if os.environ.get('ODOO_READY_SIGHUP_PID'): 

1161 os.kill(int(os.environ.pop('ODOO_READY_SIGHUP_PID')), signal.SIGHUP) 

1162 

1163 _logger.debug("Multiprocess starting") 

1164 while 1: 

1165 try: 

1166 #_logger.debug("Multiprocess beat (%s)",time.time()) 

1167 self.process_signals() 

1168 self.process_zombie() 

1169 self.process_timeout() 

1170 self.process_spawn() 

1171 self.sleep() 

1172 except KeyboardInterrupt: 

1173 _logger.debug("Multiprocess clean stop") 

1174 self.stop() 

1175 break 

1176 except Exception as e: 

1177 _logger.exception(e) 

1178 self.stop(False) 

1179 return -1 

1180 

1181class Worker(object): 

1182 """ Workers """ 

1183 def __init__(self, multi): 

1184 self.multi = multi 

1185 self.watchdog_time = time.time() 

1186 self.watchdog_pipe = multi.pipe_new() 

1187 self.eintr_pipe = multi.pipe_new() 

1188 self.wakeup_fd_r, self.wakeup_fd_w = self.eintr_pipe 

1189 # Can be set to None if no watchdog is desired. 

1190 self.watchdog_timeout = multi.timeout 

1191 self.ppid = os.getpid() 

1192 self.pid = None 

1193 self.alive = True 

1194 # should we rename into lifetime ? 

1195 self.request_max = multi.limit_request 

1196 self.request_count = 0 

1197 

1198 def setproctitle(self, title=""): 

1199 setproctitle('odoo: %s %s %s' % (self.__class__.__name__, self.pid, title)) 

1200 

1201 def close(self): 

1202 os.close(self.watchdog_pipe[0]) 

1203 os.close(self.watchdog_pipe[1]) 

1204 os.close(self.eintr_pipe[0]) 

1205 os.close(self.eintr_pipe[1]) 

1206 

1207 def signal_handler(self, sig, frame): 

1208 self.alive = False 

1209 

1210 def signal_time_expired_handler(self, n, stack): 

1211 # TODO: print actual RUSAGE_SELF (since last check_limits) instead of 

1212 # just repeating the config setting 

1213 _logger.info('Worker (%d) CPU time limit (%s) reached.', self.pid, config['limit_time_cpu']) 

1214 # We dont suicide in such case 

1215 raise Exception('CPU time limit exceeded.') 

1216 

1217 def sleep(self): 

1218 try: 

1219 select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat) 

1220 # clear wakeup pipe if we were interrupted 

1221 empty_pipe(self.wakeup_fd_r) 

1222 except select.error as e: 

1223 if e.args[0] not in [errno.EINTR]: 

1224 raise 

1225 

1226 def check_limits(self): 

1227 # If our parent changed suicide 

1228 if self.ppid != os.getppid(): 

1229 _logger.info("Worker (%s) Parent changed", self.pid) 

1230 self.alive = False 

1231 # check for lifetime 

1232 if self.request_count >= self.request_max: 

1233 _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) 

1234 self.alive = False 

1235 # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). 

1236 memory = memory_info(psutil.Process(os.getpid())) 

1237 if config['limit_memory_soft'] and memory > config['limit_memory_soft']: 

1238 _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, memory) 

1239 self.alive = False # Commit suicide after the request. 

1240 

1241 set_limit_memory_hard() 

1242 

1243 # update RLIMIT_CPU so limit_time_cpu applies per unit of work 

1244 r = resource.getrusage(resource.RUSAGE_SELF) 

1245 cpu_time = r.ru_utime + r.ru_stime 

1246 soft, hard = resource.getrlimit(resource.RLIMIT_CPU) 

1247 resource.setrlimit(resource.RLIMIT_CPU, (int(cpu_time + config['limit_time_cpu']), hard)) 

1248 

1249 def process_work(self): 

1250 pass 

1251 

1252 def start(self): 

1253 self.pid = os.getpid() 

1254 self.setproctitle() 

1255 _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid) 

1256 # Reseed the random number generator 

1257 random.seed() 

1258 if self.multi.socket: 

1259 # Prevent fd inheritance: close_on_exec 

1260 flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC 

1261 fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags) 

1262 # reset blocking status 

1263 self.multi.socket.setblocking(0) 

1264 

1265 signal.signal(signal.SIGINT, self.signal_handler) 

1266 signal.signal(signal.SIGXCPU, self.signal_time_expired_handler) 

1267 

1268 signal.signal(signal.SIGTERM, signal.SIG_DFL) 

1269 signal.signal(signal.SIGHUP, signal.SIG_DFL) 

1270 signal.signal(signal.SIGCHLD, signal.SIG_DFL) 

1271 signal.signal(signal.SIGTTIN, signal.SIG_DFL) 

1272 signal.signal(signal.SIGTTOU, signal.SIG_DFL) 

1273 

1274 signal.set_wakeup_fd(self.wakeup_fd_w) 

1275 

1276 def stop(self): 

1277 pass 

1278 

1279 def run(self): 

1280 try: 

1281 self.start() 

1282 t = threading.Thread(name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), target=self._runloop) 

1283 t.daemon = True 

1284 t.start() 

1285 t.join() 

1286 _logger.info("Worker (%s) exiting. request_count: %s, registry count: %s.", 

1287 self.pid, self.request_count, 

1288 len(Registry.registries)) 

1289 self.stop() 

1290 except Exception: 

1291 _logger.exception("Worker (%s) Exception occurred, exiting...", self.pid) 

1292 # should we use 3 to abort everything ? 

1293 sys.exit(1) 

1294 

1295 def _runloop(self): 

1296 signal.pthread_sigmask(signal.SIG_BLOCK, { 

1297 signal.SIGXCPU, 

1298 signal.SIGINT, signal.SIGQUIT, 

1299 signal.SIGUSR1, signal.SIGUSR2, 

1300 }) 

1301 try: 

1302 while self.alive: 

1303 self.check_limits() 

1304 self.multi.pipe_ping(self.watchdog_pipe) 

1305 self.sleep() 

1306 if not self.alive: 

1307 break 

1308 self.process_work() 

1309 except: 

1310 _logger.exception("Worker %s (%s) Exception occurred, exiting...", self.__class__.__name__, self.pid) 

1311 sys.exit(1) 

1312 

1313class WorkerHTTP(Worker): 

1314 """ HTTP Request workers """ 

1315 def __init__(self, multi): 

1316 super(WorkerHTTP, self).__init__(multi) 

1317 

1318 # The ODOO_HTTP_SOCKET_TIMEOUT environment variable allows to control socket timeout for 

1319 # extreme latency situations. It's generally better to use a good buffering reverse proxy 

1320 # to quickly free workers rather than increasing this timeout to accommodate high network 

1321 # latencies & b/w saturation. This timeout is also essential to protect against accidental 

1322 # DoS due to idle HTTP connections. 

1323 sock_timeout = os.environ.get("ODOO_HTTP_SOCKET_TIMEOUT") 

1324 self.sock_timeout = float(sock_timeout) if sock_timeout else 2 

1325 

1326 def process_request(self, client, addr): 

1327 client.setblocking(1) 

1328 client.settimeout(self.sock_timeout) 

1329 client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

1330 # Prevent fd inherientence close_on_exec 

1331 flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC 

1332 fcntl.fcntl(client, fcntl.F_SETFD, flags) 

1333 # do request using BaseWSGIServerNoBind monkey patched with socket 

1334 self.server.socket = client 

1335 # tolerate broken pipe when the http client closes the socket before 

1336 # receiving the full reply 

1337 try: 

1338 self.server.process_request(client, addr) 

1339 except IOError as e: 

1340 if e.errno != errno.EPIPE: 

1341 raise 

1342 self.request_count += 1 

1343 

1344 def process_work(self): 

1345 try: 

1346 client, addr = self.multi.socket.accept() 

1347 self.process_request(client, addr) 

1348 except socket.error as e: 

1349 if e.errno not in (errno.EAGAIN, errno.ECONNABORTED): 

1350 raise 

1351 

1352 def start(self): 

1353 Worker.start(self) 

1354 self.server = BaseWSGIServerNoBind(self.multi.app) 

1355 

1356class WorkerCron(Worker): 

1357 """ Cron workers """ 

1358 

1359 def __init__(self, multi): 

1360 super(WorkerCron, self).__init__(multi) 

1361 self.alive_time = time.monotonic() 

1362 self.watchdog_timeout = multi.cron_timeout # Use a distinct value for CRON Worker 

1363 # process_work() below process a single database per call. 

1364 # self.db_queue keeps track of the databases to process (in order, from left to right). 

1365 self.db_queue: deque[str] = deque() 

1366 self.db_count: int = 0 

1367 

1368 def sleep(self): 

1369 # Really sleep once all the databases have been processed. 

1370 if not self.db_queue: 

1371 interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect 

1372 

1373 # simulate interruptible sleep with select(wakeup_fd, timeout) 

1374 try: 

1375 select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval) 

1376 # clear pg_conn/wakeup pipe if we were interrupted 

1377 time.sleep(self.pid / 100 % .1) 

1378 self.dbcursor._cnx.poll() 

1379 empty_pipe(self.wakeup_fd_r) 

1380 except select.error as e: 

1381 if e.args[0] != errno.EINTR: 

1382 raise 

1383 

1384 def check_limits(self): 

1385 super().check_limits() 

1386 

1387 if config['limit_time_worker_cron'] > 0 and (time.monotonic() - self.alive_time) > config['limit_time_worker_cron']: 

1388 _logger.info('WorkerCron (%s) max age (%ss) reached.', self.pid, config['limit_time_worker_cron']) 

1389 self.alive = False 

1390 

1391 def process_work(self): 

1392 """Process a single database.""" 

1393 _logger.debug("WorkerCron (%s) polling for jobs", self.pid) 

1394 

1395 if not self.db_queue: 

1396 # list databases 

1397 db_names = OrderedSet(cron_database_list()) 

1398 pg_conn = self.dbcursor._cnx 

1399 notified = OrderedSet( 

1400 notif.payload 

1401 for notif in pg_conn.notifies 

1402 if notif.channel == 'cron_trigger' 

1403 ) 

1404 pg_conn.notifies.clear() # free resources 

1405 # add notified databases (in order) first in the queue 

1406 self.db_queue.extend(db for db in notified if db in db_names) 

1407 self.db_queue.extend(db for db in db_names if db not in notified) 

1408 self.db_count = len(self.db_queue) 

1409 if not self.db_count: 

1410 return 

1411 

1412 # pop the leftmost element (because notified databases appear first) 

1413 db_name = self.db_queue.popleft() 

1414 self.setproctitle(db_name) 

1415 

1416 from odoo.addons.base.models.ir_cron import IrCron # noqa: PLC0415 

1417 IrCron._process_jobs(db_name) 

1418 

1419 # dont keep cursors in multi database mode 

1420 if self.db_count > 1: 

1421 sql_db.close_db(db_name) 

1422 

1423 self.request_count += 1 

1424 if self.request_count >= self.request_max and self.request_max < self.db_count: 

1425 _logger.error( 

1426 "There are more dabatases to process than allowed " 

1427 "by the `limit_request` configuration variable: %s more.", 

1428 self.db_count - self.request_max, 

1429 ) 

1430 

1431 def start(self): 

1432 os.nice(10) # mommy always told me to be nice with others... 

1433 Worker.start(self) 

1434 if self.multi.socket: 

1435 self.multi.socket.close() 

1436 

1437 dbconn = sql_db.db_connect('postgres') 

1438 self.dbcursor = dbconn.cursor() 

1439 # LISTEN / NOTIFY doesn't work in recovery mode 

1440 self.dbcursor.execute("SELECT pg_is_in_recovery()") 

1441 in_recovery = self.dbcursor.fetchone()[0] 

1442 if not in_recovery: 

1443 self.dbcursor.execute("LISTEN cron_trigger") 

1444 else: 

1445 _logger.warning("PG cluster in recovery mode, cron trigger not activated") 

1446 self.dbcursor.commit() 

1447 

1448 def stop(self): 

1449 super().stop() 

1450 self.dbcursor._cnx.close() 

1451 self.dbcursor.close() 

1452 

1453#---------------------------------------------------------- 

1454# start/stop public api 

1455#---------------------------------------------------------- 

1456 

1457server = None 

1458server_phoenix = False 

1459 

1460 

1461def load_server_wide_modules(): 

1462 from odoo.modules.module import load_openerp_module # noqa: PLC0415 

1463 with gc.disabling_gc(): 

1464 for m in config['server_wide_modules']: 

1465 try: 

1466 load_openerp_module(m) 

1467 except Exception: 

1468 msg = '' 

1469 if m == 'web': 

1470 msg = """ 

1471 The `web` module is provided by the addons found in the `openerp-web` project. 

1472 Maybe you forgot to add those addons in your addons_path configuration.""" 

1473 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) 

1474 

1475 

1476def _reexec(updated_modules=None): 

1477 """reexecute openerp-server process with (nearly) the same arguments""" 

1478 if osutil.is_running_as_nt_service(): 

1479 subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True) 

1480 exe = os.path.basename(sys.executable) 

1481 args = stripped_sys_argv() 

1482 if updated_modules: 

1483 args += ["-u", ','.join(updated_modules)] 

1484 if not args or args[0] != exe: 

1485 args.insert(0, exe) 

1486 # We should keep the LISTEN_* environment variabled in order to support socket activation on reexec 

1487 os.execve(sys.executable, args, os.environ) 

1488 

1489 

1490def preload_registries(dbnames): 

1491 """ Preload a registries, possibly run a test file.""" 

1492 # TODO: move all config checks to args dont check tools.config here 

1493 dbnames = dbnames or [] 

1494 rc = 0 

1495 

1496 preload_profiler = contextlib.nullcontext() 

1497 

1498 for dbname in dbnames: 

1499 if os.environ.get('ODOO_PROFILE_PRELOAD'): 1499 ↛ 1500line 1499 didn't jump to line 1500 because the condition on line 1499 was never true

1500 interval = float(os.environ.get('ODOO_PROFILE_PRELOAD_INTERVAL', '0.1')) 

1501 collectors = [profiler.PeriodicCollector(interval=interval)] 

1502 if os.environ.get('ODOO_PROFILE_PRELOAD_SQL'): 

1503 collectors.append('sql') 

1504 preload_profiler = profiler.Profiler(db=dbname, collectors=collectors) 

1505 try: 

1506 with preload_profiler: 

1507 threading.current_thread().dbname = dbname 

1508 update_module = config['init'] or config['update'] or config['reinit'] 

1509 

1510 registry = Registry.new(dbname, update_module=update_module, install_modules=config['init'], upgrade_modules=config['update'], reinit_modules=config['reinit']) 

1511 

1512 # run post-install tests 

1513 if config['test_enable']: 1513 ↛ 1534line 1513 didn't jump to line 1534 because the condition on line 1513 was always true

1514 from odoo.tests import loader # noqa: PLC0415 

1515 t0 = time.time() 

1516 t0_sql = sql_db.sql_counter 

1517 module_names = (registry.updated_modules if update_module else 

1518 sorted(registry._init_modules)) 

1519 _logger.info("Starting post tests") 

1520 tests_before = registry._assertion_report.testsRun 

1521 post_install_suite = loader.make_suite(module_names, 'post_install') 

1522 if post_install_suite.has_http_case(): 1522 ↛ 1523line 1522 didn't jump to line 1523 because the condition on line 1522 was never true

1523 with registry.cursor() as cr: 

1524 env = api.Environment(cr, api.SUPERUSER_ID, {}) 

1525 env['ir.qweb']._pregenerate_assets_bundles() 

1526 result = loader.run_suite(post_install_suite, global_report=registry._assertion_report) 

1527 registry._assertion_report.update(result) 

1528 _logger.info("%d post-tests in %.2fs, %s queries", 

1529 registry._assertion_report.testsRun - tests_before, 

1530 time.time() - t0, 

1531 sql_db.sql_counter - t0_sql) 

1532 

1533 registry._assertion_report.log_stats() 

1534 if registry._assertion_report and not registry._assertion_report.wasSuccessful(): 1534 ↛ 1535line 1534 didn't jump to line 1535 because the condition on line 1534 was never true

1535 rc += 1 

1536 except Exception: 

1537 _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True) 

1538 return -1 

1539 return rc 

1540 

1541def start(preload=None, stop=False): 

1542 """ Start the odoo http server and cron processor. 

1543 """ 

1544 global server 

1545 

1546 load_server_wide_modules() 

1547 import odoo.http # noqa: PLC0415 

1548 

1549 if odoo.evented: 1549 ↛ 1550line 1549 didn't jump to line 1550 because the condition on line 1549 was never true

1550 server = GeventServer(odoo.http.root) 

1551 elif config['workers']: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true

1552 if config['test_enable']: 

1553 _logger.warning("Unit testing in workers mode could fail; use --workers 0.") 

1554 

1555 server = PreforkServer(odoo.http.root) 

1556 else: 

1557 if platform.system() == "Linux" and sys.maxsize > 2**32 and "MALLOC_ARENA_MAX" not in os.environ: 1557 ↛ 1579line 1557 didn't jump to line 1579 because the condition on line 1557 was always true

1558 # glibc's malloc() uses arenas [1] in order to efficiently handle memory allocation of multi-threaded 

1559 # applications. This allows better memory allocation handling in case of multiple threads that 

1560 # would be using malloc() concurrently [2]. 

1561 # Due to the python's GIL, this optimization have no effect on multithreaded python programs. 

1562 # Unfortunately, a downside of creating one arena per cpu core is the increase of virtual memory 

1563 # which Odoo is based upon in order to limit the memory usage for threaded workers. 

1564 # On 32bit systems the default size of an arena is 512K while on 64bit systems it's 64M [3], 

1565 # hence a threaded worker will quickly reach it's default memory soft limit upon concurrent requests. 

1566 # We therefore set the maximum arenas allowed to 2 unless the MALLOC_ARENA_MAX env variable is set. 

1567 # Note: Setting MALLOC_ARENA_MAX=0 allow to explicitly set the default glibs's malloc() behaviour. 

1568 # 

1569 # [1] https://sourceware.org/glibc/wiki/MallocInternals#Arenas_and_Heaps 

1570 # [2] https://www.gnu.org/software/libc/manual/html_node/The-GNU-Allocator.html 

1571 # [3] https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c;h=00ce48c;hb=0a8262a#l862 

1572 try: 

1573 import ctypes 

1574 libc = ctypes.CDLL("libc.so.6") 

1575 M_ARENA_MAX = -8 

1576 assert libc.mallopt(ctypes.c_int(M_ARENA_MAX), ctypes.c_int(2)) 

1577 except Exception: 

1578 _logger.warning("Could not set ARENA_MAX through mallopt()") 

1579 server = ThreadedServer(odoo.http.root) 

1580 

1581 watcher = None 

1582 if 'reload' in config['dev_mode'] and not odoo.evented: 1582 ↛ 1583line 1582 didn't jump to line 1583 because the condition on line 1582 was never true

1583 if inotify: 

1584 watcher = FSWatcherInotify() 

1585 watcher.start() 

1586 elif watchdog: 

1587 watcher = FSWatcherWatchdog() 

1588 watcher.start() 

1589 else: 

1590 if os.name == 'posix' and platform.system() != 'Darwin': 

1591 module = 'inotify' 

1592 else: 

1593 module = 'watchdog' 

1594 _logger.warning("'%s' module not installed. Code autoreload feature is disabled", module) 

1595 

1596 rc = server.run(preload, stop) 

1597 

1598 if watcher: 1598 ↛ 1599line 1598 didn't jump to line 1599 because the condition on line 1598 was never true

1599 watcher.stop() 

1600 # like the legend of the phoenix, all ends with beginnings 

1601 if server_phoenix: 1601 ↛ 1602line 1601 didn't jump to line 1602 because the condition on line 1601 was never true

1602 _reexec() 

1603 

1604 return rc if rc else 0 

1605 

1606def restart(): 

1607 """ Restart the server 

1608 """ 

1609 if os.name == 'nt': 

1610 # run in a thread to let the current thread return response to the caller. 

1611 threading.Thread(target=_reexec).start() 

1612 else: 

1613 os.kill(server.pid, signal.SIGHUP)