目录导航
SMBSR是什么
SMBSR 是一个 python 脚本,它给定一个 CIDR/IP/IP_file/HOSTNAME(s) 枚举目标中所有侦听 (445) 的 SMB 服务并尝试对它们进行身份验证;如果身份验证成功,则递归访问所有文件夹和子文件夹,以便在文件和…秘密文件中找到机密文件。为了扫描 SMB 端口打开的目标,使用 masscan 模块。SMBSR 认为一些有趣的东西基于其:
- 内容
- 扩展
- 名称
该工具应查找的有趣关键字是通过命令行定义的,以及:
- 文件扩展名黑名单(此列表在运行时根据线程抛出的异常和文件类型自动更新)
- Shares 黑名单
- 文件夹黑名单(注意,子文件夹也包括)
- 线程数
- 我应该使用masscan吗?
- 有趣的文件扩展名(我猜像 ppk、kdbx、…)
- 允许检查的最大文件大小(字节)(相信我,太大可能需要一些时间)
- 我应该将结果导出为两个漂亮的 CSV 文件吗?
- 我应该对子文件夹进行多深的调查?
- 要匹配的正则表达式单词列表
- ldap 绑定的域控制器 IP
- 其他常见和必填项
当然,所有内容都本地保存在 SQlite 数据库中。该数据库包含一个用于“希望它是 DA 密码”匹配的表,称为 smbsr,其中包含以下列:
- 文件
- 分享
- ip
- 位置
- 匹配
- 创立日期
- 最后修改日期
- 上次访问日期
- 计数(如果在同一个文件上匹配多个,则递增)
还有另一个包含以下列的有趣文件列表的表:
- 文件
- 分享
- ip
- 创立日期
- 最后修改日期
- 上次访问日期
文件支持
SMBSR 学会了如何阅读以下扩展名文件:
- .csv 通过 python 内置
- .doc 通过antiword
- .docx 通过 python-docx2txt
- .eml 通过 python 内置
- .epub 通过 ebooklib
- .gif 通过 tesseract-ocr
- .jpg 和 .jpeg 通过 tesseract-ocr
- .json 通过 python 内置
- .html 和 .htm 通过 beautifulsoup4
- .mp3 通过 sox、SpeechRecognition 和 pocketsphinx
- .msg 通过 msg-extractor
- .odt 通过 python 内置
- .ogg 通过 sox、SpeechRecognition 和 pocketsphinx
- .pdf 通过 pdftotext(默认)或 pdfminer* .six
- .png 通过 tesseract-ocr
- .pptx 通过 python-pptx
- .ps 通过 ps2text
- .rtf 通过 unrtf
- .tiff 和 .tif 通过 tesseract-ocr
- .txt 通过 python 内置
- .wav 通过 SpeechRecognition 和 pocketsphinx
- .xlsx 通过 xlrd
- .xls 通过 xlrd
LDAP
它终于来了!现在,为 SMB 连接指定的域凭据也可以用于从 Active Directory 检索计算机对象列表。
reg_gen.py
作为最后一次更新,SMBSR 已被授予查找与给定正则表达式匹配秘密的权力(请参阅包含一些要匹配的好示例的 regulars.txt 文件)。鉴于这种新的超能力,我还实现了一个新脚本,它给出了一个字典列表,它生成一个正则表达式列表,该列表与它在词表中找到的密码模式相匹配。在打印出所有内容之前,正则表达式列表是 (sort -u)-ed。如果模式连续出现两个或多个 ascii_lower,则可以优化脚本,但现在不是这样。
requirements
pip3 install -r requirements.txt
用法
例如,从项目文件夹中:
./smbsr.py -IP 127.0.0.1 -word-list-path tomatch.txt -multithread -max-size 1000 -T 2 -username OB -password '****' -domain OB -file-extensions dll,exe,bin

