-
Notifications
You must be signed in to change notification settings - Fork 2
/
AdAccountReader.py
96 lines (74 loc) · 3.35 KB
/
AdAccountReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from samba import drs_utils
from samba import net
from samba.dcerpc import drsuapi, misc
from typing import List
class AdAccountReader:
SecurityAttributesIds = [drsuapi.DRSUAPI_ATTID_supplementalCredentials, drsuapi.DRSUAPI_ATTID_unicodePwd, drsuapi.DRSUAPI_ATTID_ntPwdHistory]
def __init__(self, dc, lp, creds, samdb):
self.lp = lp
self.creds = creds
self.dc: str = dc
self._samdb = samdb
self._drsuapi_connection = None
@property
def samdb(self) :
return self._samdb
@property
def drsuapi_connection(self):
if self._drsuapi_connection is None:
self._drsuapi_connection = drs_utils.drsuapi_connect(self.dc, self.lp, self.creds)
return self._drsuapi_connection
def _get_account_attributes(self, account_dn, attributes_ids: List[int] = None):
destination_dsa_guid = misc.GUID(self.samdb.get_ntds_GUID())
source_dsa_invocation_id = misc.GUID()
bind, handle, _ = self.drsuapi_connection
req = drsuapi.DsGetNCChangesRequest8()
req.destination_dsa_guid = destination_dsa_guid
req.source_dsa_invocation_id = source_dsa_invocation_id
naming_context = drsuapi.DsReplicaObjectIdentifier()
naming_context.dn = account_dn
req.naming_context = naming_context
hwm = drsuapi.DsReplicaHighWaterMark()
hwm.tmp_highest_usn = 0
hwm.reserved_usn = 0
hwm.highest_usn = 0
req.highwatermark = hwm
req.uptodateness_vector = None
req.replica_flags = 0
req.max_object_count = 1
req.max_ndr_size = 402116
req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
if attributes_ids:
partial_attribute_set = drsuapi.DsPartialAttributeSet()
partial_attribute_set.version = 1
partial_attribute_set.attids = attributes_ids
partial_attribute_set.num_attids = len(attributes_ids)
req.partial_attribute_set = partial_attribute_set
else:
req.partial_attribute_set = None
req.partial_attribute_set_ex = None
req.mapping_ctr.num_mappings = 0
req.mapping_ctr.mappings = None
_, ctr = bind.DsGetNCChanges(handle, 8, req)
identifier = ctr.first_object.object.identifier
attributes = ctr.first_object.object.attribute_ctr.attributes
return bind, identifier, attributes
def get_account_attributes(self, account_dn, decrypted: bool = True, attributes_ids: List[int] = SecurityAttributesIds) -> List[drsuapi.DsReplicaAttribute]:
bind, identifier, attributes = self._get_account_attributes(str(account_dn), attributes_ids)
if decrypted:
rid = identifier.sid.split()[1]
net_ctx = net.Net(self.creds)
for attr in attributes:
net_ctx.replicate_decrypt(bind, attr, rid)
return attributes
@staticmethod
def get_attribute_blob(attributes, attribute_id: int) -> bytes:
for attr in attributes:
if attr.attid == attribute_id:
if attr.value_ctr.num_values != 1:
return None
return attr.value_ctr.values[0].blob
return None
@staticmethod
def get_unicodePwd(attributes):
return AdAccountReader.get_attribute_blob(attributes, drsuapi.DRSUAPI_ATTID_unicodePwd)