-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathllm_client.py
340 lines (286 loc) · 12.9 KB
/
llm_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
from typing import Optional, Tuple
from config import Config
import logging
import yaml
from patch_processor import PatchInfo
import re
from openai import OpenAI
from pathlib import Path
class LLMClient:
def __init__(self, config: Config):
self.config = config
self.client = OpenAI(
api_key=config.openrouter_api_key,
base_url=config.openrouter_base_url
)
def extract_response(self, text: str) -> str:
"""Remove thinking tags and markdown formatting from LLM response."""
# Remove thinking tags
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# Remove markdown code block markers
text = re.sub(r'^```yaml\s*', '', text, flags=re.MULTILINE)
text = re.sub(r'^```\s*$', '', text, flags=re.MULTILINE)
# Remove any leading/trailing whitespace
return text.strip()
def clean_yaml_text(self, text: str) -> Optional[str]:
"""Clean and validate YAML text with better error handling."""
if not text:
return None
# Remove any YAML document markers
text = re.sub(r'^---\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'^\.{3}\s*$', '', text, flags=re.MULTILINE)
try:
# First try to parse the YAML
data = yaml.safe_load(text)
# Ensure we have a valid rules structure
if not isinstance(data, dict):
data = {'rules': [data] if data else []}
elif 'rules' not in data:
data = {'rules': [data]}
# Clean dump with proper formatting
return yaml.dump(data, sort_keys=False, default_flow_style=False)
except yaml.YAMLError as e:
logging.error(f"YAML parsing error: {e}")
# Try to salvage malformed YAML by wrapping in rules structure
try:
wrapped_text = f"rules:\n - {text}"
data = yaml.safe_load(wrapped_text)
return yaml.dump(data, sort_keys=False, default_flow_style=False)
except yaml.YAMLError:
return None
def validate_rule_schema(self, rule: dict) -> Tuple[bool, Optional[str]]:
"""Validate that the rule has all required fields."""
required_fields = ['id', 'pattern', 'message', 'severity', 'languages']
if not isinstance(rule, dict):
return False, "Rule must be a dictionary"
missing_fields = [field for field in required_fields if field not in rule]
if missing_fields:
return False, f"Missing required fields: {', '.join(missing_fields)}"
# Validate severity
valid_severities = ['ERROR', 'WARNING', 'INFO']
if rule['severity'] not in valid_severities:
return False, f"Invalid severity level. Must be one of: {', '.join(valid_severities)}"
# Ensure id is properly formatted
if not re.match(r'^[a-z0-9-]+$', rule['id']):
return False, "Invalid id format. Must contain only lowercase letters, numbers, and hyphens"
return True, None
def _sanitize_rule(self, rule: dict, patch_info: PatchInfo) -> dict:
"""Ensure rule has all required fields and correct format."""
if not isinstance(rule, dict):
rule = {'rules': [rule]}
# Extract the actual rule if wrapped in 'rules' list
if 'rules' in rule and isinstance(rule['rules'], list):
rule = rule['rules'][0]
# Ensure rule has an ID
if 'id' not in rule:
# Generate an ID based on the repository and commit
commit_short = patch_info.commit_id[:8]
rule['id'] = f"vuln-{patch_info.repo_name.lower()}-{commit_short}"
# Ensure rule has languages field
if 'languages' not in rule:
rule['languages'] = [patch_info.file_changes[0].language]
# Ensure rule has severity
if 'severity' not in rule:
rule['severity'] = 'ERROR'
# Ensure rule has metadata
if 'metadata' not in rule:
rule['metadata'] = {}
metadata = rule['metadata']
if 'source-url' not in metadata:
metadata['source-url'] = f"github.com/{patch_info.repo_owner}/{patch_info.repo_name}/commit/{patch_info.commit_id}"
if 'category' not in metadata:
metadata['category'] = 'security'
if 'technology' not in metadata:
metadata['technology'] = [patch_info.file_changes[0].language]
return rule
def generate_rule(self, patch_info: PatchInfo, error_feedback: Optional[str] = None) -> Optional[dict]:
"""Generate a Semgrep rule using the LLM with improved validation."""
prompt = self._build_prompt(patch_info, error_feedback)
try:
response = self.client.chat.completions.create(
model="deepseek/deepseek-chat",
messages=[
{"role": "system", "content": """You generate Semgrep rules in YAML format.
Return only the raw YAML content without any markdown formatting or additional text.
Always include these required fields: id, pattern, message, severity, languages"""},
{"role": "user", "content": prompt}
],
temperature=0.6,
)
if not response.choices:
logging.error("No response generated from LLM")
return None
# Extract and clean the response
content = response.choices[0].message.content
if not content:
logging.error("Empty response from LLM")
return None
rule_text = self.extract_response(content)
if not rule_text:
logging.error("No valid content after extracting response")
return None
rule_text = self.clean_yaml_text(rule_text)
if not rule_text:
logging.error("Failed to clean YAML text")
return None
try:
rule_data = yaml.safe_load(rule_text)
rule = self._sanitize_rule(rule_data, patch_info)
# Validate the rule schema
is_valid, error = self.validate_rule_schema(rule)
if not is_valid:
logging.error(f"Invalid rule schema: {error}")
return None
return rule
except yaml.YAMLError as e:
logging.error(f"Error parsing generated rule YAML: {e}")
return None
except Exception as e:
logging.error(f"Error generating rule: {e}")
return None
def _build_prompt(self, patch_info: PatchInfo, error_feedback: Optional[str] = None) -> str:
"""Build an enhanced prompt for rule generation with examples and detailed guidance."""
# Combine all file changes with context
all_changes = []
for file_change in patch_info.file_changes:
file_header = f"File: {file_change.file_path}\n"
all_changes.append(file_header + file_change.changes)
combined_changes = "\n\n".join(all_changes)
language = patch_info.file_changes[0].language
commit_short = patch_info.commit_id[:8]
suggested_id = f"vuln-{patch_info.repo_name.lower()}-{commit_short}"
# Language-specific example rules
EXAMPLE_RULES = {
"python": '''
rules:
- id: "unsafe-deserialization"
pattern: pickle.loads($DATA)
pattern-not: pickle.loads(trusted_data)
pattern-inside: |
def $FUNC(...):
...
languages: ["python"]
message: "Detected unsafe deserialization using pickle.loads(). This can lead to remote code execution."
severity: ERROR
metadata:
category: security
cwe: CWE-502
owasp: A8:2017-Insecure Deserialization
references:
- https://docs.python.org/3/library/pickle.html#pickle.loads
- id: "sql-injection"
patterns:
- pattern: execute($QUERY)
- pattern-not: execute("SELECT ...")
- pattern-not: execute(sanitized_query)
languages: ["python"]
message: "Potential SQL injection detected. Use parameterized queries instead."
severity: ERROR
metadata:
category: security
cwe: CWE-89
''',
"javascript": '''
rules:
- id: "xss-innerHTML"
pattern: $ELEMENT.innerHTML = $DATA
pattern-not-inside: |
$ELEMENT.innerHTML = DOMPurify.sanitize($DATA)
languages: ["javascript"]
message: "Potential XSS vulnerability using innerHTML. Use DOMPurify or safe alternatives."
severity: ERROR
metadata:
category: security
cwe: CWE-79
- id: "eval-injection"
pattern: eval($DATA)
pattern-not: eval("trusted_static_string")
languages: ["javascript"]
message: "Dangerous use of eval() detected. This can lead to code injection."
severity: ERROR
metadata:
category: security
cwe: CWE-95
''',
"java": '''
rules:
- id: "path-traversal"
pattern: new File($PATH)
pattern-not: new File(sanitized_path)
pattern-inside: |
class $CLASS {
...
}
languages: ["java"]
message: "Potential path traversal vulnerability. Validate and sanitize file paths."
severity: ERROR
metadata:
category: security
cwe: CWE-22
- id: "weak-cipher"
pattern: Cipher.getInstance("DES")
languages: ["java"]
message: "Usage of weak cryptographic algorithm DES detected. Use AES instead."
severity: ERROR
metadata:
category: security
cwe: CWE-326
'''
}
# Get examples for the current language
examples = EXAMPLE_RULES.get(language, "")
# Build the enhanced prompt
prompt = f"""Analyze the following vulnerability patch and generate a precise Semgrep rule to detect similar vulnerabilities in {language} code.
CONTEXT:
Repository: github.com/{patch_info.repo_owner}/{patch_info.repo_name}
Commit: {patch_info.commit_id}
PATCH CHANGES:
{combined_changes}
SEMGREP PATTERN SYNTAX GUIDE:
- Use $VARNAME to match any expression
- Use ... to match any sequence of statements
- Use |> to pipe patterns together
- Use pattern-inside to limit matches to specific code blocks
- Use pattern-not to exclude specific patterns (like the fixed version)
- Use metavariable-pattern to add constraints on variables
REQUIRED FIELDS:
1. id: "{suggested_id}" (must be unique and descriptive)
2. pattern: Clear pattern matching the vulnerable code structure
3. languages: ["{language}"]
4. message: Detailed description of:
- What the vulnerability is
- Why it's dangerous
- How to fix it
5. severity: One of [ERROR, WARNING, INFO]
RECOMMENDED FIELDS:
- pattern-not: Pattern that should not match (e.g., fixed version)
- pattern-inside: Context pattern for where the rule should match
- pattern-not-inside: Context pattern for where the rule should not match
- metadata:
source-url: github.com/{patch_info.repo_owner}/{patch_info.repo_name}/commit/{patch_info.commit_id}
category: security
cwe: [relevant CWE number]
owasp: [relevant OWASP category]
references: [links to documentation or standards]
technology: [{language}]
IMPORTANT GUIDELINES:
1. Make patterns as specific as possible to minimize false positives
2. Include pattern-not for the fixed version when possible
3. Add relevant metadata like CWE numbers and OWASP categories
4. Write clear, actionable messages explaining both the problem and solution
5. Consider different variations of the vulnerable pattern
EXAMPLES OF HIGH-QUALITY RULES:
{examples}
FORMAT YOUR RESPONSE AS A SINGLE YAML DOCUMENT:
rules:
- id: "{suggested_id}"
pattern: [Your pattern here]
pattern-not: [Fixed version pattern]
languages: ["{language}"]
message: [Clear description]
severity: ERROR
metadata: [Additional context]
Remember to adapt the patterns to match the specific vulnerability in the patch while keeping them general enough to catch variations of the same issue."""
if error_feedback:
prompt += f"\n\nPREVIOUS ERROR TO FIX:\n{error_feedback}\nEnsure your next attempt addresses these issues."
return prompt