SMBSR下载地址
①GitHub:
https://github.com/oldboy21/SMBSR/archive/refs/heads/master.zip
②云中转网盘:
https://yzzpan.com/#sharefile=VIpKedSj_31487
解压密码:www.ddosi.org
③直接复制保存以下文件即可
clean.sh
#!/bin/bash
rm smbsr.log
rm smbsr.db
rm *.csv
reg_gen.py
import string
import sys
def checkChar(x,counter):
lower = set(string.ascii_lowercase)
upper = set(string.ascii_uppercase)
if x.isspace():
return "(?=^.{" + str(counter) + "}[\s])"
elif x.isalnum() is False:
return "(?=^.{" + str(counter) + """}[!@#&_()–\[\{\}\]:;'%,?\/\*~\$\^\+="<>])"""
elif x in lower:
return "(?=^.{" + str(counter) + "}[a-z])"
elif x in upper:
return "(?=^.{" + str(counter) + "}[A-Z])"
else:
return "(?=^.{" + str(counter) + "}[\d])"
filepath = input("Wordlist path, please: ")
final = []
lines = []
result = ""
try:
with open(filepath, 'rb') as f:
for line in f:
try:
lines.append(line.strip(b'\n'))
except Exception as e:
print ('Error while reading line in the wordlist' + str(e))
continue
f.close()
except Exception as e:
print (e)
sys.exit(1)
for line in lines:
result = ""
try:
line = line.decode("utf-8")
except Exception as e:
continue
for element in range(0, len(line)):
result += checkChar(line[element],element)
final.append(result)
final = list( dict.fromkeys(final) )
with open("regulars.txt", "a") as f:
for x in final:
f.write(x + "\n")
f.close()
regulars.txt
^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,10}$
^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{8,}$
^(?=.*[A-Za-z])(?=.*\d)(?=.*[@$!%*#?&])[A-Za-z\d@$!%*#?&]{8,}$
^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[a-zA-Z\d]{8,}$
^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$
^(?=.*[0-9])(?=.*[a-z])(?=.*[A-Z])(?=.*[!@#&_()–\[\{\}\]:;'%,?\/\*~\$\^\+"=<>]).{8,20}$
requirements.txt
pysmb==1.2.7
python_masscan==0.1.6
textract
smbsr.py
#!/usr/bin/python3
#
# Author:
# @oldboy21
# https://github.com/oldboy21/smbsr/
import socket
import argparse
import logging
import sys
import ipaddress
import urllib
import tempfile
import re
from smb import *
from smb.SMBConnection import SMBConnection
from smb.SMBHandler import SMBHandler
from io import BytesIO
import masscan
import _thread
import threading
from threading import Lock
from threading import Thread
import random
import uuid
import sys
import os
import sqlite3
import csv
from itertools import compress
import datetime
import faulthandler
import concurrent.futures
import gc
import time
import io
import string
import textract
import smbsrldap
class Database:
def __init__(self,db_file):
self.db_file=db_file
def connect_database(self):
self.conn = sqlite3.connect(self.db_file, check_same_thread=False)
self.cursor = self.conn.cursor()
self.lock = threading.Lock()
def create_database(self):
self.connect_database()
try:
smb_match_table = """ CREATE TABLE IF NOT EXISTS smbsr (
id integer PRIMARY KEY AUTOINCREMENT,
file text NOT NULL,
share text NOT NULL,
ip text NOT NULL,
position text NOT NULL,
matchedWith text NOT NULL,
tsCreated text NOT NULL,
tsModified text NOT NULL,
tsAccessed text NOT NULL,
count integer NOT NULL
); """
smb_files_table = """ CREATE TABLE IF NOT EXISTS smbfile (
id integer PRIMARY KEY AUTOINCREMENT,
file text NOT NULL,
share text NOT NULL,
ip text NOT NULL,
tsCreated text NOT NULL,
tsModified text NOT NULL,
tsAccessed text NOT NULL
); """
if self.cursor is not None:
self.create_table(smb_match_table)
self.create_table(smb_files_table)
except Exception as e:
logger.error("Encountered error while creating the database: " + str(e))
sys.exit(1)
def exportToCSV(self):
cursor = self.cursor
exportQuery = "SELECT * from smbsr"
exportQueryFile = "SELECT * from smbfile"
sr = cursor.execute(exportQuery)
with open('smbsr_results.csv', 'w') as f:
writer = csv.writer(f)
writer.writerows(sr)
sf = cursor.execute(exportQueryFile)
with open('smbsrfile_results.csv', 'w') as g:
writer = csv.writer(g)
writer.writerows(sf)
def commit(self):
self.conn.commit()
def create_table(self, create_table_sql):
try:
self.cursor.execute(create_table_sql)
except Exception as e:
logger.info(e)
def insertFinding(self, filename, share, ip, line, matched_with ,times):
try:
self.lock.acquire(True)
cursor = self.cursor
checkQuery = 'SELECT id FROM smbsr WHERE ip=\'{ip}\' AND share=\'{share}\' AND file=\'{filename}\''.format(ip=ip, share=share, filename=filename)
results = cursor.execute(checkQuery).fetchall()
if len(results) == 0:
insertFindingQuery = "INSERT INTO smbsr (file, share, ip, position, matchedWith, tsCreated, tsModified, tsAccessed, count) VALUES (?,?,?,?,?,?,?,?,?)"
cursor.execute(insertFindingQuery, (filename, share, ip, line, matched_with, times[0], times[1], times[2], 1))
self.commit()
else:
updateQuery = 'UPDATE smbsr SET count = count + 1 WHERE ip=\'{ip}\' AND share=\'{share}\' AND file=\'{filename}\''.format(ip=ip, share=share, filename=filename)
cursor.execute(updateQuery)
self.commit()
finally:
self.lock.release()
def insertFileFinding(self, filename, share, ip, times):
try:
self.lock.acquire(True)
cursor = self.cursor
insertFindingQuery = "INSERT INTO smbfile (file, share, ip, tsCreated, tsModified, tsAccessed) VALUES (?,?,?,?,?,?)"
cursor.execute(insertFindingQuery, (filename, share, ip, times[0], times[1], times[2]))
self.commit()
finally:
self.lock.release()
class HW(object):
def __init__(self, options, db):
super(HW, self).__init__()
self.options = options
self.conn = SMBConnection(options.username,options.password,options.fake_hostname,'netbios-server-name',options.domain,use_ntlm_v2=True,is_direct_tcp=True)
self.db = db
def retrieveTextSpecial(self, file_object):
try:
#os.rename(file_object.name, file_object.name + ".docx")
text = textract.process(file_object.name)
return text
except Exception as e:
os.remove(file_object.name)
logger.error("Error while parsing special file " + file_object.name + " with exception: " + str(e))
return "textractfailed"
def get_bool(self,prompt):
while True:
try:
return {"y":True,"n":False}[input(prompt).lower()]
except KeyError:
print("Invalid input please enter [y/n]")
def retrieveTimes(self, share, filename):
times = []
attributes = self.conn.getAttributes(share, filename)
ts_created = datetime.datetime.fromtimestamp(attributes.create_time).strftime('%Y-%m-%d %H:%M:%S')
ts_accessed = datetime.datetime.fromtimestamp(attributes.last_access_time).strftime('%Y-%m-%d %H:%M:%S')
ts_modified = datetime.datetime.fromtimestamp(attributes.last_write_time).strftime('%Y-%m-%d %H:%M:%S')
times.append(ts_created)
times.append(ts_modified)
times.append(ts_accessed)
return times
def passwordHW(self,text, filename,to_match, counter, IP, share):
results = []
output = False
words =to_match["words"]
regex = to_match["regex"]
for substring in words:
results.append(substring.lower() in text.lower())
output=any(results)
if output:
m = [i for i, x in enumerate(results) if x]
for z in m:
logger.info("Found interesting match in " + filename + " with " + words[z] +", line: " + str(counter))
self.db.insertFinding(filename, share, IP, str(counter), words[z], self.retrieveTimes(share,filename))
return True
if len(regex) > 0:
for i in regex:
if re.search(i, text):
logger.info("Found interesting match in " + filename + " with regex " + i +", line: " + str(counter))
self.db.insertFinding(filename, share, IP, str(counter), i, self.retrieveTimes(share,filename))
return True
return False
def parse(self, share, filename, to_match, IP):
line_counter = 0
hits = 0
file_obj = tempfile.NamedTemporaryFile()
file_ext = (filename.split('/')[-1]).split('.')[-1] or "empty"
#file_ext_double = (filename.split('/')[-1]).split('.')[-2] or "empty"
# or file_ext_double.lower() in self.options.file_extensions_black.split(',')
if file_ext.lower() in self.options.file_extensions_black.split(','):
logger.info("This extensions is blacklisted")
else:
if file_ext.lower() in self.options.file_interesting.split(','):
logger.info("Found interesting file: " + filename)
self.db.insertFileFinding(filename, share, IP, self.retrieveTimes(share,filename))
if (filename.split('/')[-1]).split('.')[0].lower() in to_match["words"]:
logger.info("Found interesting file named " + filename)
self.db.insertFileFinding(filename, share, IP, self.retrieveTimes(share,filename))
filesize = (self.conn.getAttributes(share, filename)).file_size
if filesize > self.options.max_size:
logger.info("Skipping file " + filename + ", it is too big and you said i can't handle it")
else:
file_attributes, filesize = self.conn.retrieveFile(share, filename, file_obj)
#here the extension check for office files
if file_ext.lower() in ['docx','doc','docx','eml','epub','gif','jpg','mp3','msg','odt','ogg','pdf','png','pptx','ps','rtf','tiff','tif','wav','xlsx','xls']:
specialfile = open(str(''.join(random.choices(string.ascii_uppercase, k = 5))) + "." +file_ext , "ab")
file_attributes, filesize = self.conn.retrieveFile(share, filename, specialfile)
#this mightbe empty AND GENERATE AND EXCEPTION
lines = (self.retrieveTextSpecial(specialfile))
specialfile.close()
if lines != "textractfailed":
lines = lines.split(b' ')
try:
os.remove(specialfile.name)
except Exception as e:
logger.warning("Error deleting the temp file: " + specialfile.name)
else:
file_obj.seek(0)
lines = file_obj.readlines()
#need to work on the lines here bcs the strip with bytes does not work apparently
if len(lines) > 0 and lines != "textractfailed":
for line in lines:
line_counter+=1
try:
if self.passwordHW((line.decode('utf-8')).strip('\n'), filename,to_match, line_counter, IP, share):
hits += 1
if hits >= options.hits:
logger.info("Reached max hits for " + filename)
break
except Exception as e:
logger.warning("Encountered exception while reading file: " + file_ext + " | Exception: " + str(e))
if isinstance(file_obj, (io.RawIOBase, io.BufferedIOBase)): #using filetype different from none?
self.options.file_extensions_black = self.options.file_extensions_black + "," + file_ext
break
file_obj.close()
def walk_path(self,path,shared_folder,IP,to_match):
#print (depth)
try:
for p in self.conn.listPath(shared_folder, path):
if p.filename!='.' and p.filename!='..':
parentPath = path
if not parentPath.endswith('/'):
parentPath += '/'
if p.isDirectory:
if p.filename.lower() in self.options.folder_black.split(','):
logger.info('Skipping ' + p.filename + " since blacklisted")
continue
else:
if parentPath.count('/') <= self.options.depth:
logger.info("Visiting subfolder " + str(p.filename))
self.walk_path(parentPath+p.filename,shared_folder,IP,to_match)
else:
logger.info("Skipping " + str(parentPath+p.filename) + ", too deep")
continue
else:
logger.info( 'File: '+ parentPath+p.filename )
self.parse(shared_folder, parentPath+p.filename, to_match, IP)
except Exception as e:
logger.warning("Error while listing paths in shares: " + str(e))
def shareAnalyze(self,IPaddress, to_match):
for ip in IPaddress:
logger.info("Checking SMB share on: " + ip)
#domain_name = 'domainname'
#conn = SMBConnection(options.username,options.password,options.fake_hostname,'netbios-server-name','SECRET.LOCAL',use_ntlm_v2=True,is_direct_tcp=True)
try:
self.conn.connect(ip, 445)
except Exception as e:
logger.warning("Detected error while connecting to " + str(ip) + " with message " + str(e))
continue
try:
shares = self.conn.listShares()
except Exception as e:
logger.warning("Detected error while listing shares on " + str(ip) + " with message " + str(e))
continue
for share in shares:
if not share.isSpecial and share.name not in ['NETLOGON', 'IPC$'] and share.name not in self.options.share_black.split(','):
logger.info('Listing file in share: ' + share.name)
try:
sharedfiles = self.conn.listPath(share.name, '/')
except Exception as e:
logger.warning("Detected error while listing shares on " + str(ip) + " with message " + str(e))
continue
self.walk_path("/",share.name,ip, to_match)
self.conn.close()
def shareAnalyzeLightning(self,to_analyze, to_match):
ip = to_analyze.pop(0)
logger.info("Checking SMB share on: " + ip)
#domain_name = 'domainname'
#conn = SMBConnection(options.username,options.password,options.fake_hostname,'netbios-server-name','SECRET.LOCAL',use_ntlm_v2=True,is_direct_tcp=True)
try:
self.conn.connect(ip, 445)
shares = self.conn.listShares()
for share in shares:
if not share.isSpecial and share.name not in ['NETLOGON', 'IPC$'] and share.name not in self.options.share_black.split(','):
logger.info('Listing file in share: ' + share.name)
self.walk_path("/",share.name,ip, to_match)
except Exception as e:
logger.info("Detected error while listing shares on " + str(ip) + " with message " + str(e))
self.conn.close()
def extractCIDR(self,final ):
cidr = []
for target in final:
ipcheck = re.match("^([0-9]{1,3}\.){3}[0-9]{1,3}(\/([0-9]{1,2}))$", target)
if ipcheck:
cidr.append(target)
return cidr
def scanNetwork(self):
target = self.options.IP
file_target = self.options.ip_list_path
temp = []
final = []
ldap_targets = []
to_analyze = []
#here it goes the LDAP check function
if self.options.ldap:
logger.info("Retrieving computer objects from LDAP")
ldapoptions =self.options
ldapoptions.username = self.options.domain + "\\" + self.options.username
ldap_targets = smbsrldap.run(self.options)
if file_target != "unset":
with open(file_target) as f:
temp = [line.rstrip() for line in f]
f.close()
else:
temp.append(target)
temp = temp + ldap_targets
temp = list( dict.fromkeys(temp) )
for i in temp:
try:
valid = re.match("^([0-9]{1,3}\.){3}[0-9]{1,3}($|\/([0-9]{1,2}))$", i)
except Exception as e:
logger.warning("exception reading from initial list")
continue
if not valid:
logger.info("You entered an hostname, looking up " + i)
try:
final.append(socket.gethostbyname(i))
except socket.gaierror:
logger.warning("Hostname could not be resolved: " + i)
else:
final.append(i)
cidrs = list(dict.fromkeys(self.extractCIDR(final)))
final = list(dict.fromkeys(final))
for i in cidrs:
if i in final:
final.remove(i)
if not self.options.masscan:
for x in final:
ipcheck = re.match("^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", x)
if not ipcheck:
logger.error("Hey there, if you do not use masscan you can't give me CIDR as input")
sys.exit(1)
return final
logger.info('Starting masscan to discover SMB ports open')
mass = masscan.PortScanner()
if len(cidrs) > 0:
for ni in cidrs:
try:
mass.scan(ni, ports='445', arguments='--rate 1000')
for key in mass.scan_result['scan']:
if mass.scan_result['scan'][key]['tcp'][445]['state'] == 'open':
to_analyze.append(key)
except Exception as e:
logger.error("masscan failed with error: " + str(e) + " for range: " + str(ni))
if len(final) > 0:
ranges = ','.join(final)
try:
mass.scan(ranges, ports='445', arguments='--rate 1000')
except Exception as e:
logger.error("masscan failed with error: " + str(e))
sys.exit(1)
for key in mass.scan_result['scan']:
if mass.scan_result['scan'][key]['tcp'][445]['state'] == 'open':
to_analyze.append(key)
return to_analyze
def readMatches(self):
filepath = self.options.word_list_path
file_regular = self.options.regular_exp
rlines = []
try:
with open(filepath) as f:
lines = [line.rstrip() for line in f]
f.close()
#return lines
except Exception as e:
logger.error("Exception while reading the file " + str(e))
sys.exit(1)
if file_regular != 'unset':
try:
with open(file_regular) as r:
rlines = [line.rstrip() for line in r]
r.close()
except Exception as e:
logger.error("Exception while reading the regular expression file " + str(e))
to_match_dict = {
"words" : lines,
"regex" : rlines
}
return to_match_dict
class smbworker (threading.Thread):
def __init__(self, name, options, ip_list, to_match, db):
threading.Thread.__init__(self)
#self.threadID = threadID
self.name = name
self.options = options
self.ip = ip_list
self.to_match = to_match
self.db = db
def run(self):
logger.info("Starting " + self.name)
smbHW = HW(self.options, self.db)
logger.info("Tasks queue lenght: " + str(len(self.ip)))
while (len(self.ip) > 0):
smbHW.shareAnalyzeLightning(self.ip, self.to_match)
logger.info("Tasks queue lenght: " + str(len(self.ip)))
logger.info("Exiting " + self.name)
def setupPersistence(db, dbfile):
if not os.path.exists(dbfile):
logger.info("Database not found, creating ...")
db.create_database()
logger.info("Database created successfully")
else:
logger.info("Database already existing")
db.connect_database()
if __name__ == '__main__':
parser = argparse.ArgumentParser(add_help=True, description="SMB Password Revealer ")
parser.add_argument('-username', action='store', default='anonymous',type=str, help='Username for authenticated scan')
parser.add_argument('-password', action='store', default='s3cret', type=str, help='Password for authenticated scan')
parser.add_argument('-domain', action='store', default='SECRET.LOCAL', help='Domain for authenticated scan')
parser.add_argument('-fake-hostname', action='store', default='localhost', help='Computer hostname SMB connection will be from')
parser.add_argument('-word-list-path', action="store", type=str, help="File containing the string to look for", required=True)
parser.add_argument('-max-size', action="store", default=50000 ,type=int, help="Maximum size of the file to be considered for scanning (bytes)")
parser.add_argument('-file-extensions-black', action='store', type=str, default='none', help='Comma separated file extensions to skip while secrets harvesting')
parser.add_argument('-multithread', action='store_true', default=False, help="Assign a thread to any IP to scan")
parser.add_argument('-masscan', action='store_true', default=False, help="Scan for 445 before trying to analyze the share")
parser.add_argument('-T', action='store', default=10, type=int, help="Define the number of thread to use")
parser.add_argument('-logfile', action='store', default='smbsr.log', type=str, help='Log file name')
parser.add_argument('-dbfile', action='store', default='./smbsr.db', type=str, help='Log file name')
parser.add_argument('-file-interesting', action='store', default='none', type=str, help='Comma separated file extensions you want to be notified about')
parser.add_argument('-folder-black', action='store', default='none', type=str, help='Comma separated folder names to skip during the analysis, keep in mind subfolders are also skipped')
parser.add_argument('-csv', action='store_true', default=False, help='Export results to CSV files in the project folder')
parser.add_argument('-depth', action='store', default=100000000, type=int, help='How recursively deep you want to go while looking for secrets')
parser.add_argument('-regular-exp', action="store", default='unset' ,type=str, help="File containing regex expression to match")
parser.add_argument('-share-black', action='store', type=str, default='none', help='Comma separated share names to skip while secrets harvesting')
group = parser.add_mutually_exclusive_group()
group.add_argument('-ip-list-path', action="store", default="unset", type=str, help="File containing IP to scan")
group.add_argument('-IP',action="store", help='IP address, CIDR or hostname')
#ldapgroup = parser.add_mutually_inclusive_group()
parser.add_argument('-ldap', action='store_true', default=False, help='Query LDAP to retrieve the list of computer objects in a given domain')
parser.add_argument('-dc-ip', action='store', help='DC IP to bind to for LDAP authentication')
parser.add_argument('-hits', action='store',default=5000 ,type=int, help='Max findings per file')
options = parser.parse_args()
faulthandler.enable()
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
logger = logging.getLogger('logger')
#cleaning handlers
logging.getLogger().handlers = []
logger.handlers = []
logger.setLevel(logging.INFO)
infoHandler = logging.FileHandler(options.logfile)
infoHandler.setLevel(logging.INFO)
infoHandler.setFormatter(formatter)
stdoutHandler = logging.StreamHandler(sys.stdout)
stdoutHandler.setLevel(logging.INFO)
stdoutHandler.setFormatter(formatter)
logger.addHandler(stdoutHandler)
logger.addHandler(infoHandler)
if len(sys.argv)==1:
parser.print_help()
print ("\nExamples: ")
print("\t./smb-secrets-revealer.py -IP 127.0.0.1/localhost -word-list-path tomatch.txt\n")
sys.exit(1)
db = Database(options.dbfile)
setupPersistence(db, options.dbfile)
smbHW = HW(options, db)
to_match = smbHW.readMatches()
to_analyze = smbHW.scanNetwork()
logger.info("Total amounts of targets: " + str(len(to_analyze)))
threads = []
if options.multithread is True:
logger.info("Lighting!!")
if len(to_analyze) < options.T:
options.T = len(to_analyze)
for i in range(options.T):
try:
worker = smbworker("Worker-" + str(i+1), options, to_analyze, to_match, db)
worker.start()
threads.append(worker)
except Exception as e:
logger.error("Error while multithreading: " + str(e))
sys.exit(1)
for thread in threads:
thread.join()
else:
smbHW.shareAnalyze(to_analyze, to_match)
if options.csv:
db.exportToCSV()
print ("Hope you found something good mate!")
smbsrldap.py
#!/usr/bin/env python3
import ldap
import argparse
import getpass
import sys
import re
import string
from datetime import datetime
import base64
import csv
class LDAPSearchResult(object):
"""A helper class to work with raw search results
Copied from here: https://www.packtpub.com/books/content/configuring-and-securing-python-ldap-applications-part-2
"""
dn = ''
def __init__(self, entry_tuple):
(dn, attrs) = entry_tuple
if dn:
self.dn = dn
else:
return
self.attrs = ldap.cidict.cidict(attrs)
def get_attributes(self):
return self.attrs
def has_attribute(self, attr_name):
return attr_name in self.attrs
def get_attr_values(self, key):
return self.attrs[key]
def get_attr_names(self):
return self.attrs.keys()
def get_dn(self):
return self.dn
def get_print_value(self, value):
isprintable = False
try:
dec_value = value.decode()
isprintable = dec_value.isprintable()
if isprintable:
value = dec_value
except UnicodeDecodeError:
pass
if not isprintable:
value = base64.b64encode(value).decode()
return value
def pretty_print(self):
attrs = self.attrs.keys()
final_list = []
try:
values = self.get_attr_values('dNSHostName')
except Exception as e:
values = self.get_attr_values('cn')
return self.get_print_value(values[0]).strip('\n')
class LDAPSession(object):
def __init__(self, dc_ip='', username='', password='', domain=''):
if dc_ip:
self.dc_ip = dc_ip
else:
self.get_set_DC_IP(domain)
self.username = username
self.password = password
self.domain = domain
self.con = self.initializeConnection()
self.domainBase = ''
self.is_binded = False
def initializeConnection(self):
con = ldap.initialize('ldap://{}'.format(self.dc_ip))
con.set_option(ldap.OPT_REFERRALS, 0)
return con
def unbind(self):
self.con.unbind()
self.is_binded = False
def getDefaultNamingContext(self):
try:
newCon = ldap.initialize('ldap://{}'.format(self.dc_ip))
newCon.simple_bind_s('', '')
res = newCon.search_s("", ldap.SCOPE_BASE, '(objectClass=*)')
rootDSE = res[0][1]
except ldap.LDAPError as e:
print("[!] Error retrieving the root DSE")
print("[!] {}".format(e))
sys.exit(1)
if 'defaultNamingContext' not in rootDSE:
print("[!] No defaultNamingContext found!")
sys.exit(1)
defaultNamingContext = rootDSE['defaultNamingContext'][0].decode()
self.domainBase = defaultNamingContext
newCon.unbind()
return defaultNamingContext
def do_bind(self):
try:
self.con.simple_bind_s(self.username, self.password)
self.is_binded = True
return True
except ldap.INVALID_CREDENTIALS:
print("[!] Error: invalid credentials")
sys.exit(1)
except ldap.LDAPError as e:
print("[!] {}".format(e))
sys.exit(1)
def whoami(self):
try:
current_dn = self.con.whoami_s()
except ldap.LDAPError as e:
print("[!] {}".format(e))
sys.exit(1)
return current_dn
def do_ldap_query(self, base_dn, subtree, objectFilter, attrs, page_size=1000):
"""
actually perform the ldap query, with paging
copied from another LDAP search script I found: https://github.com/CroweCybersecurity/ad-ldap-enum
found this script well after i'd written most of this one. oh well
"""
more_pages = True
cookie = None
ldap_control = ldap.controls.SimplePagedResultsControl(True, size=page_size, cookie='')
allResults = []
while more_pages:
msgid = self.con.search_ext(base_dn, subtree, objectFilter, attrs, serverctrls=[ldap_control])
result_type, rawResults, message_id, server_controls = self.con.result3(msgid)
allResults += rawResults
# Get the page control and get the cookie from the control.
page_controls = [c for c in server_controls if
c.controlType == ldap.controls.SimplePagedResultsControl.controlType]
if page_controls:
cookie = page_controls[0].cookie
if not cookie:
more_pages = False
else:
ldap_control.cookie = cookie
return allResults
def get_search_results(self, results):
# takes raw results and returns a list of helper objects
res = []
arr = []
if type(results) == tuple and len(results) == 2:
(code, arr) = results
elif type(results) == list:
arr = results
if len(results) == 0:
return res
for item in arr:
resitem = LDAPSearchResult(item)
if resitem.dn: # hack to workaround "blank" results
res.append(resitem)
return res
def getFunctionalityLevel(self):
objectFilter = '(objectclass=*)'
attrs = ['domainFunctionality', 'forestFunctionality', 'domainControllerFunctionality']
try:
# rawFunctionality = self.do_ldap_query('', ldap.SCOPE_BASE, objectFilter, attrs)
rawData = self.con.search_s('', ldap.SCOPE_BASE, "(objectclass=*)", attrs)
functionalityLevels = rawData[0][1]
except Error as e:
print("[!] Error retrieving functionality level")
print("[!] {}".format(e))
sys.exit(1)
return functionalityLevels
def getAllComputers(self, attrs=''):
if not attrs:
attrs = ['cn', 'dNSHostName', 'operatingSystem', 'operatingSystemVersion', 'operatingSystemServicePack']
objectFilter = '(objectClass=Computer)'
base_dn = self.domainBase
try:
rawComputers = self.do_ldap_query(base_dn, ldap.SCOPE_SUBTREE, objectFilter, attrs)
except ldap.LDAPError as e:
print("[!] Error retrieving computers")
print("[!] {}".format(e))
sys.exit(1)
return self.get_search_results(rawComputers), attrs
def prettyPrintResults(results, showDN=False):
final_list = []
for result in results:
final_list.append(result.pretty_print())
return final_list
def printFunctionalityLevels(levels):
for name, level in levels.items():
print("[+]\t {}: {}".format(name, FUNCTIONALITYLEVELS[level[0]]))
def run(args):
startTime = datetime.now().strftime("%Y%m%d-%H:%M:%S")
if not args.username:
print("[+] No username provided. I'm not going to try the anonymous bind.")
sys.exit(1)
else:
username = args.username
if args.username and not args.password:
password = getpass.getpass("Password for {}: ".format(args.username))
elif args.password:
password = args.password
ldapSession = LDAPSession(dc_ip=args.dc_ip, username=username, password=password, domain=args.domain)
print("[+] Using Domain Controller at: {}".format(ldapSession.dc_ip))
print("[+] Getting defaultNamingContext from Root DSE")
print("[+]\tFound: {}".format(ldapSession.getDefaultNamingContext()))
print("[+] Attempting bind")
ldapSession.do_bind()
if ldapSession.is_binded:
print("[+]\t...success! Binded as: ")
print("[+]\t {}".format(ldapSession.whoami()))
attrs = ''
attrs = ['dNSHostName', 'cn']
print("\n[+] Enumerating all AD computers")
allComputers, searchAttrs = ldapSession.getAllComputers(attrs=attrs)
if not allComputers:
bye(ldapSession)
print("[+]\tFound {} computers: \n".format(len(allComputers)))
finallist = prettyPrintResults(allComputers)
return finallist
bye(ldapSession)
def isValidDN(testdn):
# super lazy regex way to see if what they entered is a DN
dnRegex = re.compile('(DC=[^,"]+)+')
return dnRegex.search(testdn)
def selectResult(results):
print("[+] Found {} results:\n".format(len(results)))
for number, result in enumerate(results):
print("{}: {}".format(number, result.dn))
print("")
response = input("Which DN do you want to use? : ")
return results[int(response)]
def bye(ldapSession):
ldapSession.unbind()
项目地址
GitHub:
https://github.com/oldboy21/SMBSR
转载请注明出处及链接