Kubernetes - How to Configure Docker Repository to Pull Image and Configure Secret
Introduction In most of cases, you are not pulling images from docker hub public…
May 09, 2022
We often require to execute in timed manner, i.e. to specify a max timeout after which function should get terminated.
We will write a decorator function in python, which we can use as annotation in any code where want this behavior.
import signal
import time
class TimedFunction(Exception):
"""
Its an exception thrown when timeout happens.
# NOTE: this will break in multithreading
# it should work fine in multiprocessing
"""
prev = list()
def __init__(self, timeout=10):
"""
:param int timeout: timeout in seconds, default 10s
"""
self._timeout = timeout
self._started = 0
self._prev = []
super(TimedFunction, self).__init__(repr(self))
def __repr__(self):
return f'Timeout: {self._timeout} seconds'
def restore(self, ended=False):
"""
this method restores the original alarm signal handler, or sets up
the next timer on the pushdown stack.
"""
if not ended and signal.getitimer(signal.ITIMER_REAL)[0] > 0:
return
while self._prev and self in self._prev:
self._prev.pop()
if self._prev:
prev = self._prev[-1]
time_diff = time.time() - prev.started
time_remaining = prev.timeout - time_diff
if time_remaining > 0:
signal.signal(signal.SIGALRM, prev.fire_timer)
signal.setitimer(signal.ITIMER_REAL, time_remaining)
else:
prev.restore()
else:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
def fire_timer(self, *_sig_param):
"""
when an itimer fires, execution enters this method
which either clears timers, sets up the next timer in a nest
as specified by the options.
After the timers are handled, this method raises the TimedFunction
exception.
"""
self.restore()
raise self
def __enter__(self):
"""
the logic that starts the timers is normally fired by the with
keyword though, with just calls this __enter__ function. The timers
are started here.
"""
self._prev.append(self)
signal.signal(signal.SIGALRM, self.fire_timer)
self._started = time.time()
signal.setitimer(signal.ITIMER_REAL, self._timeout)
return self
def __exit__(self, e_type, e_obj, e_tb):
"""
when the code leaves the a TimedFunction with block, execution enters this __exit__
method. It attempts to clean up any remaining timers.
"""
self.restore(ended=True)
def timedfunction_wrapper(**t_kw):
"""
wrap decroated function in a with TimedFunction block and guard against exceptions
The options are roughly the same as for TimedFunction with a minor exception.
options:
"""
def _decorator(actual):
def _wrapper(*a, **kw):
with TimedFunction(**t_kw):
return actual(*a, **kw)
return _wrapper
return _decorator
Note: I placed this code in a folder: python_utils/timed_function.py
import time
from python_utils.timed_function import timedfunction_wrapper
@timedfunction_wrapper(timeout=2)
def test():
print('In Test')
for i in range(1, 100):
print(f'Sleeping - {i}')
time.sleep(1)
test()
The github link for this code is: https://github.com/goravsingal/python_utils
Thanks for reading.
Introduction In most of cases, you are not pulling images from docker hub public…
This is due to our web server are configured to deny accessing this directory…
Introduction In your backend and frontend projects, you always need to deal with…
Introduction I had to create many repositories in an Github organization. I…
Introduction I had to develop a small automation to query some old mysql data…
Introduction Npm has a tool called: npm audit which reports if your packages or…
Introduction So you have a Django project, and want to run it using docker image…
Introduction It is very important to introduce few process so that your code and…
Introduction In this post, we will see a sample Jenkin Pipeline Groovy script…
Introduction In some of the cases, we need to specify namespace name along with…
Introduction In most of cases, you are not pulling images from docker hub public…
Introduction I was trying to integrate Okta with Spring, and when I deploy the…