This is the twelfth selection of Python tips and programming from my @pythonetc feed.
←
Previous collections
You cannot change closure variables with a simple assignment. Python regards assignment as a definition within the body of a function and generally does not make a closure.
Works fine, displays
2
:
def make_closure(x): def closure(): print(x) return closure make_closure(2)()
And this code throws an
UnboundLocalError: local variable 'x' referenced before assignment
:
def make_closure(x): def closure(): print(x) x *= 2 print(x) return closure make_closure(2)()
To make the code work, use
nonlocal
. This explicitly tells the interpreter not to consider the assignment as a definition:
def make_closure(x): def closure(): nonlocal x print(x) x *= 2 print(x) return closure make_closure(2)()
Sometimes during an iteration you need to find out which element is being processed, first or last. This can be easily determined using an explicit flag:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] first = True for x in iterable: if not first: result += zeros result.append(x) first = False return result assert sparse_list([1, 2, 3], 2) == [ 1, 0, 0, 2, 0, 0, 3, ]
Of course, you could handle the first element outside the loop. This looks cleaner, but leads to partial code duplication. In addition, it will not be so easy to do this when working with abstract
iterable
:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] iterator = iter(iterable) try: result.append(next(iterator)) except StopIteration: return [] for x in iterator: result += zeros result.append(x) return result
You can also use
enumerate
and perform the
i == 0
check (it works only to determine the first element, not the last), however, the best solution would be a generator that returns the
first
and
last
flags together with the
iterable
element:
def first_last_iter(iterable): iterator = iter(iterable) first = True last = False while not last: if first: try: current = next(iterator) except StopIteration: return else: current = next_one try: next_one = next(iterator) except StopIteration: last = True yield (first, last, current) first = False
Now the original function might look like this:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] for first, last, x in first_last_iter(iterable): if not first: result += zeros result.append(x) return result
If you need to measure the time elapsed between two events, then use
time.monotonic()
instead of
time.time()
.
time.monotonic()
never changes in the smaller direction, even when updating the system clock:
from contextlib import contextmanager import time @contextmanager def timeit(): start = time.monotonic() yield print(time.monotonic() - start) def main(): with timeit(): time.sleep(2) main()
Nested context managers usually do not know that they are nested. You can tell them about this by creating internal managers using an external one:
from contextlib import AbstractContextManager import time class TimeItContextManager(AbstractContextManager): def __init__(self, name, parent=None): super().__init__() self._name = name self._parent = parent self._start = None self._substracted = 0 def __enter__(self): self._start = time.monotonic() return self def __exit__(self, exc_type, exc_value, traceback): delta = time.monotonic() - self._start if self._parent is not None: self._parent.substract(delta) print(self._name, 'total', delta) print(self._name, 'outer', delta - self._substracted) return False def child(self, name): return type(self)(name, parent=self) def substract(self, n): self._substracted += n timeit = TimeItContextManager def main(): with timeit('large') as large_t: with large_t.child('medium') as medium_t: with medium_t.child('small-1'): time.sleep(1) with medium_t.child('small-2'): time.sleep(1) time.sleep(1) time.sleep(1) main()
When you need to pass information on a chain of calls, the first thing that comes to mind is to pass data in the form of function arguments.
In some cases, it may be much more convenient to modify all the functions in the chain to transfer a new piece of data. Instead, you can specify a context that will be used by all functions in the chain. How to do it?
The easiest solution is to use a global variable. In Python, you can also use modules and classes as context keepers because, strictly speaking, they are also global variables. You probably already do this regularly, for example, for journaling.
If your application is multithreaded, then ordinary global variables will not work for you, since they are not thread safe. At each point in time, you can have several chains of calls, and each of them needs its own context. The
threading
module will help you, it provides a
threading.local()
object, which is thread safe. You can store data in it with a simple access to the attributes:
threading.local().symbol = '@'
.
However, both described approaches are not concurrency-safe, that is, they are not suitable for the Corutin call chain, in which the system not only calls functions, but also expects them to be executed. When a coroutine executes
await
, an event flow may trigger another coroutine from a different chain. This will not work:
import asyncio import sys global_symbol = '.' async def indication(timeout): while True: print(global_symbol, end='') sys.stdout.flush() await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'): loop = asyncio.get_event_loop() global global_symbol global_symbol = symbol task = loop.create_task( indication(indication_t) ) await asyncio.sleep(t) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( sleep(1, 0.1, '0'), sleep(1, 0.1, 'a'), sleep(1, 0.1, 'b'), sleep(1, 0.1, 'c'), ))
You can fix this by forcing the cycle to set and restore the context each time you switch between coroutines. You can implement this behavior using the
contextvars
module, which has been available since Python 3.7.
import asyncio import sys import contextvars global_symbol = contextvars.ContextVar('symbol') async def indication(timeout): while True: print(global_symbol.get(), end='') sys.stdout.flush() await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'): loop = asyncio.get_event_loop() global_symbol.set(symbol) task = loop.create_task(indication(indication_t)) await asyncio.sleep(t) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( sleep(1, 0.1, '0'), sleep(1, 0.1, 'a'), sleep(1, 0.1, 'b'), sleep(1, 0.1, 'c'), ))