From de403deed73340f8068739dc240ebebfa1053872 Mon Sep 17 00:00:00 2001 From: jwijenbergh Date: Mon, 20 Mar 2023 13:02:50 +0100 Subject: Wrappers Python: Implement V2 initial API --- wrappers/python/eduvpn_common/discovery.py | 213 ------------ wrappers/python/eduvpn_common/error.py | 14 - wrappers/python/eduvpn_common/event.py | 192 ----------- wrappers/python/eduvpn_common/loader.py | 102 +----- wrappers/python/eduvpn_common/main.py | 532 +++++------------------------ wrappers/python/eduvpn_common/server.py | 441 ------------------------ wrappers/python/eduvpn_common/state.py | 32 -- wrappers/python/eduvpn_common/types.py | 242 +------------ 8 files changed, 121 insertions(+), 1647 deletions(-) delete mode 100644 wrappers/python/eduvpn_common/discovery.py delete mode 100644 wrappers/python/eduvpn_common/error.py delete mode 100644 wrappers/python/eduvpn_common/event.py delete mode 100644 wrappers/python/eduvpn_common/server.py delete mode 100644 wrappers/python/eduvpn_common/state.py diff --git a/wrappers/python/eduvpn_common/discovery.py b/wrappers/python/eduvpn_common/discovery.py deleted file mode 100644 index 1d59c00..0000000 --- a/wrappers/python/eduvpn_common/discovery.py +++ /dev/null @@ -1,213 +0,0 @@ -from ctypes import CDLL, POINTER, c_void_p, cast -from typing import List, Optional - -from eduvpn_common.types import ( - cDiscoveryOrganizations, - cDiscoveryServers, - get_ptr_list_strings, -) - - -class DiscoOrganization: - """The class that represents an organization from discovery - - :param: display_name: str: The display name of the organizations - :param: org_id: str: The organization ID - :param: secure_internet_home: str: Indicating which server is the secure internet home server - :param: keyword_list: The list of strings that the users gets to search on to find the server - """ - - def __init__( - self, - display_name: str, - org_id: str, - secure_internet_home: str, - keyword_list: List[str], - ): - self.display_name = display_name - self.org_id = org_id - self.secure_internet_home = secure_internet_home - self.keyword_list = keyword_list - - def __str__(self): - return self.display_name - - -class DiscoOrganizations: - """The class that represents the list of disco organizations from discovery. - Additionally it has provided a version which indicates which exact 'version' was used from discovery - - :param: version: int: The version of the list as returned by Discovery - :param: organizations: List[DiscoOrganization]: The actual list of discovery organizations - """ - - def __init__(self, version: int, organizations: List[DiscoOrganization]): - self.version = version - self.organizations = organizations - - -class DiscoServer: - """The class that represents a discovery server, this can be an institute access or secure internet server - - :param: authentication_url_template: str: The OAuth template to use to skip WAYF - :param: base_url: str: The base URL of the server - :param: country_code: str: The country code of the server - :param: display_name: str: The display name of the server - :param: keyword_list: List[str]: The list of keywords that the user can use to find the server - :param: public_keys: List[str]: The list of public keys - :param: server_type: str: The server type as a string - :param: support_contacts: List[str]: The list of support contacts - """ - - def __init__( - self, - authentication_url_template: str, - base_url: str, - country_code: str, - display_name: str, - keyword_list: List[str], - public_keys: List[str], - server_type: str, - support_contacts: List[str], - ): - self.authentication_url_template = authentication_url_template - self.base_url = base_url - self.country_code = country_code - self.display_name = display_name - self.keyword_list = keyword_list - self.public_keys = public_keys - self.server_type = server_type - self.support_contacts = support_contacts - - def __str__(self): - return self.display_name - - -class DiscoServers: - """This class represents the list of discovery servers. - The version indicates which exact 'version' from Discovery was used. - - :param: version: int: The version of the list as returned by Discovery - :param: servers: List[DiscoServers]: The list of discovery servers - """ - - def __init__(self, version: int, servers: List[DiscoServer]): - self.version = version - self.servers = servers - - -def get_disco_organization(ptr) -> Optional[DiscoOrganization]: - """Gets a discovery organization from the Go library in a C structure and returns a Python usable structure - - :param ptr: The pointer returned by the go library that contains a discovery organization - - :meta private: - - :return: The Discovery Organization if there is one - :rtype: Optional[DiscoOrganization] - """ - if not ptr: - return None - - current_organization = ptr.contents - display_name = current_organization.display_name.decode("utf-8") - org_id = current_organization.org_id.decode("utf-8") - secure_internet_home = current_organization.secure_internet_home.decode("utf-8") - keyword_list = current_organization.keyword_list.decode("utf-8") - return DiscoOrganization(display_name, org_id, secure_internet_home, keyword_list) - - -def get_disco_server(lib: CDLL, ptr) -> Optional[DiscoServer]: - """Gets a discovery server from the Go library in a C structure and returns a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: The pointer to a discovery server returned by the Go library - - :meta private: - - :return: The Discovery Server if there is one - :rtype: Optional[DiscoServer] - """ - if not ptr: - return None - - current_server = ptr.contents - authentication_url_template = current_server.authentication_url_template.decode( - "utf-8" - ) - base_url = current_server.base_url.decode("utf-8") - country_code = current_server.country_code.decode("utf-8") - display_name = current_server.display_name.decode("utf-8") - keyword_list = current_server.keyword_list.decode("utf-8") - public_keys = get_ptr_list_strings( - lib, current_server.public_key_list, current_server.total_public_keys - ) - server_type = current_server.server_type.decode("utf-8") - support_contacts = get_ptr_list_strings( - lib, current_server.support_contact, current_server.total_support_contact - ) - return DiscoServer( - authentication_url_template, - base_url, - country_code, - display_name, - keyword_list, - public_keys, - server_type, - support_contacts, - ) - - -def get_disco_servers(lib: CDLL, ptr: c_void_p) -> Optional[DiscoServers]: - """Gets servers from the Go library in a C structure and returns a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The pointer returned by the Go library for the discovery servers - - :meta private: - - :return: The Discovery Servers if there are any - :rtype: Optional[DiscoServers] - """ - if ptr: - svrs = cast(ptr, POINTER(cDiscoveryServers)).contents - - servers = [] - - if svrs.servers: - for i in range(svrs.total_servers): - current = get_disco_server(lib, svrs.servers[i]) - - if current is None: - continue - servers.append(current) - disco_version = svrs.version - lib.FreeDiscoServers(ptr) - return DiscoServers(disco_version, servers) - return None - - -def get_disco_organizations(lib: CDLL, ptr: c_void_p) -> Optional[DiscoOrganizations]: - """Gets organizations from the Go library in a C structure and returns a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The pointer returned by the Go library for the discovery organizations - - :meta private: - - :return: The Discovery Organizations if there are any - :rtype: Optional[DiscoOrganizations] - """ - if ptr: - orgs = cast(ptr, POINTER(cDiscoveryOrganizations)).contents - organizations = [] - if orgs.organizations: - for i in range(orgs.total_organizations): - current = get_disco_organization(orgs.organizations[i]) - if current is None: - continue - organizations.append(current) - disco_version = orgs.version - lib.FreeDiscoOrganizations(ptr) - return DiscoOrganizations(disco_version, organizations) - return None diff --git a/wrappers/python/eduvpn_common/error.py b/wrappers/python/eduvpn_common/error.py deleted file mode 100644 index 3d84331..0000000 --- a/wrappers/python/eduvpn_common/error.py +++ /dev/null @@ -1,14 +0,0 @@ -from enum import Enum - - -class WrappedError(Exception): - """An exception returned by the Go library - - :param: traceback: str: The traceback of the error including newlines - :param: cause: str: The cause of the error as a message - """ - - def __init__(self, traceback: str, cause: str): - super(WrappedError, self).__init__(cause) - self.traceback = traceback - self.cause = cause diff --git a/wrappers/python/eduvpn_common/event.py b/wrappers/python/eduvpn_common/event.py deleted file mode 100644 index 746a0c9..0000000 --- a/wrappers/python/eduvpn_common/event.py +++ /dev/null @@ -1,192 +0,0 @@ -from ctypes import CDLL -from typing import Any, Callable, Dict, List, Tuple - -from eduvpn_common.server import ( - get_locations, - get_servers, - get_transition_profiles, - get_transition_server, -) -from eduvpn_common.state import State, StateType -from eduvpn_common.types import get_ptr_string - -# The attribute that callback functions get -EDUVPN_CALLBACK_PROPERTY = "_eduvpn_property_callback" - -# A state transition decorator for classes -# To use this, make sure to register the class with `register_class_callbacks` -def class_state_transition(state: int, state_type: StateType) -> Callable: - """A decorator to be internally by classes to register the event - - :param state: int: The state of the transition - :param state_type: StateType: The type of transition - - :meta private: - """ - - def wrapper(func): - """ - - :param func: The function to set the internal attribute for - - """ - setattr(func, EDUVPN_CALLBACK_PROPERTY, (state, state_type)) - return func - - return wrapper - - -def convert_data(lib: CDLL, state: int, data: Any) -> None: - """The function that converts the C structure from the Go library to a Python structure - - :param lib: CDLL: The Go shared library - :param state: int: The state to convert the data for - :param data: Any: The data itself that has to be converted - - :meta private: - """ - if not data: - return None - if state is State.NO_SERVER: - return get_servers(lib, data) - if state is State.OAUTH_STARTED: - return get_ptr_string(lib, data) - if state is State.ASK_LOCATION: - return get_locations(lib, data) - if state is State.ASK_PROFILE: - return get_transition_profiles(lib, data) - if state in [ - State.DISCONNECTED, - State.DISCONNECTING, - State.CONNECTING, - State.CONNECTED, - ]: - return get_transition_server(lib, data) - - -class EventHandler(object): - """The class that neatly handles event callbacks from the internal Go FSM""" - - def __init__(self, lib: CDLL): - self.handlers: Dict[Tuple[int, StateType], List[Callable]] = {} - self.lib = lib - - def change_class_callbacks(self, cls: Any, add: bool = True) -> None: - """The function that is used to change class callbacks - - :param cls: Any: The class to change the callbacks for - :param add: bool: (Default value = True): Whether or not to add or remove the event. If true the event gets added - - :meta private: - """ - # Loop over method names - for method_name in dir(cls): - try: - # Get the method - method = getattr(cls, method_name) - except: - # Unable to get a value, go to the next - continue - - # If it has a callback defined, add it to the events - method_value = getattr(method, EDUVPN_CALLBACK_PROPERTY, None) - if method_value: - state, state_type = method_value - - if add: - self.add_event(state, state_type, method) - else: - self.remove_event(state, state_type, method) - - def remove_event(self, state: int, state_type: StateType, func: Callable) -> None: - """Removes an event - - :param state: int: The state to remove the event for - :param state_type: StateType: The state type to remove the event for - :param func: Callable: The function that needs to be removed from the event - - :meta private: - """ - for key, values in self.handlers.copy().items(): - if key == (state, state_type): - values.remove(func) - if not values: - del self.handlers[key] - else: - self.handlers[key] = values - - def add_event(self, state: int, state_type: StateType, func: Callable) -> None: - """Adds an event - - :param state: int: The state to add the event for - :param state_type: StateType: The state type to add the event for - :param func: Callable: The function that needs to be added to the event - - :meta private: - """ - if (state, state_type) not in self.handlers: - self.handlers[(state, state_type)] = [] - self.handlers[(state, state_type)].append(func) - - def on(self, state: int, state_type: StateType) -> Callable: - """The decorator for standalone functions - - :param state: int: The state of the event - :param state_type: StateType: The state type of the event - - :meta private: - """ - - def wrapped_f(func): - """ - - :param func: The function to add the event for - - """ - self.add_event(state, state_type, func) - return func - - return wrapped_f - - def run_state( - self, state: int, other_state: int, state_type: StateType, data: str - ) -> bool: - """The function that runs the callback for a specific event - - :param state: int: The state of the event - :param other_state: int: The other state of the event - :param state_type: StateType: The state type of the event - :param data: str: The data that gets passed to the function callback when the event is ran - - :meta private: - """ - if (state, state_type) not in self.handlers: - return False - for func in self.handlers[(state, state_type)]: - func(other_state, data) - return True - - def run( - self, old_state: int, new_state: int, data: Any, convert: bool = True - ) -> bool: - """Run a specific event. - It converts the data and then runs the event for all state types - - :param old_state: int: The previous state for running the event - :param new_state: int: The new state for running the event - :param data: Any: The data that gets passed to the event - :param convert: bool: (Default value = True): Whether or not to convert the data further - - """ - # First run leave transitions, then enter - # The state is done when the wait event finishes - converted = data - if convert: - converted = convert_data(self.lib, new_state, data) - self.run_state(old_state, new_state, StateType.LEAVE, converted) - # We decide handled based on enter transitions - handled = self.run_state(new_state, old_state, StateType.ENTER, converted) - # Only run wait transitions if the enter transition is handled - if handled: - self.run_state(new_state, old_state, StateType.WAIT, converted) - return handled diff --git a/wrappers/python/eduvpn_common/loader.py b/wrappers/python/eduvpn_common/loader.py index 673d180..c5709fb 100644 --- a/wrappers/python/eduvpn_common/loader.py +++ b/wrappers/python/eduvpn_common/loader.py @@ -4,13 +4,7 @@ from collections import defaultdict from ctypes import CDLL, c_char_p, c_int, c_void_p, cdll from eduvpn_common import __version__ -from eduvpn_common.types import ( - cToken, - DataError, - ReadRxBytes, - UpdateToken, - VPNStateChange, -) +from eduvpn_common.types import DataError, ReadRxBytes, VPNStateChange def load_lib() -> CDLL: @@ -63,111 +57,51 @@ def initialize_functions(lib: CDLL) -> None: # Exposed functions # We have to use c_void_p instead of c_char_p to free it properly # See https://stackoverflow.com/questions/13445568/python-ctypes-how-to-free-memory-getting-invalid-pointer-error - lib.CancelOAuth.argtypes, lib.CancelOAuth.restype = [c_char_p], c_void_p - lib.ChangeSecureLocation.argtypes, lib.ChangeSecureLocation.restype = [ - c_char_p - ], c_void_p - lib.Deregister.argtypes, lib.Deregister.restype = [c_char_p], None - lib.FreeConfig.argtypes, lib.FreeConfig.restype = [c_void_p], None - lib.FreeTokens.argtypes, lib.FreeTokens.restype = [c_void_p], None - lib.FreeDiscoOrganizations.argtypes, lib.FreeDiscoOrganizations.restype = [ - c_void_p - ], None - lib.FreeDiscoServers.argtypes, lib.FreeDiscoServers.restype = [c_void_p], None - lib.FreeError.argtypes, lib.FreeError.restype = [c_void_p], None - lib.FreeProfiles.argtypes, lib.FreeProfiles.restype = [c_void_p], None - lib.FreeSecureLocations.argtypes, lib.FreeSecureLocations.restype = [c_void_p], None - lib.FreeServer.argtypes, lib.FreeServer.restype = [c_void_p], None - lib.FreeServers.argtypes, lib.FreeServers.restype = [c_void_p], None + lib.CancelOAuth.argtypes, lib.CancelOAuth.restype = [], c_void_p + lib.Deregister.argtypes, lib.Deregister.restype = [], None + lib.ExpiryTimes.argtypes, lib.ExpiryTimes.restype = [], DataError lib.FreeString.argtypes, lib.FreeString.restype = [c_void_p], None - lib.GetConfigCustomServer.argtypes, lib.GetConfigCustomServer.restype = [ + lib.DiscoOrganizations.argtypes, lib.DiscoOrganizations.restype = [], DataError + lib.DiscoServers.argtypes, lib.DiscoServers.restype = [], DataError + lib.GetConfig.argtypes, lib.GetConfig.restype = [ c_char_p, c_char_p, c_int, - cToken, + c_char_p, ], DataError - lib.GetConfigInstituteAccess.argtypes, lib.GetConfigInstituteAccess.restype = [ + lib.AddServer.argtypes, lib.AddServer.restype = [ c_char_p, c_char_p, - c_int, - cToken, - ], DataError - lib.GetConfigSecureInternet.argtypes, lib.GetConfigSecureInternet.restype = [ + ], c_char_p + lib.CurrentServer.argtypes, lib.CurrentServer.restype = [], DataError + lib.RemoveServer.argtypes, lib.RemoveServer.restype = [ c_char_p, c_char_p, - c_int, - cToken, - ], DataError - lib.GetDiscoOrganizations.argtypes, lib.GetDiscoOrganizations.restype = [ - c_char_p - ], DataError - lib.GetDiscoServers.argtypes, lib.GetDiscoServers.restype = [c_char_p], DataError - lib.GetCurrentServer.argtypes, lib.GetCurrentServer.restype = [c_char_p], DataError - lib.GetSavedServers.argtypes, lib.GetSavedServers.restype = [c_char_p], DataError - lib.GoBack.argtypes, lib.GoBack.restype = [c_char_p], None - lib.InFSMState.argtypes, lib.InFSMState.restype = [c_void_p, c_int], int + ], c_char_p + lib.ServerList.argtypes, lib.ServerList.restype = [], DataError lib.Register.argtypes, lib.Register.restype = [ c_char_p, c_char_p, c_char_p, - c_char_p, VPNStateChange, c_int, ], c_void_p - lib.SetTokenUpdater.argtypes, lib.SetTokenUpdater.restype = [ - c_char_p, - UpdateToken, - ], c_void_p - lib.RemoveCustomServer.argtypes, lib.RemoveCustomServer.restype = [ - c_char_p, - c_char_p, - ], c_void_p - lib.AddInstituteAccess.argtypes, lib.AddInstituteAccess.restype = [ - c_char_p, - c_char_p, - ], c_void_p - ( - lib.AddSecureInternetHomeServer.argtypes, - lib.AddSecureInternetHomeServer.restype, - ) = [ - c_char_p, - c_char_p, - ], c_void_p - lib.AddCustomServer.argtypes, lib.AddCustomServer.restype = [ - c_char_p, - c_char_p, - ], c_void_p - lib.RemoveInstituteAccess.argtypes, lib.RemoveInstituteAccess.restype = [ - c_char_p, - c_char_p, - ], c_void_p - lib.RemoveSecureInternet.argtypes, lib.RemoveSecureInternet.restype = [ - c_char_p - ], c_void_p - lib.RenewSession.argtypes, lib.RenewSession.restype = [c_char_p], c_void_p - lib.SetConnected.argtypes, lib.SetConnected.restype = [c_char_p], c_void_p - lib.SetConnecting.argtypes, lib.SetConnecting.restype = [c_char_p], c_void_p + lib.RenewSession.argtypes, lib.RenewSession.restype = [], c_void_p lib.Cleanup.argtypes, lib.Cleanup.restype = [ c_char_p, - cToken, ], c_void_p - lib.SetDisconnected.argtypes, lib.SetDisconnected.restype = [c_char_p], c_void_p - lib.SetDisconnecting.argtypes, lib.SetDisconnecting.restype = [c_char_p], c_void_p - lib.SetProfileID.argtypes, lib.SetProfileID.restype = [c_char_p, c_char_p], c_void_p - lib.SetSearchServer.argtypes, lib.SetSearchServer.restype = [c_char_p], c_void_p + lib.SetProfileID.argtypes, lib.SetProfileID.restype = [c_char_p], c_void_p lib.SetSecureLocation.argtypes, lib.SetSecureLocation.restype = [ c_char_p, - c_char_p, ], c_void_p + lib.SecureLocationList.argtypes, lib.SecureLocationList.restype = [], DataError lib.SetSupportWireguard.argtypes, lib.SetSupportWireguard.restype = [ - c_char_p, c_int, ], c_void_p lib.ShouldRenewButton.argtypes, lib.ShouldRenewButton.restype = [], int lib.StartFailover.argtypes, lib.StartFailover.restype = [ - c_char_p, c_char_p, c_int, ReadRxBytes, ], DataError - lib.CancelFailover.argtypes, lib.CancelFailover.restype = [c_char_p], c_void_p + lib.CancelFailover.argtypes, lib.CancelFailover.restype = [], c_void_p diff --git a/wrappers/python/eduvpn_common/main.py b/wrappers/python/eduvpn_common/main.py index 74ff52d..f22e4d3 100644 --- a/wrappers/python/eduvpn_common/main.py +++ b/wrappers/python/eduvpn_common/main.py @@ -1,36 +1,19 @@ -import threading -from ctypes import POINTER, cast, c_void_p, c_int, pointer -from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple - -from eduvpn_common.discovery import ( - DiscoOrganizations, - DiscoServers, - get_disco_organizations, - get_disco_servers, -) -from eduvpn_common.event import EventHandler +from typing import Any, Callable, Iterator, Optional + from eduvpn_common.loader import initialize_functions, load_lib -from eduvpn_common.server import ( - Profiles, - Config, - Token, - encode_tokens, - get_config, - Server, - get_tokens, - get_transition_server, - get_servers, -) -from eduvpn_common.state import State, StateType -from eduvpn_common.types import ( - VPNStateChange, - ReadRxBytes, - UpdateToken, - decode_res, - encode_args, - get_data_error, - get_bool, -) +from eduvpn_common.types import (ReadRxBytes, VPNStateChange, decode_res, + encode_args, get_bool, get_data_error) + + +class WrappedError(Exception): + pass + + +def forwardError(error: bytes | str): + # TODO: HACK, remove this + if isinstance(error, str): + raise WrappedError(error) + raise WrappedError(error.decode("utf-8")) class EduVPN(object): @@ -40,52 +23,18 @@ class EduVPN(object): :param name: str: The name of the client. For commonly used names, see https://git.sr.ht/~fkooman/vpn-user-portal/tree/v3/item/src/OAuth/ClientDb.php. E.g. org.eduvpn.app.linux, if this name has "letsconnect" in it, then it is a Let's Connect! variant :param version: str: The version number of the client as a string, max 10 characters :param config_directory: str: The directory (absolute/relative) where to store the files - :param language: str: The language of the client, e.g. en """ - def __init__(self, name: str, version: str, config_directory: str, language: str): + def __init__(self, name: str, version: str, config_directory: str): self.name = name self.version = version self.config_directory = config_directory - self.language = language # Load the library self.lib = load_lib() initialize_functions(self.lib) - self.event_handler = EventHandler(self.lib) - self.token_callback = None - - # Callbacks that need to wait for specific events - - # The ask profile callback needs to wait for the UI thread to select a profile - # This is stored in the profile_event - self.profile_event: Optional[threading.Event] = None - self.location_event: Optional[threading.Event] = None - - @self.event.on(State.ASK_PROFILE, StateType.WAIT) - def wait_profile_event(old_state: int, profiles: Profiles): - """This functions waits until the ask location thread event is finished - - :param old_state: int: The old state of the profiles event - :param profiles: Profiles: The profiles - - """ - if self.profile_event: - self.profile_event.wait() - - @self.event.on(State.ASK_LOCATION, StateType.WAIT) - def wait_location_event(old_state: int, locations: List[str]): - """This functions waits until the location thread event is finished - - :param old_state: int: The old state of the location event - :param locations: List[str]: The locations - - """ - if self.location_event: - self.location_event.wait() - def go_function( self, func: Any, *args: Iterator, decode_func: Optional[Callable] = None ) -> Any: @@ -99,8 +48,8 @@ class EduVPN(object): :meta private: """ # The functions all have at least one arg type which is the name of the client - args_gen = encode_args(list(args), func.argtypes[1:]) - res = func(self.name.encode("utf-8"), *(args_gen)) + args_gen = encode_args(list(args), func.argtypes) + res = func(*(args_gen)) if decode_func is None: return decode_res(func.restype)(self.lib, res) else: @@ -111,348 +60,143 @@ class EduVPN(object): cancel_oauth_err = self.go_function(self.lib.CancelOAuth) if cancel_oauth_err: - raise cancel_oauth_err + forwardError(cancel_oauth_err) def deregister(self) -> None: """Deregister the Go shared library. This removes the object from internal bookkeeping and saves the configuration """ self.go_function(self.lib.Deregister) - remove_as_global_object(self) + global callback_object + callback_object = None - def register(self, debug: bool = False) -> None: + def register(self, handler: Optional[Callable] = None, debug: bool = False) -> None: """Register the Go shared library. This makes sure the FSM is initialized and that we can call Go functions :param debug: bool: (Default value = False): Whether or not we want to enable debug logging """ - if not add_as_global_object(self): + global callback_object + if callback_object is not None: raise Exception("Already registered") - + callback_object = handler register_err = self.go_function( self.lib.Register, + self.name, self.version, self.config_directory, - self.language, state_callback, debug, ) if register_err: - raise register_err - - def set_token_updater(self, updater: Callable): - self.token_callback = updater - updater_err = self.go_function( - self.lib.SetTokenUpdater, - token_callback, - ) - - if updater_err: - raise updater_err - - def get_disco_servers(self) -> Optional[DiscoServers]: - """Get the discovery servers - - :raises WrappedError: An error by the Go library - - :return: The disco Servers if any - :rtype: Optional[DiscoServers] - """ - servers, _ = self.go_function( - self.lib.GetDiscoServers, - decode_func=lambda lib, x: get_data_error(lib, x, get_disco_servers), - ) - - return servers - - def get_disco_organizations(self) -> Optional[DiscoOrganizations]: - """Get the discovery organizations - - :raises WrappedError: An error by the Go library - - :return: The discovery Organizations if any - :rtype: Optional[DiscoOrganizations] - """ - organizations, _ = self.go_function( - self.lib.GetDiscoOrganizations, - decode_func=lambda lib, x: get_data_error(lib, x, get_disco_organizations), - ) - - return organizations - - def add_institute_access(self, url: str) -> None: - """Add an institute access server - - :param url: str: The URL for the institute access server. Use the exact base_url as returned by Discovery - - :raises WrappedError: An error by the Go library - """ - add_err = self.go_function(self.lib.AddInstituteAccess, url) - - if add_err: - raise add_err - - def add_secure_internet_home(self, org_id: str) -> None: - """Add a secure internet server - - :param org_id: str: The organization ID of the secure internet server. Use the exact organization as returned by Discovery - - :raises WrappedError: An error by the Go library - """ - self.location_event = threading.Event() - add_err = self.go_function(self.lib.AddSecureInternetHomeServer, org_id) - - if add_err: - raise add_err + forwardError(register_err) - def add_custom_server(self, url: str) -> None: - """Add a custom server + def add_server(self, _type: str, _id: str) -> None: + """Add a server - :param url: str: The base URL of the server + :param _type: str: The type of server: "institute_access", "secure_internet" or "custom_server" + :param _id: str: The identifier of the server, e.g. "https://vpn.example.com/" :raises WrappedError: An error by the Go library """ - add_err = self.go_function(self.lib.AddCustomServer, url) + add_err = self.go_function(self.lib.AddServer, _type, _id) if add_err: - raise add_err + forwardError(add_err) - def remove_secure_internet(self) -> None: - """Remove the secure internet server + def get_expiry_times(self) -> Optional[str]: + expiry, expiry_err = self.go_function(self.lib.ExpiryTimes) + if expiry_err: + forwardError(expiry_err) + return expiry - :raises WrappedError: An error by the Go library - """ - remove_err = self.go_function(self.lib.RemoveSecureInternet) - - if remove_err: - raise remove_err + def get_current_server(self) -> Optional[str]: + server, server_err = self.go_function(self.lib.CurrentServer) + if server_err: + forwardError(server_err) + return server - def remove_institute_access(self, url: str) -> None: - """Remove an institute access server + def get_secure_locations(self) -> Optional[str]: + locs, locs_err = self.go_function(self.lib.SecureLocationList) + if locs_err: + forwardError(locs_err) + return locs - :param url: str: The URL for the institute access server. Use the exact base_url as returned by Discovery + def get_disco_organizations(self) -> Optional[str]: + orgs, _ = self.go_function(self.lib.DiscoOrganizations) + # TODO: Log error + return orgs - :raises WrappedError: An error by the Go library - """ - remove_err = self.go_function(self.lib.RemoveInstituteAccess, url) + def get_disco_servers(self) -> Optional[str]: + servers, _ = self.go_function(self.lib.DiscoServers) + # TODO: Log error + return servers - if remove_err: - raise remove_err + def get_servers(self) -> Optional[str]: + servers, servers_err = self.go_function(self.lib.ServerList) + if servers_err: + forwardError(servers_err) + return servers - def remove_custom_server(self, url: str) -> None: - """Remove a custom server + def remove_server(self, _type: str, _id: str) -> None: + """Remove a server - :param url: str: The base URL of the server + :param _type: str: The type of server: "institute_access", "secure_internet" or "custom_server" + :param _id: str: The identifier of the server, e.g. "https://vpn.example.com/" :raises WrappedError: An error by the Go library """ - remove_err = self.go_function(self.lib.RemoveCustomServer, url) + remove_err = self.go_function(self.lib.RemoveServer, _type, _id) if remove_err: - raise remove_err + forwardError(remove_err) def get_config( - self, - identifier: str, - func: Any, - prefer_tcp: bool = False, - tokens: Optional[Token] = None, - ) -> Optional[Config]: + self, _type: str, identifier: str, prefer_tcp: bool = False, tokens: str = "{}" + ) -> Optional[str]: """Get an OpenVPN/WireGuard configuration from the server + :param _type: str: The type of server: "institute_access", "secure_internet" or "custom_server" :param identifier: str: The identifier of the server, e.g. URL or ORG ID - :param func: Any: The Go function to call :param prefer_tcp: bool: (Default value = False): Whether or not to prefer TCP - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available + :param tokens: str (Defualt value = ""): The OAuth tokens if available :meta private: :raises WrappedError: An error by the Go library - :return: The configuration and configuration type ('openvpn' or 'wireguard') - :rtype: Config + :return: The configuration and configuration type ('openvpn' or 'wireguard') as a JSON string + :rtype: str """ # Because it could be the case that a profile callback is started, store a threading event # In the constructor, we have defined a wait event for Ask_Profile, this waits for this event to be set # The event is set in self.set_profile - self.profile_event = threading.Event() - config, config_err = self.go_function( - func, + self.lib.GetConfig, + _type, identifier, prefer_tcp, - encode_tokens(tokens), - decode_func=lambda lib, x: get_data_error(lib, x, get_config), + tokens, ) - self.profile_event = None - self.location_event = None - if config_err: - raise config_err + forwardError(config_err) return config - def get_config_custom_server( - self, url: str, prefer_tcp: bool = False, tokens: Optional[Token] = None - ) -> Optional[Config]: - """Get an OpenVPN/WireGuard configuration from a custom server - - :param url: str: The URL of the custom server - :param prefer_tcp: bool: (Default value = False): Whether or not to prefer TCP - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available - - :raises WrappedError: An error by the Go library - - :return: The configuration and configuration type ('openvpn' or 'wireguard') - :rtype: Config - """ - return self.get_config(url, self.lib.GetConfigCustomServer, prefer_tcp, tokens) - - def get_config_institute_access( - self, url: str, prefer_tcp: bool = False, tokens: Optional[Token] = None - ) -> Optional[Config]: - """Get an OpenVPN/WireGuard configuration from an institute access server - - :param url: str: The URL of the institute access server. Use the one from Discovery - :param prefer_tcp: bool: (Default value = False): Whether or not to prefer TCP - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available - - :raises WrappedError: An error by the Go library - - :return: The configuration and configuration type ('openvpn' or 'wireguard') - :rtype: Config - """ - return self.get_config( - url, self.lib.GetConfigInstituteAccess, prefer_tcp, tokens - ) - - def get_config_secure_internet( - self, org_id: str, prefer_tcp: bool = False, tokens: Optional[Token] = None - ) -> Optional[Config]: - """Get an OpenVPN/WireGuard configuration from a secure internet server - - :param org_id: str: The organization ID of the secure internet server. Use the one from Discovery - :param prefer_tcp: bool: (Default value = False): Whether or not to prefer TCP - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available - - :raises WrappedError: An error by the Go library - - :return: The configuration and configuration type ('openvpn' or 'wireguard') - :rtype: Config - """ - return self.get_config( - org_id, self.lib.GetConfigSecureInternet, prefer_tcp, tokens - ) - - def go_back(self) -> None: - """Go back in the FSM""" - # Ignore the error - self.go_function(self.lib.GoBack) - - def set_connected(self) -> None: - """Set the FSM to connected - - :raises WrappedError: An error by the Go library - """ - connect_err = self.go_function(self.lib.SetConnected) - - if connect_err: - raise connect_err - - def set_disconnecting(self) -> None: - """Set the FSM to disconnecting - - :raises WrappedError: An error by the Go library - """ - disconnecting_err = self.go_function(self.lib.SetDisconnecting) - - if disconnecting_err: - raise disconnecting_err - - def set_connecting(self) -> None: - """Set the FSM to connecting - - :raises WrappedError: An error by the Go library - """ - connecting_err = self.go_function(self.lib.SetConnecting) - - if connecting_err: - raise connecting_err - - def cleanup(self, tokens: Optional[Token] = None) -> None: + def cleanup(self, tokens: str = "") -> None: """Cleanup the vpn connection - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available + :param tokens: str (Default value = ""): The OAuth tokens if available :raises WrappedError: An error by the Go library """ - cleanup_err = self.go_function(self.lib.Cleanup, encode_tokens(tokens)) + cleanup_err = self.go_function(self.lib.Cleanup, tokens) if cleanup_err: - raise cleanup_err - - def set_disconnected( - self, - ) -> None: - """Set the FSM to disconnected - - :param cleanup: bool: (Default value = True): Whether or not to call /disconnect to the server. This invalidates the OpenVPN/WireGuard configuration - :param tokens: Optional[Token] (Default value = None): The OAuth tokens if available - - :raises WrappedError: An error by the Go library - """ - disconnect_err = self.go_function(self.lib.SetDisconnected) - - if disconnect_err: - raise disconnect_err - - def set_search_server(self) -> None: - """Set the FSM to search server - - :raises WrappedError: An error by the Go library - """ - search_err = self.go_function(self.lib.SetSearchServer) - - if search_err: - raise search_err - - def remove_class_callbacks(self, cls: Any) -> None: - """Remove class callbacks - - :param cls: Any: The class to remove callbacks for - - """ - self.event_handler.change_class_callbacks(cls, add=False) - - def register_class_callbacks(self, cls: Any) -> None: - """Register class callbacks - - :param cls: Any: The class to register callbacks for - - """ - self.event_handler.change_class_callbacks(cls) - - @property - def event(self) -> EventHandler: - """The property that gets the event handler - - :return: The event handler - :rtype: EventHandler - """ - return self.event_handler - - def callback(self, old_state: State, new_state: State, data: Any) -> bool: - """Run an event callback - - :param old_state: State: The previous state - :param new_state: State: The new state - :param data: Any: The data to pass to the event - - """ - return self.event.run(old_state, new_state, data) + forwardError(cleanup_err) def token_calback(self, srv: Server, tok: Token): if self.token_callback is None: @@ -471,23 +215,8 @@ class EduVPN(object): # If there is a profile event, set it so that the wait callback finishes # And so that the Go code can move to the next state - if self.profile_event: - self.profile_event.set() - if profile_err: - raise profile_err - - def change_secure_location(self) -> None: - """Change the secure location. This calls the necessary events - - :raises WrappedError: An error by the Go library - """ - # Set the location by country code - self.location_event = threading.Event() - location_err = self.go_function(self.lib.ChangeSecureLocation) - - if location_err: - raise location_err + forwardError(profile_err) def set_secure_location(self, country_code: str) -> None: """Set the secure location @@ -501,11 +230,8 @@ class EduVPN(object): # If there is a location event, set it so that the wait callback finishes # And so that the Go code can move to the next state - if self.location_event: - self.location_event.set() - if location_err: - raise location_err + forwardError(location_err) def renew_session(self) -> None: """Renew the session. This invalidates the tokens and runs the necessary callbacks to log back in @@ -515,7 +241,7 @@ class EduVPN(object): renew_err = self.go_function(self.lib.RenewSession) if renew_err: - raise renew_err + forwardError(renew_err) def set_support_wireguard(self, support: bool) -> None: """Indicates whether or not the OS supports WireGuard connections. @@ -527,7 +253,7 @@ class EduVPN(object): support_err = self.go_function(self.lib.SetSupportWireguard, support) if support_err: - raise support_err + forwardError(support_err) def should_renew_button(self) -> bool: """Whether or not the UI should show the renew button @@ -537,48 +263,6 @@ class EduVPN(object): """ return self.go_function(self.lib.ShouldRenewButton) - def in_fsm_state(self, state_id: State) -> bool: - """Check whether or not the FSM is in the provided state - - :param state_id: State: The state to check for - - :return: Whether or not the FSM is in the provided state - :rtype: bool - """ - return self.go_function(self.lib.InFSMState, state_id) - - def get_current_server(self) -> Optional[Server]: - """Get the current server - - :return: The current servers if there is any - :rtype: Optional[List[Servers]] - """ - server, server_err = self.go_function( - self.lib.GetCurrentServer, - decode_func=lambda lib, x: get_data_error(lib, x, get_transition_server), - ) - - if server_err: - raise server_err - - return server - - def get_saved_servers(self) -> Optional[List[Server]]: - """Get a list of saved servers - - :return: The list of Servers if there are any - :rtype: Optional[List[Servers]] - """ - servers, servers_err = self.go_function( - self.lib.GetSavedServers, - decode_func=lambda lib, x: get_data_error(lib, x, get_servers), - ) - - if servers_err: - raise servers_err - - return servers - def start_failover( self, gateway: str, wg_mtu: int, readrxbytes: ReadRxBytes ) -> bool: @@ -590,16 +274,16 @@ class EduVPN(object): decode_func=lambda lib, x: get_data_error(lib, x, get_bool), ) if dropped_err: - raise dropped_err + forwardError(dropped_err) return dropped def cancel_failover(self): cancel_err = self.go_function(self.lib.CancelFailover) if cancel_err: - raise cancel_err + forwardError(cancel_err) -eduvpn_objects: Dict[str, EduVPN] = {} +callback_object: Optional[Callable] = None @UpdateToken @@ -616,50 +300,18 @@ def token_callback(name: bytes, srv, tok): @VPNStateChange -def state_callback(name: bytes, old_state: int, new_state: int, data: Any) -> int: +def state_callback(old_state: int, new_state: int, data: str) -> int: """The internal callback that is passed to the Go library - :param name: bytes: The name of the client :param old_state: int: The old state :param new_state: int: The new state - :param data: Any: The data that still needs to be converted + :param data: str: The data that still needs to be converted by parsing the JSON :meta private: """ - name_decoded = name.decode() - if name_decoded not in eduvpn_objects: + if callback_object is None: return 0 - handled = eduvpn_objects[name_decoded].callback( - State(old_state), State(new_state), data - ) + handled = callback_object(old_state, new_state, data.decode("utf-8")) if handled: return 1 return 0 - - -def add_as_global_object(eduvpn: EduVPN) -> bool: - """Add the provided parameter to the global objects lists so we can call the callback - - :param eduvpn: EduVPN: The class to add - - :meta private: - - :return: Whether or not the object was added - :rtype: bool - """ - global eduvpn_objects - if eduvpn.name not in eduvpn_objects: - eduvpn_objects[eduvpn.name] = eduvpn - return True - return False - - -def remove_as_global_object(eduvpn: EduVPN) -> None: - """Remove the provided parameter from the global objects list - - :param eduvpn: EduVPN: The class to remove - - :meta private: - """ - global eduvpn_objects - eduvpn_objects.pop(eduvpn.name, None) diff --git a/wrappers/python/eduvpn_common/server.py b/wrappers/python/eduvpn_common/server.py deleted file mode 100644 index cd3ad15..0000000 --- a/wrappers/python/eduvpn_common/server.py +++ /dev/null @@ -1,441 +0,0 @@ -from ctypes import CDLL, POINTER, c_void_p, cast -from datetime import datetime -from typing import List, Optional, Type - -from eduvpn_common.types import ( - cConfig, - cServer, - cServerLocations, - cServerProfiles, - cServers, - cToken, -) - - -class Profile: - """The class that represents a server profile. - - :param: identifier: str: The identifier (id) of the profile - :param: display_name: str: The display name of the profile - :param: default_gateway: str: Whether or not this profile should have the default gateway set - """ - - def __init__(self, identifier: str, display_name: str, default_gateway: bool): - self.identifier = identifier - self.display_name = display_name - self.default_gateway = default_gateway - - def __str__(self): - return self.display_name - - -class Token: - """The class that represents oauth Tokens - - :param: access: str: The access token - :param: refresh: str: The refresh token - :param: expired: int: The expire unix time - """ - - def __init__(self, access: str, refresh: str, expired: int): - self.access = access - self.refresh = refresh - self.expires = expired - - -class Config: - """The class that represents an OpenVPN/WireGuard config - - :param: config: str: The config string - :param: config_type: str: The type of config, openvpn/wireguard - :param: tokens: Optional[Token]: The tokens - """ - - def __init__(self, config: str, config_type: str, tokens: Optional[Token]): - self.config = config - self.config_type = config_type - self.tokens = tokens - - def __str__(self): - return self.config - - -class Profiles: - """The class that represents a list of profiles - - :param: profiles: List[Profile]: A list of profiles - :param: current: int: The current profile index - """ - - def __init__(self, profiles: List[Profile], current: int): - self.profiles = profiles - self.current_index = current - - @property - def current(self) -> Optional[Profile]: - """Get the current profile if there is any - - :return: The profile if there is a current one (meaning the index is valid) - :rtype: Optional[Profile] - """ - if self.current_index < len(self.profiles): - return self.profiles[self.current_index] - return None - - -class Server: - """The class that represents a server. Use this for a custom server - - :param: url: str: The base URL of the server. In case of secure internet (supertype) this is the organisation ID URL - :param: display_name: str: The display name of the server - :param: profiles: Optional[Profiles]: The profiles if there are any already obtained, defaults to None - :param: expire_time: int: The expiry time in a Unix timestamp, defaults to 0 - """ - - def __init__( - self, - url: str, - display_name: str, - profiles: Optional[Profiles] = None, - expire_time: int = 0, - ): - self.url = url - self.display_name = display_name - self.profiles = profiles - self.expire_time = datetime.fromtimestamp(expire_time) - - def __str__(self): - return self.display_name - - @property - def category(self) -> str: - """Return the category of the server as a string - - :return: The category string, "Custom Server" - :rtype: str - """ - return "Custom Server" - - -class InstituteServer(Server): - """The class that represents an Institute Access Server - - :param: url: str: The base URL of the Institute Access Server - :param: display_name: str: The display name of the Institute Access Server - :param: support_contact: List[str]: The list of support contacts - :param: profiles: Optional[Profiles]: The profiles of the server if there are any - :param: expire_time: int: The expiry time in a Unix timestamp - """ - - def __init__( - self, - url: str, - display_name: str, - support_contact: List[str], - profiles: Optional[Profiles], - expire_time: int, - ): - super().__init__(url, display_name, profiles, expire_time) - self.support_contact = support_contact - - @property - def category(self) -> str: - """Return the category of the institute server as a string - - :return: The category string, "Institute Access Server" - :rtype: str - """ - return "Institute Access Server" - - -class SecureInternetServer(Server): - """The class that represents a Secure Internet Server - - :param: org_id: str: The organization ID of the Secure Internet Server as returned by Discovery - :param: display_name: str: The display name of the server - :param: support_contact: List[str]: The list of support contacts of the server - :param: locations: List[str]: The list of secure internet locations - :param: profiles: Optional[Profiles]: The list of profiles if there are any - :param: expire_time: int: The expiry time in a Unix timestamp - :param: country_code: str: The country code of the server - """ - - def __init__( - self, - org_id: str, - display_name: str, - support_contact: List[str], - locations: List[str], - profiles: Optional[Profiles], - expire_time: int, - country_code: str, - ): - super().__init__(org_id, display_name, profiles, expire_time) - self.org_id = org_id - self.support_contact = support_contact - self.locations = locations - self.country_code = country_code - - @property - def category(self) -> str: - """Return the category of the secure internet server as a string - - :return: The category string, "Secure Internet Server" - :rtype: str - """ - return "Secure Internet Server" - - -def get_type_for_str(type_str: str) -> Type[Server]: - """Get the right class type for a certain string input - - :param type_str: str: The string that represents the type of server, one of secure_internet, institute_access, custom_server - - :return: The server, defaults to Institute Server if an invalid input is given - :rtype: Type[Server] - """ - if type_str == "secure_internet": - return SecureInternetServer - if type_str == "custom_server": - return Server - return InstituteServer - - -def get_locations_from_ptr(ptr) -> List[str]: - """Get the locations from the Go shared library and convert it to a Python usable structure - - :param ptr: The pointer to the List[str] locations as returned by the Go library - - :meta private: - - :return: Locations if there are any - :rtype: List[str] - """ - if not ptr: - return [] - locations = cast(ptr, POINTER(cServerLocations)).contents - location_list = [] - for i in range(locations.total_locations): - location_list.append(locations.locations[i].decode("utf-8")) - return location_list - - -def get_profiles(ptr) -> Optional[Profiles]: - """Get the profiles from the Go shared library and convert it to a Python usable structure - - :param ptr: The pointer to the Profiles as returned by the Go library - - :meta private: - - :return: Profiles if there are any - :rtype: Optional[Profiles] - """ - if not ptr: - return None - profiles = [] - _profiles = ptr.contents - current_profile = _profiles.current - if not _profiles.profiles: - return None - for i in range(_profiles.total_profiles): - if not _profiles.profiles[i]: - continue - profile = _profiles.profiles[i].contents - profiles.append( - Profile( - profile.identifier.decode("utf-8"), - profile.display_name.decode("utf-8"), - profile.default_gateway == 1, - ) - ) - return Profiles(profiles, current_profile) - - -def get_server(ptr, _type: Optional[str] = None) -> Optional[Server]: - """Get the server from the Go shared library and convert it to a Python usable structure - - :param ptr: The pointer as returned by the Go library - :param _type: (Default value = None): The optional parameter that represents whether or not the type is enforced to the input. If None it is automatically determined - - :meta private: - - :return: Server if there is any - :rtype: Optional[Server] - """ - if not ptr: - return None - - current_server = ptr.contents - if _type is None: - _type = get_type_for_str(current_server.server_type.decode("utf-8")) - - identifier = current_server.identifier.decode("utf-8") - display_name = current_server.display_name.decode("utf-8") - - if _type is not Server: - support_contact = [] - for i in range(current_server.total_support_contact): - support_contact.append(current_server.support_contact[i].decode("utf-8")) - locations = get_locations_from_ptr(current_server.locations) - profiles = get_profiles(current_server.profiles) - if profiles is None: - profiles = Profiles([], 0) - if _type is SecureInternetServer: - return SecureInternetServer( - identifier, - display_name, - support_contact, - locations, - profiles, - current_server.expire_time, - current_server.country_code.decode("utf-8"), - ) - if _type is InstituteServer: - return InstituteServer( - identifier, - display_name, - support_contact, - profiles, - current_server.expire_time, - ) - return Server(identifier, display_name, profiles, current_server.expire_time) - - -def get_transition_server(lib: CDLL, ptr: c_void_p) -> Optional[Server]: - """Get a server from a transition event - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The Go's returned C pointer that represents the Server - - :meta private: - - :return: The server if there is any - :rtype: Optional[Server] - """ - if ptr: - server = get_server(cast(ptr, POINTER(cServer))) - lib.FreeServer(ptr) - return server - return None - - -def get_transition_profiles(lib: CDLL, ptr: c_void_p) -> Optional[Profiles]: - """Get profiles from a transition event - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The Go's returned C pointer that represents the profiles - - :meta private: - - :return: The profiles if there is any - :rtype: Optional[Profiles] - """ - if ptr: - profiles = get_profiles(cast(ptr, POINTER(cServerProfiles))) - lib.FreeProfiles(ptr) - return profiles - return None - - -def get_servers(lib: CDLL, ptr: c_void_p) -> Optional[List[Server]]: - """Get servers from the Go library as a C structure and return a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The C pointer to the servers structure - - :meta private: - - :return: The list of Servers if there is any - :rtype: Optional[List[Server]] - """ - if ptr: - returned = [] - servers = cast(ptr, POINTER(cServers)).contents - if servers.custom_servers: - for i in range(servers.total_custom): - current = get_server(servers.custom_servers[i], Server) - if current is None: - continue - returned.append(current) - - if servers.institute_servers: - for i in range(servers.total_institute): - current = get_server(servers.institute_servers[i], InstituteServer) - if current is None: - continue - returned.append(current) - - if servers.secure_internet: - current = get_server(servers.secure_internet, SecureInternetServer) - if current is not None: - returned.append(current) - lib.FreeServers(ptr) - return returned - return None - - -def get_locations(lib: CDLL, ptr: c_void_p) -> Optional[List[str]]: - """Get locations from the Go library as a C structure and return a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The C pointer to the locations structure - - :meta private: - - :return: The list of servers if there are any - :rtype: Optional[List[str]] - """ - if ptr: - location_list = get_locations_from_ptr(ptr) - lib.FreeSecureLocations(ptr) - return location_list - return None - - -def get_tokens(lib: CDLL, ptr: c_void_p) -> Optional[Token]: - if ptr: - toks = cast(ptr, POINTER(cToken)).contents - access = toks.access.decode("utf-8") - refresh = toks.refresh.decode("utf-8") - expired = toks.expired - lib.FreeTokens(ptr) - return Token( - access, refresh, expired - ) - return None - - -def get_config(lib: CDLL, ptr: c_void_p) -> Optional[Config]: - """Get the config from the Go library as a C structure and return a Python usable structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The C pointer to the confg structure - - :meta private: - - :return: The configuration if there is any - :rtype: Optional[Config] - """ - if ptr: - config = cast(ptr, POINTER(cConfig)).contents - cfg = config.config.decode("utf-8") - cfg_type = config.config_type.decode("utf-8") - tokens = None - if config.token: - token_struct = config.token.contents - tokens = Token( - token_struct.access.decode("utf-8"), - token_struct.refresh.decode("utf-8"), - token_struct.expired, - ) - - config_class = Config(cfg, cfg_type, tokens) - lib.FreeConfig(ptr) - return config_class - return None - - -def encode_tokens(arg: Optional[Token]) -> cToken: - if arg is None: - return cToken("".encode("utf-8"), "".encode("utf-8"), 0) - return cToken(arg.access.encode("utf-8"), arg.refresh.encode("utf-8"), arg.expires) diff --git a/wrappers/python/eduvpn_common/state.py b/wrappers/python/eduvpn_common/state.py deleted file mode 100644 index 21573f7..0000000 --- a/wrappers/python/eduvpn_common/state.py +++ /dev/null @@ -1,32 +0,0 @@ -from enum import IntEnum - - -class StateType(IntEnum): - """ - The State Type enum. Wait types are mostly used for internal code - """ - - ENTER = 1 - LEAVE = 2 - WAIT = 3 - - -class State(IntEnum): - """ - The State enum. Each state here also exists in the Go library - """ - - DEREGISTERED = 0 - NO_SERVER = 1 - ASK_LOCATION = 2 - SEARCH_SERVER = 3 - LOADING_SERVER = 4 - CHOSEN_SERVER = 5 - OAUTH_STARTED = 6 - AUTHORIZED = 7 - REQUEST_CONFIG = 8 - ASK_PROFILE = 9 - DISCONNECTED = 10 - DISCONNECTING = 11 - CONNECTING = 12 - CONNECTED = 13 diff --git a/wrappers/python/eduvpn_common/types.py b/wrappers/python/eduvpn_common/types.py index 32a7a00..ea5bbd2 100644 --- a/wrappers/python/eduvpn_common/types.py +++ b/wrappers/python/eduvpn_common/types.py @@ -1,186 +1,6 @@ -from ctypes import ( - CDLL, - CFUNCTYPE, - POINTER, - Structure, - c_char_p, - c_int, - c_size_t, - c_ulonglong, - c_void_p, - cast, - pointer, -) -from typing import Any, Callable, Iterator, List, Optional, Tuple - -from eduvpn_common.error import WrappedError - - -class cToken(Structure): - """The C type that represents the Token as forwarded to the Go library - - :meta private: - """ - - _fields_ = [ - ("access", c_char_p), - ("refresh", c_char_p), - ("expired", c_ulonglong), - ] - - -class cConfig(Structure): - """The C type that represents the data that gets by the Go library returned when a config is obtained - - :meta private: - """ - - _fields_ = [ - ("config", c_char_p), - ("config_type", c_char_p), - ("token", POINTER(cToken)), - ] - - -class cError(Structure): - """The C type that represents the Error as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("traceback", c_char_p), - ("cause", c_char_p), - ] - - -class cServerLocations(Structure): - """The C type that represents the Server Locations as returned by the Go library - - :meta private: - """ - - _fields_ = [("locations", POINTER(c_char_p)), ("total_locations", c_size_t)] - - -class cDiscoveryOrganization(Structure): - """The C type that represents a Discovery Organization as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("display_name", c_char_p), - ("org_id", c_char_p), - ("secure_internet_home", c_char_p), - ("keyword_list", c_char_p), - ] - - -class cDiscoveryOrganizations(Structure): - """The C type that represents Discovery Organizations as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("version", c_ulonglong), - ("organizations", POINTER(POINTER(cDiscoveryOrganization))), - ("total_organizations", c_size_t), - ] - - -class cDiscoveryServer(Structure): - """The C type that represents a Discovery Server as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("authentication_url_template", c_char_p), - ("base_url", c_char_p), - ("country_code", c_char_p), - ("display_name", c_char_p), - ("keyword_list", c_char_p), - ("public_key_list", POINTER(c_char_p)), - ("total_public_keys", c_size_t), - ("server_type", c_char_p), - ("support_contact", POINTER(c_char_p)), - ("total_support_contact", c_size_t), - ] - - -class cDiscoveryServers(Structure): - """The C type that represents Discovery Servers as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("version", c_ulonglong), - ("servers", POINTER(POINTER(cDiscoveryServer))), - ("total_servers", c_size_t), - ] - - -class cServerProfile(Structure): - """The C type that represents a Server Profile as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("identifier", c_char_p), - ("display_name", c_char_p), - ("default_gateway", c_int), - ] - - -class cServerProfiles(Structure): - """The C type that represents Server Profiles as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("current", c_int), - ("profiles", POINTER(POINTER(cServerProfile))), - ("total_profiles", c_size_t), - ] - - -class cServer(Structure): - """The C type that represents a Server as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("identifier", c_char_p), - ("display_name", c_char_p), - ("server_type", c_char_p), - ("country_code", c_char_p), - ("support_contact", POINTER(c_char_p)), - ("total_support_contact", c_size_t), - ("locations", POINTER(cServerLocations)), - ("profiles", POINTER(cServerProfiles)), - ("expire_time", c_ulonglong), - ] - - -class cServers(Structure): - """The C type that represents Servers as returned by the Go library - - :meta private: - """ - - _fields_ = [ - ("custom_servers", POINTER(POINTER(cServer))), - ("total_custom", c_size_t), - ("institute_servers", POINTER(POINTER(cServer))), - ("total_institute", c_size_t), - ("secure_internet", POINTER(cServer)), - ] +from ctypes import (CDLL, CFUNCTYPE, Structure, c_char_p, c_int, c_ulonglong, + c_void_p, cast) +from typing import Any, Iterator, List, Tuple class DataError(Structure): @@ -193,7 +13,7 @@ class DataError(Structure): # The type for a Go state change callback -VPNStateChange = CFUNCTYPE(c_int, c_char_p, c_int, c_int, c_void_p) +VPNStateChange = CFUNCTYPE(c_int, c_int, c_int, c_char_p) ReadRxBytes = CFUNCTYPE(c_ulonglong) UpdateToken = CFUNCTYPE(None, c_char_p, c_void_p, c_void_p) @@ -231,7 +51,7 @@ def decode_res(res: Any) -> Any: """ decode_map = { c_int: get_bool, - c_void_p: get_error, + c_void_p: get_ptr_string, DataError: get_data_error, } return decode_map.get(res, lambda lib, x: x) @@ -257,62 +77,22 @@ def get_ptr_string(lib: CDLL, ptr: c_void_p) -> str: return "" -def get_ptr_list_strings(lib: CDLL, strings: pointer, total_strings: int) -> List[str]: - """Convert a list of C strings to a Python usable list of strings - This list is not freed here but is later freed in the Go library for convenience - - :param lib: CDLL: The Go shared library - :param strings: pointer: The C pointer to the strings list - :param total_strings: int: The total strings in the list - - :meta private: - - :return: The list of strings converted to Python - :rtype: List[str] - """ - if strings: - strings_list = [] - for i in range(total_strings): - strings_list.append(strings[i].decode("utf-8")) - return strings_list - return [] - - -def get_error(lib: CDLL, ptr: c_void_p) -> Optional[WrappedError]: - """Convert a C error structure to a Python usable error structure - - :param lib: CDLL: The Go shared library - :param ptr: c_void_p: The pointer to the C error struct - - :meta private: - - :return: The error if there is one - :rtype: Optional[WrappedError] - """ - if not ptr: - return None - err = cast(ptr, POINTER(cError)).contents - wrapped = WrappedError(err.traceback.decode("utf-8"), err.cause.decode("utf-8")) - lib.FreeError(ptr) - return wrapped - - def get_data_error( - lib: CDLL, data_error: DataError, data_conv: Callable = get_ptr_string -) -> Tuple[Any, Optional[WrappedError]]: + lib: CDLL, data_error: DataError, data_conv: Any = get_ptr_string +) -> Tuple[str, str]: """Convert a C data+error structure to a Python usable data+error structure :param lib: CDLL: The Go shared library :param data_error: DataError: The data error C structure - :param data_conv: Callable: (Default value = get_ptr_string): The function that converts the data part + :param data_conv: Any: The function to convert the data :meta private: - :return: The data and optional error - :rtype: Tuple[Any, Optional[WrappedError]] + :return: The data and error + :rtype: Tuple[str, str] """ data = data_conv(lib, data_error.data) - error = get_error(lib, data_error.error) + error = get_ptr_string(lib, data_error.error) return data, error -- cgit v1.2.3