Pythonの要素数を制限したキャッシュの実装-ソリューション:シンプルで複雑

タスクステートメント



リクエストに応じて情報を提供し、可能な限り迅速に情報を提供する特定のサービスが必要だとします。 これに対して普通の人は何をしますか? 最も頻繁に要求されるデータをキャッシュします。 この場合、将来について少しでも考えると、キャッシュのサイズを制限する必要があります。

Pythonの場合、実装を簡単にするために、キャッシュ内の要素の数を制限しますここでは、データのサイズがほぼ均一であると仮定し、Pythonオブジェクトが実際に占有するメモリ量を決定することは非常に重要なタスクであり、 、彼がここに来るようにしてください )、キャッシュをできるだけ頻繁に使用される情報を保持するために- 最も使用頻度の低い原則、つまり ずっと前に情報を要求するほど、キャッシュから「飛び出す」可能性が高まりました。



この記事では、2つのソリューション(より単純でより複雑な)について説明します。



哲学の隠れ家



多くの場合、Pythonで記述されたコードはメモリ消費や速度の点で最適化を示していないことに気付きましたここで、これは言語を使用するプログラマーのせいであるだけでなく、単に優れたツールキットがないか、少なくとも私は知らないことに気付くことができます(はい、私はcProfileを知っていますが、それは常に助けにはなりません、例えば、それはアタッチ/デタッチがありません。メモリリークを探すことは一般的に心の弱い人ではありません、pymplerが終了した場合...)、誰かが私に感謝したら、私は感謝します ) Pythonは通常、プログラムの実行時間やメモリ消費が重要でない場合に使用されるため、これは正しいことでもありますが、コードを記述するのにかかる時間とそのサポートの単純さが重要であるため、明らかに経済的ではありませんが、ソリューションはより便利です。

上記は、Pythonコードを「とにかく」記述する必要があることをまったく意味しませんが。 さらに、優れたプロセッサと大量のメモリを備えたサーバーマシンでコードを実行する場合でも、パフォーマンスとメモリ消費が重要になる場合があります。

まさにそのようなケース(CPU時間の不足)と、この問題を研究し、あるキャッシュクラスを別のキャッシュクラスに置き換えるという私の偉業です。



シンプルなソリューション



よく言われます-「単純な解決策は正しいものです」、しかしプログラミングの場合、これは常にそうではありませんか? ただし、キャッシュを提供するタスクがあり、速度に特別な要件がない場合(たとえば、このキャッシュを使用するものはそれ自体非常に低速であり、複雑な実装に投資する意味がありません)、簡単なソリューションでうまくいくことができます。



実装


したがって、比較的簡単なソリューション:

import weakref class SimpleCache: def __init__(self, cacheSize): self.cache = weakref.WeakValueDictionary() self.cache_LRU = [] self.cacheSize = cacheSize def __getitem__(self, key): # no exception catching - let calling code handle them if any value = self.cache[key] return self.touchCache(key, value) def __setitem__(self, key, value): self.touchCache(key, value) def touchCache(self, key, value): self.cache[key] = value if value in self.cache_LRU: self.cache_LRU.remove(value) self.cache_LRU = self.cache_LRU[-(self.cacheSize-1):] + [ value ] return value
      
      







使用する


通常の辞書と同様に、作成後にこのようなキャッシュを使用できますが、要素の読み取り/書き込み時にはキューの最後に配置され、WeakValueDictionary()とcacheSize要素を格納しないリストの組み合わせにより、数に制限があります保存されたデータ。

そのため、コードを実行した後

 cache = SimpleCache(2) cache[1] = 'a' cache[2] = 'b' cache[3] = 'c'
      
      





エントリ「b」と「c」のみがキャッシュに残ります(最も古い「a」が優先されます)。



長所と短所


利点は、比較的単純であることです(約20行のコード。WeakValueDictionaryのドキュメントを読んだ後、操作の原理が明確になります)。

欠点は作業の速度です。なぜなら、 キャッシュを「タッチ」するたびに、リスト全体を再作成し、その任意の場所から要素を削除する必要があります(実際、リストの操作には、「if value in self.cache_LRU」-リスト内の線形検索、 .remove()-もう一度線形検索、次に別のスライスが取得されます-つまり、リストのほぼ完全なコピーが作成されます)。 一言で言えば、比較的長い。



難しい決定



