forked from robinhood/faust
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
101 lines (75 loc) · 2.93 KB
/
auth.py
File metadata and controls
101 lines (75 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Authentication Credentials."""
import ssl
from typing import Any, Optional, Union
from faust.types.auth import AuthProtocol, CredentialsT, SASLMechanism
__all__ = [
'Credentials',
'SASLCredentials',
'GSSAPICredentials',
'SSLCredentials',
]
class Credentials(CredentialsT):
"""Base class for authentication credentials."""
class SASLCredentials(Credentials):
"""Describe SASL credentials."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.PLAIN
username: Optional[str]
password: Optional[str]
ssl_context: Optional[ssl.SSLContext]
def __init__(self, *,
username: str = None,
password: str = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None) -> None:
self.username = username
self.password = password
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return f'<{type(self).__name__}: username={self.username}>'
class GSSAPICredentials(Credentials):
"""Describe GSSAPI credentials over SASL."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.GSSAPI
ssl_context: Optional[ssl.SSLContext]
def __init__(self, *,
kerberos_service_name: str = 'kafka',
kerberos_domain_name: str = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None) -> None:
self.kerberos_service_name = kerberos_service_name
self.kerberos_domain_name = kerberos_domain_name
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return '<{0}: kerberos service={1!r} domain={2!r}'.format(
type(self).__name__,
self.kerberos_service_name,
self.kerberos_domain_name,
)
class SSLCredentials(Credentials):
"""Describe SSL credentials/settings."""
protocol = AuthProtocol.SSL
context: ssl.SSLContext
def __init__(self, context: ssl.SSLContext = None, *,
purpose: Any = None,
cafile: Optional[str] = None,
capath: Optional[str] = None,
cadata: Optional[str] = None) -> None:
if context is None:
context = ssl.create_default_context(
purpose=purpose,
cafile=cafile,
capath=capath,
cadata=cadata,
)
self.context = context
def __repr__(self) -> str:
return f'<{type(self).__name__}: context={self.context}>'