-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path__init__.py
More file actions
122 lines (106 loc) · 3.89 KB
/
__init__.py
File metadata and controls
122 lines (106 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import logging
import requests
import base64
from socketdev.core.classes import Response
from socketdev.dependencies import Dependencies
from socketdev.exceptions import APIKeyMissing, APIFailure, APIAccessDenied, APIInsufficientQuota, APIResourceNotFound
from socketdev.export import Export
from socketdev.fullscans import FullScans
from socketdev.npm import NPM
from socketdev.openapi import OpenAPI
from socketdev.org import Orgs
from socketdev.purl import Purl
from socketdev.quota import Quota
from socketdev.report import Report
from socketdev.repos import Repos
from socketdev.repositories import Repositories
from socketdev.sbom import Sbom
from socketdev.settings import Settings
__author__ = "socket.dev"
__version__ = "1.0.15"
__all__ = ["socketdev"]
global encoded_key
encoded_key: str
api_url = "https://api.socket.dev/v0"
request_timeout = 30
log = logging.getLogger("socketdev")
log.addHandler(logging.NullHandler())
def encode_key(token: str):
global encoded_key
encoded_key = base64.b64encode(token.encode()).decode("ascii")
def do_request(
path: str, headers: dict = None, payload: [dict, str] = None, files: list = None, method: str = "GET"
) -> Response:
"""
Shared function for performing the requests against the API.
:param path: String path of the URL
:param headers: Optional dictionary of the headers to include in the request. Defaults to None
:param payload: Optional dictionary or string of the payload to POST. Defaults to None
:param files: Optional list of files to send. Defaults to None
:param method: Optional string of the method for the Request. Defaults to GET
"""
if encoded_key is None or encoded_key == "":
raise APIKeyMissing
if headers is None:
headers = {
"Authorization": f"Basic {encoded_key}",
"User-Agent": f"SocketPythonScript/{__version__}",
"accept": "application/json",
}
url = f"{api_url}/{path}"
try:
response = requests.request(
method.upper(), url, headers=headers, data=payload, files=files, timeout=request_timeout
)
if response.status_code >= 400:
raise APIFailure("Bad Request")
elif response.status_code == 401:
raise APIAccessDenied("Unauthorized")
elif response.status_code == 403:
raise APIInsufficientQuota("Insufficient max_quota for API method")
elif response.status_code == 404:
raise APIResourceNotFound(f"Path not found {path}")
elif response.status_code == 429:
raise APIInsufficientQuota("Insufficient quota for API route")
except Exception as error:
response = Response(text=f"{error}", error=True, status_code=500)
raise APIFailure(response)
return response
class socketdev:
token: str
timeout: int
dependencies: Dependencies
npm: NPM
openapi: OpenAPI
org: Orgs
quota: Quota
report: Report
sbom: Sbom
purl: Purl
fullscans: FullScans
export: Export
repositories: Repositories
settings: Settings
repos: Repos
def __init__(self, token: str, timeout: int = 30):
self.token = token + ":"
encode_key(self.token)
self.timeout = timeout
socketdev.set_timeout(self.timeout)
self.dependencies = Dependencies()
self.npm = NPM()
self.openapi = OpenAPI()
self.org = Orgs()
self.quota = Quota()
self.report = Report()
self.sbom = Sbom()
self.purl = Purl()
self.fullscans = FullScans()
self.export = Export()
self.repositories = Repositories()
self.repos = Repos()
self.settings = Settings()
@staticmethod
def set_timeout(timeout: int):
global request_timeout
request_timeout = timeout