Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions xgitguard/common/github_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,61 @@ def get_github_enterprise_commits(self, user_name, repo_name, file_path, header)
except Exception as e:
logger.error(f"Github API commit content get Error: {e}")
return {}

def check_public_branch_exists(self, user_name, repo_name, branch):
"""
Check if a branch exists in a public GitHub repository
params: user_name - string
params: repo_name - string
params: branch - string
returns: True if branch exists, False otherwise
"""
logger.debug("<<<< 'Current Executing Function' >>>>")
token_var = "GITHUB_TOKEN"
if not os.getenv(token_var):
logger.error(
f"GitHub API Token Environment variable '{token_var}' not set."
)
return False
try:
time.sleep(self._throttle_time)
base = self._base_url.replace("/search/code", "")
url = f"{base}/repos/{user_name}/{repo_name}/branches/{branch}"
response = requests.get(
url, auth=("token", os.getenv(token_var)), timeout=10
)
return response.status_code == 200
except Exception as e:
logger.error(f"Public branch existence check failed for '{user_name}/{repo_name}' branch '{branch}': {e}")
return False

def check_enterprise_branch_exists(self, user_name, repo_name, branch, header):
"""
Check if a branch exists in an enterprise GitHub repository
params: user_name - string
params: repo_name - string
params: branch - string
params: header - dict
returns: True if branch exists, False otherwise
"""
logger.debug("<<<< 'Current Executing Function' >>>>")
token_var = "GITHUB_ENTERPRISE_TOKEN"
if not os.getenv(token_var):
logger.error(
f"GitHub API Token Environment variable '{token_var}' not set."
)
return False
try:
time.sleep(self._throttle_time)
base = self._base_url.replace("/search/code", "")
url = f"{base}/repos/{user_name}/{repo_name}/branches/{branch}"
response = requests.get(
url,
auth=("token", os.getenv(token_var)),
headers=header,
timeout=10,
)
return response.status_code == 200
except Exception as e:
logger.error(f"Enterprise branch existence check failed for '{user_name}/{repo_name}' branch '{branch}': {e}")
return False
1 change: 1 addition & 0 deletions xgitguard/config/xgg_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ github:
# GitHub Public
public_api_url: "https://api.github.com/search/code"
public_commits_url: "https://api.github.com/repos/%s/%s/commits?path=%s"
public_pre_url: "https://api.github.com/repos/"

# GitHub Enterprise - For Open Source
enterprise_api_url: "https://github.<< Enterprise Name >>.com/api/v3/search/code"
Expand Down
39 changes: 35 additions & 4 deletions xgitguard/github-enterprise/enterprise_cred_detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def check_existing_detections(org_url_list, url_list, search_query):
return new_org_url_list, new_urls_list, new_hashed_urls


def process_search_results(search_response_lines, search_query, ml_prediction):
def process_search_results(search_response_lines, search_query, ml_prediction, branch=""):
"""
For each search response items, process as below
Get the html urls from the search response
Expand All @@ -387,6 +387,7 @@ def process_search_results(search_response_lines, search_query, ml_prediction):
params: search_response_lines - list
params: search_query - string
params: ml_prediction - boolean
params: branch - string - optional

returns: detection_writes_per_query - int - Total detections written to file
returns: new_results_per_query - int - No of new urls per query
Expand All @@ -413,6 +414,9 @@ def process_search_results(search_response_lines, search_query, ml_prediction):
+ "/contents/"
+ line["path"]
)
# If branch is specified, add ref parameter to the contents API URL
if branch:
html_url = html_url + "?ref=" + branch
url_list.append(html_url)

if url_list:
Expand Down Expand Up @@ -540,7 +544,7 @@ def format_search_query_list(secondary_keywords):


def run_detection(
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[]
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[], branch=""
):
"""
Run GitHub detections
Expand Down Expand Up @@ -655,7 +659,7 @@ def run_detection(
new_results_per_query,
detections_per_query,
) = process_search_results(
search_response_lines, search_query, ml_prediction
search_response_lines, search_query, ml_prediction, branch
)
logger.info(
f"Detection writes in current search query: {detection_writes_per_query}"
Expand Down Expand Up @@ -804,6 +808,16 @@ def arg_parser():
help="Pass the Console Logging as Yes or No. Default is Yes",
)

argparser.add_argument(
"-b",
"--branch",
metavar="Branch Name",
action="store",
type=str,
default="",
help="Pass the Branch name to scan. If branch does not exist, falls back to default branch",
)

args = argparser.parse_args()

if args.secondary_keywords:
Expand Down Expand Up @@ -848,6 +862,8 @@ def arg_parser():
else:
console_logging = False

branch = args.branch.strip() if args.branch else ""

return (
secondary_keywords,
extensions,
Expand All @@ -857,6 +873,7 @@ def arg_parser():
repo,
log_level,
console_logging,
branch,
)


