GDBを使用してLinuxプロセスにその場でパッチを適用する

Linuxでの関数フックの手法はよく知られており、インターネットで説明されています。 最も簡単な方法は、「クローン関数」を使用して動的ライブラリを作成し、LD_PRELOADメカニズムを使用して、プロセスのロード段階でインポートテーブルをオーバーライドすることです。



LD_PRELOADの欠点は、プロセスの開始を制御する必要があることです。 すでに実行中のプロセスの関数またはインポートテーブルにない関数をインターセプトするには、「スプライシング」を使用します。インターセプトされた関数の先頭でインターセプターに移動するコマンドを記録します。



Pythonには、C言語のデータや関数(つまり、Cインターフェースを備えた多数の動的ライブラリ)とやり取りできるctypes



モジュールがctypes



ことも知られています。 したがって、プロセスの機能をインターセプトし、 ctypes



を使用してCコールバックでラップされたPythonメソッドに送信することを妨げるものはありません。



制御をインターセプトし、ターゲットプロセスにコードをロードするには、Python( https://sourceware.org/gdb/current/onlinedocs/gdb/Python-API.html )で拡張モジュールの作成をサポートするGDBデバッガーを使用すると便利です。

ニュアンス
この例のコードは記事の最後に完全に記載されており、2つのファイルで構成されています。



  • pyinject.py-GDB拡張機能
  • hook.py-フック関数を備えたモジュール


GDB側では、コードはユーザーコマンドとして便利にフォーマットされます。 gdb.Command



クラスから継承することにより、新しいコマンドを作成できます。 GDBでコマンドを使用する場合、 invoke(argument, from_tty)



メソッドが呼び出されます。



gdb.Parameter



継承するカスタムパラメータを作成することもできます。 サンプル記事では、インターセプション関数でファイル名を指定するために使用されます。



実行中のPID



プロセスへの接続とモジュールのロードは、GDBを起動するとすぐに実行できます

 gdb -ex 'attach PID' -ex 'source pyinject.py' -ex 'set hookfile hook.py'
      
      



このデバッグされたプロセスのフィールドが停止し、インタラクティブなGDBコマンドラインが起動します。ここで、新しいpyinjectコマンドが使用可能になります。



傍受は3つの段階に分けることができます。
  1. Pythonインタープリターをターゲットプロセスのアドレス空間に挿入する
  2. キャプチャされた機能に関する情報のキャプチャ
  3. 実際に傍受
節1と2はデバッガー側で簡単に実行でき、節3はすでにターゲットプロセス内にあります。



Pythonインタープリターインジェクション



Python GDBインターフェースのほとんどは、デバッグ機能を拡張するように設計されています。 他のすべてについては、 gdb.execute(command, from_tty, to_string)



があります。これにより、任意のGDBコマンドを実行し、その出力を文字列として取得できます。

例:

 out = gdb.execute("info registers", False, True)
      
      



また、式を評価し、結果をgdb.Value



として返すgdb.parse_end_eval(expression)



gdb.Value



ます。



最初のステップは、Pythonライブラリをターゲットプロセスのアドレス空間にロードすることです。 これを行うには、ターゲットプロセスのコンテキストでdlopen



を呼び出します。

gdb.execute



またはgdb.parse_and_eval



call



コマンドを使用できます。

 # pyinject.py gdb.execute('call dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(gdb.history(0)) handle = gdb.parse_and_eval('dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(handle)
      
      





その後、インタープリターを初期化できます

 # pyinject.py gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()')
      
      



最初の呼び出しはGIL(グローバルインタープリターロック)を作成し、2番目の呼び出しはPython C-APIを使用するために準備します。



そしてインターセプション関数でモジュールをロードします

 # pyinject.py fp = gdb.parse_and_eval('fopen("hook.py", "r")') assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "hook.py", 1)' % fp)
      
      



PyRun_AnyFileEx



は、 PyRun_AnyFileEx



モジュールのコンテキストでファイルからコードを実行します。
ニュアンス
上記は、ターゲットプロセスがPythonを(メインまたはスクリプト言語として)使用しない場合にのみ機能します。 そうでない場合、すべてが非常に複雑です。 主な問題は、デバッグのためにランダムな場所で停止したプロセスでは、Python C-API関数を使用できないことです(おそらくPy_AddPendingCall



を除く)。


Hook.pyモジュール



hook.pyモジュールには、フック関数と、フック自体を実行するフッククラスが含まれています。

インターセプター関数は、デコレーターを使用して指定されます。 たとえば、標準ライブラリのopen



関数の場合、引数を出力し、 orig



フィールドに格納されている元の関数を呼び出した結果を返します

 # hook.py @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag)
      
      





@hook



デコレーターは2つのパラメーターを受け入れます。 デコレータはHookクラスに関数を登録し、変更せずに戻ります。

 # hook.py def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco
      
      





register



メソッドは、クラスのインスタンスを作成し、 all_hooks



辞書に格納します。 したがって、ファイルの実行後、 Hook.all_hooks



のデコレーターのおかげで、インターセプターの利用可能な機能に関するすべての情報が得られます。

 # hook.py class Hook(object): all_hooks = {} @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args)
      
      





1つの関数を呼び出してGDBからインターセプトするには、インターセプトを担当するHook



クラスで静的メソッドを定義すると便利です

 # hook.py class Hook(object): @staticmethod def hook(symbol, *args): h = Hook.all_hooks[symbol] if h.active: return h.install(*args)
      
      



*args



は、フックされた関数に関する追加情報をここに提供します。 どれが傍受の方法に依存します。



スプライシングの傍受方法



スプライシングは、元の関数の呼び出し方法に応じて、2つの亜種にグローバルに分割されます。



単純なフックでは、元の関数の呼び出しはいくつかのステップで構成されます。
  1. 元の関数の先頭が保存されたコピーから復元されます
  2. 電話をかける
  3. 始まりは、インターセプターへの遷移命令によって再び上書きされます
ニュアンス
欠点は明らかです。マルチスレッドプログラムでは、別のスレッドが関数の先頭を上書きするときに関数を呼び出さないことを保証できません。 これは、元の関数が呼び出されている間に他のスレッドを停止することで部分的に処理されます。 しかし、第一に、これを達成する標準的な方法はありません。第二に、mallocのような関数の呼び出しに失敗すると、デッドロックをキャッチできます。


トランポリンフックでは、元の関数の先頭が新しい場所にコピーされ、その後、元の関数の本体への遷移が記録されます。 このオプションでは、元の機能は常に新しいアドレスで使用できます。



トランポリンフックはマルチスレッドプログラムで動作しますが、インストールははるかに困難です。 整数個の命令を書き換える必要がありますが、一般的には逆アセンブラが使用されます。 x86_64アーキテクチャの出現により、 %rip



レジスタ(現在のコマンドアドレス)に対するメモリアドレス指定の偏在性により、さらに多くの問題が追加されました。

ニュアンス
GDBのopen



関数の始まりを見てみましょう。

 0x7f6cc8aa83e0 <open64+0>: 83 3d ed 33 2d 00 00 cmpl $0x0,0x2d33ed(%rip) 0x7f6cc8aa83e7 <open64+7>: 75 10 jne 0x7f6cc8aa83f9 <open64+25> 0x7f6cc8aa83e9 <__open_nocancel+0>: b8 02 00 00 00 mov $0x2,%eax 0x7f6cc8aa83ee <__open_nocancel+5>: 0f 05 syscall
      
      





最初のコマンド " cmpl $0x0,0x2d33ed(%rip)



"を別のアドレスに0x7f6cc8d7b7d4



、現在0x7f6cc8d7b7d4



指している相対アドレス0x2d33ed(%rip)



は別の場所(hi SIGSEGV)を指します。



この関数のトランポリンフックを作成するには、次のものが必要です。
  1. 関数の先頭でコマンドのサイズを決定します
  2. cmplコマンドの宛先アドレスから2 GB以下のメモリを割り当てます(オフセット0x2d33ed(%rip)



    署名付き32ビット)
  3. 先頭を新しい場所にコピーし、 cmpl



    %rip



    に関連するメモリアクセスにパッチを当てます
さらに、jumpコマンドは9バイトより短くする必要があります。 2つのエントリポイントを持つ関数であり、アドレス0x7f6cc8aa83e9



すでに__open_nocancel



です。 これは、32ビットのトランジションを許可するために、 open



の開始からスプリングボードが2 GBを超えてはならないことを意味します(64ビットのトランジションはすべて9バイトより長い)。


原則として、GDBのすべての機能( gdb.execute()



)の背後にあるため、トランポリンフックを正しく実装することを妨げるものはありませんが、簡単にするために、この記事では単純なフックを使用します。



単純なフックでは、唯一の制限はジャンプ命令の長さです。

2つのオプションがあります(メイン):



この記事では2番目の方法を使用します

 # hook.py class Hook(object): @staticmethod def get_indlongjmp(srcaddr, proxyaddr): s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s)
      
      



get_indlongjmp



は、 srcaddr



アドレスからproxyaddr



QWORDに格納されているアドレスにジャンプするためのコードを返します



これで、 Hook



クラスの欠落しているメソッドを最終的に記述できます。 install



メソッドは、元の関数address



proxyaddr



補助ゾーンのアドレスを取得します。 その後、インターセプターに切り替えて、関数の先頭を書き換えます(以前はself.code



保存していself.code





 # hook.py def install(self, address, proxyaddr): self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True
      
      





patchmem



は、 src



データで元の関数の先頭を上書きします

 # hook.py def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i]
      
      





origfunc



は、インターセプターへの遷移を削除および設定するコードで関数呼び出しをラップします。

 # hook.py def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap
      
      





最後の仕上げ



Pythonはアドレス空間にロードされ、hook.pyファイルはPythonにロードされます。 GDBモジュールのPython側からHook.hook(symbol, address, proxyaddr)



を呼び出すことは残ります。



open



」機能のアドレスを見つける

 line = gdb.execute('info address %s' % "open" False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) addr = int(m.group(1), 16)
      
      



ニュアンス
一般に、停止したプロセスのコードを書き直すために実行する前に、このコードの途中で停止しないように(またはそれに戻るように)する必要があります。 これを行う最も簡単な方法は、 gdb.execute("thread apply all backtrace")



の出力を解析することgdb.execute("thread apply all backtrace")





addr



近くにメモリを割り当てます

 prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF, 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000)
      
      



ニュアンス
最後の行は、結果の上位ビットを食い尽くすGDBのバグの回避策です。 引数(addr | 0x7FFFFFFF)



は、文書化されていないプロパティmmap



を使用して、目的のアドレスよりも小さいアドレスのメモリを(addr | 0x7FFFFFFF)



ます。



トリックなしでは、正しい方法で少し長くなりますgdb.execute('info proc mappings', False, True)



の出力を解析し、アドレス空間でaddrに最も近い穴を見つけ、 MAP_FIXED



mmapをフェッチする必要があります。 もちろん、インターセプトされた関数ごとにメモリのページ全体を割り当てる必要はありません。


元の関数の書き換えを許可(別名SIGSEGV)

 gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot))
      
      





