本博客由闲散白帽子胖胖鹏鹏胖胖鹏潜力所写,仅仅作为个人技术交流分享,不得用做商业用途。转载请注明出处,未经许可禁止将本博客内所有内容转载、商用。
最近分析的时候要去修复CFG,那就免不了要去看CFGFast的源码(有人写过了这个解析),那么从官方的解释来看呢,CFGFast采用静态分析的方法,简单来讲就是用程序序言,也就是程序开头的固定字符串匹配比如(push {xxx} / push {xxx,lr}),程序尾声匹配程序结尾 (pop{xxx} / pop{xxx,lr} /bx lr)等方式,匹配上的话,就认为是个函数的起始点。当然很多函数并不是从这里起始的,如果有其他的函数call这个地址,那么这个地址也作为一个函数加入CFG中。于是,我们就像到了一些问题,比如B.W这个指令并没有固定的格式,我举几个例子就明白了。
CF F7 CA BE B.W sub_xxxxxx
05 F0 4A B8 B.W sub_10BF56
00 F0 64 B9
00 F0 54 BC
从上面代码可以看度,第二个字节为'F7'或者'F0',最后一个字节是'Bx'。特征不是很明显,我们切换到bit级来看下这个是怎么构成的,看下官方的手册。
前1111 0sxx就是 F7 或者是F0的来源,那么BX的来源就是10 j1 1,imm10和imm11用于标识跳转地址与当前pc的偏移。这就很棘手了,很难用正则添加尾序的识别,同时还有一个问题。如图,Angr认为B.W不是一个函数调用,而是一个近距离跳转,这样在判断函数结尾时就会出现问题。(红框部分是这个函数本身的地址)
但是我们生成CFG图之后,获得节点,发现他好像真的没有出错!后继点和前继点都是对的!Size也是对的!使用function manager也是对的!,这就出现了冲突!我们需要看看他是怎么完成的
解决方法
先看看CFGFast处理调用关系的流程,在函数末尾,调用了self._analyses(),这个函数其实是在forward_analysis.py里面,我们跟进去看看这个函数。
def _analyze(self):
"""
The main analysis routine.
:return: None
"""
self._pre_analysis() #前置分析
if self._graph_visitor is None:
# There is no base graph that we can rely on. The analysis itself should generate successors for the
# current job.
# An example is the CFG recovery.
self._analysis_core_baremetal() #分析的核心函数
else:
# We have a base graph to follow. Just handle the current job.
self._analysis_core_graph()
self._post_analysis() #后置分析
_pre_analysis()函数里面都是些初始化的内容,没有找到我们想要的东西。主要看看这个函数_analysis_core_baremetal()。我们一路跟踪路径,(中间不太重要的我们略过)
_analysis_core_baremetal()
->ForwardAnalysis._process_job_and_get_successors()
->CFGFast._get_successors()
->CFGFast._scan_block()
->CFGFast._scan_irsb()->CFGFast._create_jobs()
这个函数就是我们需要跟进的地方,我们有两个目标1.让Angr承认B.W是函数间跳转;2.让Angr承认B.W是个函数结尾。
def _create_jobs(self, target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx,
fast_indirect_jump_resolution=True):
"""
根据一个节点和他的后继点信息,生成一个CFGJobs列表
如果这是一个调用或者exit则在CFG中也标记
:param int target: 后继点的目标地址
:param str jumpkind: 跳转类型
:param int current_function_当前函数运行地址
:param pyvex.IRSB irsb: 前继点predecessor的IRSB
:param int addr: predecessor地址
:param CFGNode cfg_node: 前继点的CFGNode
:param int ins_addr: 源指令的地址
:param int stmt_idx: source statement的ID
:return: CFGJobs列表
:rtype: list
"""
if type(target) is pyvex.IRExpr.Const:
target_addr = target.con.value
elif type(target) in (pyvex.IRConst.U32, pyvex.IRConst.U64):
target_addr = target.value
elif type(target) in (int, long):
target_addr = target
else:
target_addr = None
jobs = [ ]
#解决符号表调用或者间接跳转
if target_addr is None and (
jumpkind in ('Ijk_Boring', 'Ijk_Call') or jumpkind.startswith('Ijk_Sys'))\
and fast_indirect_jump_resolution:
# try resolving it fast
resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, current_function_addr,
jumpkind
)
if resolved:
for t in resolved_targets:
ent = self._create_jobs(t, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr,
stmt_idx, fast_indirect_jump_resolution=False)
jobs.extend(ent)
return jobs
#特殊处理,有时候用call的方式跳转到紧接着下一条指令处,我们认为是个近跳
# If a call instruction has a target that points to the immediate next instruction, we treat it as a boring jump
if jumpkind == "Ijk_Call" and \
not self.project.arch.call_pushes_ret and \ #call了没push返回地址
cfg_node.instruction_addrs and \ #指令地址是正确的
ins_addr == cfg_node.instruction_addrs[-1] and \
target_addr == irsb.addr + irsb.size: #跳转的目标地址是下一条指令
jumpkind = "Ijk_Boring" #那他就是 一个近跳
if jumpkind == 'Ijk_Boring':
if target_addr is not None:
# 当目标地址在另外一节中时,我们认为他跳到了一个新的函数中。
# 这条很重要!!!!
# 这里解决了问题1——为什么Angr会将这个跳转识别成新的函数
if not self._addrs_belong_to_same_section(addr, target_addr):
target_func_addr = target_addr
to_outside = True
else:
# 也许只是函数内跳转
target_func_addr = None
real_target_addr = self._real_address(self.project.arch, target_addr)
if real_target_addr in self._traced_addresses:
node = self.get_any_node(target_addr)
if node is not None:
target_func_addr = node.function_address
if target_func_addr is None:
target_func_addr = current_function_addr
to_outside = not target_func_addr == current_function_addr
# 增加新的跳转关系
r = self._function_add_transition_edge(target_addr, cfg_node, current_function_addr, ins_addr=ins_addr,
stmt_idx=stmt_idx, to_outside=to_outside
)
if not r:
if cfg_node is not None:
l.debug("An angr exception occurred when adding a transition from %#x to %#x. "
"Ignore this successor.",
cfg_node.addr,
target_addr
)
else:
l.debug("SimTranslationError occurred when creating a new entry to %#x. "
"Ignore this successor.",
target_addr
)
return []
#在这里增加要处理的CFGJob,因为每有一个后继点,
ce = CFGJob(target_addr, target_func_addr, jumpkind, last_addr=addr, src_node=cfg_node,
src_ins_addr=ins_addr, src_stmt_idx=stmt_idx)
jobs.append(ce)
else:
l.debug('(%s) Indirect jump at %#x.', jumpkind, addr)
# Add it to our set. Will process it later if user allows.
# 创建间接跳转实例
if addr not in self.indirect_jumps:
tmp_statements = irsb.statements if stmt_idx == 'default' else irsb.statements[ : stmt_idx]
ins_addr = next(iter(stmt.addr for stmt in reversed(tmp_statements)
if isinstance(stmt, pyvex.IRStmt.IMark)), None
)
ij = IndirectJump(addr, ins_addr, current_function_addr, jumpkind, stmt_idx, resolved_targets=[ ])
self.indirect_jumps[addr] = ij
else:
ij = self.indirect_jumps[addr]
# 好像这里还没有完成一样
# TODO: revisit the logic here
# TODO: - put the indirect jump reusing logic into a separate method
if ij.resolved_targets:
# has been resolved before
# directly create CFGJobs
for resolved_target in ij.resolved_targets:
ce = CFGJob(resolved_target, resolved_target, jumpkind, last_addr=resolved_target,
src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr)
jobs.append(ce)
self._function_add_call_edge(resolved_target, None, None, resolved_target,
stmt_idx=stmt_idx, ins_addr=ins_addr
)
else:
resolved_as_plt = False
if irsb and self._heuristic_plt_resolving:
# Test it on the initial state. Does it jump to a valid location?
# It will be resolved only if this is a .plt entry
resolved_as_plt = self._resolve_plt(addr, irsb, ij)
if resolved_as_plt:
jump_target = next(iter(ij.resolved_targets))
target_func_addr = jump_target # TODO: FIX THIS
r = self._function_add_transition_edge(jump_target, cfg_node, current_function_addr,
ins_addr=ins_addr, stmt_idx=stmt_idx,
to_outside=True
)
if r:
ce = CFGJob(jump_target, target_func_addr, jumpkind, last_addr=jump_target,
src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr)
jobs.append(ce)
self._function_add_call_edge(jump_target, None, None, target_func_addr,
stmt_idx=stmt_idx, ins_addr=ins_addr
)
resolved_as_plt = True
if resolved_as_plt:
# has been resolved as a PLT entry. Remove it from indirect_jumps_to_resolve
if ij.addr in self._indirect_jumps_to_resolve:
self._indirect_jumps_to_resolve.remove(ij.addr)
self._deregister_analysis_job(current_function_addr, ij)
else:
# add it to indirect_jumps_to_resolve
self._indirect_jumps_to_resolve.add(ij)
# register it as a job for the current function
self._register_analysis_job(current_function_addr, ij)
# 如果是call的方式。但是ARM架构下,没有call吧?我们暂时不考虑
elif jumpkind == 'Ijk_Call' or jumpkind.startswith("Ijk_Sys"):
is_syscall = jumpkind.startswith("Ijk_Sys")
if target_addr is not None:
jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr,
target_addr, jumpkind, is_syscall=is_syscall
)
else:
l.debug('(%s) Indirect jump at %#x.', jumpkind, addr)
# Add it to our set. Will process it later if user allows.
if addr not in self.indirect_jumps:
tmp_statements = irsb.statements if stmt_idx == 'default' else irsb.statements[: stmt_idx]
if self.project.arch.branch_delay_slot:
ins_addr = next(itertools.islice(iter(stmt.addr for stmt in reversed(tmp_statements)
if isinstance(stmt, pyvex.IRStmt.IMark)), 1, None
), None)
else:
ins_addr = next(iter(stmt.addr for stmt in reversed(tmp_statements)
if isinstance(stmt, pyvex.IRStmt.IMark)), None
)
ij = IndirectJump(addr, ins_addr, current_function_addr, jumpkind, stmt_idx,
resolved_targets=[])
self.indirect_jumps[addr] = ij
else:
ij = self.indirect_jumps[addr]
self._indirect_jumps_to_resolve.add(ij)
self._register_analysis_job(current_function_addr, ij)
self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr, None,
jumpkind, is_syscall=is_syscall
)
elif jumpkind == "Ijk_Ret":
if current_function_addr != -1:
self._function_exits[current_function_addr].add(addr)
self._function_add_return_site(addr, current_function_addr)
self.functions[current_function_addr].returning = True
self._add_returning_function(current_function_addr)
cfg_node.has_return = True
else:
# TODO: Support more jumpkinds
l.debug("Unsupported jumpkind %s", jumpkind)
return jobs
其实到了这里,我们已经知道Angr在后期做了处理,增加了函数间跳转判定。我们跟进_function_add_transition_edge()函数
def _function_add_transition_edge(self, addr, src_node, function_addr, to_outside=False, to_function_addr=None,
stmt_idx=None, ins_addr=None):
"""
在函数转移表中增加一条边
:param int addr: 控制流要跳转到的地址
:param CFGNode src_node: 控制流跳出的那个CFGNode节点
:param int function_addr: 函数地址
:return: 如果添加成功就返回True。如果发现异常返回False(比如目标地址不存在)
:rtype: bool
"""
try:
target_node = self._nodes.get(addr, None)
if target_node is None:
target_snippet = self._to_snippet(addr=addr, base_state=self._base_state)
else:
target_snippet = self._to_snippet(cfg_node=target_node)
if src_node is None:
# Add this basic block into the function manager
self.kb.functions._add_node(function_addr, target_snippet)
else:
src_snippet = self._to_snippet(cfg_node=src_node)
if not to_outside: #如果没有跳出函数
self.kb.functions._add_transition_to(function_addr, src_snippet, target_snippet, stmt_idx=stmt_idx,
ins_addr=ins_addr
)
else: #如果跳出了函数,就增加to_function_addr这个选项
self.kb.functions._add_outside_transition_to(function_addr, src_snippet, target_snippet,
to_function_addr=to_function_addr,
stmt_idx=stmt_idx, ins_addr=ins_addr
)
return True
except (SimMemoryError, SimEngineError):
return False
_add_outside_transition_to这个函数在Knowledge_base中的function类中定义。
def _add_outside_transition_to(self, function_addr, from_node, to_node, to_function_addr=None, ins_addr=None,
stmt_idx=None):
if type(from_node) in (int, long): # pylint: disable=unidiomatic-typecheck
from_node = self._kb._project.factory.snippet(from_node)
if type(to_node) in (int, long): # pylint: disable=unidiomatic-typecheck
try:
to_node = self._kb._project.factory.snippet(to_node)
except SimEngineError:
# we cannot get the snippet, but we should at least tell the function that it's going to jump out here
self._function_map[function_addr].add_jumpout_site(from_node)
return
self._function_map[function_addr]._transit_to(from_node, to_node, outside=True, ins_addr=ins_addr,
stmt_idx=stmt_idx
)
#在这里增加了函数间跳转的部分,在CFG增加了一条
if to_function_addr is not None:
# mark it on the callgraph
edge_data = {'type': 'transition'}
if function_addr not in self.callgraph or \
to_function_addr not in self.callgraph[function_addr] or \
edge_data not in self.callgraph[function_addr][to_function_addr].values():
self.callgraph.add_edge(function_addr, to_function_addr, **edge_data)
其实跟到这里我们就已经知道了,Angr在处理跳转的时候,并不是以跳转类型(Ijk_Call、Ijk_Boring等)为准的,而是有自己的处理逻辑。当跳转到的地址超过本函数能够覆盖的范围之后,就认为是跳转到了新的函数里面。并且为这个新的函数,增加CFG的edge以及CFGJob。当然,Angr中可能还会有更多其他的处理逻辑,等我们以后遇到了再深入分析。
嗯,我们面临着第二个问题,函数的尾巴是怎么确定下来的。就是我们通过程序断言扫到了函数的开头,但是我们还需要知道函数的结尾是怎么确定的,尤其是ARM架构,经常是代码段中穿插着数据段。Angr是怎么确定函数执行完了呢?这个问题我们跳到_analyses()函数中去。在执行完CFG的分析之后,我们需要把屁股擦干净,这个擦屁股的函数就是_post_analysis()。
这个函数的功能就是将每个block合并、整合、扫描函数跳转,对尾部返回指令进行处理、CFG的最终合并等等。这部分源码我们以后需要修改的时候我们再继续深入研究。因为我发现了一个有趣的现象。就是当我使用完整的代码生成CFG时,这个带B.W的函数长度是正常的,但是如果我从函数的中间开始分析的话,这个函数将和B.W之后的函数合并成为一个新的函数。
上面这个函数可能表达不准确,我这样说明。假设A函数结尾用B.W调用了B函数,A地址紧接着是C函数。A函数的调用者是D函数。如果你从A函数开始分析,那么生成的CFG图将是A、B、C三者的结合,因为Angr不将B.W视为函数的结尾;如果你从D函数开始分析,而D函数恰巧有调用B函数的部分,那么A、B将是两个独立的函数。因为B函数被分析过,A使用B.W指令,Angr视为函数间调用。这也就解释的通,为什么我们不同的分析得到了不同的结果。但是!仔细想想我们的功能是什么?是准确恢复调用流图?还是准确识别出函数的终止地址?我们这里显然是调用流图。所以,我们不要太在乎细节。