Files
android_kernel_samsung_sm87…/qcom/opensource/tools/linux-ramdump-parser-v2/utasklib.py
2025-08-12 23:12:57 +02:00

317 lines
12 KiB
Python
Executable File

#SPDX-License-Identifier: GPL-2.0-only
#Copyright (c) 2023-2024 Qualcomm Innovation Center, Inc. All rights reserved.
from print_out import print_out_str
from parser_util import cleanupString
import maple_tree
from mmu import Armv8MMU, Armv7MMU
import struct
from kstructlib import StructParser
from collections import namedtuple
class UTaskInfo:
def __init__(self):
self.vmalist=[]
self.name = ""
self.task_addr = 0
self.mmu = None
def __repr__(self):
olist = []
olist.append("Task {} 0x{}".format(self.name, hex(self.task_addr) if self.task_addr else 0))
for vma in self.vmalist:
olist.append(str(vma))
return "\n".join(olist)
class Vma:
def __init__(self):
self.vm_start = 0
self.vm_end = 0
self.flags = 0
self.file = 0
self.file_name = ""
self.vm_pgoff = ""
self.vm_addr = 0
def __repr__(self):
return "0x{0:x}-0x{1:x} flags:0x{2:x} file_name:{3}".format(self.vm_start, self.vm_end, self.flags, self.file_name)
class ProcessNotFoundExcetion(Exception):
pass
class UTaskLib:
def __init__(self, ramdump):
self.offset_comm = ramdump.field_offset('struct task_struct', 'comm')
self.mm_offset = ramdump.field_offset('struct task_struct', 'mm')
self.f_path_offset = ramdump.field_offset('struct file', 'f_path')
self.dentry_offset = ramdump.field_offset('struct path', 'dentry')
self.d_iname_offset = ramdump.field_offset('struct dentry', 'd_iname')
self.active_mm_offset = ramdump.field_offset('struct task_struct', 'active_mm')
self.pid_offset = ramdump.field_offset('struct task_struct', 'pid')
self.ramdump = ramdump
def get_utask_info(self, process_name, logging=False):
'''
support get task info by process name or process id
process_name: str --> process name
int -->process id
'''
for task in self.ramdump.for_each_process():
task_name = cleanupString(self.ramdump.read_cstring(task + self.offset_comm, 16))
if isinstance(process_name, str):
if task_name != process_name:
continue
else:
pid = self.ramdump.read_u32(task + self.pid_offset)
if pid != process_name:
continue
mm_addr = self.ramdump.read_word(task + self.mm_offset)
if process_name == "init" and mm_addr == 0:
mm_addr = self.ramdump.read_word(task + self.active_mm_offset)
if mm_addr == 0:
if logging is True:
print_out_str("mm for {} is null\n".format(process_name))
_utask = UTaskInfo()
_utask.name = task_name
_utask.task_addr = task
return _utask
pgd = self.ramdump.read_structure_field(mm_addr, 'struct mm_struct',
'pgd')
pgdp = self.ramdump.virt_to_phys(pgd)
if self.ramdump.arm64:
mmu = Armv8MMU(self.ramdump, pgdp)
else:
mmu = Armv7MMU(self.ramdump, pgdp)
vmalist = []
for vmaobj in self.for_each_vma_list(mm_addr):
vmalist.append(vmaobj)
_utask = UTaskInfo()
_utask.name = task_name
_utask.vmalist = vmalist
_utask.task_addr = task
_utask.mmu = mmu
return _utask
raise ProcessNotFoundExcetion("Process:{} was not found!".format(process_name))
@staticmethod
def read_bytes(ramdump, mmu, addr, len1, zram_parser=None):
vmaddr = addr
addr = mmu.virt_to_phys(addr)
s = ramdump.read_physical(addr, len1)
if s is None and zram_parser:
pte = mmu.get_swap_pte(vmaddr)
if pte:
zdata = zram_parser.read_data(vmaddr, pte)
if zdata and len1 <= len(zdata):
#print("read content from ptr:0x%x in zram memory"% addr)
s = zdata[0:len1]
if (s is None) or (s == ''):
return None
if len1 == 8:
s = struct.unpack('<Q', s)
elif len1 == 4:
s = struct.unpack('<I', s)
elif len1 == 2:
s = struct.unpack('<H', s)
elif len1 == 1:
s = struct.unpack('<B', s)
else:
print_out_str("This api used to unpack 1/2/4/8 bytes data, check the len\n")
exit()
return s[0]
@staticmethod
def read_binary(ramdump, mmu, addr, length, zram_parser=None):
"""Reads binary data of specified length from addr_or_name."""
min = 0
msg = b''
while length > 0:
msg_binary = None
addr = addr + min
# msg located in the same page
if length < (0x1000 - addr % 0x1000):
min = length
# msg separated in two pages
else:
min = 0x1000 - addr % 0x1000
length = length - min
addr_phys = mmu.virt_to_phys(addr)
if addr_phys is None and zram_parser:
pte = mmu.get_swap_pte(addr)
if pte:
zdata = zram_parser.read_data(addr, pte)
if zdata and min <= len(zdata):
#print("read content from ptr:0x%x in zram memory"% addr)
msg_binary = zdata[0:min]
else:
msg_binary = ramdump.read_physical(addr_phys, min)
if msg_binary is None or msg_binary == '':
msg = msg + b'\x00' * min
else:
msg = msg + msg_binary
return msg
def for_each_vma_list(self, mm_addr, count=None):
self.read_count = 0
if (self.ramdump.kernel_version) < (6, 1, 0):
# struct vm_area_struct *mmap
mmap = self.ramdump.read_structure_field(mm_addr, 'struct mm_struct',
'mmap')
_map = mmap
while _map:
self.read_count +=1
if count and self.read_count > count:
break
vma_obj = self._make_vma_object(_map)
_map = self.ramdump.read_structure_field(
_map, 'struct vm_area_struct', 'vm_next') # next loop
yield vma_obj
else:
mm_mt = self.ramdump.struct_field_addr(mm_addr, 'struct mm_struct',
'mm_mt')
mt_walk = maple_tree.MapleTreeWalker(self.ramdump)
vmalist = []
try:
mt_walk.walk(mm_mt, self._save_vma_list, vmalist, count)
except:
pass
for vmaobj in vmalist:
yield vmaobj
def _make_vma_object(self, vma_addr):
'''
read vma info from vma_addr. then save it to vma object
'''
tmpstartVm = self.ramdump.read_structure_field(
vma_addr, 'struct vm_area_struct', 'vm_start')
tmpsEndVm = self.ramdump.read_structure_field(
vma_addr, 'struct vm_area_struct', 'vm_end')
flags = self.ramdump.read_structure_field(
vma_addr, 'struct vm_area_struct', 'vm_flags')
file = self.ramdump.read_structure_field(
vma_addr, 'struct vm_area_struct', 'vm_file')
vm_pgoff = self.ramdump.read_structure_field(
vma_addr, 'struct vm_area_struct', 'vm_pgoff')
file_name = ""
if file:
dentry = self.ramdump.read_word(file + self.f_path_offset +
self.dentry_offset)
file_name = cleanupString(self.ramdump.read_cstring(
dentry + self.d_iname_offset, 32))
vma_obj = Vma()
vma_obj.vm_start = tmpstartVm
vma_obj.vm_end = tmpsEndVm
vma_obj.flags = flags
vma_obj.file = file
vma_obj.file_name = file_name
vma_obj.vm_pgoff = vm_pgoff
vma_obj.vm_addr = vma_addr
return vma_obj
def _save_vma_list(self, node, vmalist, count=None):
if node:
self.read_count +=1
if count and self.read_count > count:
raise StopIteration
vmalist.append(self._make_vma_object(node))
def __dentry_d_name(self, dentry):
dentry_struct = self.struct_parser.read_struct(dentry, 'struct dentry', ['d_name', 'd_parent'])
d_parent = dentry_struct.d_parent
hash_len = dentry_struct.d_name.hash_len
d_len = hash_len >> 32
d_name_addr = dentry_struct.d_name.name
if (d_len == 0 or d_name_addr == 0):
return ''
d_name = self.ramdump.read_cstring(d_name_addr, d_len)
return d_parent, d_name
def __get_path_name(self, dentry, vfsmnt):
tmp_vfsmnt = vfsmnt
d_parent = dentry
path_name = ''
### first step to find out file path
max_count = 8
index = 0
while True:
index += 1
if index > max_count:
# make an error
break
tmp_dentry = d_parent
d_parent, d_name = self.__dentry_d_name(tmp_dentry)
if d_name == "/":
break
if (tmp_dentry != d_parent):
path_name = d_name + '/' + path_name if path_name else d_name
else:
break
if tmp_vfsmnt == 0:
return path_name
### second step to find out mount path
tmp_mount = tmp_vfsmnt - self.mnt_offset
index = 0
while True:
index += 1
if index > max_count:
# make an error
break
mount_struct = self.struct_parser.read_struct(tmp_mount, 'struct mount', ['mnt_parent', 'mnt_mountpoint'])
if tmp_mount == mount_struct.mnt_parent:
break
_, d_name = self.__dentry_d_name(mount_struct.mnt_mountpoint)
tmp_mount = mount_struct.mnt_parent
if d_name != "/":
path_name = d_name + '/' + path_name
break
return path_name
def for_each_cmdline(self):
'''
As task_struct->comm[16] stored a small part of process name.
developer can't distinct which process it is.
this method is to dump full cmdline for each user space process
'''
if not hasattr(self, "struct_parser"):
self.struct_parser = StructParser(self.ramdump)
cmdline_obj = namedtuple("TaskCmdline", ["pid", "comm", "cmdline"])
self.mnt_offset = self.ramdump.field_offset('struct mount', 'mnt')
for task in self.ramdump.for_each_process():
mm_addr = self.ramdump.read_word(task + self.mm_offset)
if mm_addr == 0:
# skip kernel process
continue
mm_addr = self.ramdump.read_word(task + self.mm_offset)
pid = self.ramdump.read_u32(task + self.pid_offset)
task_name = cleanupString(self.ramdump.read_cstring(task + self.offset_comm, 16))
path_struct = None
for vmaobj in self.for_each_vma_list(mm_addr, count=1):# only read the first vma
if vmaobj.file != 0:
path_struct = self.struct_parser.read_struct(
vmaobj.file + self.f_path_offset, 'struct path', ['dentry', 'mnt'])
if not path_struct:
continue
dentry_addr = path_struct.dentry
vfsmnt = path_struct.mnt
if (dentry_addr != 0x0):
file_name = self.__get_path_name(dentry_addr, vfsmnt)
if task_name in file_name:
yield cmdline_obj(pid, task_name, file_name)