Skip to content

Auth

CredentialProvider

Bases: ABC

Abstract class for different credential providers

Source code in src/pyfabricops/api/auth.py
 99
100
101
102
103
104
105
class CredentialProvider(ABC):
    """Abstract class for different credential providers"""

    @abstractmethod
    def get_credentials(self) -> Dict[str, str]:
        """Return the necessary credentials"""
        pass

get_credentials() abstractmethod

Return the necessary credentials

Source code in src/pyfabricops/api/auth.py
102
103
104
105
@abstractmethod
def get_credentials(self) -> Dict[str, str]:
    """Return the necessary credentials"""
    pass

EnvCredentialProvider

Bases: CredentialProvider

Environment variable credential provider

Source code in src/pyfabricops/api/auth.py
108
109
110
111
112
113
114
115
116
117
118
119
120
class EnvCredentialProvider(CredentialProvider):
    """Environment variable credential provider"""

    def get_credentials(self) -> Dict[str, str]:
        load_dotenv()
        return {
            'fab_client_id': os.getenv('FAB_CLIENT_ID'),
            'fab_client_secret': os.getenv('FAB_CLIENT_SECRET'),
            'fab_tenant_id': os.getenv('FAB_TENANT_ID'),
            'fab_username': os.getenv('FAB_USERNAME'),
            'fab_password': os.getenv('FAB_PASSWORD'),
            'github_token': os.getenv('GH_TOKEN'),
        }

OAuthProvider

OAuth interactive authentication provider

Source code in src/pyfabricops/api/auth.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class OAuthProvider:
    """OAuth interactive authentication provider"""

    def __init__(self, cache: TokenCache):
        self.cache = cache

    def get_token(
        self, audience: Literal['fabric', 'powerbi'] = 'fabric'
    ) -> Dict:
        scope = FABRIC_SCOPE if audience == 'fabric' else POWERBI_SCOPE
        token_key = f'{audience.upper()}_INTERACTIVE'

        # Check if cached token is still valid
        if self.cache.is_token_valid(token_key):
            return self.cache.get_token(token_key)

        logger.info('Opening browser for user authentication...')
        credential = InteractiveBrowserCredential()
        new_token = credential.get_token(scope)

        if not new_token:
            raise ResourceNotFoundError('Access token not found.')

        logger.success('Token retrieved successfully.')

        # Calculate expires_in based on expires_on
        expires_in = int(new_token.expires_on - time.time())
        self.cache.store_token(token_key, new_token.token, expires_in)

        return self.cache.get_token(token_key)

TokenCache

Manage the token cache in a temporary file

Source code in src/pyfabricops/api/auth.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class TokenCache:
    """Manage the token cache in a temporary file"""

    CACHE_TEMPLATE = {
        'FABRIC_SPN': {'access_token': '', 'expires_at': 0},
        'FABRIC_USER': {'access_token': '', 'expires_at': 0},
        'FABRIC_INTERACTIVE': {'access_token': '', 'expires_at': 0},
        'POWERBI_SPN': {'access_token': '', 'expires_at': 0},
        'POWERBI_USER': {'access_token': '', 'expires_at': 0},
        'POWERBI_INTERACTIVE': {'access_token': '', 'expires_at': 0},
    }

    def __init__(self, cache_file: Optional[str] = None):
        self.cache_file = cache_file or os.path.join(
            tempfile.gettempdir(), 'pf_token_cache.json'
        )
        self._init_cache()

    def _init_cache(self):
        """Initialize the cache file if it does not exist"""
        if not os.path.exists(self.cache_file):
            with open(self.cache_file, 'w') as f:
                json.dump(self.CACHE_TEMPLATE, f)

    def load_tokens(self) -> Dict:
        """Load tokens from cache"""
        try:
            with open(self.cache_file, 'r') as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            self._init_cache()
            return self.CACHE_TEMPLATE.copy()

    def save_tokens(self, tokens: Dict):
        """Save tokens to cache"""
        with open(self.cache_file, 'w') as f:
            json.dump(tokens, f, indent=4)

    def get_token(self, token_key: str) -> Optional[Dict]:
        """Get a specific token from cache"""
        tokens = self.load_tokens()
        return tokens.get(token_key)

    def is_token_valid(
        self, token_key: str, buffer_seconds: int = 300
    ) -> bool:
        """Check if a token is still valid"""
        token_data = self.get_token(token_key)
        if not token_data or not token_data.get('access_token'):
            return False

        now = time.time()
        expires_at = token_data.get('expires_at', 0)
        return (expires_at - now) > buffer_seconds

    def store_token(self, token_key: str, access_token: str, expires_in: int):
        """Store a new token in cache"""
        tokens = self.load_tokens()
        tokens[token_key] = {
            'access_token': access_token,
            'expires_at': time.time() + expires_in,
        }
        self.save_tokens(tokens)

    def clear_cache(self):
        """Clear the token cache by deleting the cache file"""
        if os.path.exists(self.cache_file):
            os.remove(self.cache_file)
            logger.info(f'Token cache cleared: {self.cache_file}')
        else:
            logger.warning(f'Cache file not found: {self.cache_file}')