Expand All @@ -871,6 +888,7 @@ def arg_parser():
repo,
log_level,
console_logging,
branch,
) = arg_parser()

# Setting up Logger
Expand All @@ -896,6 +914,19 @@ def arg_parser():
)
sys.exit(1)

run_detection(secondary_keywords, extensions, ml_prediction, org, repo)
# Validate branch if specified
if branch and repo:
repo_parts = repo[0].split("/")
if len(repo_parts) == 2:
header = configs.xgg_configs["github"]["enterprise_header"]
if githubCalls.check_enterprise_branch_exists(repo_parts[0], repo_parts[1], branch, header):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please raise error if requested branch not exists

logger.info(f"Branch '{branch}' exists in repo '{repo[0]}'. Scanning branch '{branch}'.")
else:
logger.error(f"Branch '{branch}' not found in repo '{repo[0]}'. Please provide a valid branch name")
sys.exit(1)
elif branch and not repo:
logger.info(f"Branch '{branch}' specified. Will attempt to scan files on this branch.")

run_detection(secondary_keywords, extensions, ml_prediction, org, repo, branch)

logger.info("xGitGuard Credentials Detection Process Completed")
39 changes: 35 additions & 4 deletions xgitguard/github-enterprise/enterprise_key_detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def check_existing_detections(org_url_list, url_list, search_query):
return new_org_url_list, new_urls_list, new_hashed_urls


def process_search_results(search_response_lines, search_query, ml_prediction):
def process_search_results(search_response_lines, search_query, ml_prediction, branch=""):
"""
For each search response items, process as below
Get the html urls from the search response
Expand All @@ -366,6 +366,7 @@ def process_search_results(search_response_lines, search_query, ml_prediction):
params: search_response_lines - list
params: search_query - string
params: ml_prediction - boolean
params: branch - string - optional

returns: detection_writes_per_query - int - Total detections written to file
returns: new_results_per_query - int - No of new urls per query
Expand All @@ -392,6 +393,9 @@ def process_search_results(search_response_lines, search_query, ml_prediction):
+ "/contents/"
+ line["path"]
)
# If branch is specified, add ref parameter to the contents API URL
if branch:
html_url = html_url + "?ref=" + branch
url_list.append(html_url)

if url_list:
Expand Down Expand Up @@ -519,7 +523,7 @@ def format_search_query_list(secondary_keywords):


def run_detection(
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[]
secondary_keywords=[], extensions=[], ml_prediction=False, org=[], repo=[], branch=""
):
"""
Run GitHub detections
Expand Down Expand Up @@ -634,7 +638,7 @@ def run_detection(
new_results_per_query,
detections_per_query,
) = process_search_results(
search_response_lines, search_query, ml_prediction
search_response_lines, search_query, ml_prediction, branch
)
logger.info(
f"Detection writes in current search query: {detection_writes_per_query}"
Expand Down Expand Up @@ -784,6 +788,16 @@ def arg_parser():
help="Pass the Console Logging as Yes or No. Default is Yes",
)

argparser.add_argument(
"-b",
"--branch",
metavar="Branch Name",
action="store",
type=str,
default="",
help="Pass the Branch name to scan. If branch does not exist, falls back to default branch",
)

args = argparser.parse_args()

if args.secondary_keywords:
Expand Down Expand Up @@ -828,6 +842,8 @@ def arg_parser():
else:
console_logging = False

branch = args.branch.strip() if args.branch else ""

return (
secondary_keywords,
extensions,
Expand All @@ -837,6 +853,7 @@ def arg_parser():
repo,
log_level,
console_logging,
branch,
)


Expand All @@ -851,6 +868,7 @@ def arg_parser():
repo,
log_level,
console_logging,
branch,
) = arg_parser()

# Setting up Logger
Expand All @@ -876,6 +894,19 @@ def arg_parser():
)
sys.exit(1)

run_detection(secondary_keywords, extensions, ml_prediction, org, repo)
# Validate branch if specified
if branch and repo:
repo_parts = repo[0].split("/")
if len(repo_parts) == 2:
header = configs.xgg_configs["github"]["enterprise_header"]
if githubCalls.check_enterprise_branch_exists(repo_parts[0], repo_parts[1], branch, header):
logger.info(f"Branch '{branch}' exists in repo '{repo[0]}'. Scanning branch '{branch}'.")
else:
logger.error(f"Branch '{branch}' not found in repo '{repo[0]}'. Please provide a valid branch name")
sys.exit(1)
elif branch and not repo:
logger.info(f"Branch '{branch}' specified. Will attempt to scan files on this branch.")

run_detection(secondary_keywords, extensions, ml_prediction, org, repo, branch)

logger.info("xGitGuard Enterprise Keys and Token Detection Process Completed")
Loading