x초마다 함수를 반복 실행하는 방법은 무엇입니까?
나는 목표 C의 NSTimer나 JS의 setTimeout처럼 60초마다 Python에서 기능을 반복적으로 실행하고 싶습니다.이 코드는 데몬으로 실행되며 사용자가 설정할 필요 없이 크론을 사용하여 매 분마다 파이썬 스크립트를 호출하는 것과 효과적입니다.
Python에 구현된 cron에 대한 이 질문에서 솔루션은 x초 동안 효과적으로 sleep()만 하는 것으로 보입니다.저는 그런 고급 기능이 필요하지 않기 때문에 아마도 이런 것이 작동할 것입니다.
while True:
# Code executed here
time.sleep(60)
이 코드에 예측 가능한 문제가 있습니까?
프로그램에 이벤트 루프가 아직 없는 경우 범용 이벤트 스케줄러를 구현하는 스케줄링된 모듈을 사용합니다.
import sched, time
def do_something(scheduler):
# schedule the next call first
scheduler.enter(60, 1, do_something, (scheduler,))
print("Doing stuff...")
# then do your stuff
my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()
에는 다음과 같이 하십시오.asyncio,trio,tkinter,PyQt5,gobject,kivy기타 많은 작업 - 대신 기존 이벤트 루프 라이브러리의 방법을 사용하여 작업을 예약합니다.
다음과 같이 시스템 시계에 타임 루프를 잠급니다.
import time
starttime = time.time()
while True:
print("tick")
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
무한 루프를 차단하는 대신 비차단 방식으로 기능을 주기적으로 실행하려면 스레드 타이머를 사용합니다.이렇게 하면 코드가 계속 실행되고 다른 작업을 수행할 수 있으며 n초마다 함수가 호출됩니다.저는 CPU/디스크/네트워크 집약적인 긴 작업에 대한 진행 상황 정보를 인쇄할 때 이 기술을 많이 사용합니다.
다음은 start()와 stop() 컨트롤을 사용하여 비슷한 질문에 올린 코드입니다.
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
용도:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
특징:
- 표준 라이브러리 전용, 외부 종속성 없음
start()그리고.stop()시작된 번 안전합니다.- 호출할 함수는 위치 및 명명된 인수를 가질 수 있습니다.
- 을 변경할 수 .
interval언제든지 다음 실행 후에 유효합니다.에도 마찬가지입니다.args,kwargsㅠㅠfunction!
Reactor Pattern을 구현하는 Python 네트워킹 라이브러리인 Twisted를 고려할 수 있습니다.
from twisted.internet import task, reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
"사실: sleep(60)"은 아마도 작동할 것입니다. 트위스트는 아마도 최종적으로 필요하게 될 많은 기능(데몬화, 로깅 또는 보빈스가 지적한 예외 처리)을 이미 구현했을 것이며, 아마도 더 강력한 솔루션이 될 것입니다.
다음은 메스트레리온의 코드에 대한 업데이트로, 시간이 지남에 따라 표류하는 것을 참조하십시오.
여기서 RepeatedTimer 클래스는 OP의 요청에 따라 주어진 함수를 "간격" 초마다 호출합니다. 일정은 함수가 실행되는 데 걸리는 시간에 따라 달라지지 않습니다.저는 이 솔루션이 외부 라이브러리 의존성이 없기 때문에 마음에 듭니다. 이것은 순수한 파이썬입니다.
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
샘플 사용량(MestreLion의 답변에서 복사):
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
나머지 코드를 차단하지 않고 이 작업을 수행하려면 이를 사용하여 자체 스레드에서 실행되도록 할 수 있습니다.
import threading
threading.Thread(target=lambda: every(5, foo)).start()
이 솔루션은 다른 솔루션에서는 거의 찾아볼 수 없는 몇 가지 기능을 결합합니다.
- 예외 처리: 이 수준에서 가능한 한 예외가 적절하게 처리됩니다. 즉, 프로그램을 중단하지 않고 디버깅 목적으로 로그에 기록됩니다.
- 체인 없음:(다음 이벤트를 예약하기 위한) 많은 답변에서 흔히 볼 수 있는 체인과 같은 구현은 스케줄링 메커니즘 내에서 뭔가 잘못될 경우 취약합니다.
threading.Timer또는 무엇이든), 이렇게 하면 체인이 종료됩니다.문제의 원인이 이미 해결된 경우에도 더 이상의 실행은 발생하지 않습니다.간단한 루프와 간단한 대기sleep()이에 비해 훨씬 강력합니다. - 표류 없음: 제 솔루션은 실행 예정 시간을 정확하게 추적합니다.실행 시간에 따른 드리프트는 없습니다(다른 많은 솔루션에서와 마찬가지로).
- 건너뛰기:한 번의 실행에 너무 많은 시간이 걸리는 경우(예: 5초마다 X를 수행하지만 X는 6초가 소요됨) 이 솔루션에서는 작업을 건너뜁니다.이것이 표준 cron 동작입니다(그리고 타당한 이유가 있습니다).다른 많은 솔루션은 지연 없이 작업을 여러 번 연속적으로 실행합니다.대부분의 경우(예: 정리 작업)에서는 이 작업을 수행하지 않습니다.원하는 경우 간단히 사용
next_time += delay대신.
더 쉬운 방법은 다음과 같습니다.
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
이런 식으로 코드가 실행된 다음 60초 동안 기다렸다가 다시 실행되고, 대기하고, 실행하는 등의 작업을 수행합니다.복잡할 필요 없음:d
저는 결국 스케줄 모듈을 사용하게 되었습니다.API가 좋네요.
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
대안적인 유연성 솔루션은 A 스케줄러입니다.
pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass
sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds
sched.start()
또한, 스케줄러는 다음과 같은 많은 스케줄러를 제공합니다.
BlockingScheduler: 프로세스에서 스케줄러가 유일하게 실행되는 경우 사용합니다.
BackgroundScheduler: 아래 프레임워크를 사용하지 않고 응용 프로그램 내부의 백그라운드에서 스케줄러를 실행할 때 사용합니다.
AsyncIOScheduler: 응용 프로그램에서 asyncio 모듈을 사용하는 경우 사용
GeventScheduler: 응용 프로그램이 gevent를 사용하는 경우 사용
Tornado Scheduler: Tornado 애플리케이션을 구축하는 경우 사용
TwistedScheduler: Twisted 응용 프로그램을 빌드하는 경우 사용
QtScheduler: Qt 애플리케이션을 빌드할 때 사용합니다.
드리프트가 문제가 되지 않는 경우
import threading, time
def print_every_n_seconds(n=2):
while True:
print(time.ctime())
time.sleep(n)
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()
비동기식으로 출력합니다.
#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018
실행 중인 작업에 상당한 시간이 소요되면 간격이 2초 + 작업 시간이 되므로 정확한 스케줄링이 필요한 경우에는 이 작업을 수행할 수 없습니다.
에 하십시오.daemon=Trueflag는 이 스레드가 앱 종료를 차단하지 않음을 의미합니다.예를 들어, 다음과 같은 문제가 있었습니다.pytest테스트를 실행한 후 이 광고가 중지될 때까지 무기한 중단됩니다.
저는 얼마 전에 비슷한 문제에 직면했습니다.http://cronus.readthedocs.org 가 도움이 될까요?
v0.2의 경우 다음 스니펫이 작동합니다.
import cronus.beat as beat
beat.set_rate(2) # run twice per second
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
그것과 cron의 주요 차이점은 예외가 데몬을 영원히 죽일 것이라는 것입니다.예외 포수와 로거로 포장하는 것이 좋습니다.
간단히 사용
import time
while True:
print("this will run after every 30 sec")
#Your code here
time.sleep(30)
한 가지 가능한 대답:
import time
t=time.time()
while True:
if time.time()-t>10:
#run your task here
t=time.time()
저는 () 메서드 이후에 Tkinter를 사용합니다. 이 메서드는 "게임을 훔치지" 않습니다. 즉, 다른 작업을 병렬로 실행할 수 있습니다.
import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
do_something1()그리고.do_something2()병렬 및 임의의 간격 속도로 실행할 수 있습니다.여기서 두 번째 것은 두 배로 빠르게 실행됩니다.또한 두 기능 중 하나를 종료하기 위한 조건으로 간단한 카운터를 사용했습니다.프로그램이 종료될 때까지 실행할 기능(예: 시계)을 사용할 경우 원하는 다른 조건을 사용하거나 사용하지 않을 수 있습니다.
여기 메스트레 라이온의 코드를 각색한 버전이 있습니다.이 코드는 원래 기능 외에도 다음과 같습니다.
특정 시간에 타이머를 실행하는 데 사용되는 first_timeout을 추가합니다(first_timeout을 계산하고 전달해야 합니다).
인종 조건을 원래 코드로 해결합니다.원래 코드에서 제어 스레드가 실행 중인 타이머를 취소하지 못한 경우("타이머를 중지하고, 타이머 동작의 실행을 취소합니다.이것은 타이머가 여전히 대기 단계에 있을 때만 작동합니다."라고 https://docs.python.org/2/library/threading.html), 에서 인용한 바와 같이 타이머는 끝없이 실행됩니다.
class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
self.timer = None
self.first_interval = first_interval
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.running = False
self.is_started = False
def first_start(self):
try:
# no race-condition here because only control thread will call this method
# if already started will not start again
if not self.is_started:
self.is_started = True
self.timer = Timer(self.first_interval, self.run)
self.running = True
self.timer.start()
except Exception as e:
log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
raise
def run(self):
# if not stopped start again
if self.running:
self.timer = Timer(self.interval, self.run)
self.timer.start()
self.func(*self.args, **self.kwargs)
def stop(self):
# cancel current timer in case failed it's still OK
# if already stopped doesn't matter to stop again
if self.timer:
self.timer.cancel()
self.running = False
추가 라이브러리를 사용하지 않는 다른 솔루션이 있습니다.
def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
"""Delay using a boolean callable function.
`condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
It can break early if condition is met.
Args:
condition_fn - a callable boolean function
interval_in_sec - wait time between calling `condition_fn`
timeout_in_sec - maximum time to run
Returns: None
"""
start = last_call = time.time()
while time.time() - start < timeout_in_sec:
if (time.time() - last_call) > interval_in_sec:
if condition_fn() is True:
break
last_call = time.time()
이를 사용하여 시간당 60개의 이벤트를 발생시키고 대부분의 이벤트는 1분 후 동일한 초에 발생합니다.
import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)
info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK
while 1:
set_timing()
print('hello world')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info['tick'] += 1
info['completion_time'] = time.time()
실제 조건에 따라 길이의 눈금이 표시될 수 있습니다.
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
그러나 60분이 지나면 60개의 눈금이 표시되며, 대부분의 눈금은 원하는 시간에 맞춰 정확한 오프셋으로 표시됩니다.
시스템에서 수정이 필요할 때까지 1/20초 미만의 일반적인 드리프트를 얻습니다.
이 방법의 장점은 클럭 드리프트를 해결할 수 있다는 것입니다. 이는 체크 표시당 하나의 항목을 추가하고 시간당 60개의 항목이 추가될 것으로 예상되는 경우 문제가 발생할 수 있습니다.드리프트를 고려하지 않으면 이동 평균과 같은 2차 징후가 발생하여 데이터가 과거로 너무 깊이 들어가 잘못된 출력을 초래할 수 있습니다.
예: 현재 현지 시간 표시
import datetime
import glib
import logger
def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)
def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True
# call every minute
glib.timeout_add(60*1000, display_local_time)
timeed-count는 시스템 클럭에 동기화되므로 고정밀(즉, 1ms 미만)로 이를 수행할 수 있습니다.시간에 따라 표류하지 않으며 코드 실행 시간의 길이에 영향을 받지 않습니다(물론 간격 기간보다 작은 경우).
간단한 차단 예제:
from timed_count import timed_count
for count in timed_count(60):
# Execute code here exactly every 60 seconds
...
스레드에서 실행하면 쉽게 비차단 상태로 만들 수 있습니다.
from threading import Thread
from timed_count import timed_count
def periodic():
for count in timed_count(60):
# Execute code here exactly every 60 seconds
...
thread = Thread(target=periodic)
thread.start()
''' tracking number of times it prints'''
import threading
global timeInterval
count=0
def printit():
threading.Timer(timeInterval, printit).start()
print( "Hello, World!")
global count
count=count+1
print(count)
printit
if __name__ == "__main__":
timeInterval= int(input('Enter Time in Seconds:'))
printit()
당신이 무엇을 하고 싶은지에 따라 다르다고 생각하고 당신의 질문은 많은 세부 사항을 명시하지 않았습니다.
저는 이미 멀티 스레드화된 프로세스 중 하나에서 값비싼 작업을 수행하고 싶습니다.그래서 저는 리더 프로세스가 시간을 확인하고 그녀만이 값비싼 작업(딥 러닝 모델 체크포인트)을 하도록 합니다.이를 위해 카운터를 늘려 5초마다 10초, 15초가 경과했는지 확인합니다(또는 math.floor와 함께 모듈식 산술을 사용합니다).
def print_every_5_seconds_have_passed_exit_eventually():
"""
https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
:return:
"""
opts = argparse.Namespace(start=time.time())
next_time_to_print = 0
while True:
current_time_passed = time.time() - opts.start
if current_time_passed >= next_time_to_print:
next_time_to_print += 5
print(f'worked and {current_time_passed=}')
print(f'{current_time_passed % 5=}')
print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True
나에게 필요한 것은 if 명세서의 확인입니다.이미 복잡한 멀티프로세싱 멀티 gpu 코드에 스레드, 스케줄러가 있다는 것은 내가 피할 수 있고 피할 수 있을 것 같다면 추가하고 싶은 복잡성이 아닙니다.작업자 ID를 확인하면 한 프로세스만 이 작업을 수행하는지 쉽게 확인할 수 있습니다.
참고 정확한 시간을 확인하는 것은 분명히 효과가 없을 것이기 때문에 저는 진짜 인쇄문을 사용하여 모듈식 산술 트릭이 효과가 있는지 확인했습니다!하지만 바닥이 속임수를 썼다는 사실에 저는 매우 놀랐습니다.
언급URL : https://stackoverflow.com/questions/474528/how-to-repeatedly-execute-a-function-every-x-seconds
'programing' 카테고리의 다른 글
| VB.NET에서 자체 서명된 TLS/SSL 인증서 수락 (0) | 2023.06.03 |
|---|---|
| SimpleNamespace와 빈 클래스 정의의 차이점은 무엇입니까? (0) | 2023.06.03 |
| iOS 앱이 감옥에서 고장난 전화기에서 실행되고 있는지 어떻게 감지합니까? (0) | 2023.06.03 |
| Sqlite용 앱 내 데이터베이스 마이그레이션 모범 사례 (0) | 2023.06.03 |
| Git의 파일에 chmod 권한을 추가하는 방법은 무엇입니까? (0) | 2023.06.03 |