programing

SIGTERM 신호를 정상적으로 처리하는 방법

randomtip 2022. 9. 23. 21:47
반응형

SIGTERM 신호를 정상적으로 처리하는 방법

python으로 이렇게 사소한 데몬을 쓴다고 가정해 봅시다.

def mainloop():
    while True:
        # 1. do
        # 2. some
        # 3. important
        # 4. job
        # 5. sleep

mainloop()

데몬화(Daemonization)를 사용하여start-stop-daemon디폴트로 송신되는SIGTERM(TERM)의 시그널 온--stop.

현재 수행 중인 단계는 다음과 같습니다.#2그리고 지금 이 순간, 우리가 보내드리는 것은TERM신호.

이 경우 실행이 즉시 종료됩니다.

다음 방법으로 신호 이벤트를 처리할 수 있습니다.signal.signal(signal.SIGTERM, handler)하지만 문제는 여전히 현재의 실행을 방해하고 제어권을handler.

그래서 제 질문은 - 현재 실행을 방해하지 않고TERM세팅할 수 있도록 분리된 스레드(?)로 신호를 보냅니다.shutdown_flag = True하도록mainloop()우아하게 멈출 기회가 있었나요?

클래스 기반 클린 사용 솔루션:

import signal
import time

class GracefulKiller:
  kill_now = False
  def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)

  def exit_gracefully(self, *args):
    self.kill_now = True

if __name__ == '__main__':
  killer = GracefulKiller()
  while not killer.kill_now:
    time.sleep(1)
    print("doing something in a loop ...")
   
  print("End of the program. I was killed gracefully :)")

첫째, 두 번째 스레드가 필요한지는 잘 모르겠습니다.shutdown_flag.
SIGTERM 핸들러에서 직접 설정하지 그래요?

다른 방법으로는 예외로 하는 것이 있습니다.SIGTERM스택 위로 전파되는 핸들러입니다.적절한 예외 처리(예:with/contextmanager그리고.try: ... finally:blocks) 프로그램을 사용하는 경우와 마찬가지로 상당히 정상적인 셧다운이 되어야 합니다.

프로그램 예시signals-test.py:

#!/usr/bin/python

from time import sleep
import signal
import sys


def sigterm_handler(_signo, _stack_frame):
    # Raises SystemExit(0):
    sys.exit(0)

if sys.argv[1] == "handle_signal":
    signal.signal(signal.SIGTERM, sigterm_handler)

try:
    print "Hello"
    i = 0
    while True:
        i += 1
        print "Iteration #%i" % i
        sleep(1)
finally:
    print "Goodbye"

이제 동작을 확인합니다.

$ ./signals-test.py default
Hello
Iteration #1
Iteration #2
Iteration #3
Iteration #4
^CGoodbye
Traceback (most recent call last):
  File "./signals-test.py", line 21, in <module>
    sleep(1)
KeyboardInterrupt
$ echo $?
1

이번에 보냅니다.SIGTERM를 4회 반복한 후kill $(ps aux | grep signals-test | awk '/python/ {print $2}'):

$ ./signals-test.py default
Hello
Iteration #1
Iteration #2
Iteration #3
Iteration #4
Terminated
$ echo $?
143

이번에는 커스텀을 유효하게 합니다.SIGTERM핸들러와 송신SIGTERM:

$ ./signals-test.py handle_signal
Hello
Iteration #1
Iteration #2
Iteration #3
Iteration #4
Goodbye
$ echo $?
0

다음은 스레드 또는 클래스가 없는 간단한 예입니다.

import signal

run = True

def handler_stop_signals(signum, frame):
    global run
    run = False

signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)

while run:
    pass # do stuff including other IO stuff

나는 네가 가능한 해결책에 가깝다고 생각해.

실행mainloop다른 실로 그것을 속성과 함께 연장하다shutdown_flag신호는 다음과 같이 잡을 수 있습니다.signal.signal(signal.SIGTERM, handler)(개별 스레드가 아닌) 메인 스레드 내에 있습니다.신호 핸들러는 다음과 같이 설정해야 합니다.shutdown_flag참으로 스레드가 끝날 때까지 기다립니다.thread.join()

이전 답변을 바탕으로 sigint와 sigterm으로부터 보호하는 컨텍스트 매니저를 만들었습니다.

import logging
import signal
import sys


class TerminateProtected:
    """ Protect a piece of code from being killed by SIGINT or SIGTERM.
    It can still be killed by a force kill.

    Example:
        with TerminateProtected():
            run_func_1()
            run_func_2()

    Both functions will be executed even if a sigterm or sigkill has been received.
    """
    killed = False

    def _handler(self, signum, frame):
        logging.error("Received SIGINT or SIGTERM! Finishing this block, then exiting.")
        self.killed = True

    def __enter__(self):
        self.old_sigint = signal.signal(signal.SIGINT, self._handler)
        self.old_sigterm = signal.signal(signal.SIGTERM, self._handler)

    def __exit__(self, type, value, traceback):
        if self.killed:
            sys.exit(0)
        signal.signal(signal.SIGINT, self.old_sigint)
        signal.signal(signal.SIGTERM, self.old_sigterm)


if __name__ == '__main__':
    print("Try pressing ctrl+c while the sleep is running!")
    from time import sleep
    with TerminateProtected():
        sleep(10)
        print("Finished anyway!")
    print("This only prints if there was no sigint or sigterm")

내게 가장 쉬운 방법을 찾았다.이 방법은 흐름 제어에 도움이 된다는 것을 명확히 하기 위해 포크를 사용한 예를 다음에 나타냅니다.

import signal
import time
import sys
import os

def handle_exit(sig, frame):
    raise(SystemExit)

def main():
    time.sleep(120)

signal.signal(signal.SIGTERM, handle_exit)

p = os.fork()
if p == 0:
    main()
    os._exit()

try:
    os.waitpid(p, 0)
except (KeyboardInterrupt, SystemExit):
    print('exit handled')
    os.kill(p, 15)
    os.waitpid(p, 0)

위의 답변에서 영감을 얻은 가장 간단한 솔루션은 다음과 같습니다.

class SignalHandler:

    def __init__(self):

        # register signal handlers
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)

        self.logger = Logger(level=ERROR)

    def exit_gracefully(self, signum, frame):
        self.logger.info('captured signal %d' % signum)
        traceback.print_stack(frame)

        ###### do your resources clean up here! ####

        raise(SystemExit)

사용 방법의 코드 샘플signal:

#! /usr/bin/env python

import signal


def ctrl_handler(signum, frm):
    print "You can't cannot kill me"


print "Installing signal handler..."
signal.signal(signal.SIGINT, ctrl_handler)
print "done"

while True:
    # do something
    pass

를 설정할 수 있습니다.threading.Event신호를 잡을 때.

threading.Event스레드 세이프이며, 다른 장소에서도 같은 이벤트를 설정 및 클리어 할 수 있습니다.

import signal, threading

quit_event = threading.Event()
signal.signal(signal.SIGTERM, lambda *_args: quit_event.set())

while not quit_event.is_set():
    print("Working...")

언급URL : https://stackoverflow.com/questions/18499497/how-to-process-sigterm-signal-gracefully

반응형