それでは、考えてみましょう-そのようなクラスをどのように加速できますか? 明らかに、私たちの主な問題はcache_LRUを最新に保つことです-私が言ったように、それを検索してからアイテムを削除して再作成するのは高価な操作です。 ここでは、 双方向リンクリストなどが役立ちます-「最後に使用-最後に」サポートを提供しますが、WeakValueDictionary()はこのリストをすばやく検索するのに役立ちます(辞書検索はPython内であるため、線形列挙よりもはるかに高速です辞書はキーハッシュによってバイナリツリーのようなものを実装します(ここではOstapに苦しみました-正直に言って、ソースを見ていないと言うことができるので、辞書検索がどのように機能するかを推測するだけです)



実装


まず、クラス(リストアイテム)を作成します。リストアイテムでは、保存されたデータをラップします。

  class Element(object): __slots__ = ['prev', 'next', 'value', '__init__', '__weakref__'] def __init__(self, value): self.prev, self.next, self.value = None, None, value
      
      





ここで、__ slots__のようなものを使用することに注意する必要があります。これにより、メモリとビットを大幅に節約できます-同じクラスの実装と比較してパフォーマンスが向上しますが、この属性はありません( 公式ドキュメントで何が使用されているか)。

キャッシュクラスを作成します(要素クラスを「避けるため」に固定します):

 import weakref class FastCache: class Element(object): __slots__ = ['prev', 'next', 'value', '__init__', '__weakref__'] def __init__(self, value): self.prev, self.next, self.value = None, None, value def __init__(self, maxCount): self.dict = weakref.WeakValueDictionary() self.head = None self.tail = None self.count = 0 self.maxCount = maxCount def _removeElement(self, element): prev, next = element.prev, element.next if prev: assert prev.next == element prev.next = next elif self.head == element: self.head = next if next: assert next.prev == element next.prev = prev elif self.tail == element: self.tail = prev element.prev, element.next = None, None assert self.count >= 1 self.count -= 1 def _appendElement(self, element): if element is None: return element.prev, element.next = self.tail, None if self.head is None: self.head = element if self.tail is not None: self.tail.next = element self.tail = element self.count += 1 def get(self, key, *arg): element = self.dict.get(key, None) if element: self._removeElement(element) self._appendElement(element) return element.value elif len(*arg): return arg[0] else: raise KeyError("'%s' is not found in the dictionary", str(key)) def __len__(self): return len(self.dict) def __getitem__(self, key): element = self.dict[key] # At this point the element is not None self._removeElement(element) self._appendElement(element) return element.value def __setitem__(self, key, value): try: element = self.dict[key] self._removeElement(element) except KeyError: if self.count == self.maxCount: self._removeElement(self.head) element = FastCache.Element(value) self._appendElement(element) self.dict[key] = element def __del__(self): while self.head: self._removeElement(self.head)
      
      





ここでは、次の重要/興味深い点に注意を払うことができます。



使用する


以前の実装と完全に類似しています(内部のロジックも同様です。Pythonに組み込まれたリストタイプのみを使用するのではなく、 チェスと詩人による双方向リスト実装します )。



優れたパフォーマンスを備えた別のシンプルなソリューション。



ヒントをありがとう、 seriyPS

最初のソリューションと同じくらい簡単に見える(さらに簡単でないとしても)同時に、ほとんど同じくらい速く動作する別のソリューションが登場しました。 実装:

 from collections import OrderedDict class ODCache(OrderedDict): def __init__(self, cacheSize): self.cacheSize = cacheSize OrderedDict.__init__(self) def __getitem__(self, key): value = OrderedDict.__getitem__(self, key) return self._touchCache(key, value) def __setitem__(self, key, value): self._touchCache(key, value) def _touchCache(self, key, value): try: OrderedDict.__delitem__(self, key) except KeyError: pass OrderedDict.__setitem__(self, key, value) toDel = len(self) - self.cacheSize if toDel > 0: for k in OrderedDict.keys(self)[:toDel]: OrderedDict.__delitem__(self, k) return value
      
      







別の解決策はパフォーマンスの向上です。



コメントからの解決策(私が使用したテストによると、速度のリーダーです)、 tbdに感謝します:

 class ListDictBasedCache(object): __slots__ = ['__key2value', '__maxCount', '__weights'] def __init__(self, maxCount): self.__maxCount = maxCount self.__key2value = {}# key->value self.__weights = []# keys ordered in LRU def __updateWeight(self, key): try: self.__weights.remove(key) except ValueError: pass self.__weights.append(key)# add key to end if len(self.__weights) > self.__maxCount: _key = self.__weights.pop(0)# remove first key self.__key2value.pop(_key) def __getitem__(self, key): try: value = self.__key2value[key] self.__updateWeight(key) return value except KeyError: raise KeyError("key %s not found" % key) def __setitem__(self, key, value): self.__key2value[key] = value self.__updateWeight(key) def __str__(self): return str(self.__key2value)
      
      







比較



申し立ては一つのことであり、公平な人物は別のことです。 したがって、私は反対の言葉を口にしないでください。これらのクラスを測定します。

ここでは、Pythonに組み込まれたtimeitモジュールが役立ちます。

 class StubClass: def __init__(self, something): self.something = something def testCache(cacheClass, cacheSize, repeat): d = cacheClass(cacheSize) for i in xrange(repeat * cacheSize): d[i] = StubClass(i) import random class testCacheReadGen: def __init__(self, cacheClass, cacheSize): d = cacheClass(cacheSize) for i in xrange(cacheSize): d[i] = StubClass(i) self.d = d self.s = cacheSize def __call__(self, repeat): cacheSize, d = self.s, self.d for i in xrange(cacheSize * repeat): tmp = d[random.randint(0, cacheSize-1)] def minMaxAvg(lst): return min(lst), max(lst), 1.0 * sum(lst) / len(lst) import timeit def testAllCaches(classes, cacheSize, repeats): templ = '%s: min %.5f, max %.5f, avg %.5f' genmsg = lambda cls, res: templ % ((cls.__name__.ljust(20),) + tuple(minMaxAvg(res))) for cls in classes: t = timeit.Timer(lambda: testCache(cls, cacheSize, repeats[0])) print genmsg(cls, t.repeat(*repeats[1:])) def testAllCachesRead(classes, cacheSize, repeats): templ = '%s: min %.5f, max %.5f, avg %.5f' genmsg = lambda cls, res: templ % ((cls.__name__.ljust(20),) + tuple(minMaxAvg(res))) for cls in classes: tst = testCacheReadGen(cls, cacheSize) t = timeit.Timer(lambda: tst(repeats[0])) print genmsg(cls, t.repeat(*repeats[1:])) if __name__ == '__main__': print 'write' testAllCaches((SimpleCache, FastCache, ODCache, ListDictBasedCache), 100, (100, 3, 3)) print 'read' testAllCachesRead((SimpleCache, FastCache, ODCache, ListDictBasedCache), 100, (100, 3, 3))
      
      







私のマシンでの起動の結果(Intel Core i5 2540M 2.6GHz、Win7 64ビット、ActivePython 2.7.2 x64ビット):

 書きます
 SimpleCache:最小9.36119、最大9.49077、平均9.42536
 FastCache:最小0.39449、最大0.41835、平均0.40880
 ODCache:最小0.79536、最大0.82727、平均0.81482
 ListDictBasedCache:最小0.25135、最大0.27334、平均0.26000
読む
 SimpleCache:最小9.61617、最大9.73143、平均9.66337
 FastCache:最小0.19294、最大0.21941、平均0.20552
 ODCache:最小0.22270、最大0.25816、平均0.23911
 ListDictBasedCache:最小0.16475、最大0.17725、平均0.16911 


単純なソリューションと複雑なソリューションの違いは非常に顕著で、約20倍です! OrderedDictの決定は、パフォーマンスの点では遅れていますが、ほんのわずかです。 さらに結論を出すには、キャッシュタスクの特性を反映した、より複雑な測定を行う必要があります。上記のように、線形ではなく、任意の情報にすばやくアクセスできます。



メモリの消費について-別のメインセクションで前のスクリプトを実行し、タスクマネージャーでPythonインタープリターのメモリ使用量を確認します。

 if __name__ == '__main__': import time print 'measure me - no cache' try: while True: time.sleep(10) except: # let user interrupt this with Ctrl-C pass testCache(FastCache, 1000, 1) print 'measure me - cached' while True: time.sleep(10) exit()
      
      





測定結果は表にあります:

キャッシュクラス キャッシュを作成する前に キャッシュを作成した後 キャッシュ消費
Simplecache 4228K 4768K 540K
Fastcache 4232K 4636K 404K
ODCache 4496K 4936K 440K
ListDictBasedCache 4500K 4880K 38万


ご覧のように、複雑なソリューションでは、要素が大幅に高速化(〜20倍)されるだけでなく、単純なソリューションよりもメモリ消費量がわずかに少なくなります。



このようなキャッシュを作成することで解決した特定のタスクで、それを置き換えることで、応答時間を90秒から約70秒に短縮できました(より高密度のプロファイリングと応答生成ロジックのほぼすべての書き換えにより、応答時間が30秒になりましたが、これはすでに別の話)。



All Articles