PyRun_SimpleString



を介してHook.hook



を呼び出しPyRun_SimpleString





 pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"open\\", 0x%x, 0x%x)")' % (addr, maddr))
      
      





できた! これで、ターゲットプロセスでの「 open



」の呼び出しがインターセプトされ、 python_open



からpython_openにルーティングされます。



サンプルファイル



完全なサンプルファイル(もう少し確認しますが、多くのニュアンスを考慮しません)

pyinject.py
 # pyinject.py import re import os RTLD_LAZY = 1 PROT_READ = 0x1 PROT_WRITE = 0x2 PROT_EXEC = 0x4 MAP_PRIVATE = 0x2 MAP_FIXED = 0x10 MAP_ANONYMOUS = 0x20 LIBPYTHON = 'libpython2.7.so' class ParamHookfile(gdb.Parameter): instance = None def __init__(self, default=''): super(ParamHookfile, self).__init__("hookfile", gdb.COMMAND_NONE, gdb.PARAM_FILENAME) self.value = default ParamHookfile.instance = self def get_set_string(self): return self.value def get_show_string(self, svalue): return svalue class CmdHook(gdb.Command): instance = None def __init__(self): super(CmdHook, self).__init__("pyinject", gdb.COMMAND_NONE) self.initialized = False CmdHook.instance = self def complete(self, text, word): matching = [s[4:] for s in dir(self) if s.startswith('cmd_') and s[4:].startswith(text)] return matching def invoke(self, subcmd, from_tty): self.dont_repeat() if subcmd.startswith("hook"): self.cmd_hook(*gdb.string_to_argv(subcmd)) elif subcmd.startswith("unhook"): self.cmd_unhook(*gdb.string_to_argv(subcmd)) else: gdb.write('unknown sub-command "%s"' % subcmd) def cmd_hook(self, *args): self.initialize() if not self.initialized: return pyret = gdb.parse_and_eval('PyRun_SimpleString("print Hook")') if long(pyret) != 0: hookfile = ParamHookfile.instance.value if not os.path.exists(hookfile): gdb.write('Use "set hookfile <path>"\n') return fp = gdb.parse_and_eval('fopen("%s", "r")' % hookfile) assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "%s", 1)' % (fp, hookfile)) if long(pyret) != 0: gdb.write('Error loading "%s"\n' % hookfile) return for symbol in args: try: line = gdb.execute('info address %s' % symbol, False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) if m: addr = int(m.group(1), 16) except gdb.error: continue prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS # | MAP_FIXED maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF , 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000) gdb.write("mmap = 0x%x\n" % maddr) if maddr == 0: continue gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot)) pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"%s\\", 0x%x, 0x%x)")' % (symbol, addr, maddr)) if long(pyret) == 0: gdb.write('hook "%s" OK\n' % symbol) def cmd_unhook(self, *args): for symbol in args: pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.unhook(\\"%s\\")")' % (symbol)) if long(pyret) == 0: gdb.write('unhook "%s" OK\n' % symbol) def initialize(self): if self.initialized: return handle = gdb.parse_and_eval('dlopen("%s", %d)' % (LIBPYTHON, RTLD_LAZY)) if not long(handle): gdb.write('Cannot load library %s\n' % LIBPYTHON) return if not long(gdb.parse_and_eval('Py_IsInitialized()')): gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()') self.initialized = True if __name__ == '__main__': ParamHookfile() CmdHook()
      
      





