Skip to content

frontend-c: Add custom type extraction #2159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/fuzz_introspector/frontends/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tree_sitter_rust

import copy
import json
import logging
import yaml

Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(self,
self.tree_sitter_lang = self.LANGUAGE.get(language,
self.LANGUAGE['cpp'])
self.parser = Parser(self.tree_sitter_lang)
self.full_type_defs: list[dict[str, Any]] = []

if source_content:
self.source_content = source_content
Expand Down Expand Up @@ -115,6 +117,22 @@ def get_report(self,

return new_report

def dump_type_definition(self,
report_name: str = '',
dump_output: bool = True) -> None:
"""Dumps the type definition for this project if exists."""
result = []
for source_code in self.source_code_files:
result.extend(source_code.full_type_defs)

if not result or not dump_output:
return

logger.info('Dumping custom type definitions.')
with open(report_name, 'w', encoding='utf-8') as f:
f.write(json.dumps(result))
logger.info('Custom type definitions dumping completed.')

def dump_module_logic(self,
report_name: str = '',
entry_function: str = '',
Expand Down
169 changes: 154 additions & 15 deletions src/fuzz_introspector/frontends/frontend_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ def language_specific_process(self):
self.function_names = []
self.line_range_pairs = []
self.struct_defs = []
self.union_defs = []
self.enum_defs = []
self.preproc_defs = []
self.typedefs = []
self.includes = set()

Expand All @@ -563,24 +566,107 @@ def language_specific_process(self):
# Load function definitions
self._set_function_defintions()
self.extract_types()
self.process_type_defs()

def process_type_defs(self):
"""Helper to gather all custom type definitions."""
self.full_type_defs.extend(self.struct_defs)
self.full_type_defs.extend(self.typedefs)
self.full_type_defs.extend(self.enum_defs)
self.full_type_defs.extend(self.union_defs)
self.full_type_defs.extend(self.preproc_defs)

def extract_types(self):
"""Extracts the types of the source code"""
# Extract all enum
enum_query = self.tree_sitter_lang.query('( enum_specifier ) @sp')
enum_query_res = enum_query.captures(self.root)
for _, enums in enum_query_res.items():
for enum in enums:
enum_name_field = enum.child_by_field_name('name')
enum_body_field = enum.child_by_field_name('body')
if not enum_name_field:
# Skip anonymous enum
continue
if not enum_body_field:
# Skip forward declaration
continue

self.enum_defs.append({
'name':
enum_name_field.text.decode(),
'definition':
enum_body_field.text.decode(),
'item_type':
'enum',
'pos': {
'source_file': self.source_file,
'line_start': enum.start_point.row,
'line_end': enum.end_point.row,
}
})

# Extract all preproc definitions
prep_query = self.tree_sitter_lang.query('( preproc_def ) @sp')
preproc_query_res = prep_query.captures(self.root)
for _, preprocs in preproc_query_res.items():
for preproc in preprocs:
preproc_name_field = preproc.child_by_field_name('name')
preproc_body_field = preproc.child_by_field_name('value')
if not preproc_name_field or not preproc_body_field:
# Skip invalid preproc definition
continue

self.preproc_defs.append({
'name':
preproc_name_field.text.decode(),
'type_or_value':
preproc_body_field.text.decode(),
'item_type':
'preproc',
'pos': {
'source_file': self.source_file,
'line_start': preproc.start_point.row,
'line_end': preproc.end_point.row,
}
})

# Extract all structs
struct_query = self.tree_sitter_lang.query('( struct_specifier ) @sp')
struct_query_res = struct_query.captures(self.root)
for _, structs in struct_query_res.items():
for struct in structs:
if struct.child_by_field_name('body') is None:
# Skip forward declaration
continue
if struct.child_by_field_name('name') is None:
continue

# Extract name for struct or anonymous struct
struct_name_field = struct.child_by_field_name('name')
if struct_name_field:
struct_name = struct.child_by_field_name(
'name').text.decode()
else:
parent = struct.parent
declarator = None
if parent and (parent.type
in ['declaration', 'type_definition']):
declarator = parent.child_by_field_name('declarator')
if declarator:
struct_name = declarator.text.decode()
else:
# Skip anonymous struct with no name
continue

# Go through each of the field declarations
fields = []
for child in struct.child_by_field_name('body').children:
if not child.child_by_field_name('declarator'):
continue
if child.type == 'field_declaration':
child_name = child.child_by_field_name(
'type').text.decode()
child_type = child.child_by_field_name(
'declarator').text.decode()
fields.append({
'type':
child.child_by_field_name('type').text.decode(),
Expand All @@ -589,40 +675,93 @@ def extract_types(self):
'declarator').text.decode()
})
self.struct_defs.append({
'name':
struct.child_by_field_name('name').text.decode(),
'fields':
fields,
'name': struct_name,
'fields': fields,
'item_type': 'struct',
'pos': {
'source_file': self.source_file,
'line_start': struct.start_point.row,
'line_end': struct.end_point.row,
}
})

# Extract all unions
union_query = self.tree_sitter_lang.query('( union_specifier ) @sp')
union_query_res = union_query.captures(self.root)
for _, unions in union_query_res.items():
for union in unions:
if union.child_by_field_name('body') is None:
# Skip forward declaration
continue

# Extract name for union or anonymous union
union_name_field = union.child_by_field_name('name')
if union_name_field:
union_name = union.child_by_field_name(
'name').text.decode()
else:
parent = union.parent
declarator = None
if parent and (parent.type
in ['declaration', 'type_definition']):
declarator = parent.child_by_field_name('declarator')
if declarator:
union_name = declarator.text.decode()
else:
# Skip anonymous union with no name
continue

# Go through each of the field declarations
fields = []
for child in union.child_by_field_name('body').children:
if not child.child_by_field_name('declarator'):
continue
if child.type == 'field_declaration':
child_name = child.child_by_field_name(
'type').text.decode()
child_type = child.child_by_field_name(
'declarator').text.decode()
fields.append({
'type': child_name,
'name': child_type,
})
self.union_defs.append({
'name': union_name,
'fields': fields,
'item_type': 'union',
'pos': {
'source_file': self.source_file,
'line_start': union.start_point.row,
'line_end': union.end_point.row,
}
})

# Extract all type definition
type_query = self.tree_sitter_lang.query('( type_definition ) @tp')
type_query_res = type_query.captures(self.root)
for _, types in type_query_res.items():
for typedef in types:
# Skip if this is an anonymous struct.
# TODO(David): handle this
# Skip if this is an anonymous type.
if typedef.child_by_field_name('declarator') is None:
continue
typedef_struct = {
'name':
typedef.child_by_field_name('declarator').text.decode()
typedef.child_by_field_name('declarator').text.decode(),
'item_type': 'typedef',
}

typedef_struct['pos'] = {
'source_file': self.source_file,
'line_start': typedef.start_point.row,
'line_end': typedef.end_point.row,
}
typedef_type = typedef.child_by_field_name('type')
if typedef_type.type == 'struct_specifier':
if typedef.child_by_field_name('name') is not None:
typedef_struct[
'type'] = typedef_type.child_by_field_name(
'name').text.decode()
# TODO(David): handle the else branch here.
if typedef_type.type in [
'struct_specifier', 'union_specifier'
]:
# Already handled in the above struct/union section
continue

elif typedef_type.type == 'primitive_type':
typedef_struct['type'] = typedef_type.text.decode()
elif typedef_type.type == 'sized_type_specifier':
Expand Down
6 changes: 6 additions & 0 deletions src/fuzz_introspector/frontends/oss_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def process_c_project(target_dir: str,
target = os.path.join(out, 'fuzzerLogFile-0.data.yaml')
project.dump_module_logic(target, 'no-harness-in-project', '',
target_dir, dump_output)
target = os.path.join(out, 'full_type_defs.json')
project.dump_type_definition(target, dump_output)

with open(os.path.join(out, 'fuzzerLogFile-0.data'), 'w') as f:
f.write("Call tree\n")
Expand All @@ -88,6 +90,8 @@ def process_c_project(target_dir: str,
idx = 1
target = os.path.join(out, 'report.yaml')
project.dump_module_logic(target, harness_source=target_dir)
target = os.path.join(out, 'full_type_defs.json')
project.dump_type_definition(target, dump_output)

if entrypoint != 'LLVMFuzzerTestOneInput':
calltree_source = project.get_source_code_with_target(entrypoint)
Expand All @@ -106,6 +110,8 @@ def process_c_project(target_dir: str,
target = os.path.join(out, f'fuzzerLogFile-{idx}.data.yaml')
project.dump_module_logic(target, 'LLVMFuzzerTestOneInput', '',
harness.source_file, dump_output)
target = os.path.join(out, 'full_type_defs.json')
project.dump_type_definition(target, dump_output)
logger.info('handling harness, step 2')
logger.info('Extracting calltree for %s', harness.source_file)
calltree = project.extract_calltree(source_code=harness,
Expand Down
Loading