Zapping Through Multicast Madness: A Fun Python Script to Keep Your IPTV Streams Rocking!
Hey there, stream wranglers! If you’re in the wild world of IPTV—think hotels, cruise ships, or sports bars blasting live channels to a gazillion screens—you know multicast UDP streams are the unsung heroes of efficient video delivery. But UDP is like that carefree friend who doesn’t check in, leaving you wondering, "Is this stream even alive?"
In this post, we’re looking at a Python script I built to help a friend whose streams randomly drop. It checks both multicast and unicast UDP streams, and if one goes down, it can (optionally) send a heads-up via Telegram. It’s got retries, scheduling, and just enough socket magic to keep things running smooth. Let’s take a look under the hood!
Table of Contents
- What’s the Deal with Multicast UDP Streams?
- What’s This Script All About?
- The Code: A Party of Sockets and Streams
- Breaking Down the Dance Moves
- Real-World Applications
- Improvements to Spice It Up
- Watch Out for These Gotchas
- Final Curtain Call
π§♂️ What’s the Deal with Multicast UDP Streams?
Picture this: you’re streaming the big game to 500 hotel rooms. Sending 500 separate streams? Nightmare. That’s where multicast swoops in like a superhero. Instead of spamming packets to every TV, the server sends one stream to a special multicast IP (in the 224.0.0.0
to 239.255.255.255
range). Routers, like savvy mail carriers, only copy the stream to networks where devices have RSVP’d (via IGMP, the Internet Group Management Protocol).
Here’s the multicast magic in action:
- Server: Sends a single UDP stream to, say,
239.1.1.1:1234
. - Routers: Use IGMP to figure out which subnets want in. Only those get the packets.
- Clients: TVs or set-top boxes “join” the multicast group and sip the stream.
Why’s this cool? It saves tons of bandwidth, making it perfect for IPTV, live sports, or any “one-to-many” streaming gig.
But here’s the catch: UDP is a fire-and-forget protocol. No handshakes, no “you good?” texts. A stream can seem alive just because a socket binds to its port, but no actual video data might be flowing. That’s where our script struts in, ready to separate the live streams from the ghosted ones.
π― What’s This Script All About?
This Python script is like a streaming detective, checking if your UDP streams (multicast or unicast) are actually delivering the goods. Here’s the vibe:
What the Script Does
- Reads a JSON config file with your stream URLs (with names!) and Telegram details.
- Parses UDP URLs to grab host and port, handling formats like udp://@.
- Binds sockets to listen for streams (and joins multicast groups if needed).
- Checks for real data (if you want the hardcore truth).
- Retries flaky streams to avoid crying wolf.
- Sends Telegram alerts if streams go dark, using channel names for clarity.
- Runs on a schedule, checking every hour (and runs immediately on startup).
π§π» The Code: A Party of Sockets and Streams
Here’s the script, packed with comments to keep things crystal clear. We’ll zoom into the juicy bits afterward.
import socket
import time
from urllib.parse import urlparse
import logging
import requests
import json
from apscheduler.schedulers.background import BackgroundScheduler
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load configuration from JSON
try:
with open('config.json', 'r') as f:
config = json.load(f)
TELEGRAM_BOT_TOKEN = config['telegram']['bot_token']
TELEGRAM_CHAT_ID = config['telegram']['chat_id']
TELEGRAM_API_URL = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
udp_streams = config['streams']['udp']
if not udp_streams:
logger.error("No UDP streams found in configuration")
exit(1)
except (KeyError, FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"Failed to load configuration: {e}")
exit(1)
def send_telegram_message(message):
try:
payload = {
'chat_id': TELEGRAM_CHAT_ID,
'text': message
}
response = requests.post(TELEGRAM_API_URL, json=payload, timeout=10)
response.raise_for_status()
logger.info(f"Telegram message sent: {message}")
except requests.RequestException as e:
logger.error(f"Failed to send Telegram message: {e}")
def parse_udp_url(udp_url):
try:
if udp_url.startswith("udp://@"):
udp_url = udp_url.replace("udp://@", "udp://", 1)
parsed = urlparse(udp_url)
if parsed.scheme != 'udp':
raise ValueError(f"Invalid scheme for {udp_url}. Expected 'udp'.")
host = parsed.hostname
port = parsed.port
if not host or not port:
raise ValueError(f"Invalid UDP URL format: {udp_url}")
return host, port
except Exception as e:
logger.error(f"Failed to parse UDP URL {udp_url}: {e}")
return None, None
def check_udp_stream(host, port, timeout=10, require_data=False):
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(timeout)
is_multicast = host.startswith("224.") or host.startswith("225.") or \
host.startswith("226.") or host.startswith("227.") or \
host.startswith("228.") or host.startswith("229.") or \
host.startswith("230.") or host.startswith("231.") or \
host.startswith("232.") or host.startswith("233.") or \
host.startswith("234.") or host.startswith("235.") or \
host.startswith("236.") or host.startswith("237.") or \
host.startswith("238.") or host.startswith("239.")
logger.debug(f"Checking {host}:{port} (Multicast: {is_multicast})")
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', port))
if is_multicast:
import struct
mreq = struct.pack("4sl", socket.inet_aton(host), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
logger.debug(f"Joined multicast group {host}")
if require_data:
try:
logger.debug(f"Waiting for data from {host}:{port}")
data, addr = sock.recvfrom(4096)
if data:
logger.info(f"Stream {host}:{port} is ACTIVE (received {len(data)} bytes from {addr})")
return True
except socket.timeout:
logger.warning(f"Stream {host}:{port} is INACTIVE (no data received within {timeout}s)")
return False
else:
logger.info(f"Stream {host}:{port} is ACTIVE (binding succeeded)")
return True
except Exception as e:
logger.error(f"Error checking stream {host}:{port}: {e}")
return False
finally:
if sock:
sock.close()
logger.debug(f"Closed socket for {host}:{port}")
def check_channels(streams, timeout=10, retry_attempts=2, retry_delay=2, require_data=False):
results = {}
down_streams = []
for stream in streams:
name = stream.get("name")
url = stream.get("url")
if not name or not url:
logger.error(f"Missing name or url in stream: {stream}")
continue
host, port = parse_udp_url(url)
if not host or not port:
results[name] = {'status': 'INVALID', 'error': 'Invalid URL'}
down_streams.append((name, 'Invalid URL'))
continue
for attempt in range(retry_attempts):
logger.info(f"Checking {name} ({url}) - Attempt {attempt + 1}/{retry_attempts}")
if check_udp_stream(host, port, timeout, require_data):
results[name] = {'status': 'ACTIVE', 'error': None}
break
else:
results[name] = {'status': 'INACTIVE', 'error': 'No response or timeout'}
if attempt < retry_attempts - 1:
logger.info(f"Retrying {name} after {retry_delay} seconds...")
time.sleep(retry_delay)
else:
down_streams.append((name, 'No response or timeout'))
if down_streams:
message = "π¨ IPTV Stream Alert π¨\nThe following channels are DOWN:\n"
for name, error in down_streams:
message += f"- {name}: {error}\n"
send_telegram_message(message)
return results
def scheduled_task():
logger.info("Starting scheduled UDP stream check...")
results = check_channels(udp_streams, timeout=10, retry_attempts=2, retry_delay=2, require_data=True)
logger.info("\nChannel Status Report:")
logger.info("-" * 50)
for name, info in results.items():
status = info['status']
error = info['error'] if info['error'] else 'None'
logger.info(f"Channel: {name}\nStatus: {status}\nError: {error}\n")
def main():
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_task, 'interval', hours=1)
logger.info("Starting scheduler and running first check immediately...")
scheduled_task() # Run immediately on startup
scheduler.start()
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
logger.info("Shutting down scheduler...")
scheduler.shutdown()
if __name__ == "__main__":
main()
πΊ Breaking Down the Dance Moves
Let’s shimmy through the script’s key moves and shine a spotlight on multicast and what’s happening under the hood.
π Config Loading: The Guest List
with open('config.json', 'r') as f:
config = json.load(f)
The script starts by loading a config.json
file, which lists your UDP streams (with names) and Telegram credentials (for those “OMG, the stream’s down!” alerts). Here’s what it looks like:
{
"telegram": {
"bot_token": "443803693:AAE7CQhMNiHoOGf3IG79FVi3X1pjFxLGb1E",
"chat_id": "-1002100841898"
},
"streams": {
"udp": [
{ "name": "KBS WORLD", "url": "udp://@239.1.1.1:1234" },
{ "name": "AXN", "url": "udp://@239.1.1.2:1234" }
]
}
}
Each stream has a name (e.g., “KBS WORLD”) and a URL, making logs and alerts super readable. Think of this as the VIP list for your stream party—no config, no entry!
π Parsing UDP URLs: Checking IDs at the Door
def parse_udp_url(udp_url):
if udp_url.startswith("udp://@"):
udp_url = udp_url.replace("udp://@", "udp://", 1)
parsed = urlparse(udp_url)
if parsed.scheme != 'udp':
raise ValueError(f"Invalid scheme for {udp_url}. Expected 'udp'.")
This function is the bouncer, making sure each stream URL is legit (udp://host:port
). It handles a quirky format—udp://@
(common in some IPTV setups)—by stripping the @
before parsing. It grabs the host and port, and if the URL’s sketchy, it gets kicked out with a log message.
π΅️♂️ Checking Streams: The Multicast VIP Check
def check_udp_stream(host, port, timeout=10, require_data=False):
This is the script’s piΓ¨ce de rΓ©sistance, where it decides if a stream is rocking or flopping. Let’s break it down with a focus on multicast:
π§ Spin Up a UDP Socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(timeout)
We create a UDP socket, ready to listen for packets. The timeout ensures we don’t wait forever for a dead stream.
❓ Is It Multicast?
is_multicast = host.startswith("224.") or host.startswith("225.") or \
host.startswith("226.") or host.startswith("227.") or \
host.startswith("228.") or host.startswith("229.") or \
host.startswith("230.") or host.startswith("231.") or \
host.startswith("232.") or host.startswith("233.") or \
host.startswith("234.") or host.startswith("235.") or \
host.startswith("236.") or host.startswith("237.") or \
host.startswith("238.") or host.startswith("239.")
Multicast IPs live in 224.0.0.0
to 239.255.255.255
. The script checks if the host falls in this range to determine if it’s a multicast stream.
π Bind the Socket:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', port))
Binding to ''
(all interfaces) lets the socket listen for packets on the given port. SO_REUSEADDR
prevents “port already in use” errors if the socket’s reused quickly.
π§π€π§ Join the Multicast Club (if needed):
if is_multicast:
import struct
mreq = struct.pack("4sl", socket.inet_aton(host), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
Here’s where multicast gets spicy. To receive multicast packets, the socket must join the multicast group using IGMP.
π¬ Multicast Deep Dive:
When a device joins a multicast group, it sends an IGMP “join” message to the local router. The router then forwards packets for that group to the device’s subnet. This is why your network gear needs to support IGMP snooping—without it, multicast packets might flood every port or not arrive at all.
π‘ Check for Real Data (Optional):
if require_data:
try:
logger.debug(f"Waiting for data from {host}:{port}")
data, addr = sock.recvfrom(4096)
if data:
logger.info(f"Stream {host}:{port} is ACTIVE (received {len(data)} bytes from {addr})")
return True
except socket.timeout:
logger.warning(f"Stream {host}:{port} is INACTIVE (no data received within {timeout}s)")
return False
Binding alone doesn’t prove a stream’s alive—someone could just be squatting on the port. If require_data=True
, the script waits for actual packets (like video data).
π§Ή Clean Up:
sock.close()
Always close the socket to avoid lingering resources. Nobody likes a messy party!
π What’s Happening with Multicast?
The script is mimicking an IPTV client (like a set-top box). By joining the multicast group and listening for packets, it confirms the stream is flowing from the server through the network. If no data arrives, either the server’s down, the network’s misconfigured (e.g., IGMP issues), or the stream’s just not there.
π Retries: Giving Streams a Second Chance
for attempt in range(retry_attempts):
logger.info(f"Checking {name} ({url}) - Attempt {attempt + 1}/{retry_attempts}")
if check_udp_stream(host, port, timeout, require_data):
results[name] = {'status': 'ACTIVE', 'error': None}
break
Networks can be flaky—packets drop, routers hiccup. The script retries failed streams (default: 2 attempts, 2-second delay) to avoid false alarms. It uses channel names (e.g., “KBS WORLD”) in logs, making it easy to see what’s up. If it’s still dead after retries, it’s officially down.
π’ Telegram Alerts: The Drama Queen
if down_streams:
message = "π¨ IPTV Stream Alert π¨\nThe following channels are DOWN:\n"
for name, error in down_streams:
message += f"- {name}: {error}\n"
send_telegram_message(message)
If streams go dark, the script sends a Telegram alert with channel names for clarity:
π¨ IPTV Stream Alert π¨
The following channels are DOWN:
- KBS WORLD: No response or timeout
- AXN: No response or timeout
You can disable this by skipping the Telegram config in config.json. But who doesn’t love a little drama?
⏰ Scheduling: The Party Never Stops
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_task, 'interval', hours=1)
scheduled_task() # Run immediately on startup
Using apscheduler
, the script checks streams every hour to keep things efficient. It also runs a check immediately on startup, so you don’t have to wait. Hit Ctrl+C
to shut it down gracefully.
π§ Real-World Applications
This script is a Swiss Army knife for IPTV warriors:
- IPTV Operators: Run it on edge servers to ensure every channel’s live, from ESPN to local news.
- NOC Teams: Pipe logs into Grafana or ElasticSearch for sexy dashboards showing stream health.
- System Integrators: Embed it in bigger monitoring suites for hotels, stadiums, or cruise ships.
- Home Enthusiasts: Got a home IPTV setup? Keep tabs on your streams and flex your tech skills.
π‘ Improvements to Spice It Up
Wanna take this script to the next level? Here’s some inspo:
- π Interface-Specific Binding: For servers with multiple NICs, bind to a specific interface to avoid multicast mix-ups.
- π SNMP Traps or Prometheus Exporters: Integrate with enterprise monitoring for instant alerts or metrics.
- π Web Dashboard: Build a Flask or FastAPI app to show real-time stream status in a browser.
- π§© Dynamic Stream Lists: Pull streams from a database or API instead of a static
config.json
. - π¬ Packet Analysis: Peek inside packets to verify they’re valid video data (e.g., MPEG-TS headers).
π¨ Watch Out for These Gotchas
Multicast Network Woes
- Switches need IGMP snooping enabled, or multicast packets might not reach you.
- Routers must support IGMP and forward multicast traffic.
- Firewalls can block UDP or IGMP—check your rules!
Binding Isn’t Enough
- A socket can bind to a port even if no data’s flowing. Use
require_data=True
for the real scoop.
- A socket can bind to a port even if no data’s flowing. Use
Port Conflicts
- If multiple streams share a port, binding will fail. Ensure unique ports or run checks in separate processes.
Socket Lingering
- Rapid checks can leave sockets in
TIME_WAIT
.SO_REUSEADDR
helps, but don’t check too frequently.
- Rapid checks can leave sockets in
π¬ Final Curtain Call
This script is your trusty sidekick for keeping IPTV streams in check—whether you're running a hotel entertainment system or a sports bar's wall of TVs.
It’s got the smarts to handle multicast madness, the tenacity to retry flaky streams, and the flair to ping you on Telegram when things go south—with named channels for extra clarity.
Flexible enough to fit into any monitoring setup—from a Raspberry Pi to a full-blown NOC.
π¦ Full Source Code & Project Repository: IPTV-Stream-Checker on GitHub