Coverage for adhoc-cicd-odoo-odoo / odoo / service / server.py: 20%
1069 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
1#-----------------------------------------------------------
2# Threaded, Gevent and Prefork Servers
3#-----------------------------------------------------------
4import contextlib
5import collections
6import datetime
7import errno
8import logging
9import os
10import os.path
11import platform
12import random
13import select
14import signal
15import socket
16import subprocess
17import sys
18import threading
19import time
20from collections import deque
21from io import BytesIO
23import psutil
24import werkzeug.serving
25from werkzeug .urls import uri_to_iri
27if os.name == 'posix': 27 ↛ 40line 27 didn't jump to line 40 because the condition on line 27 was always true
28 # Unix only for workers
29 import fcntl
30 import resource
31 try:
32 import inotify
33 from inotify.adapters import InotifyTrees
34 from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO
35 INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO
36 except ImportError:
37 inotify = None
38else:
39 # Windows shim
40 signal.SIGHUP = -1
41 inotify = None
43if not inotify: 43 ↛ 52line 43 didn't jump to line 52 because the condition on line 43 was always true
44 try:
45 import watchdog
46 from watchdog.observers import Observer
47 from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent
48 except ImportError:
49 watchdog = None
51# Optional process names for workers
52try:
53 from setproctitle import setproctitle
54except ImportError:
55 setproctitle = lambda x: None
57from odoo import api, sql_db
58from odoo.modules.registry import Registry
59from odoo.release import nt_service_name
60from odoo.tools import config, gc, osutil, OrderedSet, profiler
61from odoo.tools.cache import log_ormcache_stats
62from odoo.tools.misc import stripped_sys_argv, dumpstacks
63from .db import list_dbs
65_logger = logging.getLogger(__name__)
67SLEEP_INTERVAL = 60 # 1 min
70# A global-ish object, each thread/worker uses its own
71thread_local = threading.local()
73# the model and method name that was called via rpc, for logging
74thread_local.rpc_model_method = ''
77def memory_info(process):
78 """
79 :return: the relevant memory usage according to the OS in bytes.
80 """
81 # psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info
82 pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)()
83 # MacOSX allocates very large vms to all processes so we only monitor the rss usage.
84 if platform.system() == 'Darwin':
85 return pmem.rss
86 return pmem.vms
89def set_limit_memory_hard():
90 if platform.system() != 'Linux': 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return
92 limit_memory_hard = config['limit_memory_hard']
93 import odoo # for eventd
94 if odoo.evented and config['limit_memory_hard_gevent']: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 limit_memory_hard = config['limit_memory_hard_gevent']
96 if limit_memory_hard: 96 ↛ exitline 96 didn't return from function 'set_limit_memory_hard' because the condition on line 96 was always true
97 rlimit = resource.RLIMIT_AS
98 soft, hard = resource.getrlimit(rlimit)
99 resource.setrlimit(rlimit, (limit_memory_hard, hard))
101def empty_pipe(fd):
102 try:
103 while os.read(fd, 1):
104 pass
105 except OSError as e:
106 if e.errno not in [errno.EAGAIN]:
107 raise
110def cron_database_list():
111 return config['db_name'] or list_dbs(True)
114#----------------------------------------------------------
115# Werkzeug WSGI servers patched
116#----------------------------------------------------------
117class LoggingBaseWSGIServerMixIn(object):
118 def handle_error(self, request, client_address):
119 t, e, _ = sys.exc_info()
120 if t == socket.error and e.errno == errno.EPIPE:
121 # broken pipe, ignore error
122 return
123 _logger.exception('Exception happened during processing of request from %s', client_address)
125class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
126 """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
127 use this class, sets the socket and calls the process_request() manually
128 """
129 def __init__(self, app):
130 werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app, handler=CommonRequestHandler)
131 # Directly close the socket. It will be replaced by WorkerHTTP when processing requests
132 if self.socket:
133 self.socket.close()
135 def server_activate(self):
136 # dont listen as we use PreforkServer#socket
137 pass
139class CommonRequestHandler(werkzeug.serving.WSGIRequestHandler):
140 def log_request(self, code = "-", size = "-"):
141 try:
142 path = uri_to_iri(self.path)
143 fragment = thread_local.rpc_model_method
144 if fragment:
145 path += '#' + fragment
146 msg = f"{self.command} {path} {self.request_version}"
147 except AttributeError:
148 # path isn't set if the requestline was bad
149 msg = self.requestline
151 code = str(code)
153 if code[0] == "1": # 1xx - Informational
154 msg = werkzeug.serving._ansi_style(msg, "bold")
155 elif code == "200": # 2xx - Success
156 pass
157 elif code == "304": # 304 - Resource Not Modified
158 msg = werkzeug.serving._ansi_style(msg, "cyan")
159 elif code[0] == "3": # 3xx - Redirection
160 msg = werkzeug.serving._ansi_style(msg, "green")
161 elif code == "404": # 404 - Resource Not Found
162 msg = werkzeug.serving._ansi_style(msg, "yellow")
163 elif code[0] == "4": # 4xx - Client Error
164 msg = werkzeug.serving._ansi_style(msg, "bold", "red")
165 else: # 5xx, or any other response
166 msg = werkzeug.serving._ansi_style(msg, "bold", "magenta")
168 self.log("info", '"%s" %s %s', msg, code, size)
171class RequestHandler(CommonRequestHandler):
172 def setup(self):
173 # timeout to avoid chrome headless preconnect during tests
174 if config['test_enable']:
175 self.timeout = 5
176 # flag the current thread as handling a http request
177 super(RequestHandler, self).setup()
178 me = threading.current_thread()
179 me.name = 'odoo.service.http.request.%s' % (me.ident,)
181 def make_environ(self):
182 environ = super().make_environ()
183 # Add the TCP socket to environ in order for the websocket
184 # connections to use it.
185 environ['socket'] = self.connection
186 if self.headers.get('Upgrade') == 'websocket':
187 # Since the upgrade header is introduced in version 1.1, Firefox
188 # won't accept a websocket connection if the version is set to
189 # 1.0.
190 self.protocol_version = "HTTP/1.1"
191 return environ
193 def send_header(self, keyword, value):
194 # Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 )
195 # since it is incompatible with websocket.
196 if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close':
197 # Do not keep processing requests.
198 self.close_connection = True
199 return
200 super().send_header(keyword, value)
202 def end_headers(self, *a, **kw):
203 super().end_headers(*a, **kw)
204 # At this point, Werkzeug assumes the connection is closed and will discard any incoming
205 # data. In the case of WebSocket connections, data should not be discarded. Replace the
206 # rfile/wfile of this handler to prevent any further action (compatibility with werkzeug >= 2.3.x).
207 # See: https://github.com/pallets/werkzeug/blob/2.3.x/src/werkzeug/serving.py#L334
208 if self.headers.get('Upgrade') == 'websocket':
209 self.rfile = BytesIO()
210 self.wfile = BytesIO()
212 def log_error(self, format, *args):
213 if format == "Request timed out: %r" and config['test_enable']:
214 _logger.info(format, *args)
215 else:
216 super().log_error(format, *args)
218class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
219 """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
220 given by the environment, this is used by autoreload to keep the listen
221 socket open when a reload happens.
222 """
223 def __init__(self, host, port, app):
224 # The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent
225 # socket connections accepted by a threaded server, implicitly limiting the amount of
226 # concurrent threads running for http requests handling.
227 self.max_http_threads = os.environ.get("ODOO_MAX_HTTP_THREADS")
228 if self.max_http_threads:
229 try:
230 self.max_http_threads = int(self.max_http_threads)
231 except ValueError:
232 # If the value can't be parsed to an integer then it's computed in an automated way to
233 # half the size of db_maxconn because while most requests won't borrow cursors concurrently
234 # there are some exceptions where some controllers might allocate two or more cursors.
235 self.max_http_threads = max((config['db_maxconn'] - config['max_cron_threads']) // 2, 1)
236 self.http_threads_sem = threading.Semaphore(self.max_http_threads)
237 super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
238 handler=RequestHandler)
240 # See https://github.com/pallets/werkzeug/pull/770
241 # This allow the request threads to not be set as daemon
242 # so the server waits for them when shutting down gracefully.
243 self.daemon_threads = False
245 def server_bind(self):
246 SD_LISTEN_FDS_START = 3
247 if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()):
248 self.reload_socket = True
249 self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM)
250 _logger.info('HTTP service (werkzeug) running through socket activation')
251 else:
252 self.reload_socket = False
253 super(ThreadedWSGIServerReloadable, self).server_bind()
254 _logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port)
256 def server_activate(self):
257 if not self.reload_socket:
258 super(ThreadedWSGIServerReloadable, self).server_activate()
260 def process_request(self, request, client_address):
261 """
262 Start a new thread to process the request.
263 Override the default method of class socketserver.ThreadingMixIn
264 to be able to get the thread object which is instantiated
265 and set its start time as an attribute
266 """
267 t = threading.Thread(target = self.process_request_thread,
268 args = (request, client_address))
269 t.daemon = self.daemon_threads
270 t.type = 'http'
271 t.start_time = time.time()
272 t.start()
274 def _handle_request_noblock(self):
275 if self.max_http_threads and not self.http_threads_sem.acquire(timeout=0.1):
276 # If the semaphore is full we will return immediately to the upstream (most probably
277 # socketserver.BaseServer's serve_forever loop which will retry immediately as the
278 # selector will find a pending connection to accept on the socket. There is a 100 ms
279 # penalty in such case in order to avoid cpu bound loop while waiting for the semaphore.
280 return
281 # upstream _handle_request_noblock will handle errors and call shutdown_request in any cases
282 super(ThreadedWSGIServerReloadable, self)._handle_request_noblock()
284 def shutdown_request(self, request):
285 if self.max_http_threads:
286 # upstream is supposed to call this function no matter what happens during processing
287 self.http_threads_sem.release()
288 super().shutdown_request(request)
290#----------------------------------------------------------
291# FileSystem Watcher for autoreload and cache invalidation
292#----------------------------------------------------------
293class FSWatcherBase(object):
294 def handle_file(self, path):
295 if path.endswith('.py') and not os.path.basename(path).startswith('.~'):
296 try:
297 source = open(path, 'rb').read() + b'\n'
298 compile(source, path, 'exec')
299 except IOError:
300 _logger.error('autoreload: python code change detected, IOError for %s', path)
301 except SyntaxError:
302 _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
303 else:
304 if not server_phoenix:
305 _logger.info('autoreload: python code updated, autoreload activated')
306 restart()
307 return True
310class FSWatcherWatchdog(FSWatcherBase):
311 def __init__(self):
312 self.observer = Observer()
313 import odoo.addons # noqa: PLC0415
314 for path in odoo.addons.__path__:
315 _logger.info('Watching addons folder %s', path)
316 self.observer.schedule(self, path, recursive=True)
318 def dispatch(self, event):
319 if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)):
320 if not event.is_directory:
321 path = getattr(event, 'dest_path', '') or event.src_path
322 self.handle_file(path)
324 def start(self):
325 self.observer.start()
326 _logger.info('AutoReload watcher running with watchdog')
328 def stop(self):
329 self.observer.stop()
330 self.observer.join()
333class FSWatcherInotify(FSWatcherBase):
334 def __init__(self):
335 self.started = False
336 # ignore warnings from inotify in case we have duplicate addons paths.
337 inotify.adapters._LOGGER.setLevel(logging.ERROR)
338 # recreate a list as InotifyTrees' __init__ deletes the list's items
339 paths_to_watch = []
340 import odoo.addons # noqa: PLC0415
341 for path in odoo.addons.__path__:
342 paths_to_watch.append(path)
343 _logger.info('Watching addons folder %s', path)
344 self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5)
346 def run(self):
347 _logger.info('AutoReload watcher running with inotify')
348 dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE'))
349 while self.started:
350 for event in self.watcher.event_gen(timeout_s=0, yield_nones=False):
351 (_, type_names, path, filename) = event
352 if 'IN_ISDIR' not in type_names:
353 # despite not having IN_DELETE in the watcher's mask, the
354 # watcher sends these events when a directory is deleted.
355 if 'IN_DELETE' not in type_names:
356 full_path = os.path.join(path, filename)
357 if self.handle_file(full_path):
358 return
359 elif dir_creation_events.intersection(type_names):
360 full_path = os.path.join(path, filename)
361 for root, _, files in os.walk(full_path):
362 for file in files:
363 if self.handle_file(os.path.join(root, file)):
364 return
366 def start(self):
367 self.started = True
368 self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher")
369 self.thread.daemon = True
370 self.thread.start()
372 def stop(self):
373 self.started = False
374 self.thread.join()
375 del self.watcher # ensures inotify watches are freed up before reexec
378#----------------------------------------------------------
379# Servers: Threaded, Gevented and Prefork
380#----------------------------------------------------------
382class CommonServer(object):
383 _on_stop_funcs = []
385 def __init__(self, app):
386 self.app = app
387 # config
388 self.interface = config['http_interface'] or '0.0.0.0'
389 self.port = config['http_port']
390 # runtime
391 self.pid = os.getpid()
393 def close_socket(self, sock):
394 """ Closes a socket instance cleanly
395 :param sock: the network socket to close
396 :type sock: socket.socket
397 """
398 try:
399 sock.shutdown(socket.SHUT_RDWR)
400 except socket.error as e:
401 if e.errno == errno.EBADF:
402 # Werkzeug > 0.9.6 closes the socket itself (see commit
403 # https://github.com/mitsuhiko/werkzeug/commit/4d8ca089)
404 return
405 # On OSX, socket shutdowns both sides if any side closes it
406 # causing an error 57 'Socket is not connected' on shutdown
407 # of the other side (or something), see
408 # http://bugs.python.org/issue4397
409 # note: stdlib fixed test, not behavior
410 if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
411 raise
412 sock.close()
414 @classmethod
415 def on_stop(cls, func):
416 """ Register a cleanup function to be executed when the server stops """
417 cls._on_stop_funcs.append(func)
419 def stop(self):
420 for func in self._on_stop_funcs:
421 try:
422 _logger.debug("on_close call %s", func)
423 func()
424 except Exception:
425 _logger.warning("Exception in %s", func.__name__, exc_info=True)
428class ThreadedServer(CommonServer):
429 def __init__(self, app):
430 super(ThreadedServer, self).__init__(app)
431 self.main_thread_id = threading.current_thread().ident
432 # Variable keeping track of the number of calls to the signal handler defined
433 # below. This variable is monitored by ``quit_on_signals()``.
434 self.quit_signals_received = 0
436 #self.socket = None
437 self.httpd = None
438 self.limits_reached_threads = set()
439 self.limit_reached_time = None
441 def signal_handler(self, sig, frame):
442 if sig in [signal.SIGINT, signal.SIGTERM]:
443 # shutdown on kill -INT or -TERM
444 self.quit_signals_received += 1
445 if self.quit_signals_received > 1:
446 # logging.shutdown was already called at this point.
447 sys.stderr.write("Forced shutdown.\n")
448 os._exit(0)
449 # interrupt run() to start shutdown
450 raise KeyboardInterrupt()
451 elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU:
452 sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n")
453 sys.stderr.flush()
454 os._exit(0)
455 elif sig == signal.SIGHUP:
456 # restart on kill -HUP
457 global server_phoenix # noqa: PLW0603
458 server_phoenix = True
459 self.quit_signals_received += 1
460 # interrupt run() to start shutdown
461 raise KeyboardInterrupt()
463 def process_limit(self):
464 memory = memory_info(psutil.Process(os.getpid()))
465 if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
466 _logger.warning('Server memory limit (%s) reached.', memory)
467 self.limits_reached_threads.add(threading.current_thread())
469 for thread in threading.enumerate():
470 thread_type = getattr(thread, 'type', None)
471 if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron':
472 # We apply the limits on cron threads and HTTP requests,
473 # websocket requests excluded.
474 if getattr(thread, 'start_time', None):
475 thread_execution_time = time.time() - thread.start_time
476 thread_limit_time_real = config['limit_time_real']
477 if (getattr(thread, 'type', None) == 'cron' and
478 config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0):
479 thread_limit_time_real = config['limit_time_real_cron']
480 if thread_limit_time_real and thread_execution_time > thread_limit_time_real:
481 _logger.warning(
482 'Thread %s virtual real time limit (%d/%ds) reached.',
483 thread, thread_execution_time, thread_limit_time_real)
484 self.limits_reached_threads.add(thread)
485 # Clean-up threads that are no longer alive
486 # e.g. threads that exceeded their real time,
487 # but which finished before the server could restart.
488 for thread in list(self.limits_reached_threads):
489 if not thread.is_alive():
490 self.limits_reached_threads.remove(thread)
491 if self.limits_reached_threads:
492 self.limit_reached_time = self.limit_reached_time or time.time()
493 else:
494 self.limit_reached_time = None
496 def cron_thread(self, number):
497 # Steve Reich timing style with thundering herd mitigation.
498 #
499 # On startup, all workers bind on a notification channel in
500 # postgres so they can be woken up at will. At worst they wake
501 # up every SLEEP_INTERVAL with a jitter. The jitter creates a
502 # chorus effect that helps distribute on the timeline the moment
503 # when individual worker wake up.
504 #
505 # On NOTIFY, all workers are awaken at the same time, sleeping
506 # just a bit prevents they all poll the database at the exact
507 # same time. This is known as the thundering herd effect.
509 from odoo.addons.base.models.ir_cron import IrCron # noqa: PLC0415
511 def _run_cron(cr):
512 pg_conn = cr._cnx
513 # LISTEN / NOTIFY doesn't work in recovery mode
514 cr.execute("SELECT pg_is_in_recovery()")
515 in_recovery = cr.fetchone()[0]
516 if not in_recovery:
517 cr.execute("LISTEN cron_trigger")
518 else:
519 _logger.warning("PG cluster in recovery mode, cron trigger not activated")
520 cr.commit()
521 check_all_time = 0.0 # last time that we listed databases, initialized far in the past
522 all_db_names = []
523 alive_time = time.monotonic()
524 while config['limit_time_worker_cron'] <= 0 or (time.monotonic() - alive_time) <= config['limit_time_worker_cron']:
525 select.select([pg_conn], [], [], SLEEP_INTERVAL + number)
526 time.sleep(number / 100)
527 try:
528 pg_conn.poll()
529 except Exception:
530 if pg_conn.closed:
531 # connection closed, just exit the loop
532 return
533 raise
534 notified = OrderedSet(
535 notif.payload
536 for notif in pg_conn.notifies
537 if notif.channel == 'cron_trigger'
538 )
539 pg_conn.notifies.clear() # free resources
541 if time.time() - SLEEP_INTERVAL > check_all_time:
542 # check all databases
543 # last time we checked them was `now - SLEEP_INTERVAL`
544 check_all_time = time.time()
545 # process notified databases first, then the other ones
546 all_db_names = OrderedSet(cron_database_list())
547 db_names = [
548 *(db for db in notified if db in all_db_names),
549 *(db for db in all_db_names if db not in notified),
550 ]
551 else:
552 # restrict to notified databases only
553 db_names = notified.intersection(all_db_names)
554 if not db_names:
555 continue
557 _logger.debug('cron%d polling for jobs (notified: %s)', number, notified)
558 for db_name in db_names:
559 thread = threading.current_thread()
560 thread.start_time = time.time()
561 try:
562 IrCron._process_jobs(db_name)
563 except Exception:
564 _logger.warning('cron%d encountered an Exception:', number, exc_info=True)
565 thread.start_time = None
567 while True:
568 conn = sql_db.db_connect('postgres')
569 with contextlib.closing(conn.cursor()) as cr:
570 _run_cron(cr)
571 cr._cnx.close()
572 _logger.info('cron%d max age (%ss) reached, releasing connection.', number, config['limit_time_worker_cron'])
574 def cron_spawn(self):
575 """ Start the above runner function in a daemon thread.
577 The thread is a typical daemon thread: it will never quit and must be
578 terminated when the main process exits - with no consequence (the processing
579 threads it spawns are not marked daemon).
581 """
582 for i in range(config['max_cron_threads']):
583 t = threading.Thread(target=self.cron_thread, args=(i,), name=f"odoo.service.cron.cron{i}")
584 t.daemon = True
585 t.type = 'cron'
586 t.start()
587 _logger.debug("cron%d started!", i)
589 def http_spawn(self):
590 self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, self.app)
591 threading.Thread(
592 target=self.httpd.serve_forever,
593 name="odoo.service.httpd",
594 daemon=True,
595 ).start()
597 def start(self, stop=False):
598 _logger.debug("Setting signal handlers")
599 set_limit_memory_hard()
600 if os.name == 'posix': 600 ↛ 609line 600 didn't jump to line 609 because the condition on line 600 was always true
601 signal.signal(signal.SIGINT, self.signal_handler)
602 signal.signal(signal.SIGTERM, self.signal_handler)
603 signal.signal(signal.SIGCHLD, self.signal_handler)
604 signal.signal(signal.SIGHUP, self.signal_handler)
605 signal.signal(signal.SIGXCPU, self.signal_handler)
606 signal.signal(signal.SIGQUIT, dumpstacks)
607 signal.signal(signal.SIGUSR1, log_ormcache_stats)
608 signal.signal(signal.SIGUSR2, log_ormcache_stats)
609 elif os.name == 'nt':
610 import win32api
611 win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
613 if config['test_enable'] or (config['http_enable'] and not stop): 613 ↛ 615line 613 didn't jump to line 615 because the condition on line 613 was never true
614 # some tests need the http daemon to be available...
615 self.http_spawn()
617 def stop(self):
618 """ Shutdown the WSGI server. Wait for non daemon threads.
619 """
620 if server_phoenix: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true
621 _logger.info("Initiating server reload")
622 else:
623 _logger.info("Initiating shutdown")
624 _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
626 stop_time = time.time()
628 if self.httpd: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 self.httpd.shutdown()
631 super().stop()
633 # Manually join() all threads before calling sys.exit() to allow a second signal
634 # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
635 # threading.Thread.join() should not mask signals (at least in python 2.5).
636 me = threading.current_thread()
637 _logger.debug('current thread: %r', me)
638 for thread in threading.enumerate():
639 _logger.debug('process %r (%r)', thread, thread.daemon)
640 if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and 640 ↛ 642line 640 didn't jump to line 642 because the condition on line 640 was never true
641 thread not in self.limits_reached_threads):
642 while thread.is_alive() and (time.time() - stop_time) < 1:
643 # We wait for requests to finish, up to 1 second.
644 _logger.debug('join and sleep')
645 # Need a busyloop here as thread.join() masks signals
646 # and would prevent the forced shutdown.
647 thread.join(0.05)
648 time.sleep(0.05)
650 sql_db.close_all()
652 current_process = psutil.Process()
653 children = current_process.children(recursive=False)
654 for child in children: 654 ↛ 655line 654 didn't jump to line 655 because the loop on line 654 never started
655 _logger.info('A child process was found, pid is %s, process may hang', child)
657 _logger.debug('--')
658 logging.shutdown()
660 def run(self, preload=None, stop=False):
661 """ Start the http server and the cron thread then wait for a signal.
663 The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
664 a second one if any will force an immediate exit.
665 """
666 with Registry._lock:
667 self.start(stop=stop)
668 rc = preload_registries(preload)
670 if stop: 670 ↛ 683line 670 didn't jump to line 683 because the condition on line 670 was always true
671 if config['test_enable']: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true
672 from odoo.tests.result import _logger as logger # noqa: PLC0415
673 with Registry.registries._lock:
674 for db, registry in Registry.registries.items():
675 report = registry._assertion_report
676 log = logger.error if not report.wasSuccessful() \
677 else logger.warning if not report.testsRun \
678 else logger.info
679 log("%s when loading database %r", report, db)
680 self.stop()
681 return rc
683 self.cron_spawn()
685 # Wait for a first signal to be handled. (time.sleep will be interrupted
686 # by the signal handler)
687 try:
688 while self.quit_signals_received == 0:
689 self.process_limit()
690 if self.limit_reached_time:
691 has_other_valid_requests = any(
692 not t.daemon and
693 t not in self.limits_reached_threads
694 for t in threading.enumerate()
695 if getattr(t, 'type', None) == 'http')
696 if (not has_other_valid_requests or
697 (time.time() - self.limit_reached_time) > SLEEP_INTERVAL):
698 # We wait there is no processing requests
699 # other than the ones exceeding the limits, up to 1 min,
700 # before asking for a reload.
701 _logger.info('Dumping stacktrace of limit exceeding threads before reloading')
702 dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads])
703 self.reload()
704 # `reload` increments `self.quit_signals_received`
705 # and the loop will end after this iteration,
706 # therefore leading to the server stop.
707 # `reload` also sets the `server_phoenix` flag
708 # to tell the server to restart the server after shutting down.
709 else:
710 time.sleep(1)
711 else:
712 time.sleep(SLEEP_INTERVAL)
713 except KeyboardInterrupt:
714 pass
716 self.stop()
718 def reload(self):
719 os.kill(self.pid, signal.SIGHUP)
721class GeventServer(CommonServer):
722 def __init__(self, app):
723 super(GeventServer, self).__init__(app)
724 self.port = config['gevent_port']
725 self.httpd = None
727 def process_limits(self):
728 restart = False
729 if self.ppid != os.getppid():
730 _logger.warning("Gevent Parent changed: %s", self.pid)
731 restart = True
732 memory = memory_info(psutil.Process(self.pid))
733 limit_memory_soft = config['limit_memory_soft_gevent'] or config['limit_memory_soft']
734 if limit_memory_soft and memory > limit_memory_soft:
735 _logger.warning('Gevent virtual memory limit reached: %s', memory)
736 restart = True
737 if restart:
738 # suicide !!
739 os.kill(self.pid, signal.SIGTERM)
741 def watchdog(self, beat=4):
742 import gevent
743 self.ppid = os.getppid()
744 while True:
745 self.process_limits()
746 gevent.sleep(beat)
748 def start(self):
749 import gevent
750 try:
751 from gevent.pywsgi import WSGIServer, WSGIHandler
752 except ImportError:
753 from gevent.wsgi import WSGIServer, WSGIHandler
755 class ProxyHandler(WSGIHandler):
756 """ When logging requests, try to get the client address from
757 the environment so we get proxyfix's modifications (if any).
759 Derived from werzeug.serving.WSGIRequestHandler.log
760 / werzeug.serving.WSGIRequestHandler.address_string
761 """
762 def _connection_upgrade_requested(self):
763 if self.headers.get('Connection', '').lower() == 'upgrade':
764 return True
765 if self.headers.get('Upgrade', '').lower() == 'websocket':
766 return True
767 return False
769 def format_request(self):
770 old_address = self.client_address
771 if getattr(self, 'environ', None):
772 self.client_address = self.environ['REMOTE_ADDR']
773 elif not self.client_address:
774 self.client_address = '<local>'
775 # other cases are handled inside WSGIHandler
776 try:
777 return super().format_request()
778 finally:
779 self.client_address = old_address
781 def finalize_headers(self):
782 # We need to make gevent.pywsgi stop dealing with chunks when the connection
783 # Is being upgraded. see https://github.com/gevent/gevent/issues/1712
784 super().finalize_headers()
785 if self.code == 101:
786 # Switching Protocols. Disable chunked writes.
787 self.response_use_chunked = False
789 def get_environ(self):
790 # Add the TCP socket to environ in order for the websocket
791 # connections to use it.
792 environ = super().get_environ()
793 environ['socket'] = self.socket
794 # Disable support for HTTP chunking on reads which cause
795 # an issue when the connection is being upgraded, see
796 # https://github.com/gevent/gevent/issues/1712
797 if self._connection_upgrade_requested():
798 environ['wsgi.input'] = self.rfile
799 environ['wsgi.input_terminated'] = False
800 return environ
802 set_limit_memory_hard()
803 if os.name == 'posix':
804 # Set process memory limit as an extra safeguard
805 signal.signal(signal.SIGQUIT, dumpstacks)
806 signal.signal(signal.SIGUSR1, log_ormcache_stats)
807 signal.signal(signal.SIGUSR2, log_ormcache_stats)
808 gevent.spawn(self.watchdog)
810 self.httpd = WSGIServer(
811 (self.interface, self.port), self.app,
812 log=logging.getLogger('longpolling'),
813 error_log=logging.getLogger('longpolling'),
814 handler_class=ProxyHandler,
815 )
816 _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
817 try:
818 self.httpd.serve_forever()
819 except:
820 _logger.exception("Evented Service (longpolling): uncaught error during main loop")
821 raise
823 def stop(self):
824 import gevent
825 self.httpd.stop()
826 super().stop()
827 gevent.shutdown()
829 def run(self, preload, stop):
830 self.start()
831 self.stop()
833class PreforkServer(CommonServer):
834 """ Multiprocessing inspired by (g)unicorn.
835 PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
836 method between workers but we plan to replace it by a more intelligent
837 dispatcher to will parse the first HTTP request line.
838 """
839 def __init__(self, app):
840 super().__init__(app)
841 # config
842 self.population = config['workers']
843 self.timeout = config['limit_time_real']
844 self.limit_request = config['limit_request']
845 self.cron_timeout = config['limit_time_real_cron'] or None
846 if self.cron_timeout == -1:
847 self.cron_timeout = self.timeout
848 # working vars
849 self.beat = 4
850 self.socket = None
851 self.workers_http = {}
852 self.workers_cron = {}
853 self.workers = {}
854 self.generation = 0
855 self.queue = collections.deque()
856 self.long_polling_pid = None
858 def pipe_new(self):
859 pipe = os.pipe()
860 for fd in pipe:
861 # non_blocking
862 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
863 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
864 # close_on_exec
865 flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
866 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
867 return pipe
869 def pipe_ping(self, pipe):
870 try:
871 os.write(pipe[1], b'.')
872 except IOError as e:
873 if e.errno not in [errno.EAGAIN, errno.EINTR]:
874 raise
876 def signal_handler(self, sig, frame):
877 if len(self.queue) < 5 or sig == signal.SIGCHLD:
878 self.queue.append(sig)
879 self.pipe_ping(self.pipe)
880 else:
881 _logger.warning("Dropping signal: %s", sig)
883 def worker_spawn(self, klass, workers_registry):
884 self.generation += 1
885 worker = klass(self)
886 pid = os.fork()
887 if pid != 0:
888 worker.pid = pid
889 self.workers[pid] = worker
890 workers_registry[pid] = worker
891 return worker
892 else:
893 worker.run()
894 sys.exit(0)
896 def long_polling_spawn(self):
897 nargs = stripped_sys_argv()
898 cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
899 popen = subprocess.Popen(cmd)
900 self.long_polling_pid = popen.pid
902 def worker_pop(self, pid):
903 if pid == self.long_polling_pid:
904 self.long_polling_pid = None
905 if pid in self.workers:
906 _logger.debug("Worker (%s) unregistered", pid)
907 try:
908 self.workers_http.pop(pid, None)
909 self.workers_cron.pop(pid, None)
910 u = self.workers.pop(pid)
911 u.close()
912 except OSError:
913 return
915 def worker_kill(self, pid, sig):
916 try:
917 os.kill(pid, sig)
918 if sig == signal.SIGKILL:
919 self.worker_pop(pid)
920 except OSError as e:
921 if e.errno == errno.ESRCH:
922 self.worker_pop(pid)
924 def process_signals(self):
925 while self.queue:
926 sig = self.queue.popleft()
927 if sig in [signal.SIGINT, signal.SIGTERM]:
928 raise KeyboardInterrupt
929 elif sig == signal.SIGHUP:
930 # restart on kill -HUP
931 global server_phoenix # noqa: PLW0603
932 server_phoenix = True
933 raise KeyboardInterrupt
934 elif sig == signal.SIGQUIT:
935 # dump stacks on kill -3
936 dumpstacks()
937 elif sig in [signal.SIGUSR1, signal.SIGUSR2]:
938 # log ormcache stats on kill -SIGUSR1 or kill -SIGUSR2
939 log_ormcache_stats(sig)
940 elif sig == signal.SIGTTIN:
941 # increase number of workers
942 self.population += 1
943 elif sig == signal.SIGTTOU:
944 # decrease number of workers
945 self.population -= 1
947 def process_zombie(self):
948 # reap dead workers
949 while 1:
950 try:
951 wpid, status = os.waitpid(-1, os.WNOHANG)
952 if not wpid:
953 break
954 if (status >> 8) == 3:
955 msg = "Critial worker error (%s)"
956 _logger.critical(msg, wpid)
957 raise Exception(msg % wpid)
958 self.worker_pop(wpid)
959 except OSError as e:
960 if e.errno == errno.ECHILD:
961 break
962 raise
964 def process_timeout(self):
965 now = time.time()
966 for (pid, worker) in list(self.workers.items()):
967 if worker.watchdog_timeout is not None and \
968 (now - worker.watchdog_time) >= worker.watchdog_timeout:
969 _logger.error("%s (%s) timeout after %ss",
970 worker.__class__.__name__,
971 pid,
972 worker.watchdog_timeout)
973 self.worker_kill(pid, signal.SIGKILL)
975 def process_spawn(self):
976 # Before spawning any process, check the registry signaling
977 registries = Registry.registries.snapshot
979 def check_registries():
980 # check the registries on the first call only!
981 if not registries:
982 return
983 for registry in registries.values():
984 with registry.cursor() as cr:
985 registry.check_signaling(cr)
986 registries.clear()
987 # Close all opened cursors
988 sql_db.close_all()
990 if config['http_enable']:
991 while len(self.workers_http) < self.population:
992 check_registries()
993 self.worker_spawn(WorkerHTTP, self.workers_http)
994 if not self.long_polling_pid:
995 check_registries()
996 self.long_polling_spawn()
997 while len(self.workers_cron) < config['max_cron_threads']:
998 check_registries()
999 self.worker_spawn(WorkerCron, self.workers_cron)
1001 def sleep(self):
1002 try:
1003 # map of fd -> worker
1004 fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
1005 fd_in = list(fds) + [self.pipe[0]]
1006 # check for ping or internal wakeups
1007 ready = select.select(fd_in, [], [], self.beat)
1008 # update worker watchdogs
1009 for fd in ready[0]:
1010 if fd in fds:
1011 fds[fd].watchdog_time = time.time()
1012 empty_pipe(fd)
1013 except select.error as e:
1014 if e.args[0] not in [errno.EINTR]:
1015 raise
1017 def start(self):
1018 # wakeup pipe, python doesn't throw EINTR when a syscall is interrupted
1019 # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
1020 # signal handler to overcome this behaviour
1021 self.pipe = self.pipe_new()
1022 # set signal handlers
1023 signal.signal(signal.SIGINT, self.signal_handler)
1024 signal.signal(signal.SIGTERM, self.signal_handler)
1025 signal.signal(signal.SIGHUP, self.signal_handler)
1026 signal.signal(signal.SIGCHLD, self.signal_handler)
1027 signal.signal(signal.SIGTTIN, self.signal_handler)
1028 signal.signal(signal.SIGTTOU, self.signal_handler)
1029 signal.signal(signal.SIGQUIT, dumpstacks)
1030 signal.signal(signal.SIGUSR1, log_ormcache_stats)
1031 signal.signal(signal.SIGUSR2, log_ormcache_stats)
1033 if config['http_enable']:
1034 if config.http_socket_activation:
1035 _logger.info('HTTP service (werkzeug) running through socket activation')
1036 else:
1037 _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
1039 if os.environ.get('ODOO_HTTP_SOCKET_FD'):
1040 # reload
1041 self.socket = socket.socket(fileno=int(os.environ.pop('ODOO_HTTP_SOCKET_FD')))
1042 elif config.http_socket_activation:
1043 # socket activation
1044 SD_LISTEN_FDS_START = 3
1045 self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM)
1046 else:
1047 # default
1048 family = socket.AF_INET
1049 if ':' in self.interface:
1050 family = socket.AF_INET6
1051 self.socket = socket.socket(family, socket.SOCK_STREAM)
1052 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1053 self.socket.setblocking(0)
1054 self.socket.bind((self.interface, self.port))
1055 self.socket.listen(8 * self.population)
1057 def fork_and_reload(self):
1058 _logger.info("Reloading server")
1059 pid = os.fork()
1060 if pid != 0:
1061 # keep the http listening socket open during _reexec() to ensure uptime
1062 http_socket_fileno = self.socket.fileno()
1063 flags = fcntl.fcntl(http_socket_fileno, fcntl.F_GETFD)
1064 fcntl.fcntl(http_socket_fileno, fcntl.F_SETFD, flags & ~fcntl.FD_CLOEXEC)
1065 os.environ['ODOO_HTTP_SOCKET_FD'] = str(http_socket_fileno)
1066 os.environ['ODOO_READY_SIGHUP_PID'] = str(pid)
1067 _reexec() # stops execution
1069 # child process handles old server shutdown
1070 _logger.info("Waiting for new server to start ...")
1071 phoenix_hatched = False
1073 def sighup_handler(sig, frame):
1074 nonlocal phoenix_hatched
1075 phoenix_hatched = True
1077 signal.signal(signal.SIGHUP, sighup_handler)
1079 reload_timeout = time.monotonic() + 60
1080 while not phoenix_hatched and time.monotonic() < reload_timeout:
1081 time.sleep(0.1)
1083 if not phoenix_hatched:
1084 _logger.error("Server reload timed out (check the updated code)")
1085 else:
1086 _logger.info("New server has started")
1088 def stop_workers_gracefully(self):
1089 _logger.info("Stopping workers gracefully")
1091 if self.long_polling_pid is not None:
1092 # FIXME make longpolling process handle SIGTERM correctly
1093 self.worker_kill(self.long_polling_pid, signal.SIGKILL)
1094 self.long_polling_pid = None
1096 # Signal workers to finish their current workload then stop
1097 for pid in self.workers:
1098 self.worker_kill(pid, signal.SIGINT)
1100 is_main_server = self.pid == os.getpid() # False if server reload, cannot reap children -> use psutil
1101 if not is_main_server:
1102 processes = {}
1103 for pid in self.workers:
1104 with contextlib.suppress(psutil.NoSuchProcess):
1105 processes[pid] = psutil.Process(pid)
1107 self.beat = 0.1
1108 while self.workers:
1109 try:
1110 self.process_signals()
1111 except KeyboardInterrupt:
1112 _logger.info("Forced shutdown.")
1113 break
1115 if is_main_server:
1116 self.process_zombie()
1117 else:
1118 for pid, proc in list(processes.items()):
1119 if not proc.is_running():
1120 self.worker_pop(pid)
1121 processes.pop(pid)
1123 self.sleep()
1124 self.process_timeout()
1126 def stop(self, graceful=True):
1127 global server_phoenix # noqa: PLW0603
1128 if server_phoenix:
1129 # PreforkServer reloads gracefully, disable outdated mechanism
1130 server_phoenix = False
1132 self.fork_and_reload()
1133 self.stop_workers_gracefully()
1135 _logger.info("Old server stopped")
1136 return
1138 if self.socket:
1139 self.socket.close()
1140 if graceful:
1141 super().stop()
1142 self.stop_workers_gracefully()
1143 else:
1144 _logger.info("Stopping forcefully")
1145 for pid in list(self.workers):
1146 self.worker_kill(pid, signal.SIGTERM)
1148 def run(self, preload, stop):
1149 self.start()
1151 rc = preload_registries(preload)
1153 if stop:
1154 self.stop()
1155 return rc
1157 # Empty the cursor pool, we dont want them to be shared among forked workers.
1158 sql_db.close_all()
1160 if os.environ.get('ODOO_READY_SIGHUP_PID'):
1161 os.kill(int(os.environ.pop('ODOO_READY_SIGHUP_PID')), signal.SIGHUP)
1163 _logger.debug("Multiprocess starting")
1164 while 1:
1165 try:
1166 #_logger.debug("Multiprocess beat (%s)",time.time())
1167 self.process_signals()
1168 self.process_zombie()
1169 self.process_timeout()
1170 self.process_spawn()
1171 self.sleep()
1172 except KeyboardInterrupt:
1173 _logger.debug("Multiprocess clean stop")
1174 self.stop()
1175 break
1176 except Exception as e:
1177 _logger.exception(e)
1178 self.stop(False)
1179 return -1
1181class Worker(object):
1182 """ Workers """
1183 def __init__(self, multi):
1184 self.multi = multi
1185 self.watchdog_time = time.time()
1186 self.watchdog_pipe = multi.pipe_new()
1187 self.eintr_pipe = multi.pipe_new()
1188 self.wakeup_fd_r, self.wakeup_fd_w = self.eintr_pipe
1189 # Can be set to None if no watchdog is desired.
1190 self.watchdog_timeout = multi.timeout
1191 self.ppid = os.getpid()
1192 self.pid = None
1193 self.alive = True
1194 # should we rename into lifetime ?
1195 self.request_max = multi.limit_request
1196 self.request_count = 0
1198 def setproctitle(self, title=""):
1199 setproctitle('odoo: %s %s %s' % (self.__class__.__name__, self.pid, title))
1201 def close(self):
1202 os.close(self.watchdog_pipe[0])
1203 os.close(self.watchdog_pipe[1])
1204 os.close(self.eintr_pipe[0])
1205 os.close(self.eintr_pipe[1])
1207 def signal_handler(self, sig, frame):
1208 self.alive = False
1210 def signal_time_expired_handler(self, n, stack):
1211 # TODO: print actual RUSAGE_SELF (since last check_limits) instead of
1212 # just repeating the config setting
1213 _logger.info('Worker (%d) CPU time limit (%s) reached.', self.pid, config['limit_time_cpu'])
1214 # We dont suicide in such case
1215 raise Exception('CPU time limit exceeded.')
1217 def sleep(self):
1218 try:
1219 select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat)
1220 # clear wakeup pipe if we were interrupted
1221 empty_pipe(self.wakeup_fd_r)
1222 except select.error as e:
1223 if e.args[0] not in [errno.EINTR]:
1224 raise
1226 def check_limits(self):
1227 # If our parent changed suicide
1228 if self.ppid != os.getppid():
1229 _logger.info("Worker (%s) Parent changed", self.pid)
1230 self.alive = False
1231 # check for lifetime
1232 if self.request_count >= self.request_max:
1233 _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
1234 self.alive = False
1235 # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
1236 memory = memory_info(psutil.Process(os.getpid()))
1237 if config['limit_memory_soft'] and memory > config['limit_memory_soft']:
1238 _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, memory)
1239 self.alive = False # Commit suicide after the request.
1241 set_limit_memory_hard()
1243 # update RLIMIT_CPU so limit_time_cpu applies per unit of work
1244 r = resource.getrusage(resource.RUSAGE_SELF)
1245 cpu_time = r.ru_utime + r.ru_stime
1246 soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
1247 resource.setrlimit(resource.RLIMIT_CPU, (int(cpu_time + config['limit_time_cpu']), hard))
1249 def process_work(self):
1250 pass
1252 def start(self):
1253 self.pid = os.getpid()
1254 self.setproctitle()
1255 _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
1256 # Reseed the random number generator
1257 random.seed()
1258 if self.multi.socket:
1259 # Prevent fd inheritance: close_on_exec
1260 flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
1261 fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
1262 # reset blocking status
1263 self.multi.socket.setblocking(0)
1265 signal.signal(signal.SIGINT, self.signal_handler)
1266 signal.signal(signal.SIGXCPU, self.signal_time_expired_handler)
1268 signal.signal(signal.SIGTERM, signal.SIG_DFL)
1269 signal.signal(signal.SIGHUP, signal.SIG_DFL)
1270 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
1271 signal.signal(signal.SIGTTIN, signal.SIG_DFL)
1272 signal.signal(signal.SIGTTOU, signal.SIG_DFL)
1274 signal.set_wakeup_fd(self.wakeup_fd_w)
1276 def stop(self):
1277 pass
1279 def run(self):
1280 try:
1281 self.start()
1282 t = threading.Thread(name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), target=self._runloop)
1283 t.daemon = True
1284 t.start()
1285 t.join()
1286 _logger.info("Worker (%s) exiting. request_count: %s, registry count: %s.",
1287 self.pid, self.request_count,
1288 len(Registry.registries))
1289 self.stop()
1290 except Exception:
1291 _logger.exception("Worker (%s) Exception occurred, exiting...", self.pid)
1292 # should we use 3 to abort everything ?
1293 sys.exit(1)
1295 def _runloop(self):
1296 signal.pthread_sigmask(signal.SIG_BLOCK, {
1297 signal.SIGXCPU,
1298 signal.SIGINT, signal.SIGQUIT,
1299 signal.SIGUSR1, signal.SIGUSR2,
1300 })
1301 try:
1302 while self.alive:
1303 self.check_limits()
1304 self.multi.pipe_ping(self.watchdog_pipe)
1305 self.sleep()
1306 if not self.alive:
1307 break
1308 self.process_work()
1309 except:
1310 _logger.exception("Worker %s (%s) Exception occurred, exiting...", self.__class__.__name__, self.pid)
1311 sys.exit(1)
1313class WorkerHTTP(Worker):
1314 """ HTTP Request workers """
1315 def __init__(self, multi):
1316 super(WorkerHTTP, self).__init__(multi)
1318 # The ODOO_HTTP_SOCKET_TIMEOUT environment variable allows to control socket timeout for
1319 # extreme latency situations. It's generally better to use a good buffering reverse proxy
1320 # to quickly free workers rather than increasing this timeout to accommodate high network
1321 # latencies & b/w saturation. This timeout is also essential to protect against accidental
1322 # DoS due to idle HTTP connections.
1323 sock_timeout = os.environ.get("ODOO_HTTP_SOCKET_TIMEOUT")
1324 self.sock_timeout = float(sock_timeout) if sock_timeout else 2
1326 def process_request(self, client, addr):
1327 client.setblocking(1)
1328 client.settimeout(self.sock_timeout)
1329 client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1330 # Prevent fd inherientence close_on_exec
1331 flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
1332 fcntl.fcntl(client, fcntl.F_SETFD, flags)
1333 # do request using BaseWSGIServerNoBind monkey patched with socket
1334 self.server.socket = client
1335 # tolerate broken pipe when the http client closes the socket before
1336 # receiving the full reply
1337 try:
1338 self.server.process_request(client, addr)
1339 except IOError as e:
1340 if e.errno != errno.EPIPE:
1341 raise
1342 self.request_count += 1
1344 def process_work(self):
1345 try:
1346 client, addr = self.multi.socket.accept()
1347 self.process_request(client, addr)
1348 except socket.error as e:
1349 if e.errno not in (errno.EAGAIN, errno.ECONNABORTED):
1350 raise
1352 def start(self):
1353 Worker.start(self)
1354 self.server = BaseWSGIServerNoBind(self.multi.app)
1356class WorkerCron(Worker):
1357 """ Cron workers """
1359 def __init__(self, multi):
1360 super(WorkerCron, self).__init__(multi)
1361 self.alive_time = time.monotonic()
1362 self.watchdog_timeout = multi.cron_timeout # Use a distinct value for CRON Worker
1363 # process_work() below process a single database per call.
1364 # self.db_queue keeps track of the databases to process (in order, from left to right).
1365 self.db_queue: deque[str] = deque()
1366 self.db_count: int = 0
1368 def sleep(self):
1369 # Really sleep once all the databases have been processed.
1370 if not self.db_queue:
1371 interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
1373 # simulate interruptible sleep with select(wakeup_fd, timeout)
1374 try:
1375 select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval)
1376 # clear pg_conn/wakeup pipe if we were interrupted
1377 time.sleep(self.pid / 100 % .1)
1378 self.dbcursor._cnx.poll()
1379 empty_pipe(self.wakeup_fd_r)
1380 except select.error as e:
1381 if e.args[0] != errno.EINTR:
1382 raise
1384 def check_limits(self):
1385 super().check_limits()
1387 if config['limit_time_worker_cron'] > 0 and (time.monotonic() - self.alive_time) > config['limit_time_worker_cron']:
1388 _logger.info('WorkerCron (%s) max age (%ss) reached.', self.pid, config['limit_time_worker_cron'])
1389 self.alive = False
1391 def process_work(self):
1392 """Process a single database."""
1393 _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
1395 if not self.db_queue:
1396 # list databases
1397 db_names = OrderedSet(cron_database_list())
1398 pg_conn = self.dbcursor._cnx
1399 notified = OrderedSet(
1400 notif.payload
1401 for notif in pg_conn.notifies
1402 if notif.channel == 'cron_trigger'
1403 )
1404 pg_conn.notifies.clear() # free resources
1405 # add notified databases (in order) first in the queue
1406 self.db_queue.extend(db for db in notified if db in db_names)
1407 self.db_queue.extend(db for db in db_names if db not in notified)
1408 self.db_count = len(self.db_queue)
1409 if not self.db_count:
1410 return
1412 # pop the leftmost element (because notified databases appear first)
1413 db_name = self.db_queue.popleft()
1414 self.setproctitle(db_name)
1416 from odoo.addons.base.models.ir_cron import IrCron # noqa: PLC0415
1417 IrCron._process_jobs(db_name)
1419 # dont keep cursors in multi database mode
1420 if self.db_count > 1:
1421 sql_db.close_db(db_name)
1423 self.request_count += 1
1424 if self.request_count >= self.request_max and self.request_max < self.db_count:
1425 _logger.error(
1426 "There are more dabatases to process than allowed "
1427 "by the `limit_request` configuration variable: %s more.",
1428 self.db_count - self.request_max,
1429 )
1431 def start(self):
1432 os.nice(10) # mommy always told me to be nice with others...
1433 Worker.start(self)
1434 if self.multi.socket:
1435 self.multi.socket.close()
1437 dbconn = sql_db.db_connect('postgres')
1438 self.dbcursor = dbconn.cursor()
1439 # LISTEN / NOTIFY doesn't work in recovery mode
1440 self.dbcursor.execute("SELECT pg_is_in_recovery()")
1441 in_recovery = self.dbcursor.fetchone()[0]
1442 if not in_recovery:
1443 self.dbcursor.execute("LISTEN cron_trigger")
1444 else:
1445 _logger.warning("PG cluster in recovery mode, cron trigger not activated")
1446 self.dbcursor.commit()
1448 def stop(self):
1449 super().stop()
1450 self.dbcursor._cnx.close()
1451 self.dbcursor.close()
1453#----------------------------------------------------------
1454# start/stop public api
1455#----------------------------------------------------------
1457server = None
1458server_phoenix = False
1461def load_server_wide_modules():
1462 from odoo.modules.module import load_openerp_module # noqa: PLC0415
1463 with gc.disabling_gc():
1464 for m in config['server_wide_modules']:
1465 try:
1466 load_openerp_module(m)
1467 except Exception:
1468 msg = ''
1469 if m == 'web':
1470 msg = """
1471 The `web` module is provided by the addons found in the `openerp-web` project.
1472 Maybe you forgot to add those addons in your addons_path configuration."""
1473 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
1476def _reexec(updated_modules=None):
1477 """reexecute openerp-server process with (nearly) the same arguments"""
1478 if osutil.is_running_as_nt_service():
1479 subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
1480 exe = os.path.basename(sys.executable)
1481 args = stripped_sys_argv()
1482 if updated_modules:
1483 args += ["-u", ','.join(updated_modules)]
1484 if not args or args[0] != exe:
1485 args.insert(0, exe)
1486 # We should keep the LISTEN_* environment variabled in order to support socket activation on reexec
1487 os.execve(sys.executable, args, os.environ)
1490def preload_registries(dbnames):
1491 """ Preload a registries, possibly run a test file."""
1492 # TODO: move all config checks to args dont check tools.config here
1493 dbnames = dbnames or []
1494 rc = 0
1496 preload_profiler = contextlib.nullcontext()
1498 for dbname in dbnames:
1499 if os.environ.get('ODOO_PROFILE_PRELOAD'): 1499 ↛ 1500line 1499 didn't jump to line 1500 because the condition on line 1499 was never true
1500 interval = float(os.environ.get('ODOO_PROFILE_PRELOAD_INTERVAL', '0.1'))
1501 collectors = [profiler.PeriodicCollector(interval=interval)]
1502 if os.environ.get('ODOO_PROFILE_PRELOAD_SQL'):
1503 collectors.append('sql')
1504 preload_profiler = profiler.Profiler(db=dbname, collectors=collectors)
1505 try:
1506 with preload_profiler:
1507 threading.current_thread().dbname = dbname
1508 update_module = config['init'] or config['update'] or config['reinit']
1510 registry = Registry.new(dbname, update_module=update_module, install_modules=config['init'], upgrade_modules=config['update'], reinit_modules=config['reinit'])
1512 # run post-install tests
1513 if config['test_enable']: 1513 ↛ 1514line 1513 didn't jump to line 1514 because the condition on line 1513 was never true
1514 from odoo.tests import loader # noqa: PLC0415
1515 t0 = time.time()
1516 t0_sql = sql_db.sql_counter
1517 module_names = (registry.updated_modules if update_module else
1518 sorted(registry._init_modules))
1519 _logger.info("Starting post tests")
1520 tests_before = registry._assertion_report.testsRun
1521 post_install_suite = loader.make_suite(module_names, 'post_install')
1522 if post_install_suite.has_http_case():
1523 with registry.cursor() as cr:
1524 env = api.Environment(cr, api.SUPERUSER_ID, {})
1525 env['ir.qweb']._pregenerate_assets_bundles()
1526 result = loader.run_suite(post_install_suite, global_report=registry._assertion_report)
1527 registry._assertion_report.update(result)
1528 _logger.info("%d post-tests in %.2fs, %s queries",
1529 registry._assertion_report.testsRun - tests_before,
1530 time.time() - t0,
1531 sql_db.sql_counter - t0_sql)
1533 registry._assertion_report.log_stats()
1534 if registry._assertion_report and not registry._assertion_report.wasSuccessful(): 1534 ↛ 1535line 1534 didn't jump to line 1535 because the condition on line 1534 was never true
1535 rc += 1
1536 except Exception:
1537 _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
1538 return -1
1539 return rc
1541def start(preload=None, stop=False):
1542 """ Start the odoo http server and cron processor.
1543 """
1544 global server
1546 load_server_wide_modules()
1547 import odoo.http # noqa: PLC0415
1549 if odoo.evented: 1549 ↛ 1550line 1549 didn't jump to line 1550 because the condition on line 1549 was never true
1550 server = GeventServer(odoo.http.root)
1551 elif config['workers']: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true
1552 if config['test_enable']:
1553 _logger.warning("Unit testing in workers mode could fail; use --workers 0.")
1555 server = PreforkServer(odoo.http.root)
1556 else:
1557 if platform.system() == "Linux" and sys.maxsize > 2**32 and "MALLOC_ARENA_MAX" not in os.environ: 1557 ↛ 1579line 1557 didn't jump to line 1579 because the condition on line 1557 was always true
1558 # glibc's malloc() uses arenas [1] in order to efficiently handle memory allocation of multi-threaded
1559 # applications. This allows better memory allocation handling in case of multiple threads that
1560 # would be using malloc() concurrently [2].
1561 # Due to the python's GIL, this optimization have no effect on multithreaded python programs.
1562 # Unfortunately, a downside of creating one arena per cpu core is the increase of virtual memory
1563 # which Odoo is based upon in order to limit the memory usage for threaded workers.
1564 # On 32bit systems the default size of an arena is 512K while on 64bit systems it's 64M [3],
1565 # hence a threaded worker will quickly reach it's default memory soft limit upon concurrent requests.
1566 # We therefore set the maximum arenas allowed to 2 unless the MALLOC_ARENA_MAX env variable is set.
1567 # Note: Setting MALLOC_ARENA_MAX=0 allow to explicitly set the default glibs's malloc() behaviour.
1568 #
1569 # [1] https://sourceware.org/glibc/wiki/MallocInternals#Arenas_and_Heaps
1570 # [2] https://www.gnu.org/software/libc/manual/html_node/The-GNU-Allocator.html
1571 # [3] https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c;h=00ce48c;hb=0a8262a#l862
1572 try:
1573 import ctypes
1574 libc = ctypes.CDLL("libc.so.6")
1575 M_ARENA_MAX = -8
1576 assert libc.mallopt(ctypes.c_int(M_ARENA_MAX), ctypes.c_int(2))
1577 except Exception:
1578 _logger.warning("Could not set ARENA_MAX through mallopt()")
1579 server = ThreadedServer(odoo.http.root)
1581 watcher = None
1582 if 'reload' in config['dev_mode'] and not odoo.evented: 1582 ↛ 1583line 1582 didn't jump to line 1583 because the condition on line 1582 was never true
1583 if inotify:
1584 watcher = FSWatcherInotify()
1585 watcher.start()
1586 elif watchdog:
1587 watcher = FSWatcherWatchdog()
1588 watcher.start()
1589 else:
1590 if os.name == 'posix' and platform.system() != 'Darwin':
1591 module = 'inotify'
1592 else:
1593 module = 'watchdog'
1594 _logger.warning("'%s' module not installed. Code autoreload feature is disabled", module)
1596 rc = server.run(preload, stop)
1598 if watcher: 1598 ↛ 1599line 1598 didn't jump to line 1599 because the condition on line 1598 was never true
1599 watcher.stop()
1600 # like the legend of the phoenix, all ends with beginnings
1601 if server_phoenix: 1601 ↛ 1602line 1601 didn't jump to line 1602 because the condition on line 1601 was never true
1602 _reexec()
1604 return rc if rc else 0
1606def restart():
1607 """ Restart the server
1608 """
1609 if os.name == 'nt':
1610 # run in a thread to let the current thread return response to the caller.
1611 threading.Thread(target=_reexec).start()
1612 else:
1613 os.kill(server.pid, signal.SIGHUP)