import os import csv import re from dotenv import load_dotenv import requests load_dotenv() FORGEJO_API_TOKEN = os.getenv("FORGEJO_API_TOKEN") forgejo_issue_api_string = "https://git.agaric.com/api/v1/repos/{owner}/{repo}/issues/{index}" """ projects = { 'harvest_project': ('owner', 'repo name') } """ projects = { "MASS Continuous Improvement": ('mass', 'mass'), "Housing Works": ("housingworks", "app-housingworks-net"), } issue_fields = [ "First Issue Title", "First Issue URL", "Second Issue Title", "Second Issue URL", "Third Issue Title", "Third Issue URL" ] issues_and_urls = [] def get_issue_title_and_url(issue_number): global issues_and_urls owner = projects["Housing Works"][0] repo = projects["Housing Works"][1] issue_url = forgejo_issue_api_string.format(owner=owner, repo=repo, index=issue_number) response = requests.get(issue_url, params={"access_token": FORGEJO_API_TOKEN}) json_response = response.json() issue_title = json_response['title'] issue_url = json_response['html_url'] issues_and_urls += [issue_title, issue_url] return [issue_title, issue_url] def prompt_for_file(file): file = input("Enter harvest report: ") if not os.path.exists(file): print("THAT FILE DOES NOT EXIST, EXITING PROGRAM") quit() print(file) return file def split_issues_into_columns(issues): for issue in issues: get_issue_title_and_url(issue) issues_dict = dict(zip(issue_fields, issues_and_urls)) return issues_dict def parse_notes_section(notes): regex_pattern = r"[Ii]ssue\s*(?:#)?\d+|#\d+" matches = re.findall(regex_pattern, notes)[:3] issue_numbers = [] for match in matches: match = re.search(r"\d+", match).group() issue_numbers.append(match) return issue_numbers def parse_harvest_csv(file=None): global issues_and_urls if file is None: file = prompt_for_file(file) print('Beginning parsing for issues') with open(file, 'r') as f: csv_reader = csv.DictReader(f) rows = list(csv_reader) original_fieldnames = csv_reader.fieldnames modified_fieldnames = original_fieldnames + issue_fields with open('modified_report.csv', 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames = modified_fieldnames) writer.writeheader() row_count = 0 for row in rows: if (row_count % 20 == 0): print("ON ROW:", row_count) issues = parse_notes_section(row['Notes']) issues_dict = split_issues_into_columns(issues) row.update(issues_dict) writer.writerow(row) issues_and_urls = [] row_count += 1 if __name__ == "__main__": parse_harvest_csv()