255 lines
7.8 KiB
Python
255 lines
7.8 KiB
Python
#----------------------------------Imports-----------------------------------
|
|
import json
|
|
import requests
|
|
from datetime import timedelta, datetime, timezone
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
|
|
#---------------------------Set Up--------------------------------------------
|
|
PLANE_URL = "https://project-management.cui-secure.us/api/v1/workspaces/midwatch"
|
|
load_dotenv()
|
|
TOKEN = os.getenv("TOKEN")
|
|
PROJECT_ID = "2a3b613e-a182-4278-8547-9bd5250bf67f"
|
|
WORKSPACE_ID = "midwatch"
|
|
TESTING_PROJECT = "a0c36f9c-65a5-4f50-beae-b9badc8cf11d"
|
|
SPRINT_LENGTH = 7
|
|
|
|
HEADER = {
|
|
"X-API-Key": f"{TOKEN}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
|
|
#---------------------------Testing Grounds------------------------------------
|
|
# test = requests.get(f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles", headers=HEADER)
|
|
|
|
# if test.status_code == 200:
|
|
# print("Connected to Plane API successfully.")
|
|
# print(test.json())
|
|
|
|
# else:
|
|
# print(f"API connection failed. Status code: {test.status_code}")
|
|
# print("Response:", test.text)
|
|
# exit()
|
|
|
|
#-----------------------Functionality-----------------------------------------------
|
|
|
|
def get_last_sprint():
|
|
url = f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles"
|
|
response = requests.get(url, headers=HEADER)
|
|
data = response.json()
|
|
if response.status_code != 200:
|
|
print(f"Failed to fetch sprints: {response.status_code}")
|
|
print("Response:", data)
|
|
return None
|
|
|
|
sprints = data if isinstance(data, list) else data.get("results", [])
|
|
if not sprints:
|
|
print("No sprints found.")
|
|
return None
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
completed = []
|
|
active = []
|
|
upcoming = []
|
|
|
|
for s in sprints:
|
|
start = datetime.fromisoformat(s["start_date"].replace("Z", "+00:00"))
|
|
end = datetime.fromisoformat(s["end_date"].replace("Z", "+00:00"))
|
|
|
|
if end < now:
|
|
completed.append(s)
|
|
elif start <= now <= end:
|
|
active.append(s)
|
|
else:
|
|
upcoming.append(s)
|
|
|
|
completed.sort(key=lambda s: s["end_date"], reverse=True)
|
|
|
|
if completed:
|
|
last = completed[0]
|
|
print(f"Last completed sprint: {last['name']} (ended {last['end_date']})")
|
|
return last
|
|
elif active:
|
|
current = active[0]
|
|
print(f"Current sprint in progress: {current['name']} (ends {current['end_date']})")
|
|
return current
|
|
else:
|
|
print("No active or completed sprints found.")
|
|
return None
|
|
|
|
|
|
def get_all_issues():
|
|
response = requests.get(
|
|
f"{PLANE_URL}/projects/{TESTING_PROJECT}/issues?expand=state",
|
|
headers=HEADER
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
all_issues = data.get('results', data)
|
|
uncompleted = []
|
|
|
|
for issue in all_issues:
|
|
state = issue.get('state', {})
|
|
group = state.get('group', 'unknown')
|
|
|
|
if group.lower() != 'completed':
|
|
uncompleted.append(issue)
|
|
|
|
print(f"Retrieved {len(uncompleted)} uncompleted issues from project {TESTING_PROJECT}")
|
|
return uncompleted
|
|
|
|
|
|
def get_unfinished_issues(sprint, all_issues):
|
|
sprint_id = sprint['id']
|
|
|
|
response = requests.get(
|
|
f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/{sprint_id}/cycle-issues/",
|
|
headers=HEADER
|
|
)
|
|
response.raise_for_status()
|
|
|
|
sprint_data = response.json()
|
|
sprint_issues = sprint_data.get('results', sprint_data)
|
|
|
|
sprint_issue_ids = {i.get('id') for i in sprint_issues if i.get('id')}
|
|
|
|
print(f"Found {len(sprint_issue_ids)} issues linked to sprint {sprint_id}")
|
|
|
|
unfinished_ids = [
|
|
issue.get('id')
|
|
for issue in all_issues
|
|
if issue.get('id') in sprint_issue_ids
|
|
]
|
|
|
|
print(f"Found {len(unfinished_ids)} unfinished issues in sprint {sprint_id}")
|
|
return unfinished_ids
|
|
|
|
|
|
def create_cycle_minimal(name="Auto-Sprint"):
|
|
url = f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/"
|
|
payload = {"name": name}
|
|
|
|
print("Creating new cycle:", payload)
|
|
res = requests.post(url, headers=HEADER, json=payload)
|
|
print("Create response:", res.text)
|
|
res.raise_for_status()
|
|
|
|
cycle = res.json()
|
|
return cycle
|
|
|
|
|
|
def update_cycle_details(cycle_id, start_date, end_date, description):
|
|
url = f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/{cycle_id}"
|
|
payload = {
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
"description": description,
|
|
"name": f"Auto-Sprint {start_date}"
|
|
}
|
|
|
|
print(f"Updating cycle {cycle_id}:", payload)
|
|
res = requests.patch(url, headers=HEADER, json=payload)
|
|
print("Patch response:", res.text)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
|
|
def attach_issues_to_cycle(cycle_id, issue_ids):
|
|
url = f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/{cycle_id}/cycle-issues/"
|
|
payload = {"issues": issue_ids}
|
|
|
|
print(f"Attaching issues to cycle {cycle_id}:", payload)
|
|
res = requests.post(url, headers=HEADER, json=payload)
|
|
print("Attach response:", res.text)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
|
|
def create_new_sprint(last_sprint, unresolved_issue_ids):
|
|
last_end = datetime.fromisoformat(last_sprint["end_date"].replace("Z", "+00:00"))
|
|
new_start = last_end + timedelta(days=1)
|
|
new_end = new_start + timedelta(days=SPRINT_LENGTH)
|
|
|
|
start_str = new_start.strftime("%Y-%m-%dT00:00:00Z")
|
|
end_str = new_end.strftime("%Y-%m-%dT23:59:59Z")
|
|
|
|
|
|
print("Creating cycle with name only...")
|
|
|
|
create_payload = {
|
|
"name": f"Auto-Sprint {new_start}"
|
|
}
|
|
|
|
create_res = requests.post(
|
|
f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/",
|
|
headers=HEADER,
|
|
json=create_payload
|
|
)
|
|
|
|
print("Create response:", create_res.text)
|
|
create_res.raise_for_status()
|
|
|
|
cycle = create_res.json()
|
|
cycle_id = cycle["id"]
|
|
print("Cycle created:", cycle_id)
|
|
|
|
patch_payload = {
|
|
"start_date": "2025-11-15",
|
|
"end_date": "2025-11-16",
|
|
"description": f"Automated sprint starting {start_str}"
|
|
}
|
|
|
|
print("➡ Patching cycle with:", patch_payload)
|
|
patch_res = requests.patch(
|
|
f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/{cycle_id}",
|
|
headers=HEADER,
|
|
json=patch_payload
|
|
)
|
|
print("Patch response:", patch_res.text)
|
|
patch_res.raise_for_status()
|
|
patched_cycle = patch_res.json()
|
|
|
|
if unresolved_issue_ids:
|
|
attach_payload = {"issues": unresolved_issue_ids}
|
|
print("Attaching issues:", attach_payload)
|
|
attach_res = requests.post(
|
|
f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/{cycle_id}/cycle-issues/",
|
|
headers=HEADER,
|
|
json=attach_payload
|
|
)
|
|
print("Attach response:", attach_res.text)
|
|
attach_res.raise_for_status()
|
|
else:
|
|
print("No unresolved issues to attach.")
|
|
|
|
return {
|
|
"cycle_id": cycle_id,
|
|
"updated": patched_cycle,
|
|
"attached_issues": unresolved_issue_ids
|
|
}
|
|
|
|
|
|
|
|
|
|
all_uncompleted = get_all_issues()
|
|
|
|
last_sprint = get_last_sprint()
|
|
|
|
get_unfinished_issues(last_sprint, all_uncompleted)
|
|
|
|
print(create_new_sprint(last_sprint, get_unfinished_issues(last_sprint, all_uncompleted)))
|
|
|
|
# patch_try = requests.patch(f"{PLANE_URL}/projects/{TESTING_PROJECT}/cycles/52261070-73ae-4ac6-aeed-805dfa5fb3d0",
|
|
# headers=HEADER,
|
|
# json= {
|
|
# "name": "Auto-Sprint 2025-11-15",
|
|
# "start_date": "2015-11-17",
|
|
# "end_date": "2025-11-19"
|
|
# })
|
|
|
|
# print(f"Status: {patch_try.status_code}")
|
|
# print(f"Response: {patch_try.text}") |