IPTV有效源检测
接着发现,在 GitHub 上获取到的,国内的 IPTV 的源,基本上,都是用不了的,气死了。因此折腾了半天,我在想,为什么不写一个程序先检测是否有效,再用呢?而且不同渠道获取的 M3U 源,格式也不一样,有些无法在管理系统里直接使用的,所以就想着写个 Python 程序解决,也就当做重温当年写爬虫的技术了。
使用 Python 编写程序来检测 M3U 文件中 IPTV 直播源的有效性。整个需求和代码迭代过程,分为几个步骤:
- 基础程序构建:首先,我们创建了一个基本的Python脚本,它能读取链接中的直播流URL,并利用
requests
库检查每个URL的有效性。import requests
def check_stream(url):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
print(f"Stream is active: {url}")
else:
print(f"Stream is inactive: {url}")
except requests.RequestException as e:
print(f"Error checking stream {url}: {e}")
# 主程序
stream_url = 'http://your_iptv_stream_address.m3u'
check_stream(stream_url)
- 文件读取:想到我的都是下载好的M3U文件,因此,需要加入文件读取的功能。注意,在Windows环境下,路径是两个反斜杠的。如:I:\\Python\\IPTV.m3u
import requests
def check_stream(url):
try:
response = requests.get(url, timeout=10)
return response.status_code == 200
except requests.RequestException:
return False
def check_m3u_file(file_path):
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if line.startswith('http'):
if check_stream(line):
print(f"Stream is active: {line}")
else:
print(f"Stream is inactive: {line}")
# 主程序
m3u_file_path = 'I:\\Python\\IPTV.m3u'
check_m3u_file(m3u_file_path)
然后,在文件读取的过程中,发现了以下的报错,好像是文件里文本字符集的问题。
UnicodeDecodeError: 'gbk' codec can't decode byte 0xaa in position 685: illegal multibyte sequence
因此需要指定字符集格式读取。
with open(file_path, 'r', encoding='utf-8') as file:
-
用户代理伪装:考虑到之前的爬虫,需要进行伪装,所以考虑某些直播源也可能需要浏览器伪装,我们更新了代码,添加了Chrome用户代理的头部信息。同时担心频繁的访问,就又加入了一个随机延时的函数。在最后想了想,发现应该是用不上的,因为我每个链接也就请求一次。所以,最后整个函数如下。
# 检查单个直播流是否有效的函数
def check_stream(name_url_pair):
# 分离出频道名称和URL
name, url = name_url_pair
# 设置HTTP请求头,伪装成Chrome浏览器
headers = {'User-Agent': 'Mozilla/5.0 ... Chrome/58.0.3029.110 Safari/537.3'}
try:
# 向URL发送请求
response = requests.get(url, headers=headers, timeout=10)
# 随机延时
time.sleep(random.randint(1, 5))
# 检查响应状态码,判断是否有效
if response.status_code == 200:
print(f"{name}: 有效源")
return True
else:
print(f"{name}: 无效源")
return False
except requests.RequestException:
print(f"{name}: 无效源")
return False
-
并行处理引入:后面想到了,Python 都是单线程的操作,所以一条一条的检测,会非常慢。为了提高效率,我们引入了
concurrent.futures
库,使用多线程并行检查直播流,大大加快了处理速度。# 一次32个并发,检查多个直播流
def check_streams(urls, max_workers=32):
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(check_stream, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url, is_active = future.result()
print(f"{'有效源' if is_active else '无效源'}: {url}")
-
进度条功能添加:程序运行的过程中,一点反应都看不见,然后就想到,是不是可以增加进度条显示之类的功能?所以在这里,为了看到程序执行的进度,又加入了
tqdm
库,提供了一个直观的进度条。# 一次32个并发,检查多个直播流
def check_streams(urls, max_workers=32):
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 创建一个进度条
tasks = {executor.submit(check_stream, url): url for url in urls}
for future in tqdm(concurrent.futures.as_completed(tasks), total=len(urls), desc="Checking Streams"):
url, is_active = future.result()
print(f"{'有效源' if is_active else '无效源'}: {url}")
-
结果实时输出:如果我想把进度条,不再按整体任务显示,而是针对每个线程的。每完成一个检测,就输出一个结果,可以根据多线程进行并列显示进检测度条。程序被进一步修改,以便在检测每个链接后立即输出其有效性。所以需要的调整一下循环的嵌套,每个流的检查是通过在
check_stream
函数内部使用tqdm
来独立跟踪的。每完成一个任务,就会更新进度条并打印出该任务的结果。def check_stream(name, url):
# 创建一个进度条
with tqdm(total=1, desc=f"Checking {name}") as progress_bar:
try:
# 设置HTTP请求头,伪装成Chrome浏览器
headers = {'User-Agent': 'Mozilla/5.0 ... Chrome/58.0.3029.110 Safari/537.3'}
# 向URL发送请求
response = requests.get(url, headers=headers, timeout=10)
# 随机延时
time.sleep(random.randint(1, 5))
# 完成任务,更新进度条
progress_bar.update(1)
return name, response.status_code == 200
except requests.RequestException:
progress_bar.update(1)
return name, False
-
输出有效流列表:最后,我们修改了程序,在所有任务完成后,输出一个包含所有有效流的总列表,并将这些信息写入一个新的M3U文件。
def check_streams(name_url_pairs, max_workers=10):
active_streams = [] # 存储有效流的列表
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_name_url = {executor.submit(check_stream, name, url): (name, url) for name, url in name_url_pairs}
for future in future_to_name_url:
name, url = future_to_name_url[future]
is_active = future.result()[1]
if is_active:
active_streams.append((name, url)) # 如果流有效,添加到列表
# 打印所有有效的流
print("\n有效的流:")
for name, url in active_streams:
print(f"{name}: {url}")
-
逐个写入:然后在运行的过程中,发现,如果我的 M3U 文件过大,就无法存储那么多的列表,所以就把程序又改成了每发现一个有效的流,就往里面写入一条。同时还是需要按照我指定的格式写入到文件里。
# 将有效的链接写入新的M3U文件
with open('有效的源.m3u', 'w', encoding='utf-8') as f:
for name, url in active_streams:
f.write(f"{name},{url}\n")
最后就是附上中文注释的完整代码:
import requests
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import random
import time
import re
# 检查单个直播流是否有效的函数
def check_stream(name_url_pair):
name, url = name_url_pair # 分离出频道名称和URL
# 设置HTTP请求头,伪装成浏览器
headers = {'User-Agent': 'Mozilla/5.0 ... Chrome/58.0.3029.110 Safari/537.3'}
try:
response = requests.get(url, headers=headers, timeout=10) # 向URL发送请求
time.sleep(random.randint(1, 5)) # 随机延时,模拟人类用户
# 检查响应状态码,判断是否有效
if response.status_code == 200:
print(f"{name}: 有效")
# 将有效的流写入文件
with open('有效的源.m3u', 'a', encoding='utf-8') as f:
f.write(f"{name},{url}\n")
return True
else:
print(f"{name}: 无效")
return False
except requests.RequestException:
print(f"{name}: 无效")
return False
# 检查多个直播流
def check_streams(name_url_pairs, max_workers=100):
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# tqdm显示进度条
list(tqdm(executor.map(check_stream, name_url_pairs), total=len(name_url_pairs)))
# 从M3U文件读取并解析每个直播流的名称和URL
def read_m3u_file(file_path):
name_url_pairs = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 正则表达式匹配频道名称
if line.startswith('#EXTINF'):
match = re.search(r'tvg-id="([^"]+)"', line)
if match:
current_name = match.group(1)
# 获取URL
elif line.startswith('http'):
url = line.strip()
name_url_pairs.append((current_name, url))
return name_url_pairs
# 主程序执行
m3u_file_path = 'index.nsfw.m3u' # M3U文件路径
name_url_pairs = read_m3u_file(m3u_file_path) # 读取并解析M3U文件
check_streams(name_url_pairs) # 检查并输出有效的直播流
此代码首先读取并解析 M3U 文件中的每个直播流的名称和 URL,然后并行检查每个流的有效性,并输出结果。最后,它会逐个将有效的流的信息写入一个新的 M3U 文件。