1"""
2Description:
3 This script processes a list of patient IDs in TWO PHASES:
4 PHASE 1: Check for specific Organs at Risk (Brachial Plexus and Skin) in each patient
5 PHASE 2: Create anonymized backups with systematic naming (br1, br2, etc.)
6
7Key Features:
8 - Separates ROI checking from backup creation to avoid patient context issues
9 - Reads patient IDs from CSV file (skips 'Patient EMPI' header)
10 - Checks ROIs in 'Organs at risk' category only
11 - Excludes ROIs with 'z' or 'zz' prefixes
12 - Creates anonymized backups with consistent naming
13 - Outputs results to both console and CSV file
14"""
15
16from connect import *
17import os, sys, csv, time, re
18
19
20def process_patients_from_csv():
21 """
22 Main function to process patients in two phases.
23
24 Phase 1: ROI checking for all patients
25 Phase 2: Backup creation for each patient individually
26
27 Returns:
28 list: List of dictionaries containing processing results for each patient
29 """
30 try:
31 # Configure paths with your specific locations
32 excel_file_path = r"-------.csv"
33 backup_folder = r"--------\Backups"
34 results_csv_path = r"----.csv"
35
36 # Create backup folder if it doesn't exist
37 if not os.path.exists(backup_folder):
38 os.makedirs(backup_folder)
39
40 # Read patient IDs from CSV file
41 patient_ids = read_patient_ids_from_csv(excel_file_path)
42
43 if not patient_ids:
44 print("No patient IDs found in the CSV file")
45 return []
46
47 # Get current database
48 patient_db = get_current("PatientDB")
49 if not patient_db:
50 print("Failed to get current database")
51 return []
52
53 print("="*60)
54 print("PHASE 1: ROI CHECKING FOR ALL PATIENTS")
55 print("="*60)
56
57 # PHASE 1: Collect ROI data for all patients
58 roi_results = []
59 for i, patient_id in enumerate(patient_ids):
60 print(f"\nChecking ROIs for patient {i+1}/{len(patient_ids)}: {patient_id}")
61 roi_info = check_patient_rois(patient_db, patient_id, i+1)
62 if roi_info:
63 roi_results.append(roi_info)
64
65 print("\n" + "="*60)
66 print("PHASE 2: BACKUP CREATION")
67 print("="*60)
68
69 # PHASE 2: Create backups for each patient individually
70 final_results = []
71 for i, roi_info in enumerate(roi_results):
72 patient_id = roi_info['original_id']
73 patient_number = i + 1
74
75 print(f"\nCreating backup for patient {i+1}/{len(roi_results)}: {patient_id}")
76
77 # Create backup for this patient
78 backup_filename = create_single_patient_backup(patient_db, patient_id, patient_number, backup_folder)
79
80 # Combine ROI info with backup result
81 final_result = roi_info.copy()
82 final_result['backup_file'] = backup_filename
83 final_results.append(final_result)
84
85 # Display interim result
86 print(f" ROI Check: BrachialPlexus={final_result['has_brachial_plexus']}, Skin={final_result['has_skin']}")
87 print(f" Backup: {backup_filename if backup_filename else 'FAILED'}")
88
89 # Display final results
90 print("\n" + "="*60)
91 print("FINAL PROCESSING RESULTS")
92 print("="*60)
93 for result in final_results:
94 print(f"Patient ID: {result['original_id']}")
95 print(f" Brachial Plexus Found: {result['has_brachial_plexus']}")
96 print(f" Skin Found: {result['has_skin']}")
97 print(f" Backup: {result['backup_file']}")
98 print()
99
100 # Write results to CSV file
101 write_results_to_csv(final_results, results_csv_path)
102
103 return final_results
104
105 except Exception as e:
106 print(f"Error processing patients: {str(e)}")
107 import traceback
108 traceback.print_exc()
109 return []
110
111def read_patient_ids_from_csv(file_path):
112 """
113 Read patient IDs from CSV file.
114
115 Args:
116 file_path (str): Path to the CSV file containing patient IDs
117
118 Returns:
119 list: List of patient ID strings
120 """
121 patient_ids = []
122
123 try:
124 if not os.path.exists(file_path):
125 print(f"CSV file not found: {file_path}")
126 return []
127
128 with open(file_path, 'r') as file:
129 for line_num, line in enumerate(file, 1):
130 line = line.strip()
131 if not line: # Skip empty lines
132 continue
133
134 parts = line.split(',')
135 if parts and parts[0]:
136 patient_id = parts[0].strip()
137 if patient_id and patient_id != 'Patient EMPI': # Skip header if present
138 patient_ids.append(patient_id)
139 print(f"Found patient ID: {patient_id}")
140
141 print(f"Total patient IDs loaded: {len(patient_ids)}")
142 return patient_ids
143
144 except Exception as e:
145 print(f"Error reading CSV file: {str(e)}")
146 return []
147
148def check_patient_rois(patient_db, patient_id, patient_number):
149 """
150 Check ROIs for a single patient without creating backup.
151
152 Args:
153 patient_db: RayStation PatientDB object
154 patient_id (str): Patient ID to check
155 patient_number (int): Sequential number for reporting
156
157 Returns:
158 dict: ROI check results or None if checking failed
159 """
160 try:
161 # Get patient info from database
162 patient_info = get_patient_by_id(patient_db, patient_id)
163
164 if not patient_info:
165 print(f"Patient {patient_id} not found in database")
166 return None
167
168 print(f" Found patient: {patient_id}")
169
170 # Open the patient
171 patient = open_patient(patient_db, patient_info)
172 if not patient:
173 print(f" Could not open patient: {patient_id}")
174 return None
175
176 print(f" Loaded patient: {patient_id}")
177
178 # Check for ROIs
179 has_brachial_plexus, has_skin = check_oars(patient)
180
181 print(f" ROI Results: BrachialPlexus={has_brachial_plexus}, Skin={has_skin}")
182
183 # Close the patient to free up memory/context
184 try:
185 patient_db.UnloadPatient(PatientInfo=patient_info)
186 print(f" Unloaded patient: {patient_id}")
187 except:
188 pass # Some versions might not support UnloadPatient
189
190 return {
191 'original_id': patient_id,
192 'has_brachial_plexus': has_brachial_plexus,
193 'has_skin': has_skin,
194 'backup_file': None # Will be filled in phase 2
195 }
196
197 except Exception as e:
198 print(f" Error checking ROIs for patient {patient_id}: {str(e)}")
199 return None
200
201def get_patient_by_id(patient_db, patient_id):
202 """
203 Retrieve patient information from database by Patient ID.
204
205 Args:
206 patient_db: RayStation PatientDB object
207 patient_id (str): Patient ID to search for
208
209 Returns:
210 dict: Patient information dictionary or None if not found
211 """
212 infos = patient_db.QueryPatientInfo(Filter={'PatientID': str(patient_id)})
213 if not infos:
214 return None
215 # Prefer exact PatientID match if multiple are returned
216 for info in infos:
217 try:
218 if str(info['PatientID']) == str(patient_id):
219 return info
220 except:
221 pass
222 return infos[0] # fallback
223
224def open_patient(patient_db, patient_info):
225 """
226 Open patient in RayStation workspace.
227
228 Args:
229 patient_db: RayStation PatientDB object
230 patient_info (dict): Patient information dictionary
231
232 Returns:
233 object: Patient object or None if opening failed
234 """
235 patient = None
236 try:
237 # Common in many sites
238 patient = patient_db.LoadPatient(PatientInfo=patient_info)
239 except:
240 try:
241 # Alternative in some API versions
242 patient = patient_db.OpenPatient(PatientInfo=patient_info)
243 except Exception as e:
244 raise
245 if patient is None:
246 patient = get_current("Patient") # if Load/Open attached to current context
247 return patient
248
249def is_excluded_by_prefix(name):
250 """
251 Check if ROI name should be excluded based on prefix rules.
252
253 Exclusion criteria:
254 - Names starting with 'z' or 'zz' (case-insensitive)
255 - Leading non-alphabetic characters are stripped before checking
256
257 Args:
258 name (str): ROI name to check
259
260 Returns:
261 bool: True if ROI should be excluded, False otherwise
262 """
263 s = name.strip().lower()
264 s = re.sub(r'^[^a-zA-Z]*', '', s) # strip leading non-letters
265 return s.startswith('z') # this covers 'z...', 'zz...', etc.
266
267def check_oars(patient):
268 """
269 Check for Brachial Plexus and Skin ROIs in patient cases.
270
271 Only checks ROIs with Type == 'Organ' (Organs at risk category)
272 and excludes ROIs with 'z' or 'zz' prefixes.
273
274 Args:
275 patient: RayStation Patient object
276
277 Returns:
278 tuple: (has_brachial_plexus, has_skin) as boolean values
279 """
280 has_brach = False
281 has_skin = False
282 if not patient.Cases:
283 return has_brach, has_skin
284 for case in patient.Cases:
285 for roi in case.PatientModel.RegionsOfInterest:
286 try:
287 if roi.Type != "Organ":
288 continue # only 'Organs at risk'
289 name = roi.Name or ""
290 if not name or is_excluded_by_prefix(name):
291 continue
292 nlc = name.lower()
293 # Check for brachial plexus
294 if "brachial plexus" in nlc or "brach" in nlc or "plexus" in nlc:
295 has_brach = True
296 # Check for skin
297 if "skin" in nlc:
298 has_skin = True
299 except:
300 continue
301 return has_brach, has_skin
302
303def create_single_patient_backup(patient_db, patient_id, patient_number, backup_folder):
304 """
305 Create backup for a single patient - called separately after ROI checking.
306
307 Args:
308 patient_db: RayStation PatientDB object
309 patient_id (str): Patient ID to backup
310 patient_number (int): Sequential number for anonymized naming
311 backup_folder (str): Directory path for backup storage
312
313 Returns:
314 str: Path to the created backup file or empty string if failed
315 """
316 try:
317 # Generate anonymized name
318 anonymized_name = f"br{patient_number}"
319 new_backup_path = os.path.join(backup_folder, f"{anonymized_name}.rsbak")
320
321 # Check if backup already exists
322 if os.path.exists(new_backup_path):
323 print(f" Backup file already exists: {os.path.basename(new_backup_path)}")
324 print(" Will be overwritten with new backup...")
325
326 # Create backup folder if it doesn't exist
327 if not os.path.exists(backup_folder):
328 os.makedirs(backup_folder)
329
330 print(f" Creating backup with anonymization: {anonymized_name}")
331
332 # Get fresh patient info for backup
333 patient_info = get_patient_by_id(patient_db, patient_id)
334 if not patient_info:
335 print(f" Patient {patient_id} not found for backup")
336 return ""
337
338 # Load the patient specifically for backup
339 patient = open_patient(patient_db, patient_info)
340 if not patient:
341 print(f" Could not load patient {patient_id} for backup")
342 return ""
343
344 print(f" Patient loaded for backup: {patient_id}")
345
346 anonymization_settings = {
347 'Anonymize': True,
348 'AnonymizedName': anonymized_name,
349 'AnonymizedID': anonymized_name,
350 'RetainDates': False,
351 'RetainDeviceIdentity': False,
352 'RetainInstitutionIdentity': False,
353 'RetainUIDs': False,
354 'RetainSafePrivateAttributes': False
355 }
356
357 # Create the backup
358 patient_db.BackupPatient(
359 PatientInfo=patient_info,
360 TargetPath=backup_folder,
361 AnonymizationSettings=anonymization_settings
362 )
363
364 # Wait for backup to complete and handle file renaming
365 time.sleep(2.0)
366 backup_files = [f for f in os.listdir(backup_folder) if f.lower().endswith('.rsbak')]
367 if backup_files:
368 latest_backup = max(backup_files, key=lambda f: os.path.getmtime(os.path.join(backup_folder, f)))
369 original_backup_path = os.path.join(backup_folder, latest_backup)
370
371 if original_backup_path != new_backup_path:
372 if os.path.exists(new_backup_path):
373 os.remove(new_backup_path)
374 os.rename(original_backup_path, new_backup_path)
375 print(f"Backup created and renamed: {os.path.basename(new_backup_path)}")
376
377 # Unload patient after successful backup
378 try:
379 patient_db.UnloadPatient(PatientInfo=patient_info)
380 print(f"Unloaded patient after backup: {patient_id}")
381 except:
382 pass
383
384 return new_backup_path
385 else:
386 print(f"Backup created: {os.path.basename(original_backup_path)}")
387 return original_backup_path
388 else:
389 print("Backup created but could not find .rsbak file")
390 return ""
391
392 except Exception as e:
393 print(f"Error creating backup for {patient_id}: {e}")
394 return ""
395
396def write_results_to_csv(results, csv_file_path):
397 """f
398 Write ROI check results to CSV file.
399
400 Creates a CSV with columns: ID, has_brach, Has_skin
401
402 Args:
403 results (list): List of patient result dictionaries
404 csv_file_path (str): Path where CSV file will be saved
405 """
406 try:
407 with open(csv_file_path, 'w', newline='') as csvfile:
408 writer = csv.writer(csvfile)
409 # Write header
410 writer.writerow(['ID', 'has_brach', 'Has_skin'])
411
412 # Write data rows
413 for result in results:
414 writer.writerow([
415 result['original_id'],
416 result['has_brachial_plexus'],
417 result['has_skin']
418 ])
419
420 print(f"\nROI check results saved to: {csv_file_path}")
421 print(f" Total patients processed: {len(results)}")
422
423 except Exception as e:
424 print(f"Error writing CSV file: {e}")
425
426# Main execution
427if __name__ == "__main__":
428 print("Starting TWO-PHASE patient processing from CSV...")
429 results = process_patients_from_csv()
430
431 if results:
432 successful_backups = sum(1 for r in results if r['backup_file'])
433 print(f"\nProcessing completed!")
434 print(f" Patients processed: {len(results)}")
435 print(f" Successful backups: {successful_backups}")
436 print(f" Failed backups: {len(results) - successful_backups}")
437 else:
438 print("\nNo patients were processed successfully")