-
-
Save arubdesu/c4f60e653cbe851827bd6e6023a1b396 to your computer and use it in GitHub Desktop.
The auditing project I discussed at Philly MacAdmins, April 21 2016 (unfortunately crashed and burned on older python...)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Collect inventory via osquery, etc""" | |
import CoreFoundation | |
import csv | |
import datetime | |
import os | |
import platform | |
import plistlib | |
import subprocess | |
import sys | |
from xml.etree import ElementTree as et | |
CWD = os.path.dirname(__file__) | |
sys.path.append(CWD + '/osquery') | |
try: | |
import osquery | |
except ImportError: | |
pass | |
sys.path.append(CWD + '/gmacpyutil') | |
_CD_APP = os.path.join(CWD, "gmacpyutil/gmacpyutil/CocoaDialog.app") | |
_CD = '%s/Contents/MacOS/CocoaDialog' % _CD_APP | |
from gmacpyutil import cocoadialog | |
#pylint: disable=no-name-in-module | |
from SystemConfiguration import SCNetworkInterfaceCopyAll, \ | |
SCNetworkInterfaceGetLocalizedDisplayName, SCNetworkInterfaceGetBSDName, \ | |
SCNetworkInterfaceGetHardwareAddressString, SCDynamicStoreCreate, \ | |
SCDynamicStoreCopyValue, SCDynamicStoreCopyConsoleUser | |
from Foundation import NSKeyedUnarchiver | |
from struct import unpack | |
def extract_share(bookmark_data): | |
'''pulls out mount_URL from bookmark data - enter the frogor''' | |
content_offset, = unpack('I', bookmark_data[12:16]) | |
#pylint: disable=C0103, W0612 | |
first_TOC, = unpack('I', bookmark_data[content_offset:content_offset+4]) | |
first_TOC += content_offset | |
TOC_len, rec_type, level, next_TOC, record_count = unpack('IIIII', \ | |
bookmark_data[first_TOC:first_TOC+20]) | |
TOC_cursor = first_TOC + 20 | |
record_offsets = {} | |
for i in range(record_count): | |
record_id, offset = unpack('<IQ', bookmark_data[TOC_cursor:TOC_cursor+12]) | |
record_offsets[record_id] = offset + content_offset | |
TOC_cursor += 12 | |
mount_record = record_offsets[0x2050] | |
mount_length, rec_type = unpack('II', bookmark_data[mount_record:mount_record+8]) | |
mount_record += 8 | |
mount_URL = (bookmark_data[mount_record:mount_record+mount_length]).decode('utf-8') | |
return mount_URL | |
def get_recentservers(sfl_file_path): | |
'''Read the com.apple.LSSharedFileList.RecentServers.sfl file''' | |
with open(sfl_file_path, 'rb') as f: | |
raw_data = f.read() | |
recent_servers = NSKeyedUnarchiver.unarchiveObjectWithData_(buffer(raw_data)) | |
server_URLs = [] | |
for x in sorted(recent_servers['items'], lambda y, _: int(y.order())): | |
server_URLs.append(extract_share(x.bookmark()[:].tobytes())) | |
return server_URLs | |
def pull_plistval(path, val): | |
'''Reads binary and XML plists, returns stanza for further parsing''' | |
return CoreFoundation.CFPreferencesCopyAppValue(val, path) | |
def get_adinfo(net_config): | |
'''Returns the ad directory info''' | |
ad_info = SCDynamicStoreCopyValue(net_config, \ | |
"com.apple.opendirectoryd.ActiveDirectory") | |
if ad_info: | |
return ad_info | |
else: | |
return {} | |
def get_adadmins(): | |
'''Returns list of ad-connector-configured admin groups''' | |
try: | |
nested_admins = \ | |
plistlib.readPlistFromString(subprocess.check_output( \ | |
['/usr/sbin/dsconfigad', '-show', '-xml'])) | |
if nested_admins: | |
return nested_admins['Administrative']['Allowed admin groups'] | |
except: | |
return {} | |
def get_currentuser(): | |
pass | |
def get_computername(net_config): | |
'''Returns the ComputerName of this Mac''' | |
sys_info = SCDynamicStoreCopyValue(net_config, "Setup:/System") | |
try: | |
return sys_info['ComputerName'] | |
except: | |
return 'compnamenotset' | |
def get_printers(): | |
'''Returns name, ppd, and uri of printers''' | |
full_prints = \ | |
plistlib.readPlistFromString(subprocess.check_output( \ | |
['/usr/sbin/system_profiler', 'SPPrintersDataType', '-xml'])) | |
all_printdicts = full_prints[0]['_items'] | |
just_prints = {} | |
for printer in all_printdicts: | |
just_prints[printer.get('_name', None)] = [printer.get('ppd', None), | |
printer.get('uri', None)] | |
return just_prints | |
def get_dnsinfo(net_config): | |
'''Returns the currently considered primary network interface''' | |
dns_info = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/DNS") | |
return dict(dns_info) | |
def get_primaryinterface(net_config): | |
'''Returns the currently considered primary network interface''' | |
states = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/IPv4") | |
if states: | |
return states['PrimaryInterface'] | |
else: | |
return None | |
def lookup_guids(net_config, guid): | |
'''Returns net iface for IPv4 guid''' | |
netname = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Service/%s" % | |
guid) | |
if netname: | |
return netname['UserDefinedName'] | |
else: | |
return None | |
def get_is_dhcp(net_config, guid): | |
'''Returns net iface for IPv4 guid''' | |
config = SCDynamicStoreCopyValue(net_config, | |
"Setup:/Network/Service/%s/IPv4" % | |
guid) | |
if config: | |
if ['ConfigMethod'] == 'DHCP': | |
return True | |
else: | |
return False | |
def get_ip_address(net_config, iface): | |
'''Returns IP for provided interface''' | |
try: | |
addresses = SCDynamicStoreCopyValue(net_config, | |
"State:/Network/Interface/%s/IPv4" % | |
iface) | |
return addresses['Addresses'][0] | |
except TypeError: | |
return None | |
def get_networkinterfacelist(net_config): | |
'''Returns a list of all network interface names/MACs''' | |
all_media = {} | |
names_forguids = [] | |
guids = {} | |
network_interfaces = SCNetworkInterfaceCopyAll() | |
for interface in network_interfaces: | |
bsdname = SCNetworkInterfaceGetBSDName(interface) | |
disp_name = SCNetworkInterfaceGetLocalizedDisplayName(interface) | |
mac_addy = SCNetworkInterfaceGetHardwareAddressString(interface) | |
if not disp_name == 'Bluetooth PAN': | |
all_media[disp_name] = [get_ip_address(net_config, bsdname), | |
mac_addy, bsdname] | |
names_forguids.append(disp_name) | |
guid_list = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Global/IPv4") | |
for guid in guid_list['ServiceOrder']: | |
if guid: | |
lookd_up = lookup_guids(net_config, guid) | |
guids[lookd_up] = guid | |
for iface_key in names_forguids: | |
associated_guid = guids.get(iface_key, None) | |
check_dhcp = get_is_dhcp(net_config, associated_guid) | |
if check_dhcp: | |
all_media[iface_key].append('DHCP') | |
return all_media.items() | |
def parse_groups(group, result_dict): | |
'''takes group to parse and current output, returns filtered list''' | |
parsed = [] | |
stripd_usrs = result_dict.get('users') | |
group_usrs = result_dict.get(group) | |
for user_dict in stripd_usrs: | |
new_dict = {} | |
this_uid = user_dict.get('uid') | |
for grp_uid in group_usrs: | |
if this_uid == grp_uid.get('uid'): | |
new_dict[user_dict.get('username')] = this_uid | |
parsed.append(new_dict) | |
return parsed | |
def manual_sysinfo(net_config): | |
'''for my 10.7 and 10.8 brethren''' | |
sysinfo_dict = {} | |
sysinfo_dict['hostname'] = SCDynamicStoreCopyValue(net_config, "Setup:/System") | |
model_cmd = ['/usr/sbin/sysctl', '-n', 'hw.model'] | |
memsize_cmd = ['/usr/sbin/sysctl', '-n', 'hw.memsize'] | |
machdep_cmd = ['/usr/sbin/sysctl', '-n', 'machdep.cpu.brand_string'] | |
sysinfo_dict['hardware_model'] = subprocess.check_output(model_cmd) | |
sysinfo_dict['physical_memory'] = subprocess.check_output(memsize_cmd) | |
sysinfo_dict['cpu_brand'] = subprocess.check_output(machdep_cmd) | |
serial_cmd = ['/usr/sbin/system_profiler', 'SPHardwareDataType', '-xml'] | |
serial_plist = subprocess.check_output(serial_cmd) | |
serial = (plistlib.readPlistFromString(serial_plist))[0]['_items'][0]['serial_number'] | |
sysinfo_dict['cpu_serial'] = serial | |
return sysinfo_dict | |
def manual_fw(): | |
'''return fw state dict''' | |
fw_dict = {} | |
fw_dict['globalstate'] = pull_plistval('globalstate', | |
'/Library/Preferences/com.apple.alf.plist') | |
return fw_dict | |
def manual_fvtwo(): | |
'''return filevault status''' | |
fvtwo = [] | |
try: | |
fvtwo.append(subprocess.check_output(['/usr/bin/fdesetup', 'status'])) | |
except: | |
pass | |
return fvtwo | |
def manual_admins(): | |
'''gets admin group membership''' | |
admins, ssh, ard = [], [], [] | |
ssh_grp = '/Groups/com.apple.access_ssh' | |
ard_grp = '/Groups/com.apple.access_screensharing' | |
admin_dsclist = ['/usr/bin/dscl', '.', 'read', '/Groups/admin', | |
'GroupMembership'] | |
admins.append((subprocess.check_output(admin_dsclist)).strip()) | |
ssh_dsclist = ['/usr/bin/dscl', '.', 'read', ssh_grp, 'GroupMembership', | |
'NestedGroups'] | |
try: | |
ssh.append((subprocess.check_output(ssh_dsclist)).strip()) | |
except: | |
pass | |
ard_dsclist = ['/usr/bin/dscl', '.', 'read', ard_grp, 'GroupMembership', | |
'NestedGroups'] | |
try: | |
ard.append((subprocess.check_output(ard_dsclist)).strip()) | |
except: | |
pass | |
return admins, ssh, ard | |
def old_inventory(net_config): | |
'''mem/cpu/name/serial/model/fwState/dsclARD/ssh/admin/fv2/disk''' | |
all_sys_info = {} | |
all_sys_info['system_info'] = manual_sysinfo(net_config) | |
all_sys_info['alf'] = manual_fw() | |
all_sys_info['disk_encryption'] = manual_fvtwo() | |
admins, ssh, ard = manual_admins() | |
all_sys_info['ARD'] = ard | |
all_sys_info['ssh'] = ssh | |
all_sys_info['admins'] = admins | |
return all_sys_info | |
def run_osquery(sql_dict): | |
"""take sql command you'd like output from osquery for, returns...""" | |
result_dict = {} | |
instance = osquery.SpawnInstance(path='%s/osqueryd' % CWD) | |
instance.open() | |
for name, quer in sql_dict.items(): | |
results = instance.client.query(quer) | |
result_dict[name] = results.response | |
return result_dict | |
def pokemon(): | |
"""returns inventory items""" | |
query_dict = {} | |
result_dict = {} | |
inventory = [('hostname, cpu_serial, hardware_model, physical_memory, \ | |
cpu_brand', 'system_info'), | |
('major, minor, patch, build', 'os_version'), | |
('global_state', 'alf'), | |
('name, path, bundle_identifier, bundle_name, \ | |
bundle_short_version, bundle_version, copyright, \ | |
bundle_executable', 'apps') | |
] | |
for query in inventory: | |
sql_string = ''.join(['select ', query[0], ' from ', query[1]]) | |
query_dict[query[1]] = sql_string | |
complex_qs = [("select uid from user_groups where gid like '80' and uid \ | |
not like '0'", 'admins'), | |
("select uid from user_groups where gid like '398' and uid \ | |
not like '0'", 'ARD'), | |
("select uid from user_groups where gid like '399' and uid \ | |
not like '0'", 'ssh'), | |
("select uid, gid, username, description, directory from \ | |
users where shell not like '/usr/%' and uid not like '0' \ | |
and uid not like '248'", 'users'), | |
("select * from disk_encryption where encrypted like '1'", | |
'disk_encryption'), | |
("select device, path, blocks, blocks_size, blocks_free, \ | |
blocks_available from mounts where device not like 'map%' \ | |
and device not like 'dev%'", 'mounts'), | |
("select name, label from block_devices where type not like \ | |
'Virtual Interface' and label like '%Media'", 'block_devices') | |
] | |
for quer in complex_qs: | |
query_dict[quer[1]] = quer[0] | |
result_dict = run_osquery(query_dict) | |
result_dict['admins'] = parse_groups('admins', result_dict) | |
result_dict['ARD'] = parse_groups('ARD', result_dict) | |
result_dict['ssh'] = parse_groups('ssh', result_dict) | |
result_dict['system_info'] = result_dict.get('system_info')[0] | |
result_dict['alf'] = result_dict.get('alf')[0] | |
return result_dict | |
def post_approve(net_config): | |
'''Build most of the non-osq dicts''' | |
extend = {} | |
extend['ad_info'] = dict(get_adinfo(net_config)) | |
extend['ad_groups'] = get_adadmins() | |
extend['current_user'] = SCDynamicStoreCopyConsoleUser(None, None, None)[0] | |
extend['network_interfaces'] = get_networkinterfacelist(net_config) | |
extend['primary_interface'] = get_primaryinterface(net_config) | |
extend['printers'] = get_printers() | |
return extend | |
def curler(filepath, email, descript): | |
'''curl our csv to Box''' | |
curl = '/usr/bin/curl' | |
url = 'https://upload.box.com/api/1.0/not/showing/therest' | |
frst_txt = 'new_file_1=@' | |
middle_txt = 'check_name_conflict_folder_option=1' | |
eml_blrb = 'uploader_email=' | |
late_txt = 'description=' | |
command = [curl, url, '-F', frst_txt + filepath, '-F', middle_txt, '-F', | |
eml_blrb + email, '-F', str(late_txt + descript)] | |
print command | |
try: | |
response = subprocess.check_output(command) | |
xmldata = et.fromstring(response) | |
status = xmldata.find("status").text | |
if status == 'upload_ok': | |
return 'Completed successfully' | |
else: | |
return 'Something went wrong' | |
except subprocess.CalledProcessError as err: | |
print err | |
return 'Something went wrong' | |
def cleanup(result): | |
'''catch errors with feedback, or show success''' | |
if result == 'Something went wrong': | |
tooold_msg = cocoadialog.MsgBox(title="Sorry!") | |
tooold_msg.SetText("There was an error submitting your inventory. Quitting") | |
tooold_msg.Show() | |
sys.exit(0) | |
# else: | |
# complete_msg = cocoadialog.MsgBox(title="Complete") | |
# complete_msg.SetText("Thank you for your help!") | |
# button_pressed = complete_msg.Show() | |
# button = button_pressed.split('\n')[0] | |
# if button: | |
# sys.exit(0) | |
def how_proceed(this_os, filename_now, net_config): | |
cd_title = "Org Device Registration" | |
email_prompt = \ | |
cocoadialog.Standard_InputBox(title=cd_title) | |
email_prompt.SetInformativeText("Please enter your email address, you'll then be taken to a webpage for more information.") | |
email_prompt.SetText("@college.edu") | |
email_entered = email_prompt.Show() | |
button, email = email_entered.split('\n')[:2] | |
print button | |
if button == 'Cancel': | |
sys.exit(0) | |
machine_primary = get_primaryinterface(net_config) | |
das_url = 'http://registration.url' | |
macproc = subprocess.Popen(['/usr/sbin/networksetup', '-getmacaddress', | |
machine_primary], stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
stdout, stderr = macproc.communicate() | |
mac_addy = (stdout[18:35]).replace(':', '-') | |
utf8_compyname = get_computername(net_config) | |
compyname = utf8_compyname.encode('ascii', 'ignore').decode('ascii') | |
userid = os.getlogin() | |
subprocess.Popen(['/usr/bin/open', das_url + mac_addy + '&email=' + email | |
+ '&userid=' + userid + '&computername=' + | |
compyname]) | |
if this_os < 7: | |
result_dict = {} | |
cmd = ['/usr/sbin/system_profiler', '-xml', 'SPHardwareDataType'] | |
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
try: | |
output = proc.communicate()[0] | |
plist = plistlib.readPlistFromString(output) | |
result_dict['serial'] = plist[0]['_items'][0]['serial_number'] | |
result_dict['current_user'] = SCDynamicStoreCopyConsoleUser(None, None, None)[0] | |
our_filepath = os.path.join('/tmp/', result_dict.get('serial') + '_' \ | |
+ filename_now + '.csv') | |
with open(our_filepath, 'w') as target: | |
writer = csv.writer(target) | |
writer.writerows(zip(result_dict.keys(), result_dict.values())) | |
response = curler(our_filepath, email, result_dict.get('current_user')) | |
cleanup(response) | |
except: | |
cleanup('Something went wrong') | |
else: | |
return email | |
def main(): | |
'''gimme some main''' | |
this_os = int(platform.mac_ver()[0].split('.')[1]) | |
now = str(datetime.datetime.now())[:-7] | |
filename_now = (now.replace(':', '-')).replace(' ', '_') | |
net_config = SCDynamicStoreCreate(None, "net", None, None) | |
email = how_proceed(this_os, filename_now, net_config) | |
extend = post_approve(net_config) | |
extend['DateCollected'] = now | |
extend['captured_email'] = email | |
descript = extend.get('current_user') | |
if this_os < 9: | |
result_dict = old_inventory(net_config) | |
result_dict['recent_servers'] = pull_plistval('/Users/' + descript + \ | |
'/Library/Preferences/com.apple.sidebarlists.plist', 'favoriteservers') | |
else: | |
result_dict = pokemon() | |
stanza = result_dict.get('os_version') | |
if stanza[0]['minor'] == '11': | |
#pylint: disable=C0103 | |
path_str = 'Library/Application Support/com.apple.sharedfilelist/com.apple.LSSharedFileList.RecentServers.sfl' | |
try: | |
result_dict['recent_servers'] = list(get_recentservers(str(os.path.join('/Users', descript, path_str)))) | |
except IOError: | |
pass | |
else: | |
quear = "select subkey, value from preferences where key = \ | |
'favoriteservers' and subkey = 'CustomListItems//URL' and \ | |
path = '/Users/" + descript + \ | |
"/Library/Preferences/com.apple.sidebarlists.plist';" | |
new_dict = {'recent_servers': str(quear)} | |
result_dict['recent_servers'] = run_osquery(new_dict) | |
pars_serial = result_dict.get('system_info') | |
serial = pars_serial.get('cpu_serial') | |
result_dict.update(extend) | |
print result_dict | |
our_filepath = os.path.join('/tmp/', serial + '_' + filename_now + '.csv') | |
print our_filepath | |
with open(our_filepath, 'w') as target: | |
writer = csv.writer(target) | |
writer.writerows(zip(result_dict.keys(), result_dict.values())) | |
response = curler(our_filepath, email, descript) | |
cleanup(response) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment