Changed cycles to modules, functionality is working, looking for feedback on how Nick would like this to run
195 lines
5.6 KiB
Python
195 lines
5.6 KiB
Python
#----------------------------------Imports-----------------------------------
|
|
import json
|
|
import requests
|
|
from datetime import timedelta, datetime, timezone
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
|
|
#---------------------------Set Up--------------------------------------------
|
|
PLANE_URL = "https://project-management.cui-secure.us/api/v1"
|
|
load_dotenv()
|
|
TOKEN = os.getenv("TOKEN")
|
|
PROJECT_ID = "2a3b613e-a182-4278-8547-9bd5250bf67f"
|
|
WORKSPACE_ID = "midwatch"
|
|
TESTING_PROJECT = "a0c36f9c-65a5-4f50-beae-b9badc8cf11d"
|
|
SPRINT_LENGTH = 7
|
|
|
|
HEADER = {
|
|
"X-API-Key": f"{TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
|
|
#---------------------------Testing Grounds------------------------------------
|
|
# test = requests.get(f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules", headers=HEADER)
|
|
|
|
# if test.status_code == 200:
|
|
# print("Connected to Plane API successfully.")
|
|
# print(test.json())
|
|
|
|
# else:
|
|
# print(f"API connection failed. Status code: {test.status_code}")
|
|
# print("Response:", test.text)
|
|
# exit()
|
|
|
|
#-----------------------Functionality-----------------------------------------------
|
|
|
|
def get_last_sprint():
|
|
url = f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules"
|
|
response = requests.get(url, headers=HEADER)
|
|
data = response.json()
|
|
if response.status_code != 200:
|
|
print(f"Failed to fetch sprints: {response.status_code}")
|
|
print("Response:", data)
|
|
return None
|
|
|
|
sprints = data if isinstance(data, list) else data.get("results", [])
|
|
if not sprints:
|
|
print("No sprints found.")
|
|
return None
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
completed = []
|
|
upcoming = []
|
|
|
|
for s in sprints:
|
|
raw = s["target_date"]
|
|
|
|
end = datetime.strptime(raw, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
|
|
|
|
|
if end < now:
|
|
completed.append(s)
|
|
else:
|
|
upcoming.append(s)
|
|
|
|
completed.sort(key=lambda s: s["target_date"], reverse=True)
|
|
|
|
if completed:
|
|
last = completed[0]
|
|
print(f"Last completed sprint: {last['name']} (ended {last['target_date']})")
|
|
return last
|
|
else:
|
|
print("No active or completed sprints found.")
|
|
return None
|
|
|
|
|
|
def get_all_issues():
|
|
response = requests.get(
|
|
f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/issues?expand=state",
|
|
headers=HEADER
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
all_issues = data.get('results', data)
|
|
uncompleted = []
|
|
|
|
for issue in all_issues:
|
|
state = issue.get('state', {})
|
|
group = state.get('group', 'unknown')
|
|
|
|
if group.lower() != 'completed':
|
|
uncompleted.append(issue)
|
|
|
|
print(f"Retrieved {len(uncompleted)} uncompleted issues from project {TESTING_PROJECT}")
|
|
return uncompleted
|
|
|
|
def get_unfinished_issues(sprint, all_issues):
|
|
sprint_id = sprint['id']
|
|
|
|
response = requests.get(
|
|
f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules/{sprint_id}/module-issues/",
|
|
headers=HEADER
|
|
)
|
|
response.raise_for_status()
|
|
|
|
sprint_data = response.json()
|
|
sprint_issues = sprint_data.get('results', sprint_data)
|
|
|
|
sprint_issue_ids = {i.get('id') for i in sprint_issues if i.get('id')}
|
|
|
|
print(f"Found {len(sprint_issue_ids)} issues linked to sprint {sprint_id}")
|
|
|
|
unfinished_ids = [
|
|
issue.get('id')
|
|
for issue in all_issues
|
|
if issue.get('id') in sprint_issue_ids
|
|
]
|
|
|
|
print(f"Found {len(unfinished_ids)} unfinished issues in sprint {sprint_id}")
|
|
|
|
archive_last_sprint = requests.patch(
|
|
f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules/{sprint_id}/",
|
|
headers=HEADER,
|
|
json={"status": "completed"}
|
|
)
|
|
print(f"Patch Last Sprint: {archive_last_sprint.status_code}")
|
|
|
|
return unfinished_ids
|
|
|
|
|
|
def create_new_sprint(last_sprint, unresolved_issue_ids):
|
|
last_end = datetime.fromisoformat(last_sprint["target_date"])
|
|
new_start = last_end + timedelta(days=1)
|
|
new_end = new_start + timedelta(days=SPRINT_LENGTH)
|
|
|
|
starting_str = new_start.strftime("%Y-%m-%d")
|
|
ending_str = new_end.strftime("%Y-%m-%d")
|
|
|
|
|
|
print("Creating cycle with name only...")
|
|
|
|
create_payload = {
|
|
"name": f"Auto-Sprint {starting_str}-{ending_str}",
|
|
"description": f"This is an Automated Sprint for the dates {starting_str} to {ending_str}",
|
|
"start_date": starting_str,
|
|
"target_date": ending_str,
|
|
"status":"in-progress"
|
|
}
|
|
|
|
create_res = requests.post(
|
|
f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules/",
|
|
headers=HEADER,
|
|
json=create_payload
|
|
)
|
|
|
|
print("Create response:", create_res.text)
|
|
create_res.raise_for_status()
|
|
|
|
cycle = create_res.json()
|
|
cycle_id = cycle["id"]
|
|
print("Cycle created:", cycle_id)
|
|
|
|
|
|
if unresolved_issue_ids:
|
|
attach_payload = {"issues": unresolved_issue_ids}
|
|
print("Attaching issues:", attach_payload)
|
|
attach_res = requests.post(
|
|
f"{PLANE_URL}/workspaces/midwatch/projects/{TESTING_PROJECT}/modules/{cycle_id}/module-issues/",
|
|
headers=HEADER,
|
|
json=attach_payload
|
|
)
|
|
print("Attach response:", attach_res.text)
|
|
attach_res.raise_for_status()
|
|
else:
|
|
print("No unresolved issues to attach.")
|
|
|
|
return {
|
|
"cycle_id": cycle_id,
|
|
"attached_issues": unresolved_issue_ids
|
|
}
|
|
|
|
|
|
|
|
|
|
all_uncompleted = get_all_issues()
|
|
|
|
last_sprint = get_last_sprint()
|
|
|
|
get_unfinished_issues(last_sprint, all_uncompleted)
|
|
|
|
print(create_new_sprint(last_sprint, get_unfinished_issues(last_sprint, all_uncompleted)))
|