clear_cache()

Clear the token cache by deleting the cache file

Source code in src/pyfabricops/api/auth.py
90
91
92
93
94
95
96
def clear_cache(self):
    """Clear the token cache by deleting the cache file"""
    if os.path.exists(self.cache_file):
        os.remove(self.cache_file)
        logger.info(f'Token cache cleared: {self.cache_file}')
    else:
        logger.warning(f'Cache file not found: {self.cache_file}')

get_token(token_key)

Get a specific token from cache

Source code in src/pyfabricops/api/auth.py
64
65
66
67
def get_token(self, token_key: str) -> Optional[Dict]:
    """Get a specific token from cache"""
    tokens = self.load_tokens()
    return tokens.get(token_key)

is_token_valid(token_key, buffer_seconds=300)

Check if a token is still valid

Source code in src/pyfabricops/api/auth.py
69
70
71
72
73
74
75
76
77
78
79
def is_token_valid(
    self, token_key: str, buffer_seconds: int = 300
) -> bool:
    """Check if a token is still valid"""
    token_data = self.get_token(token_key)
    if not token_data or not token_data.get('access_token'):
        return False

    now = time.time()
    expires_at = token_data.get('expires_at', 0)
    return (expires_at - now) > buffer_seconds

load_tokens()

Load tokens from cache

Source code in src/pyfabricops/api/auth.py
50
51
52
53
54
55
56
57
def load_tokens(self) -> Dict:
    """Load tokens from cache"""
    try:
        with open(self.cache_file, 'r') as f:
            return json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        self._init_cache()
        return self.CACHE_TEMPLATE.copy()

save_tokens(tokens)

Save tokens to cache

Source code in src/pyfabricops/api/auth.py
59
60
61
62
def save_tokens(self, tokens: Dict):
    """Save tokens to cache"""
    with open(self.cache_file, 'w') as f:
        json.dump(tokens, f, indent=4)

store_token(token_key, access_token, expires_in)

Store a new token in cache

Source code in src/pyfabricops/api/auth.py
81
82
83
84
85
86
87
88
def store_token(self, token_key: str, access_token: str, expires_in: int):
    """Store a new token in cache"""
    tokens = self.load_tokens()
    tokens[token_key] = {
        'access_token': access_token,
        'expires_at': time.time() + expires_in,
    }
    self.save_tokens(tokens)

TokenManager

Main token and authentication manager

Source code in src/pyfabricops/api/auth.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class TokenManager:
    """Main token and authentication manager"""

    def __init__(self, auth_provider: Literal['env', 'oauth'] = 'env'):
        self.cache = TokenCache()
        self.auth_provider = auth_provider
        self._credential_providers = {
            'env': EnvCredentialProvider(),
        }
        self.oauth_provider = OAuthProvider(self.cache)

    def set_auth_provider(self, source: Literal['env', 'oauth'] = 'env'):
        """Define the authentication provider"""
        if source not in ['env', 'oauth']:
            raise OptionNotAvailableError(
                f'Source not available. Available: env, oauth. Got: {source}'
            )
        self.auth_provider = source

    def _build_token_payload(
        self,
        audience: Literal['fabric', 'powerbi'],
        credential_type: Literal['spn', 'user'],
        credentials: Dict[str, str],
    ) -> Dict:
        """Construct the payload for token request"""
        payload = {
            'client_id': credentials['fab_client_id'],
            'client_secret': credentials['fab_client_secret'],
            'tenant_id': credentials['fab_tenant_id'],
            'grant_type': 'client_credentials'
            if credential_type == 'spn'
            else 'password',
            'scope': FABRIC_SCOPE if audience == 'fabric' else POWERBI_SCOPE,
        }

        if credential_type == 'user':
            payload['username'] = credentials['fab_username']
            payload['password'] = credentials['fab_password']

        return payload

    def _retrieve_token_from_api(
        self,
        audience: Literal['fabric', 'powerbi'],
        credential_type: Literal['spn', 'user'],
    ) -> Dict:
        """Makes an HTTP request to retrieve the token"""
        if self.auth_provider not in self._credential_providers:
            raise OptionNotAvailableError(
                f'Invalid auth provider: {self.auth_provider}'
            )

        credentials = self._credential_providers[
            self.auth_provider
        ].get_credentials()
        tenant_id = credentials['fab_tenant_id']
        url = TOKEN_TEMPLATE.format(tenant_id=tenant_id)

        payload = self._build_token_payload(
            audience, credential_type, credentials
        )

        try:
            resp = requests.post(url, data=payload)
            if resp.status_code == 200:
                return resp.json()
            else:
                raise AuthenticationError(
                    f'Token request failed: {resp.status_code} - {resp.text}'
                )
        except Exception as e:
            raise AuthenticationError(f'Failed to retrieve token: {str(e)}')

    def get_token(
        self,
        audience: Literal['fabric', 'powerbi'] = 'fabric',
        credential_type: Literal['spn', 'user'] = 'spn',
    ) -> Dict:
        """Get a valid token, using cache when possible"""

        # OAuth uses a different flow
        if self.auth_provider == 'oauth':
            return self.oauth_provider.get_token(audience)

        # For env, use cache + API
        token_key = f'{audience.upper()}_{credential_type.upper()}'

        # Check if cached token is still valid
        if self.cache.is_token_valid(token_key):
            return self.cache.get_token(token_key)

        # Fetch new token from API
        token_response = self._retrieve_token_from_api(
            audience, credential_type
        )
        if not token_response:
            raise ResourceNotFoundError('Access token not found.')

        # Store in cache
        self.cache.store_token(
            token_key,
            token_response['access_token'],
            token_response['expires_in'],
        )

        return self.cache.get_token(token_key)