hook.py
 # hook.py import struct from ctypes import (CFUNCTYPE, POINTER, c_ubyte, c_int, c_char_p, c_void_p) class Hook(object): all_hooks = {} @staticmethod def cast_to_void_p(pointer): return CFUNCTYPE(c_void_p, c_void_p)(lambda x: x)(pointer) @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args) def __init__(self, symbol, ctype, pyfunc): self.symbol = symbol self.ctype = ctype self.pyfunc = pyfunc self.cfunc = self.ctype(self.pyfunc) self.address = 0 self.proxyaddr = 0 self.jmp = None self.memory = None self.code = None self.active = False def install(self, address, proxyaddr): print "install:", hex(address) self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True def uninstall(self): self.patchmem(self.code) self.active = False def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i] @staticmethod def get_indlongjmp(srcaddr, proxyaddr): # 64-bit indirect absolute jump (6 + 8 bytes) # ff 25 off32 jmpq *off32(%rip) try: s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s) except: print hex(proxyaddr), hex(srcaddr), hex(proxyaddr - srcaddr - 6) raise @staticmethod def hook(symbol, address, proxyaddr): h = Hook.all_hooks[symbol] if h.active: return h.install(address, proxyaddr) @staticmethod def unhook(symbol): h = Hook.all_hooks[symbol] if not h.active: return h.uninstall() def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco #int open (const char *__file, int __oflag, ...) @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag)
      
      





例を実行する(絶対パスで改善する)

 gdb -ex 'attach PID' -ex 'source /path/pyinject.py' -ex 'set hookfile /path/hook.py' (gdb) pyinject hook open (gdb) continue
      
      






All Articles