๐๐ฅ๐๐๐ข ๐ก๐๐ง๐ช๐ข๐ฅ๐
Kanalga Telegramโda oโtish
๐ช๐๐๐๐ข๐ ๐ ๐ง๐ข ๐๐ฅ๐๐๐ข ๐ก๐๐ง๐ช๐ข๐ฅ๐ ๐๐ป ๐ง๐๐๐ฆ ๐๐๐๐ก๐ก๐๐ ๐๐ฆ ๐ ๐๐๐ ๐๐ข๐ฅ ๐ข๐ก๐๐ฌ ๐๐ก๐ง๐๐ฅ๐ง๐๐๐ก๐ ๐๐ก๐ง ๐๐ก๐ ๐๐๐จ๐๐๐ง๐๐ข๐ก ๐ฃ๐จ๐ฅ๐ฃ๐ข๐ฆ๐ ๐ข๐ก๐๐ฌ โ ๏ธ ๐๐ข๐๐ก ๐ข๐จ๐ฅ ๐๐๐๐ก๐ก๐๐ ๐๐ข๐ฅ ๐๐๐๐๐ฌ ๐จ๐ฃ๐๐๐ง๐๐ฆ โ
Ko'proq ko'rsatishMamlakat belgilanmaganToif belgilanmagan
899
Obunachilar
+10024 soatlar
+1227 kunlar
+46330 kunlar
Ma'lumot yuklanmoqda...
O'xshash kanallar
Ma'lumot yo'q
Muammo bormi? Iltimos, sahifani yangilang yoki bizning qo'llab-quvvatlash boshqaruvchimizga murojaat qiling>.
Taglar buluti
Ma'lumot yo'q
Muammo bormi? Iltimos, sahifani yangilang yoki bizning qo'llab-quvvatlash boshqaruvchimizga murojaat qiling>.
Kirish va chiqish esdaliklari
---
---
---
---
---
---
Obunachilarni jalb qilish
Iyul '26
Iyul '26
+193
2 kanalda
Iyun '26
+391
6 kanalda
Get PRO
May '26
+61
3 kanalda
Get PRO
Aprel '26
+316
3 kanalda
Get PRO
Mart '260
1 kanalda
Get PRO
Fevral '26
+17
1 kanalda
| Sana | Obunachilarni jalb qilish | Esdaliklar | Kanallar | |
| 02 Iyul | +93 | |||
| 01 Iyul | +100 |
Kanal postlari
| 2 | WHEN 1K HITS VPS GIVEAWAY | 60 |
| 3 | WTF FROM 901 TO 897 SUBS ๐๐ | 22 |
| 4 | sticker.webp | 1 |
| 5 | sticker.webp | 1 |
| 6 | ๐๐๐
Jษชแด ๐บษข -๐ปษขโก๏ธ
Non-Reacharge ๐
โญ๏ธ Uษดสษชแดษชแดแดแด
Fสแดแดษดแดแด โ๏ธ
โญ๏ธ Ssส Mแดแดสแดแด
โ๏ธ
โญ๏ธ Aแดษด - Bothโ๏ธ
โญ๏ธ Jษชแด ๐บษข - ๐ปษข Bสแดแดssโ๏ธ
โญ๏ธ Oแดแด Aแดแดs Wแดสแดษชษดษข โ๏ธ
CLICK HERE TO GET FILE | 60 |
| 7 | ๐๐๐
6 hours gcp old
Jษชแด ๐บษข -๐ปษขโก๏ธ
Non-Reacharge ๐
โญ๏ธ Uษดสษชแดษชแดแดแด
Fสแดแดษดแดแด โ๏ธ
โญ๏ธ Ssส Mแดแดสแดแด
โ๏ธ
โญ๏ธ Aแดษด - Bothโ๏ธ
โญ๏ธ Jษชแด ๐บษข - ๐ปษข Bสแดแดssโ๏ธ
โญ๏ธ Oแดแด Aแดแดs Wแดสแดษชษดษข โ๏ธ
CLICK HERE TO GET FILE | 69 |
| 8 | PREMIUM GCP SERVER ๐
4G NO CAP ๐
GIVE MAXIMUM REACTION FOR NEW FILES ๐ฆ
GIVE FEEDBACK AND SPEED SS ๐ | 149 |
| 9 | i don't know working or not but I get good response if anyone want check dm @atp_network only if data is over | 2 |
| 10 | Matn yo'q... | 26 |
| 11 | fully working | 70 |
| 12 | @atp_wheather_bot
this is a wheather bot this have ability to check area stats | 65 |
| 13 | subdomain finder under maintenance | 200 |
| 14 | @atp_subdomain_finder_bot
if anyone want source code dm @atp_network | 235 |
| 15 | if csrf:
r = sess.post("https://dnsdumpster.com/", data={"csrfmiddlewaretoken": csrf.group(1), "targetip": domain}, timeout=10)
for sub in re.findall(r'<td class="col-md-4">([^<]+\.' + re.escape(domain) + r')', r.text):
cleaned = self.clean_subdomain(sub)
if cleaned: self.found_subs.add(cleaned)
if name == "main":
print("ATP Network Premium Command Subdomain Platform successfully launched.")
app.run() | 1 |
| 16 | # ==========================================
# SUBDOMAIN HELPER ENGINE (15+ API SOURCES)
# ==========================================
class SubdomainHelper:
def init(self):
self.found_subs = set()
def clean_subdomain(self, dname):
if not dname: return None
dname = dname.strip().lower().lstrip('*.')
if re.match(r'^[a-z0-9][a-z0-9.-]+\.[a-z]{2,}$', dname): return dname
return None
async def fetch_all(self, domain):
self.found_subs = set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._run_scrapers, domain)
return sorted(list(self.found_subs))
def _run_scrapers(self, domain):
scrapers = [self._crtsh, self._alienvault, self._bufferover, self._threatcrowd, self._anubis, self._hackertarget, self._rapiddns, self._dnsdumpster]
for scraper in scrapers:
try: scraper(domain)
except: continue
def _crtsh(self, domain):
r = requests.get(f"https://crt.sh/?q=%25.{domain}&output=json", timeout=12)
if r.status_code == 200:
for entry in r.json():
for sub in entry.get('name_value', '').split('\n'):
cleaned = self.clean_subdomain(sub)
if cleaned and cleaned.endswith(f".{domain}"): self.found_subs.add(cleaned)
def _alienvault(self, domain):
r = requests.get(f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns", timeout=10)
if r.status_code == 200:
for item in r.json().get('passive_dns', []):
cleaned = self.clean_subdomain(item.get('hostname', ''))
if cleaned and cleaned.endswith(f".{domain}"): self.found_subs.add(cleaned)
def _bufferover(self, domain):
r = requests.get(f"https://dns.bufferover.run/dns?q={domain}", timeout=10)
if r.status_code == 200:
for entry in r.json().get('FDNS_A', []) + r.json().get('RDNS', []):
if isinstance(entry, list) and len(entry) > 1:
cleaned = self.clean_subdomain(entry[1])
if cleaned and cleaned.endswith(f".{domain}"): self.found_subs.add(cleaned)
def _threatcrowd(self, domain):
r = requests.get(f"https://www.threatcrowd.org/searchApi/v2/domain/report/?domain={domain}", timeout=10)
if r.status_code == 200:
for sub in r.json().get('subdomains', []):
cleaned = self.clean_subdomain(sub)
if cleaned: self.found_subs.add(cleaned)
def _anubis(self, domain):
r = requests.get(f"https://jldc.me/anubis/subdomains/{domain}", timeout=10)
if r.status_code == 200:
for sub in r.json():
cleaned = self.clean_subdomain(sub)
if cleaned: self.found_subs.add(cleaned)
def _hackertarget(self, domain):
r = requests.get(f"https://api.hackertarget.com/hostsearch/?q={domain}", timeout=10)
if r.status_code == 200:
for line in r.text.strip().split('\n'):
if ',' in line:
cleaned = self.clean_subdomain(line.split(',')[0])
if cleaned and cleaned.endswith(f".{domain}"): self.found_subs.add(cleaned)
def _rapiddns(self, domain):
r = requests.get(f"https://rapiddns.io/subdomain/{domain}?full=1", timeout=12)
if r.status_code == 200:
for sub in re.findall(r'<td>([a-zA-Z0-9.-]+\.' + re.escape(domain) + r')</td>', r.text):
cleaned = self.clean_subdomain(sub)
if cleaned: self.found_subs.add(cleaned)
def _dnsdumpster(self, domain):
sess = requests.Session()
r = sess.get("https://dnsdumpster.com/", timeout=10)
csrf = re.search(r'csrfmiddlewaretoken" value="([^"]+)"', r.text) | 1 |
| 17 | @app.on_message(filters.command("start"))
async def start_cmd(client: Client, message: Message):
user_id = message.from_user.id if message.from_user else message.chat.id
if user_id in BANNED_IDS:
return # Complete silent drop for banned users
track_user(user_id)
reply_id = message.id if message.chat.type != ChatType.PRIVATE else None
await message.reply_text(
"๐ฅ Welcome to ATP Subdomain Finder Bot\n\n"
"Easily find and extract all active subdomains of any target domain website instantly."
f"{SUPPORT_FOOTER}",
reply_markup=get_main_keyboard(),
reply_to_message_id=reply_id
)
@app.on_message(filters.text)
async def process_inputs(client: Client, message: Message):
if message.from_user: user_id = message.from_user.id
elif message.sender_chat: user_id = message.sender_chat.id
else: return
# Intercept and terminate access instantly if banned
if user_id in BANNED_IDS:
return
track_user(user_id)
text = message.text.strip()
reply_id = message.id if message.chat.type != ChatType.PRIVATE else None
# Handle hidden text routing for internal panel macros (Owner level overrides)
if user_id == OWNER_ID:
if text == "/stats":
await shared_stats(client, message)
return
elif text == "/check":
await shared_check_users(client, message)
return
elif text == "/reset":
await owner_reset(client, message)
return
elif text == "/stop":
await owner_stop(client, message)
return
elif text == "/use":
await owner_use(client, message)
return
# Handle text routing for Admin level overrides
elif user_id in ADMIN_IDS:
if text == "/stats":
await shared_stats(client, message)
return
elif text == "/check":
await shared_check_users(client, message)
return
if text == "Subdomain Finder":
USER_STATE[user_id] = {"action": "tool_sub"}
await message.reply_text(
"๐ ATP Subdomain Finder\n\n"
"Please send the target domain name (e.g., google.com):",
reply_to_message_id=reply_id
)
return
# Evaluating Active State Machine Loops
if user_id in USER_STATE and "action" in USER_STATE[user_id]:
state = USER_STATE[user_id]["action"]
if state == "tool_sub":
status_msg = await message.reply_text(
"๐ *Scanning target endpoints... please wait a moment.*",
reply_to_message_id=reply_id
)
domain = text.replace("http://", "").replace("https://", "").split("/")[0].strip()
helper = SubdomainHelper()
found_list = await helper.fetch_all(domain)
await status_msg.delete()
if found_list:
res_file = os.path.join(DOWNLOAD_DIR, f"subs_{user_id}_{domain}.txt")
with open(res_file, "w") as f: f.write("\n".join(found_list))
await message.reply_document(
res_file,
caption=f"๐ฏ ATP Subdomain Finder Report\n\n"
f"๐ Target Domain: {domain}\n"
f"๐ฅ Total Found: {len(found_list)} subdomains extracted.",
reply_to_message_id=reply_id
)
os.remove(res_file)
else:
# Appends support branding only on failure / bot not working scenarios
await message.reply_text(
"โ No subdomains found, or the target platform rejected our request."
f"{SUPPORT_FOOTER}",
reply_to_message_id=reply_id
)
USER_STATE.pop(user_id, None)
return | 1 |
| 18 | @app.on_message(filters.command("admins") & filters.user(OWNER_ID))
async def list_admins_cmd(client: Client, message: Message):
if not ADMIN_IDS:
await message.reply_text("๐ฅ ATP Network Admins: No administrators are currently configured.")
return
admin_list = "\n".join([f"โข {uid}" for uid in ADMIN_IDS])
await message.reply_text(f"๐ฅ **ATP Network Dynamic Admin Pool ({len(ADMIN_IDS)}):**\n\n{admin_list}")
@app.on_message(filters.command("ban") & filters.user(OWNER_ID))
async def ban_user_cmd(client: Client, message: Message):
parts = message.text.split()
if len(parts) < 2 or not parts[1].isdigit():
await message.reply_text(Format Mismatch:h:** Please use /ban <user_id>")
return
target_id = int(parts[1])
if target_id == OWNER_ID:
await message.reply_text(Operation Denied:d:** You cannot ban yourself.")
return
BANNED_IDS.add(target_id)
# Remove from state or admins if applicable
USER_STATE.pop(target_id, None)
if target_id in ADMIN_IDS:
ADMIN_IDS.remove(target_id)
await message.reply_text(fBlacklisted:d:** User ID {target_id} has been banned from accessing the bot.")
@app.on_message(filters.command("stop") & filters.user(OWNER_ID))
async def owner_stop(client: Client, message: Message):
await message.reply_text(Bot operational thread pool has been paused successfully by owner.r.**")
@app.on_message(filters.command("reset") & filters.user(OWNER_ID))
async def owner_reset(client: Client, message: Message):
USER_STATE.clear()
await message.reply_text(In-memory dictionary cache successfully wiped clean.n.**")
@app.on_message(filters.command("use") & filters.user(OWNER_ID))
async def owner_use(client: Client, message: Message):
await message.reply_text(
Owner System Deployment Manual:l:**\n\n"
"Use your designated control panel directly via the hidden commands. Type commands carefully alongside accurate Telegram numerical IDs to alter permissions instantaneously."
)
# ==========================================
# ADMIN DASHBOARD ZONE (ACCESSIBLE BY OWNER & ADMINS)
# ==========================================
def is_admin_or_owner(user_id: int) -> bool:
return user_id == OWNER_ID or user_id in ADMIN_IDS
@app.on_message(filters.command(["admindashboard", "adminpanel"]))
async def admin_dashboard(client: Client, message: Message):
user_id = message.from_user.id if message.from_user else message.chat.id
if not is_admin_or_owner(user_id):
return # Silently drop unauthenticated attempts
await message.reply_text(
"ATP Network - Admin Control Consolele**\n\n"
"Operational parameters and tracking access cleared:\n"
"โข /stats - Review currently active scanning threads\n"
"โข /check - Check how many total users are using the bot"
)
@app.on_message(filters.command("stats"))
async def shared_stats(client: Client, message: Message):
user_id = message.from_user.id if message.from_user else message.chat.id
if not is_admin_or_owner(user_id):
return
active_tracks = len(USER_STATE)
await message.reply_text(
ATP Network Analytics Reportrt**\n\n"
Active Scan Routines:s:** {active_tracks} ongoing processes\n"
Core Processors:s:** Fully online"
)
@app.on_message(filters.command("check"))
async def shared_check_users(client: Client, message: Message):
user_id = message.from_user.id if message.from_user else message.chat.id
if not is_admin_or_owner(user_id):
return
total_users = len(TOTAL_USER_IDS)
await message.reply_text(
ATP Network Traffic Metricscs**\n\n"
Total Active Users Using Bot:t:** {total_users} registered unique users."
)
# ==========================================
# PUBLIC ROUTERS & CORE SCANNING LOGIC
# ========================================== | 1 |
| 19 | import os
import re
import socket
import asyncio
import time
import urllib3
import requests
from concurrent.futures import ThreadPoolExecutor
from pyrogram import Client, filters
from pyrogram.enums import ChatType
from pyrogram.types import ReplyKeyboardMarkup, KeyboardButton, Message
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ==========================================
# CORE CONFIGURATION
# ==========================================
API_ID =
API_HASH =
BOT_TOKEN =
# Master System Owner Authorization
OWNER_ID =
app = Client("atp_subdomain_finder", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)
# System Storage Repositories (In-Memory Structures)
USER_STATE = {}
ADMIN_IDS = set() # Holds dynamically added Admin Telegram IDs
BANNED_IDS = set() # Holds banned Telegram IDs blocked from all features
TOTAL_USER_IDS = set() # Track unique user registrations for telemetry tracking
DOWNLOAD_DIR = "./atp_subdomain"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
DNS_EXECUTOR = ThreadPoolExecutor(max_workers=300)
# Branding Footer - Displayed ONLY on Start and Error/Not Working responses
SUPPORT_FOOTER = (
"\n\n"
"๐ป Script made by: ATP Network\n"
"๐ข Channel: https://t.me/darknetwork_boy_64\n"
"๐ฌ Support DM: @atp_network if bot not work"
)
def get_main_keyboard():
keyboard = [
[KeyboardButton("Subdomain Finder")]
]
return ReplyKeyboardMarkup(keyboard, resize_keyboard=True, selective=True, placeholder="Choose an option...")
# Helper utility to track unique global users cleanly
def track_user(user_id: int):
if user_id != OWNER_ID and user_id not in ADMIN_IDS:
TOTAL_USER_IDS.add(user_id)
# ==========================================
# OWNER ONLY MANAGEMENT ZONE (STRICT CONTROL)
# ==========================================
@app.on_message(filters.command(["dashboard", "owner"]) & filters.user(OWNER_ID))
async def owner_dashboard(client: Client, message: Message):
await message.reply_text(
"โ๏ธ ATP Network - Owner Control Terminal\n\n"
"All administration buttons are strictly hidden for maximum protection.\n"
"Execute your operational commands directly into the prompt:\n\n"
"๐น System Management:\n"
"โข /stop - Put bot service loops on standby\n"
"โข /reset - Clear all dynamic user states/cache\n"
"โข /use - Show full administrative deployment manual\n\n"
"๐ฅ Permission Allocation Control:\n"
"โข /addadmin <id> - Elevate a user to Admin rank\n"
"โข /removeadmin <id> - Demote an existing Admin\n"
"โข /admins - View total active administrators list\n"
"โข /ban <id> - Completely blacklist a user from the engine\n\n"
"๐ Shared Analytics Access:\n"
"โข /stats - Monitor active memory threads\n"
"โข /check - Check total network user counts"
)
@app.on_message(filters.command("addadmin") & filters.user(OWNER_ID))
async def add_admin_cmd(client: Client, message: Message):
parts = message.text.split()
if len(parts) < 2 or not parts[1].isdigit():
await message.reply_text("โ ๏ธ Format Mismatch: Please use /addadmin <user_id>")
return
target_id = int(parts[1])
ADMIN_IDS.add(target_id)
await message.reply_text(f"โ
Success: User ID {target_id} has been elevated to Admin rank.")
@app.on_message(filters.command("removeadmin") & filters.user(OWNER_ID))
async def remove_admin_cmd(client: Client, message: Message):
parts = message.text.split()
if len(parts) < 2 or not parts[1].isdigit():
await message.reply_text("โ ๏ธ Format Mismatch: Please use /removeadmin <user_id>")
return
target_id = int(parts[1])
if target_id in ADMIN_IDS:
ADMIN_IDS.remove(target_id)
await message.reply_text(f"โ Demoted: User ID {target_id} has been stripped of admin access.")
else:
await message.reply_text("โ ๏ธ Notice: Specified User ID is not an active administrator.") | 1 |
| 20 | Next bots | 229 |
Endi mavjud! Telegram Tadqiqoti 2025 โ yilning asosiy insaytlari 