get_token(audience='fabric', credential_type='spn')

Get a valid token, using cache when possible

Source code in src/pyfabricops/api/auth.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def get_token(
    self,
    audience: Literal['fabric', 'powerbi'] = 'fabric',
    credential_type: Literal['spn', 'user'] = 'spn',
) -> Dict:
    """Get a valid token, using cache when possible"""

    # OAuth uses a different flow
    if self.auth_provider == 'oauth':
        return self.oauth_provider.get_token(audience)

    # For env, use cache + API
    token_key = f'{audience.upper()}_{credential_type.upper()}'

    # Check if cached token is still valid
    if self.cache.is_token_valid(token_key):
        return self.cache.get_token(token_key)

    # Fetch new token from API
    token_response = self._retrieve_token_from_api(
        audience, credential_type
    )
    if not token_response:
        raise ResourceNotFoundError('Access token not found.')

    # Store in cache
    self.cache.store_token(
        token_key,
        token_response['access_token'],
        token_response['expires_in'],
    )

    return self.cache.get_token(token_key)

set_auth_provider(source='env')

Define the authentication provider

Source code in src/pyfabricops/api/auth.py
166
167
168
169
170
171
172
def set_auth_provider(self, source: Literal['env', 'oauth'] = 'env'):
    """Define the authentication provider"""
    if source not in ['env', 'oauth']:
        raise OptionNotAvailableError(
            f'Source not available. Available: env, oauth. Got: {source}'
        )
    self.auth_provider = source

clear_token_cache()

Clear the token cache by deleting the cache file.

This will force all subsequent token requests to retrieve new tokens from the authentication provider.

Returns:

Type Description
None

None

Examples:

from pyfabricops.api.auth import clear_token_cache

# Clear all cached tokens
clear_token_cache()
Source code in src/pyfabricops/api/auth.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def clear_token_cache() -> None:
    """
    Clear the token cache by deleting the cache file.

    This will force all subsequent token requests to retrieve new tokens
    from the authentication provider.

    Returns:
        None

    Examples:
        ```python
        from pyfabricops.api.auth import clear_token_cache

        # Clear all cached tokens
        clear_token_cache()
        ```
    """
    global _token_manager
    _token_manager.cache.clear_cache()

set_auth_provider(source='env')

Set the authentication provider for token retrieval.

Parameters:

Name Type Description Default
source str

The provider of credentials. Can be "env" or "oauth".

'env'

Returns:

Type Description
None

None

Raises:

Type Description
OptionNotAvailableError

If the source is not one of the available options.

Examples:

Environment variables (.env, GitHub Secrets, Ado Secrets...)
set_auth_provider("env")
OAuth (Interactive)
set_auth_provider("oauth")
Source code in src/pyfabricops/api/auth.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def set_auth_provider(source: Literal['env', 'oauth'] = 'env') -> None:
    """
    Set the authentication provider for token retrieval.

    Args:
        source (str): The provider of credentials. Can be "env" or "oauth".

    Returns:
        None

    Raises:
        OptionNotAvailableError: If the source is not one of the available options.

    Examples:
        ### Environment variables (.env, GitHub Secrets, Ado Secrets...)
        ```python
        set_auth_provider("env")
        ```

        ### OAuth (Interactive)
        ```python
        set_auth_provider("oauth")
        ```
    """
    global _token_manager
    _token_manager.set_auth_provider(source)