前几年笔者写过一篇使用EC20模块配合asterisk及freepbx实现短信转发和网络电话的文章。当时的目标是把一张手机卡从长期插电的旧手机里解放出来,让它可以通过Asterisk接打电话,并用Telegram等工具转发短信。几年后,笔者在某频道看到了拼多多上有商家在清仓DJI的一代增强图传模块,于是花30多元购买了一块。它的USB ID显示为2ca3:4006 DJI Technology Co., Ltd. Baiwang,但从网上的资料和AT命令看,它本质上仍然是高通平台上的EG25/EC25一类模块,不过无法刷入标准的Quectel固件。
在本文中,笔者尝试了通过修改PVID的方式,让linux的驱动将其作为标准的EC25设备对待,并请llm重写了收发短信的配置,以期得到更加稳定的短信体验。
TL;DR
- DJI/Baiwang模块出厂的USB ID是
2ca3:4006,Linux不会自动把它的AT串口接口绑定到option驱动,需要先临时加new_id,再通过AT命令把模块的USB ID改成Quectel常见的2c7c:0125,改完之后后续的操作就和EC25一样了。 AT+QCFG="usbnet",1可以把网卡模式设成ECM。这个设置对调试有用,但如果插的是漫游卡,要小心NetworkManager自动DHCP并加默认路由,导致偷跑流量。- 关于VoLTE,
AT+QCFG="ims"返回+QCFG: "ims",1,0说明IMS被强制启用,但VoLTE session不可用。但如果只收发短信,不一定需要VoLTE注册成功。这次实测在VoLTE没有注册的情况下,模块仍然能收到短信,并被Asterisk通过chan_quectel读出。 chan_quectel手工配置audio=和data=端口时,并不太关心USB设备的manufacturer/product字符串是不是Quectel。这块模块显示BAIWANG/Baiwang(叫这个名字估计是因为DJI最开始的办公楼在深圳南山区的百旺科技园),仍然可以正常作为quectel1接入。- Telegram Bot建议按设备拆成
tg-sms@quectel0、tg-sms@quectel1这样的systemd template service。每张卡一个bot token,共用同一份Python脚本和SQLite数据库,这样比较不容易误发。
识别DJI/Baiwang模块
插上模块后,一开始lsusb看到的是:
Bus 002 Device 003: ID 2ca3:4006 DJI Technology Co., Ltd. Baiwang
同时lsusb -t里可以看到它有多个接口,但接口0-3没有被option驱动绑定,只有网卡接口被cdc_ether认出来:
|__ Port 2: Dev 3, If 0, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 1, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 2, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 3, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 4, Class=Communications, Driver=cdc_ether
|__ Port 2: Dev 3, If 5, Class=CDC Data, Driver=cdc_ether
因为Linux的option驱动支持动态添加USB ID,所以我们可以先执行:
echo 2ca3 4006 > /sys/bus/usb-serial/drivers/option1/new_id
随后会多出几个串口,比如(如果机器上只有一台quectel设备,理论上ttyUSB是1/2/3/4,控制串口是ttyUSB2):
/dev/ttyUSB4 if00
/dev/ttyUSB5 if01
/dev/ttyUSB6 if02
/dev/ttyUSB7 if03
在这台机器上,/dev/ttyUSB6和/dev/ttyUSB7都能响应AT命令。ATI返回如下:
ATI
Baiwang
QDC507
Revision: QDC507GLEFM21
OK
搜索固件的revision,能找到一些quectel论坛上的帖子,看发帖时间估计都是在2026年6月份左右买了这个模块的人发的。
修改USB ID和usbnet模式
使用minicom -D /dev/ttyUSB6连接串口后,先查询原始配置:
AT+QCFG="usbnet"
+QCFG: "usbnet",3
AT+QCFG="usbid"
+QCFG: 11427,16390
11427,16390换成十六进制就是2ca3:4006。为了让内核和chan_quectel正确识别,可以把它改成Quectel EC25常见的2c7c:0125。十进制是11388,293:
AT+QCFG="usbnet",1
OK
AT+QCFG="usbid",11388,293
OK
AT+QCFG="usbnet"
+QCFG: "usbnet",1
AT+QCFG="usbid"
+QCFG: 11388,293
然后重启模块:
AT+CFUN=1,1
这块模块在CFUN重启后并没有立刻把外层USB descriptor切换过来,笔者又对它所在的USB端口做了一次unbind/bind:
echo 2-2 > /sys/bus/usb/drivers/usb/unbind
sleep 2
echo 2-2 > /sys/bus/usb/drivers/usb/bind
重新枚举后,lsusb里两块模块都变成了2c7c:0125:
Bus 002 Device 005: ID 2c7c:0125 Quectel Wireless Solutions Co., Ltd. EC25 LTE modem
Bus 002 Device 002: ID 2c7c:0125 Quectel Wireless Solutions Co., Ltd. EC25 LTE modem
虽然USB ID变成了Quectel,但manufacturer/product还是BAIWANG/Baiwang。不过这并不影响后续手工指定串口。
防止偷跑流量(适用于漫游卡,无流量的卡等)
模块改成ECM模式后,Debian上的NetworkManager可能会把模块网卡当成普通有线网卡,自动DHCP到192.168.225.x,并添加一条默认路由:
default via 192.168.225.1 dev enx...
如果卡是漫游卡,这可能会产生完全没必要的流量费用。笔者这台机器只把模块用于短信,因此直接让NetworkManager忽略所有modem网卡:
# /etc/NetworkManager/conf.d/99-usb-modem-unmanaged.conf
[keyfile]
unmanaged-devices=interface-name:enx*;interface-name:usb*;interface-name:wwan*
然后关掉已经拉起来的连接:
nmcli general reload
nmcli connection down "Wired connection 6"
nmcli connection down "Wired connection 7"
for i in enx* usb* wwan*; do
[ -e "/sys/class/net/$i" ] || continue
nmcli device set "$i" managed no
ip route del default dev "$i" 2>/dev/null || true
ip link set "$i" down
done
最后确认默认路由只剩主网卡:
ip route show
default via 192.168.0.1 dev eth0
网络注册
检查注册状态:
AT+COPS?
+COPS: 0,0,"Verizon Wireless",13
AT+CEREG?
+CEREG: 0,5
AT+QNWINFO
+QNWINFO: "FDD LTE","311480","LTE BAND 4",2050
CEREG: 0,5表示已经以漫游状态注册到EPS网络。信号可以用AT+QCSQ看:
AT+QCSQ
+QCSQ: "LTE",59,-94,169,-14
这里的-94大致是RSRP,作为收短信是够用的。
关于VoLTE和短信
根据Quectel的IMS Application Note,AT+QCFG="ims"返回格式为:
+QCFG: "ims",<IMS_conf>,<VoLTE_cap>
其中第二个值VoLTE_cap为1时,表示VoLTE session可用;为0时,说明当前VoLTE不可用。本次模块的状态是:
AT+QCFG="ims"
+QCFG: "ims",1,0
笔者尝试过ROW_Generic_3GPP和hVoLTE-Verizon等MBN,IMS都没有注册成功:
AT+QMBNCFG="Select"
+QMBNCFG: "Select",ROW_Generic_3GPP
AT+QCFG="ims"
+QCFG: "ims",1,0
这意味着它不适合拿来做VoLTE通话,chan_quectel也会提示:
[quectel1] Dongle has NO voice support
但这并不代表模块不能收短信。实测在IMS/VoLTE不可用时,模块仍然能收到短信通知:
+CMTI: "SM",0
+CMTI: "SM",1
+CMTI: "SM",2
接入Asterisk后,这些短信也可以被chan_quectel读出并送进dialplan。因此,如果需求只是收发短信,VoLTE配置不上也是没有关系的。真正需要VoLTE的情况是语音通话,或者某些运营商只允许SMS over IMS。
配置Asterisk里的第二个Quectel设备
这台机器上原本已经有一块正常的EG25:
/dev/ttyUSB0 if00
/dev/ttyUSB1 if01 audio
/dev/ttyUSB2 if02 AT/data
/dev/ttyUSB3 if03
DJI/Baiwang模块在第二个USB物理口:
/dev/ttyUSB4 if00
/dev/ttyUSB5 if01 audio
/dev/ttyUSB6 if02 AT/data
/dev/ttyUSB7 if03
为了避免ttyUSB序号变化,我们可以用/dev/serial/by-path制定设备。chan_quectel打开端口时会先realpath(),所以用symlink不会影响它的锁文件逻辑。
/etc/asterisk/quectel.conf里新增:
[quectel0]
audio=/dev/ttyUSB1
data=/dev/ttyUSB2
[quectel1]
audio=/dev/serial/by-path/pci-0000:07:1b.0-usb-0:2:1.1-port0
data=/dev/serial/by-path/pci-0000:07:1b.0-usb-0:2:1.2-port0
然后在Asterisk CLI里reload:
asterisk -rx "quectel reload now"
第一次初始化时,这张漫游卡在AT+COPS=0,0上花了比较久,chan_quectel一度超时。但模块自动选网完成后,后续重试成功了:
ID Group State RSSI Provider Name Model Firmware Number
quectel0 0 Free 22 Ultra EG25 EG25... ...
quectel1 0 Free 99 AT&T QDC507 QDC507GLEFM21 ...
quectel1的状态里可以看到:
SMS : Yes
Voice : No
GSM Registration Status : Registered, roaming
Provider Name : AT&T
对本文的目标来说,SMS: Yes已经足够。
更安全的短信入站dialplan
上一篇文章里,笔者直接在dialplan中用System(echo ...)保存短信。这种写法能用,但询问llm后,llm指出短信正文来自外部,直接拼进shell命令并不优雅,也有注入风险。因此这次改成AGI入口:
[incoming-mobile]
exten => sms,1,Verbose(Incoming SMS from ${CALLERID(num)} on ${QUECTELNAME})
exten => sms,n,AGI(/opt/tg-sms/bin/tg_sms_gateway.py,ingest-agi)
exten => sms,n,Hangup()
exten => ussd,1,Verbose(Incoming USSD: ${BASE64_DECODE(${USSD_BASE64})})
exten => ussd,n,System(echo '${STRFTIME(${EPOCH},,%Y-%m-%d %H:%M:%S)} - ${QUECTELNAME}: ${BASE64_DECODE(${USSD_BASE64})}' >> /var/log/asterisk/ussd.txt)
exten => ussd,n,Hangup()
AGI脚本通过GET VARIABLE SMS_BASE64读取Asterisk变量,在Python里base64解码,然后写入SQLite:
def run_agi_ingest(_args):
agi_env = agi_read_environment()
init_db()
sms_base64 = agi_get_variable("SMS_BASE64")
device = agi_get_variable("QUECTELNAME") or configured_device()
phone = agi_env.get("agi_callerid") or "unknown"
body = decode_sms_base64(sms_base64)
sms_id = record_inbound(device, phone, body)
agi_verbose(f"tg-sms queued inbound SMS #{sms_id} from {phone} on {device}", 1)
这样短信正文不再作为shell参数出现,也能自然记录来自哪个QUECTELNAME。
Telegram Bot按设备拆分
一开始笔者使用的是一个bot同时处理所有设备。后来发现如果有多张卡,最清晰的方式是每张卡一个bot,但共用同一份代码:
systemd template如下:
[Unit]
Description=Telegram SMS gateway for chan_quectel device %i
Wants=network-online.target
After=network-online.target asterisk.service
[Service]
Type=simple
User=asterisk
Group=asterisk
Environment=TG_SMS_DEVICE=%i
EnvironmentFile=/etc/tg-sms/%i.env
WorkingDirectory=/opt/tg-sms
ExecStart=/opt/tg-sms/venv/bin/python3 /opt/tg-sms/bin/tg_sms_gateway.py daemon
Restart=on-failure
RestartSec=5
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ProtectHome=true
ReadWritePaths=/var/lib/tg-sms /var/log/asterisk
[Install]
WantedBy=multi-user.target
每张卡一个env:
# /etc/tg-sms/quectel0.env
TG_SMS_BOT_TOKEN=123456:replace-me
TG_SMS_BOT_BASE_URL=https://api.telegram.org/bot
TG_SMS_ALLOWED_CHAT_IDS=11111111
TG_SMS_DEFAULT_DEVICE=quectel0
TG_SMS_DEVICE=quectel0
TG_SMS_DB=/var/lib/tg-sms/sms.db
TG_SMS_ASTERISK_CMD=/usr/sbin/asterisk
TG_SMS_LOG_LEVEL=INFO
quectel1.env只需要把token和device改掉:
TG_SMS_BOT_TOKEN=654321:replace-me
TG_SMS_DEFAULT_DEVICE=quectel1
TG_SMS_DEVICE=quectel1
启用服务:
systemctl enable --now [email protected]
systemctl enable --now [email protected]
注意:两个实例必须使用不同的Telegram bot token,否则Telegram的long polling会报:
Conflict: terminated by other getUpdates request
Python脚本
TG SMS Python Script
#!/opt/tg-sms/venv/bin/python3
import argparse
import asyncio
import base64
import datetime as dt
import logging
import os
import re
import sqlite3
import subprocess
import sys
from contextlib import contextmanager
DEFAULT_DB = "/var/lib/tg-sms/sms.db"
DEFAULT_ASTERISK = "/usr/sbin/asterisk"
DEFAULT_DEVICE = "quectel0"
DEFAULT_BASE_URL = "https://api.telegram.org/bot"
PHONE_RE = re.compile(r"^\+?\d{3,20}$")
DEVICE_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
def utc_now():
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()
def getenv(name, default=None):
value = os.environ.get(name)
if value is None or value == "":
return default
return value
def parse_allowed_chat_ids():
raw = getenv("TG_SMS_ALLOWED_CHAT_IDS", "")
ids = []
for item in raw.replace(";", ",").split(","):
item = item.strip()
if not item:
continue
try:
ids.append(int(item))
except ValueError:
raise SystemExit(f"Invalid chat id in TG_SMS_ALLOWED_CHAT_IDS: {item!r}")
if not ids:
raise SystemExit("TG_SMS_ALLOWED_CHAT_IDS is empty")
return ids
def normalize_phone(phone):
phone = (phone or "").strip()
phone = re.sub(r"[\s().-]+", "", phone)
if not PHONE_RE.fullmatch(phone):
raise ValueError("phone number must be 3-20 digits, optionally prefixed by +")
return phone
def normalize_device(device):
device = (device or DEFAULT_DEVICE).strip()
if not DEVICE_RE.fullmatch(device):
raise ValueError("invalid Quectel device id")
return device
def configured_device():
return normalize_device(getenv("TG_SMS_DEVICE", getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE)))
def split_message(text, limit=3900):
text = text or ""
if len(text) <= limit:
return [text]
parts = []
while text:
parts.append(text[:limit])
text = text[limit:]
return parts
def asterisk_quote(value):
value = (value or "").replace("\r", " ").replace("\n", " ")
value = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{value}"'
def connect_db():
db_path = getenv("TG_SMS_DB", DEFAULT_DB)
conn = sqlite3.connect(db_path, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
return conn
def init_db():
with connect_db() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS sms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
direction TEXT NOT NULL CHECK(direction IN ('inbound', 'outbound')),
created_at TEXT NOT NULL,
device TEXT NOT NULL,
phone TEXT NOT NULL,
body TEXT NOT NULL,
status TEXT NOT NULL,
telegram_chat_id INTEGER,
telegram_message_id INTEGER,
reply_to_sms_id INTEGER,
asterisk_result TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS telegram_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sms_id INTEGER NOT NULL REFERENCES sms(id) ON DELETE CASCADE,
chat_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(chat_id, message_id)
);
CREATE INDEX IF NOT EXISTS idx_sms_pending
ON sms(direction, status, id);
CREATE INDEX IF NOT EXISTS idx_telegram_notifications_lookup
ON telegram_notifications(chat_id, message_id);
"""
)
@contextmanager
def db_cursor():
conn = connect_db()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def record_inbound(device, phone, body):
device = normalize_device(device)
phone = (phone or "unknown").strip() or "unknown"
body = body or ""
with db_cursor() as conn:
cur = conn.execute(
"""
INSERT INTO sms(direction, created_at, device, phone, body, status)
VALUES('inbound', ?, ?, ?, ?, 'new')
""",
(utc_now(), device, phone, body),
)
return cur.lastrowid
def record_outbound(device, phone, body, status, chat_id=None, reply_to_sms_id=None, result=None, error=None):
with db_cursor() as conn:
cur = conn.execute(
"""
INSERT INTO sms(
direction, created_at, device, phone, body, status,
telegram_chat_id, reply_to_sms_id, asterisk_result, error
)
VALUES('outbound', ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(utc_now(), device, phone, body, status, chat_id, reply_to_sms_id, result, error),
)
return cur.lastrowid
def send_sms_via_asterisk(phone, body, device=None):
phone = normalize_phone(phone)
device = normalize_device(device or getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE))
if not body or not body.strip():
raise ValueError("SMS body is empty")
asterisk_bin = getenv("TG_SMS_ASTERISK_CMD", DEFAULT_ASTERISK)
command = f"quectel sms {device} {phone} {asterisk_quote(body)}"
proc = subprocess.run(
[asterisk_bin, "-rx", command],
text=True,
capture_output=True,
timeout=45,
)
output = (proc.stdout or "") + (proc.stderr or "")
output = output.strip()
if proc.returncode != 0:
raise RuntimeError(output or f"asterisk exited with code {proc.returncode}")
return output or "OK"
def agi_read_environment():
env = {}
while True:
line = sys.stdin.readline()
if line == "":
break
line = line.rstrip("\n")
if line == "":
break
key, _, value = line.partition(":")
env[key.strip()] = value.strip()
return env
def agi_command(command):
print(command, flush=True)
return sys.stdin.readline().strip()
def agi_escape(value):
return (value or "").replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def agi_verbose(message, level=1):
agi_command(f'VERBOSE "{agi_escape(message)}" {int(level)}')
def agi_get_variable(name):
response = agi_command(f"GET VARIABLE {name}")
match = re.search(r"result=1 \((.*)\)", response)
if match:
return match.group(1)
return ""
def decode_sms_base64(value):
value = (value or "").strip()
if not value:
return ""
try:
data = base64.b64decode(value, validate=False)
except Exception:
data = base64.b64decode(value + "=" * (-len(value) % 4), validate=False)
return data.decode("utf-8", errors="replace")
def run_agi_ingest(_args):
agi_env = agi_read_environment()
try:
init_db()
sms_base64 = agi_get_variable("SMS_BASE64")
device = agi_get_variable("QUECTELNAME") or getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE)
phone = agi_env.get("agi_callerid") or agi_env.get("agi_calleridname") or "unknown"
body = decode_sms_base64(sms_base64)
sms_id = record_inbound(device, phone, body)
agi_verbose(f"tg-sms queued inbound SMS #{sms_id} from {phone} on {device}", 1)
return 0
except Exception as exc:
print(f"tg-sms AGI ingest failed: {exc}", file=sys.stderr)
try:
agi_verbose(f"tg-sms ingest failed: {exc}", 1)
except Exception:
pass
return 1
class SmsBot:
def __init__(self):
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import (
Application,
ApplicationBuilder,
CallbackContext,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
self.InlineKeyboardButton = InlineKeyboardButton
self.InlineKeyboardMarkup = InlineKeyboardMarkup
self.Update = Update
self.Application = Application
self.ApplicationBuilder = ApplicationBuilder
self.CallbackContext = CallbackContext
self.CallbackQueryHandler = CallbackQueryHandler
self.CommandHandler = CommandHandler
self.ContextTypes = ContextTypes
self.MessageHandler = MessageHandler
self.filters = filters
token = getenv("TG_SMS_BOT_TOKEN")
if not token:
raise SystemExit("TG_SMS_BOT_TOKEN is empty")
base_url = getenv("TG_SMS_BOT_BASE_URL", DEFAULT_BASE_URL)
self.allowed_chat_ids = parse_allowed_chat_ids()
self.device = configured_device()
self.default_device = self.device
self.pending = {}
self.worker_task = None
self.app = (
ApplicationBuilder()
.token(token)
.base_url(base_url)
.post_init(self.post_init)
.post_shutdown(self.post_shutdown)
.build()
)
def is_allowed(self, update):
message = getattr(update, "effective_message", None)
if not message:
return False
return message.chat_id in self.allowed_chat_ids
async def require_allowed(self, update):
if self.is_allowed(update):
return True
message = getattr(update, "effective_message", None)
if message:
await message.reply_text("您没有权限使用这个 bot。")
return False
async def post_init(self, app):
init_db()
self.worker_task = asyncio.create_task(self.inbound_worker(app), name="inbound-worker")
logging.info("telegram sms gateway started for device %s", self.device)
async def post_shutdown(self, _app):
if self.worker_task:
self.worker_task.cancel()
try:
await self.worker_task
except asyncio.CancelledError:
pass
def help_text(self):
return "\n".join(
[
f"SMS 网关已在线:{self.device}",
"",
"/send <手机号> <短信内容> - 发送前确认",
"/send <手机号> - 进入分步发送",
"直接回复一条入站短信通知 - 回短信给原号码",
"/last [数量] - 最近短信",
"/status - Quectel 状态",
"/cancel - 取消当前发送",
]
)
async def cmd_start(self, update, _context):
if not await self.require_allowed(update):
return
await update.message.reply_text(self.help_text())
async def cmd_cancel(self, update, _context):
if not await self.require_allowed(update):
return
self.pending.pop(update.message.chat_id, None)
await update.message.reply_text("已取消。")
async def cmd_status(self, update, _context):
if not await self.require_allowed(update):
return
def get_status():
asterisk_bin = getenv("TG_SMS_ASTERISK_CMD", DEFAULT_ASTERISK)
proc = subprocess.run(
[asterisk_bin, "-rx", "quectel show devices"],
text=True,
capture_output=True,
timeout=20,
)
text = ((proc.stdout or "") + (proc.stderr or "")).strip() or "没有输出。"
lines = text.splitlines()
if len(lines) > 1:
selected = [line for line in lines[1:] if line.split(None, 1)[0:1] == [self.device]]
if selected:
return "\n".join(lines[:1] + selected)
return text
try:
text = await asyncio.to_thread(get_status)
except Exception as exc:
text = f"状态查询失败:{exc}"
await update.message.reply_text(text[:3900])
async def cmd_last(self, update, context):
if not await self.require_allowed(update):
return
limit = 5
if context.args:
try:
limit = min(max(int(context.args[0]), 1), 20)
except ValueError:
await update.message.reply_text("用法:/last [1-20]")
return
rows = await asyncio.to_thread(self.fetch_last, limit)
if not rows:
await update.message.reply_text("还没有短信记录。")
return
lines = []
for row in rows:
arrow = "in" if row["direction"] == "inbound" else "out"
body = row["body"].replace("\n", " ")
if len(body) > 180:
body = body[:177] + "..."
lines.append(
f"#{row['id']} {arrow} {row['created_at']} {row['device']} {row['phone']} [{row['status']}]\n{body}"
)
await update.message.reply_text("\n\n".join(lines)[:3900])
def fetch_last(self, limit):
with db_cursor() as conn:
return conn.execute(
"""
SELECT id, direction, created_at, device, phone, body, status
FROM sms
WHERE device = ?
ORDER BY id DESC
LIMIT ?
""",
(self.device, limit),
).fetchall()
async def cmd_send(self, update, context):
if not await self.require_allowed(update):
return
if not context.args:
await update.message.reply_text("用法:/send <手机号> [短信内容]")
return
try:
phone = normalize_phone(context.args[0])
except ValueError as exc:
await update.message.reply_text(f"手机号格式不对:{exc}")
return
body = " ".join(context.args[1:]).strip()
if not body:
self.pending[update.message.chat_id] = {"phone": phone, "device": self.device}
await update.message.reply_text(f"设备:{self.device}\n收件人:{phone}\n请输入短信内容,或发送 /cancel。")
return
await self.ask_send_confirmation(update.message, phone, body, device=self.device)
async def ask_send_confirmation(self, message, phone, body, reply_to_sms_id=None, device=None):
device = normalize_device(device or self.device)
self.pending[message.chat_id] = {
"device": device,
"phone": phone,
"body": body,
"reply_to_sms_id": reply_to_sms_id,
}
preview = body if len(body) <= 900 else body[:900] + "..."
keyboard = self.InlineKeyboardMarkup(
[
[
self.InlineKeyboardButton("发送", callback_data="send_confirm"),
self.InlineKeyboardButton("取消", callback_data="send_cancel"),
]
]
)
await message.reply_text(f"确认从 {device} 发送到 {phone}:\n{preview}", reply_markup=keyboard)
async def callback_send(self, update, _context):
query = update.callback_query
if query.message.chat_id not in self.allowed_chat_ids:
await query.answer("无权限", show_alert=True)
return
state = self.pending.pop(query.message.chat_id, None)
if query.data == "send_cancel":
await query.answer()
await query.edit_message_text("已取消。")
return
if not state:
await query.answer("没有待确认的短信。", show_alert=True)
return
await query.answer("正在发送...")
device = state.get("device", self.device)
phone = state["phone"]
body = state["body"]
reply_to_sms_id = state.get("reply_to_sms_id")
try:
result = await asyncio.to_thread(send_sms_via_asterisk, phone, body, device)
sms_id = await asyncio.to_thread(
record_outbound,
device,
phone,
body,
"sent",
query.message.chat_id,
reply_to_sms_id,
result,
None,
)
await query.edit_message_text(f"已从 {device} 发送 #{sms_id} 到 {phone}。\n{result}")
except Exception as exc:
sms_id = await asyncio.to_thread(
record_outbound,
device,
phone,
body,
"failed",
query.message.chat_id,
reply_to_sms_id,
None,
str(exc),
)
await query.edit_message_text(f"{device} 发送失败 #{sms_id}:{exc}")
async def handle_text(self, update, _context):
if not await self.require_allowed(update):
return
chat_id = update.message.chat_id
text = update.message.text or ""
state = self.pending.get(chat_id)
if state and "body" not in state:
await self.ask_send_confirmation(
update.message,
state["phone"],
text,
device=state.get("device", self.device),
)
return
reply = update.message.reply_to_message
if reply:
inbound = await asyncio.to_thread(self.find_inbound_by_telegram_reply, chat_id, reply.message_id)
if inbound:
await self.ask_send_confirmation(
update.message,
inbound["phone"],
text,
reply_to_sms_id=inbound["id"],
device=inbound["device"],
)
return
def find_inbound_by_telegram_reply(self, chat_id, message_id):
with db_cursor() as conn:
return conn.execute(
"""
SELECT sms.id, sms.phone, sms.device
FROM telegram_notifications n
JOIN sms ON sms.id = n.sms_id
WHERE n.chat_id = ? AND n.message_id = ? AND sms.direction = 'inbound' AND sms.device = ?
""",
(chat_id, message_id, self.device),
).fetchone()
def pending_inbound(self, limit=20):
with db_cursor() as conn:
return conn.execute(
"""
SELECT id, created_at, device, phone, body
FROM sms
WHERE direction = 'inbound' AND device = ? AND status IN ('new', 'notify_failed')
ORDER BY id
LIMIT ?
""",
(self.device, limit),
).fetchall()
def mark_inbound_status(self, sms_id, status, error=None):
with db_cursor() as conn:
conn.execute(
"UPDATE sms SET status = ?, error = ? WHERE id = ?",
(status, error, sms_id),
)
def add_notification(self, sms_id, chat_id, message_id):
with db_cursor() as conn:
conn.execute(
"""
INSERT OR IGNORE INTO telegram_notifications(sms_id, chat_id, message_id, created_at)
VALUES(?, ?, ?, ?)
""",
(sms_id, chat_id, message_id, utc_now()),
)
conn.execute(
"""
UPDATE sms
SET telegram_chat_id = COALESCE(telegram_chat_id, ?),
telegram_message_id = COALESCE(telegram_message_id, ?)
WHERE id = ?
""",
(chat_id, message_id, sms_id),
)
def format_inbound(self, row):
return "\n".join(
[
f"收到短信 #{row['id']}",
f"设备:{row['device']}",
f"来自:{row['phone']}",
f"时间:{row['created_at']}",
"",
row["body"],
]
)
async def inbound_worker(self, app):
logging.info("inbound notification worker started for device %s", self.device)
while True:
try:
rows = await asyncio.to_thread(self.pending_inbound)
for row in rows:
text = self.format_inbound(row)
sent_any = False
last_error = None
for chat_id in self.allowed_chat_ids:
try:
for part in split_message(text):
msg = await app.bot.send_message(chat_id=chat_id, text=part)
await asyncio.to_thread(self.add_notification, row["id"], chat_id, msg.message_id)
sent_any = True
except Exception as exc:
last_error = str(exc)
logging.warning("failed to notify chat %s for SMS #%s: %s", chat_id, row["id"], exc)
await asyncio.to_thread(
self.mark_inbound_status,
row["id"],
"notified" if sent_any else "notify_failed",
last_error,
)
except asyncio.CancelledError:
raise
except Exception:
logging.exception("inbound worker loop failed")
await asyncio.sleep(2)
def run(self):
app = self.app
app.add_handler(self.CommandHandler(["start", "help"], self.cmd_start))
app.add_handler(self.CommandHandler("cancel", self.cmd_cancel))
app.add_handler(self.CommandHandler("status", self.cmd_status))
app.add_handler(self.CommandHandler("last", self.cmd_last))
app.add_handler(self.CommandHandler("send", self.cmd_send))
app.add_handler(self.CallbackQueryHandler(self.callback_send, pattern=r"^send_(confirm|cancel)$"))
app.add_handler(self.MessageHandler(self.filters.TEXT & ~self.filters.COMMAND, self.handle_text))
app.run_polling(drop_pending_updates=True)
def run_bot(_args):
logging.basicConfig(
level=getenv("TG_SMS_LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("telegram.request").setLevel(logging.WARNING)
SmsBot().run()
return 0
def run_send_cli(args):
init_db()
body = " ".join(args.body)
result = send_sms_via_asterisk(args.phone, body, args.device)
sms_id = record_outbound(normalize_device(args.device), normalize_phone(args.phone), body, "sent", result=result)
print(f"sent #{sms_id}: {result}")
return 0
def main():
parser = argparse.ArgumentParser(description="Telegram SMS gateway for chan_quectel")
sub = parser.add_subparsers(dest="command", required=True)
daemon_parser = sub.add_parser("daemon")
daemon_parser.set_defaults(func=run_bot)
agi_parser = sub.add_parser("ingest-agi")
agi_parser.set_defaults(func=run_agi_ingest)
send_parser = sub.add_parser("send")
send_parser.add_argument("phone")
send_parser.add_argument("body", nargs="+")
send_parser.add_argument("--device", default=configured_device())
send_parser.set_defaults(func=run_send_cli)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())
主要结构如下:
/opt/tg-sms/bin/tg_sms_gateway.py
├── daemon # Telegram bot daemon
├── ingest-agi # Asterisk AGI入口
└── send # 命令行发短信测试入口
数据库表大致如下:
CREATE TABLE IF NOT EXISTS sms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
direction TEXT NOT NULL CHECK(direction IN ('inbound', 'outbound')),
created_at TEXT NOT NULL,
device TEXT NOT NULL,
phone TEXT NOT NULL,
body TEXT NOT NULL,
status TEXT NOT NULL,
telegram_chat_id INTEGER,
telegram_message_id INTEGER,
reply_to_sms_id INTEGER,
asterisk_result TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS telegram_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sms_id INTEGER NOT NULL REFERENCES sms(id) ON DELETE CASCADE,
chat_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(chat_id, message_id)
);
每个bot实例只看自己的设备:
def configured_device():
return normalize_device(
os.environ.get("TG_SMS_DEVICE")
or os.environ.get("TG_SMS_DEFAULT_DEVICE")
or "quectel0"
)
def pending_inbound(self, limit=20):
return conn.execute(
"""
SELECT id, created_at, device, phone, body
FROM sms
WHERE direction = 'inbound'
AND device = ?
AND status IN ('new', 'notify_failed')
ORDER BY id
LIMIT ?
""",
(self.device, limit),
).fetchall()
回复短信时,通过Telegram message id反查原入站短信,然后用原来的device回发:
SELECT sms.id, sms.phone, sms.device
FROM telegram_notifications n
JOIN sms ON sms.id = n.sms_id
WHERE n.chat_id = ?
AND n.message_id = ?
AND sms.direction = 'inbound'
AND sms.device = ?
发短信时没有使用shell=True,而是让Python直接执行asterisk -rx:
def send_sms_via_asterisk(phone, body, device):
command = f'quectel sms {device} {phone} {asterisk_quote(body)}'
proc = subprocess.run(
["/usr/sbin/asterisk", "-rx", command],
text=True,
capture_output=True,
timeout=45,
)
虽然asterisk -rx本身仍然需要一整条字符串命令,但至少没有再经过一层shell展开。
实际测试
quectel1上线后,Asterisk立刻从SIM卡里读出了几条积压短信:
[quectel1] Got full SMS from #SIMWorld: ...
[quectel1] Got full SMS from +86...: 'hello from xxx at 20260704'
随后dialplan调用AGI:
Executing [sms@incoming-mobile:1] Verbose(... "Incoming SMS from +86... on quectel1")
/opt/tg-sms/bin/tg_sms_gateway.py,ingest-agi: tg-sms queued inbound SMS #5 from +86... on quectel1
SQLite里可以看到:
id direction device phone status
5 inbound quectel1 +86... new
4 inbound quectel1 #SIMWorld new
3 inbound quectel1 #SIMWorld new
因为当时quectel1的第二个Telegram bot token还没配置,所以这些短信先停留在new状态。等tg-sms@quectel1启动后,对应实例会把device=quectel1的短信推送出去。
总结
- 如果只需要收发短信,可以不着急配VoLTE(况且模块提供的volte配置也不多,主要只有中国,美国,澳大利亚,西欧,韩国的一些运营商)。
AT+QCFG="ims",1,0当然不适合做通话,但不代表短信不可用。 - 漫游卡调试时一定要盯着
ip route。usbnet=1后模块很容易变成一块会自动DHCP的“有线网卡”。 - 多模块机器上尽量用
/dev/serial/by-path,不要直接写死ttyUSB6。同型号模块重启顺序变化后,ttyUSB序号可能会变。 chan_quectel的自动discovery能用,但遇到这种非标准manufacturer/product的模块,手工写audio=和data=更可控。- Telegram bot最好按卡拆分。一个bot管一张卡,命令和通知都更清楚,也能避免误从另一张卡发短信。
- 入站短信不要在dialplan里直接
System(echo '${BASE64_DECODE(...)}。短信正文来自外部,交给AGI脚本处理会干净很多。