Source code for pyplanet.god.process

import threading
import multiprocessing

from colorlog import ColoredFormatter


def _run(name, queue, options):
	"""
	The actual process that runs the separate controller instance.

	:param name: name of the process
	:param queue: Queue of the binding parent.
	:param options: Custom Options
	:type name: str
	"""
	from pyplanet.core.instance import Controller
	from pyplanet.utils.log import initiate_logger, QueueHandler
	import logging

	# Tokio Asyncio (EXPERIMENTAL).
	if 'tokio' in options and options['tokio'] is True:
		import asyncio
		import tokio
		policy = tokio.TokioLoopPolicy()
		asyncio.set_event_loop_policy(policy)
		asyncio.set_event_loop(tokio.new_event_loop())
		logging.warning('Using experimental Tokio Asyncio Loop!')

	# Logging to queue.
	if multiprocessing.get_start_method() != 'fork':  # pragma: no cover
		initiate_logger()
		root_logger = logging.getLogger()
		formatter = ColoredFormatter(
			'%(log_color)s%(levelname)-8s%(reset)s %(yellow)s[%(threadName)s][%(name)s]%(reset)s %(blue)s%(message)s'
		)
		queue_handler = QueueHandler(queue)
		queue_handler.setFormatter(formatter)
		root_logger.addHandler(queue_handler)

	logging.getLogger(__name__).info('Starting pool process for \'{}\'...'.format(name))

	# Setting thread name to our process name.
	threading.main_thread().setName(name)

	# Initiate instance.
	instance = Controller.prepare(name).instance
	instance._queue = queue

	# Start and loop instance.
	instance.start()


[docs]class InstanceProcess: """ The InstanceProcess is the encapsulation around the real controller instance. .. warning:: This code is still being executed at the main process!! """ def __init__(self, queue, environment_name='default', pool=None, options=None): """ Create an environment process of the controller itself. :param queue: Queue to hook on. :param environment_name: Name of environment. :param pool: Pool. :param options: Custom options. :type queue: multiprocessing.Queue :type environment_name: str :type pool: multiprocessing.Pool :type options: dict """ self.queue = queue self.name = environment_name self.options = options or dict() self.max_restarts = 1 self.restarts = 0 self.process = multiprocessing.Process(target=_run, kwargs=dict( name=self.name, queue=self.queue, options=self.options, )) self.__last_state = True @property def did_die(self): """ Boolean determinating if the process did die. """ if not self.is_alive() and self.__last_state: self.__last_state = False return True return False @property def exitcode(self): """ Exit code of process. :return: Exit code. """ return self.process.exitcode @property def will_restart(self): """ Boolean: Is the process able to restart (not reached max_restarts). """ return self.restarts < self.max_restarts
[docs] def is_alive(self): """ Call process method is_alive() """ return self.process.is_alive()
[docs] def start(self): """ Start the process. """ return self.process.start()
[docs] def shutdown(self): """ Shutdown (terminate) process. """ try: return self.process.terminate() except: pass return None
[docs] def graceful(self): """ Graceful shutdown the process. """ self.process.join(timeout=10)