前几年笔者写过一篇使用EC20模块配合asterisk及freepbx实现短信转发和网络电话的文章。当时的目标是把一张手机卡从长期插电的旧手机里解放出来,让它可以通过Asterisk接打电话,并用Telegram等工具转发短信。几年后,笔者在某频道看到了拼多多上有商家在清仓DJI的一代增强图传模块,于是花30多元购买了一块。它的USB ID显示为2ca3:4006 DJI Technology Co., Ltd. Baiwang,但从网上的资料和AT命令看,它本质上仍然是高通平台上的EG25/EC25一类模块,不过无法刷入标准的Quectel固件。

在本文中,笔者尝试了通过修改PVID的方式,让linux的驱动将其作为标准的EC25设备对待,并请llm重写了收发短信的配置,以期得到更加稳定的短信体验。


TL;DR

  • DJI/Baiwang模块出厂的USB ID是2ca3:4006,Linux不会自动把它的AT串口接口绑定到option驱动,需要先临时加new_id,再通过AT命令把模块的USB ID改成Quectel常见的2c7c:0125,改完之后后续的操作就和EC25一样了。
  • AT+QCFG="usbnet",1可以把网卡模式设成ECM。这个设置对调试有用,但如果插的是漫游卡,要小心NetworkManager自动DHCP并加默认路由,导致偷跑流量。
  • 关于VoLTE,AT+QCFG="ims"返回+QCFG: "ims",1,0说明IMS被强制启用,但VoLTE session不可用。但如果只收发短信,不一定需要VoLTE注册成功。这次实测在VoLTE没有注册的情况下,模块仍然能收到短信,并被Asterisk通过chan_quectel读出。
  • chan_quectel手工配置audio=data=端口时,并不太关心USB设备的manufacturer/product字符串是不是Quectel。这块模块显示BAIWANG/Baiwang叫这个名字估计是因为DJI最开始的办公楼在深圳南山区的百旺科技园),仍然可以正常作为quectel1接入。
  • Telegram Bot建议按设备拆成tg-sms@quectel0tg-sms@quectel1这样的systemd template service。每张卡一个bot token,共用同一份Python脚本和SQLite数据库,这样比较不容易误发。

识别DJI/Baiwang模块

插上模块后,一开始lsusb看到的是:

Bus 002 Device 003: ID 2ca3:4006 DJI Technology Co., Ltd. Baiwang

同时lsusb -t里可以看到它有多个接口,但接口0-3没有被option驱动绑定,只有网卡接口被cdc_ether认出来:

|__ Port 2: Dev 3, If 0, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 1, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 2, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 3, Class=Vendor Specific Class, Driver=
|__ Port 2: Dev 3, If 4, Class=Communications, Driver=cdc_ether
|__ Port 2: Dev 3, If 5, Class=CDC Data, Driver=cdc_ether

因为Linux的option驱动支持动态添加USB ID,所以我们可以先执行:

echo 2ca3 4006 > /sys/bus/usb-serial/drivers/option1/new_id

随后会多出几个串口,比如(如果机器上只有一台quectel设备,理论上ttyUSB是1/2/3/4,控制串口是ttyUSB2):

/dev/ttyUSB4  if00
/dev/ttyUSB5  if01
/dev/ttyUSB6  if02
/dev/ttyUSB7  if03

在这台机器上,/dev/ttyUSB6/dev/ttyUSB7都能响应AT命令。ATI返回如下:

ATI
Baiwang
QDC507
Revision: QDC507GLEFM21

OK

搜索固件的revision,能找到一些quectel论坛上的帖子,看发帖时间估计都是在2026年6月份左右买了这个模块的人发的。

修改USB ID和usbnet模式

使用minicom -D /dev/ttyUSB6连接串口后,先查询原始配置:

AT+QCFG="usbnet"
+QCFG: "usbnet",3

AT+QCFG="usbid"
+QCFG: 11427,16390

11427,16390换成十六进制就是2ca3:4006。为了让内核和chan_quectel正确识别,可以把它改成Quectel EC25常见的2c7c:0125。十进制是11388,293

AT+QCFG="usbnet",1
OK

AT+QCFG="usbid",11388,293
OK

AT+QCFG="usbnet"
+QCFG: "usbnet",1

AT+QCFG="usbid"
+QCFG: 11388,293

然后重启模块:

AT+CFUN=1,1

这块模块在CFUN重启后并没有立刻把外层USB descriptor切换过来,笔者又对它所在的USB端口做了一次unbind/bind:

echo 2-2 > /sys/bus/usb/drivers/usb/unbind
sleep 2
echo 2-2 > /sys/bus/usb/drivers/usb/bind

重新枚举后,lsusb里两块模块都变成了2c7c:0125

Bus 002 Device 005: ID 2c7c:0125 Quectel Wireless Solutions Co., Ltd. EC25 LTE modem
Bus 002 Device 002: ID 2c7c:0125 Quectel Wireless Solutions Co., Ltd. EC25 LTE modem

虽然USB ID变成了Quectel,但manufacturer/product还是BAIWANG/Baiwang。不过这并不影响后续手工指定串口。

防止偷跑流量(适用于漫游卡,无流量的卡等)

模块改成ECM模式后,Debian上的NetworkManager可能会把模块网卡当成普通有线网卡,自动DHCP到192.168.225.x,并添加一条默认路由:

default via 192.168.225.1 dev enx...

如果卡是漫游卡,这可能会产生完全没必要的流量费用。笔者这台机器只把模块用于短信,因此直接让NetworkManager忽略所有modem网卡:

# /etc/NetworkManager/conf.d/99-usb-modem-unmanaged.conf
[keyfile]
unmanaged-devices=interface-name:enx*;interface-name:usb*;interface-name:wwan*

然后关掉已经拉起来的连接:

nmcli general reload

nmcli connection down "Wired connection 6"
nmcli connection down "Wired connection 7"

for i in enx* usb* wwan*; do
    [ -e "/sys/class/net/$i" ] || continue
    nmcli device set "$i" managed no
    ip route del default dev "$i" 2>/dev/null || true
    ip link set "$i" down
done

最后确认默认路由只剩主网卡:

ip route show

default via 192.168.0.1 dev eth0

网络注册

检查注册状态:

AT+COPS?
+COPS: 0,0,"Verizon Wireless",13

AT+CEREG?
+CEREG: 0,5

AT+QNWINFO
+QNWINFO: "FDD LTE","311480","LTE BAND 4",2050

CEREG: 0,5表示已经以漫游状态注册到EPS网络。信号可以用AT+QCSQ看:

AT+QCSQ
+QCSQ: "LTE",59,-94,169,-14

这里的-94大致是RSRP,作为收短信是够用的。

关于VoLTE和短信

根据Quectel的IMS Application NoteAT+QCFG="ims"返回格式为:

+QCFG: "ims",<IMS_conf>,<VoLTE_cap>

其中第二个值VoLTE_cap1时,表示VoLTE session可用;为0时,说明当前VoLTE不可用。本次模块的状态是:

AT+QCFG="ims"
+QCFG: "ims",1,0

笔者尝试过ROW_Generic_3GPPhVoLTE-Verizon等MBN,IMS都没有注册成功:

AT+QMBNCFG="Select"
+QMBNCFG: "Select",ROW_Generic_3GPP

AT+QCFG="ims"
+QCFG: "ims",1,0

这意味着它不适合拿来做VoLTE通话,chan_quectel也会提示:

[quectel1] Dongle has NO voice support

但这并不代表模块不能收短信。实测在IMS/VoLTE不可用时,模块仍然能收到短信通知:

+CMTI: "SM",0
+CMTI: "SM",1
+CMTI: "SM",2

接入Asterisk后,这些短信也可以被chan_quectel读出并送进dialplan。因此,如果需求只是收发短信,VoLTE配置不上也是没有关系的。真正需要VoLTE的情况是语音通话,或者某些运营商只允许SMS over IMS。

配置Asterisk里的第二个Quectel设备

如果需要安装asterisk相关的chan,请参考笔者之前的文章,或对应的github repo ( https://github.com/IchthysMaranatha/asterisk-chan-quectel )或笔者的fork ( https://github.com/sparkcyf/asterisk-chan-quectel/tree/fix-smsdb-lock , 修复了打电话同时收短信会让电话崩溃,和USSD不能正确收发的bug)

这台机器上原本已经有一块正常的EG25:

/dev/ttyUSB0  if00
/dev/ttyUSB1  if01  audio
/dev/ttyUSB2  if02  AT/data
/dev/ttyUSB3  if03

DJI/Baiwang模块在第二个USB物理口:

/dev/ttyUSB4  if00
/dev/ttyUSB5  if01  audio
/dev/ttyUSB6  if02  AT/data
/dev/ttyUSB7  if03

为了避免ttyUSB序号变化,我们可以用/dev/serial/by-path制定设备。chan_quectel打开端口时会先realpath(),所以用symlink不会影响它的锁文件逻辑。

/etc/asterisk/quectel.conf里新增:

[quectel0]
audio=/dev/ttyUSB1
data=/dev/ttyUSB2

[quectel1]
audio=/dev/serial/by-path/pci-0000:07:1b.0-usb-0:2:1.1-port0
data=/dev/serial/by-path/pci-0000:07:1b.0-usb-0:2:1.2-port0

然后在Asterisk CLI里reload:

asterisk -rx "quectel reload now"

第一次初始化时,这张漫游卡在AT+COPS=0,0上花了比较久,chan_quectel一度超时。但模块自动选网完成后,后续重试成功了:

ID        Group State RSSI Provider Name Model  Firmware       Number
quectel0 0     Free  22   Ultra         EG25   EG25...        ...
quectel1 0     Free  99   AT&T          QDC507 QDC507GLEFM21  ...

quectel1的状态里可以看到:

SMS                     : Yes
Voice                   : No
GSM Registration Status : Registered, roaming
Provider Name           : AT&T

对本文的目标来说,SMS: Yes已经足够。

更安全的短信入站dialplan

上一篇文章里,笔者直接在dialplan中用System(echo ...)保存短信。这种写法能用,但询问llm后,llm指出短信正文来自外部,直接拼进shell命令并不优雅,也有注入风险。因此这次改成AGI入口:

[incoming-mobile]
exten => sms,1,Verbose(Incoming SMS from ${CALLERID(num)} on ${QUECTELNAME})
exten => sms,n,AGI(/opt/tg-sms/bin/tg_sms_gateway.py,ingest-agi)
exten => sms,n,Hangup()

exten => ussd,1,Verbose(Incoming USSD: ${BASE64_DECODE(${USSD_BASE64})})
exten => ussd,n,System(echo '${STRFTIME(${EPOCH},,%Y-%m-%d %H:%M:%S)} - ${QUECTELNAME}: ${BASE64_DECODE(${USSD_BASE64})}' >> /var/log/asterisk/ussd.txt)
exten => ussd,n,Hangup()

AGI脚本通过GET VARIABLE SMS_BASE64读取Asterisk变量,在Python里base64解码,然后写入SQLite:

def run_agi_ingest(_args):
    agi_env = agi_read_environment()
    init_db()
    sms_base64 = agi_get_variable("SMS_BASE64")
    device = agi_get_variable("QUECTELNAME") or configured_device()
    phone = agi_env.get("agi_callerid") or "unknown"
    body = decode_sms_base64(sms_base64)
    sms_id = record_inbound(device, phone, body)
    agi_verbose(f"tg-sms queued inbound SMS #{sms_id} from {phone} on {device}", 1)

这样短信正文不再作为shell参数出现,也能自然记录来自哪个QUECTELNAME

Telegram Bot按设备拆分

一开始笔者使用的是一个bot同时处理所有设备。后来发现如果有多张卡,最清晰的方式是每张卡一个bot,但共用同一份代码:

systemd template如下:

[Unit]
Description=Telegram SMS gateway for chan_quectel device %i
Wants=network-online.target
After=network-online.target asterisk.service

[Service]
Type=simple
User=asterisk
Group=asterisk
Environment=TG_SMS_DEVICE=%i
EnvironmentFile=/etc/tg-sms/%i.env
WorkingDirectory=/opt/tg-sms
ExecStart=/opt/tg-sms/venv/bin/python3 /opt/tg-sms/bin/tg_sms_gateway.py daemon
Restart=on-failure
RestartSec=5
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=full
ProtectHome=true
ReadWritePaths=/var/lib/tg-sms /var/log/asterisk

[Install]
WantedBy=multi-user.target

每张卡一个env:

# /etc/tg-sms/quectel0.env
TG_SMS_BOT_TOKEN=123456:replace-me
TG_SMS_BOT_BASE_URL=https://api.telegram.org/bot
TG_SMS_ALLOWED_CHAT_IDS=11111111
TG_SMS_DEFAULT_DEVICE=quectel0
TG_SMS_DEVICE=quectel0
TG_SMS_DB=/var/lib/tg-sms/sms.db
TG_SMS_ASTERISK_CMD=/usr/sbin/asterisk
TG_SMS_LOG_LEVEL=INFO

quectel1.env只需要把token和device改掉:

TG_SMS_BOT_TOKEN=654321:replace-me
TG_SMS_DEFAULT_DEVICE=quectel1
TG_SMS_DEVICE=quectel1

启用服务:

systemctl enable --now [email protected]
systemctl enable --now [email protected]

注意:两个实例必须使用不同的Telegram bot token,否则Telegram的long polling会报:

Conflict: terminated by other getUpdates request

Python脚本

TG SMS Python Script
#!/opt/tg-sms/venv/bin/python3
import argparse
import asyncio
import base64
import datetime as dt
import logging
import os
import re
import sqlite3
import subprocess
import sys
from contextlib import contextmanager


DEFAULT_DB = "/var/lib/tg-sms/sms.db"
DEFAULT_ASTERISK = "/usr/sbin/asterisk"
DEFAULT_DEVICE = "quectel0"
DEFAULT_BASE_URL = "https://api.telegram.org/bot"

PHONE_RE = re.compile(r"^\+?\d{3,20}$")
DEVICE_RE = re.compile(r"^[A-Za-z0-9_.-]+$")


def utc_now():
    return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()


def getenv(name, default=None):
    value = os.environ.get(name)
    if value is None or value == "":
        return default
    return value


def parse_allowed_chat_ids():
    raw = getenv("TG_SMS_ALLOWED_CHAT_IDS", "")
    ids = []
    for item in raw.replace(";", ",").split(","):
        item = item.strip()
        if not item:
            continue
        try:
            ids.append(int(item))
        except ValueError:
            raise SystemExit(f"Invalid chat id in TG_SMS_ALLOWED_CHAT_IDS: {item!r}")
    if not ids:
        raise SystemExit("TG_SMS_ALLOWED_CHAT_IDS is empty")
    return ids


def normalize_phone(phone):
    phone = (phone or "").strip()
    phone = re.sub(r"[\s().-]+", "", phone)
    if not PHONE_RE.fullmatch(phone):
        raise ValueError("phone number must be 3-20 digits, optionally prefixed by +")
    return phone


def normalize_device(device):
    device = (device or DEFAULT_DEVICE).strip()
    if not DEVICE_RE.fullmatch(device):
        raise ValueError("invalid Quectel device id")
    return device


def configured_device():
    return normalize_device(getenv("TG_SMS_DEVICE", getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE)))


def split_message(text, limit=3900):
    text = text or ""
    if len(text) <= limit:
        return [text]
    parts = []
    while text:
        parts.append(text[:limit])
        text = text[limit:]
    return parts


def asterisk_quote(value):
    value = (value or "").replace("\r", " ").replace("\n", " ")
    value = value.replace("\\", "\\\\").replace('"', '\\"')
    return f'"{value}"'


def connect_db():
    db_path = getenv("TG_SMS_DB", DEFAULT_DB)
    conn = sqlite3.connect(db_path, timeout=30)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA busy_timeout=30000")
    return conn


def init_db():
    with connect_db() as conn:
        conn.executescript(
            """
            CREATE TABLE IF NOT EXISTS sms (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                direction TEXT NOT NULL CHECK(direction IN ('inbound', 'outbound')),
                created_at TEXT NOT NULL,
                device TEXT NOT NULL,
                phone TEXT NOT NULL,
                body TEXT NOT NULL,
                status TEXT NOT NULL,
                telegram_chat_id INTEGER,
                telegram_message_id INTEGER,
                reply_to_sms_id INTEGER,
                asterisk_result TEXT,
                error TEXT
            );

            CREATE TABLE IF NOT EXISTS telegram_notifications (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sms_id INTEGER NOT NULL REFERENCES sms(id) ON DELETE CASCADE,
                chat_id INTEGER NOT NULL,
                message_id INTEGER NOT NULL,
                created_at TEXT NOT NULL,
                UNIQUE(chat_id, message_id)
            );

            CREATE INDEX IF NOT EXISTS idx_sms_pending
                ON sms(direction, status, id);
            CREATE INDEX IF NOT EXISTS idx_telegram_notifications_lookup
                ON telegram_notifications(chat_id, message_id);
            """
        )


@contextmanager
def db_cursor():
    conn = connect_db()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def record_inbound(device, phone, body):
    device = normalize_device(device)
    phone = (phone or "unknown").strip() or "unknown"
    body = body or ""
    with db_cursor() as conn:
        cur = conn.execute(
            """
            INSERT INTO sms(direction, created_at, device, phone, body, status)
            VALUES('inbound', ?, ?, ?, ?, 'new')
            """,
            (utc_now(), device, phone, body),
        )
        return cur.lastrowid


def record_outbound(device, phone, body, status, chat_id=None, reply_to_sms_id=None, result=None, error=None):
    with db_cursor() as conn:
        cur = conn.execute(
            """
            INSERT INTO sms(
                direction, created_at, device, phone, body, status,
                telegram_chat_id, reply_to_sms_id, asterisk_result, error
            )
            VALUES('outbound', ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (utc_now(), device, phone, body, status, chat_id, reply_to_sms_id, result, error),
        )
        return cur.lastrowid


def send_sms_via_asterisk(phone, body, device=None):
    phone = normalize_phone(phone)
    device = normalize_device(device or getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE))
    if not body or not body.strip():
        raise ValueError("SMS body is empty")

    asterisk_bin = getenv("TG_SMS_ASTERISK_CMD", DEFAULT_ASTERISK)
    command = f"quectel sms {device} {phone} {asterisk_quote(body)}"
    proc = subprocess.run(
        [asterisk_bin, "-rx", command],
        text=True,
        capture_output=True,
        timeout=45,
    )
    output = (proc.stdout or "") + (proc.stderr or "")
    output = output.strip()
    if proc.returncode != 0:
        raise RuntimeError(output or f"asterisk exited with code {proc.returncode}")
    return output or "OK"


def agi_read_environment():
    env = {}
    while True:
        line = sys.stdin.readline()
        if line == "":
            break
        line = line.rstrip("\n")
        if line == "":
            break
        key, _, value = line.partition(":")
        env[key.strip()] = value.strip()
    return env


def agi_command(command):
    print(command, flush=True)
    return sys.stdin.readline().strip()


def agi_escape(value):
    return (value or "").replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")


def agi_verbose(message, level=1):
    agi_command(f'VERBOSE "{agi_escape(message)}" {int(level)}')


def agi_get_variable(name):
    response = agi_command(f"GET VARIABLE {name}")
    match = re.search(r"result=1 \((.*)\)", response)
    if match:
        return match.group(1)
    return ""


def decode_sms_base64(value):
    value = (value or "").strip()
    if not value:
        return ""
    try:
        data = base64.b64decode(value, validate=False)
    except Exception:
        data = base64.b64decode(value + "=" * (-len(value) % 4), validate=False)
    return data.decode("utf-8", errors="replace")


def run_agi_ingest(_args):
    agi_env = agi_read_environment()
    try:
        init_db()
        sms_base64 = agi_get_variable("SMS_BASE64")
        device = agi_get_variable("QUECTELNAME") or getenv("TG_SMS_DEFAULT_DEVICE", DEFAULT_DEVICE)
        phone = agi_env.get("agi_callerid") or agi_env.get("agi_calleridname") or "unknown"
        body = decode_sms_base64(sms_base64)
        sms_id = record_inbound(device, phone, body)
        agi_verbose(f"tg-sms queued inbound SMS #{sms_id} from {phone} on {device}", 1)
        return 0
    except Exception as exc:
        print(f"tg-sms AGI ingest failed: {exc}", file=sys.stderr)
        try:
            agi_verbose(f"tg-sms ingest failed: {exc}", 1)
        except Exception:
            pass
        return 1


class SmsBot:
    def __init__(self):
        from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
        from telegram.ext import (
            Application,
            ApplicationBuilder,
            CallbackContext,
            CallbackQueryHandler,
            CommandHandler,
            ContextTypes,
            MessageHandler,
            filters,
        )

        self.InlineKeyboardButton = InlineKeyboardButton
        self.InlineKeyboardMarkup = InlineKeyboardMarkup
        self.Update = Update
        self.Application = Application
        self.ApplicationBuilder = ApplicationBuilder
        self.CallbackContext = CallbackContext
        self.CallbackQueryHandler = CallbackQueryHandler
        self.CommandHandler = CommandHandler
        self.ContextTypes = ContextTypes
        self.MessageHandler = MessageHandler
        self.filters = filters

        token = getenv("TG_SMS_BOT_TOKEN")
        if not token:
            raise SystemExit("TG_SMS_BOT_TOKEN is empty")
        base_url = getenv("TG_SMS_BOT_BASE_URL", DEFAULT_BASE_URL)
        self.allowed_chat_ids = parse_allowed_chat_ids()
        self.device = configured_device()
        self.default_device = self.device
        self.pending = {}
        self.worker_task = None
        self.app = (
            ApplicationBuilder()
            .token(token)
            .base_url(base_url)
            .post_init(self.post_init)
            .post_shutdown(self.post_shutdown)
            .build()
        )

    def is_allowed(self, update):
        message = getattr(update, "effective_message", None)
        if not message:
            return False
        return message.chat_id in self.allowed_chat_ids

    async def require_allowed(self, update):
        if self.is_allowed(update):
            return True
        message = getattr(update, "effective_message", None)
        if message:
            await message.reply_text("您没有权限使用这个 bot。")
        return False

    async def post_init(self, app):
        init_db()
        self.worker_task = asyncio.create_task(self.inbound_worker(app), name="inbound-worker")
        logging.info("telegram sms gateway started for device %s", self.device)

    async def post_shutdown(self, _app):
        if self.worker_task:
            self.worker_task.cancel()
            try:
                await self.worker_task
            except asyncio.CancelledError:
                pass

    def help_text(self):
        return "\n".join(
            [
                f"SMS 网关已在线:{self.device}",
                "",
                "/send <手机号> <短信内容> - 发送前确认",
                "/send <手机号> - 进入分步发送",
                "直接回复一条入站短信通知 - 回短信给原号码",
                "/last [数量] - 最近短信",
                "/status - Quectel 状态",
                "/cancel - 取消当前发送",
            ]
        )

    async def cmd_start(self, update, _context):
        if not await self.require_allowed(update):
            return
        await update.message.reply_text(self.help_text())

    async def cmd_cancel(self, update, _context):
        if not await self.require_allowed(update):
            return
        self.pending.pop(update.message.chat_id, None)
        await update.message.reply_text("已取消。")

    async def cmd_status(self, update, _context):
        if not await self.require_allowed(update):
            return

        def get_status():
            asterisk_bin = getenv("TG_SMS_ASTERISK_CMD", DEFAULT_ASTERISK)
            proc = subprocess.run(
                [asterisk_bin, "-rx", "quectel show devices"],
                text=True,
                capture_output=True,
                timeout=20,
            )
            text = ((proc.stdout or "") + (proc.stderr or "")).strip() or "没有输出。"
            lines = text.splitlines()
            if len(lines) > 1:
                selected = [line for line in lines[1:] if line.split(None, 1)[0:1] == [self.device]]
                if selected:
                    return "\n".join(lines[:1] + selected)
            return text

        try:
            text = await asyncio.to_thread(get_status)
        except Exception as exc:
            text = f"状态查询失败:{exc}"
        await update.message.reply_text(text[:3900])

    async def cmd_last(self, update, context):
        if not await self.require_allowed(update):
            return
        limit = 5
        if context.args:
            try:
                limit = min(max(int(context.args[0]), 1), 20)
            except ValueError:
                await update.message.reply_text("用法:/last [1-20]")
                return
        rows = await asyncio.to_thread(self.fetch_last, limit)
        if not rows:
            await update.message.reply_text("还没有短信记录。")
            return
        lines = []
        for row in rows:
            arrow = "in" if row["direction"] == "inbound" else "out"
            body = row["body"].replace("\n", " ")
            if len(body) > 180:
                body = body[:177] + "..."
            lines.append(
                f"#{row['id']} {arrow} {row['created_at']} {row['device']} {row['phone']} [{row['status']}]\n{body}"
            )
        await update.message.reply_text("\n\n".join(lines)[:3900])

    def fetch_last(self, limit):
        with db_cursor() as conn:
            return conn.execute(
                """
                SELECT id, direction, created_at, device, phone, body, status
                FROM sms
                WHERE device = ?
                ORDER BY id DESC
                LIMIT ?
                """,
                (self.device, limit),
            ).fetchall()

    async def cmd_send(self, update, context):
        if not await self.require_allowed(update):
            return
        if not context.args:
            await update.message.reply_text("用法:/send <手机号> [短信内容]")
            return
        try:
            phone = normalize_phone(context.args[0])
        except ValueError as exc:
            await update.message.reply_text(f"手机号格式不对:{exc}")
            return
        body = " ".join(context.args[1:]).strip()
        if not body:
            self.pending[update.message.chat_id] = {"phone": phone, "device": self.device}
            await update.message.reply_text(f"设备:{self.device}\n收件人:{phone}\n请输入短信内容,或发送 /cancel。")
            return
        await self.ask_send_confirmation(update.message, phone, body, device=self.device)

    async def ask_send_confirmation(self, message, phone, body, reply_to_sms_id=None, device=None):
        device = normalize_device(device or self.device)
        self.pending[message.chat_id] = {
            "device": device,
            "phone": phone,
            "body": body,
            "reply_to_sms_id": reply_to_sms_id,
        }
        preview = body if len(body) <= 900 else body[:900] + "..."
        keyboard = self.InlineKeyboardMarkup(
            [
                [
                    self.InlineKeyboardButton("发送", callback_data="send_confirm"),
                    self.InlineKeyboardButton("取消", callback_data="send_cancel"),
                ]
            ]
        )
        await message.reply_text(f"确认从 {device} 发送到 {phone}\n{preview}", reply_markup=keyboard)

    async def callback_send(self, update, _context):
        query = update.callback_query
        if query.message.chat_id not in self.allowed_chat_ids:
            await query.answer("无权限", show_alert=True)
            return
        state = self.pending.pop(query.message.chat_id, None)
        if query.data == "send_cancel":
            await query.answer()
            await query.edit_message_text("已取消。")
            return
        if not state:
            await query.answer("没有待确认的短信。", show_alert=True)
            return
        await query.answer("正在发送...")
        device = state.get("device", self.device)
        phone = state["phone"]
        body = state["body"]
        reply_to_sms_id = state.get("reply_to_sms_id")
        try:
            result = await asyncio.to_thread(send_sms_via_asterisk, phone, body, device)
            sms_id = await asyncio.to_thread(
                record_outbound,
                device,
                phone,
                body,
                "sent",
                query.message.chat_id,
                reply_to_sms_id,
                result,
                None,
            )
            await query.edit_message_text(f"已从 {device} 发送 #{sms_id}{phone}\n{result}")
        except Exception as exc:
            sms_id = await asyncio.to_thread(
                record_outbound,
                device,
                phone,
                body,
                "failed",
                query.message.chat_id,
                reply_to_sms_id,
                None,
                str(exc),
            )
            await query.edit_message_text(f"{device} 发送失败 #{sms_id}{exc}")

    async def handle_text(self, update, _context):
        if not await self.require_allowed(update):
            return
        chat_id = update.message.chat_id
        text = update.message.text or ""

        state = self.pending.get(chat_id)
        if state and "body" not in state:
            await self.ask_send_confirmation(
                update.message,
                state["phone"],
                text,
                device=state.get("device", self.device),
            )
            return

        reply = update.message.reply_to_message
        if reply:
            inbound = await asyncio.to_thread(self.find_inbound_by_telegram_reply, chat_id, reply.message_id)
            if inbound:
                await self.ask_send_confirmation(
                    update.message,
                    inbound["phone"],
                    text,
                    reply_to_sms_id=inbound["id"],
                    device=inbound["device"],
                )
                return

    def find_inbound_by_telegram_reply(self, chat_id, message_id):
        with db_cursor() as conn:
            return conn.execute(
                """
                SELECT sms.id, sms.phone, sms.device
                FROM telegram_notifications n
                JOIN sms ON sms.id = n.sms_id
                WHERE n.chat_id = ? AND n.message_id = ? AND sms.direction = 'inbound' AND sms.device = ?
                """,
                (chat_id, message_id, self.device),
            ).fetchone()

    def pending_inbound(self, limit=20):
        with db_cursor() as conn:
            return conn.execute(
                """
                SELECT id, created_at, device, phone, body
                FROM sms
                WHERE direction = 'inbound' AND device = ? AND status IN ('new', 'notify_failed')
                ORDER BY id
                LIMIT ?
                """,
                (self.device, limit),
            ).fetchall()

    def mark_inbound_status(self, sms_id, status, error=None):
        with db_cursor() as conn:
            conn.execute(
                "UPDATE sms SET status = ?, error = ? WHERE id = ?",
                (status, error, sms_id),
            )

    def add_notification(self, sms_id, chat_id, message_id):
        with db_cursor() as conn:
            conn.execute(
                """
                INSERT OR IGNORE INTO telegram_notifications(sms_id, chat_id, message_id, created_at)
                VALUES(?, ?, ?, ?)
                """,
                (sms_id, chat_id, message_id, utc_now()),
            )
            conn.execute(
                """
                UPDATE sms
                SET telegram_chat_id = COALESCE(telegram_chat_id, ?),
                    telegram_message_id = COALESCE(telegram_message_id, ?)
                WHERE id = ?
                """,
                (chat_id, message_id, sms_id),
            )

    def format_inbound(self, row):
        return "\n".join(
            [
                f"收到短信 #{row['id']}",
                f"设备:{row['device']}",
                f"来自:{row['phone']}",
                f"时间:{row['created_at']}",
                "",
                row["body"],
            ]
        )

    async def inbound_worker(self, app):
        logging.info("inbound notification worker started for device %s", self.device)
        while True:
            try:
                rows = await asyncio.to_thread(self.pending_inbound)
                for row in rows:
                    text = self.format_inbound(row)
                    sent_any = False
                    last_error = None
                    for chat_id in self.allowed_chat_ids:
                        try:
                            for part in split_message(text):
                                msg = await app.bot.send_message(chat_id=chat_id, text=part)
                                await asyncio.to_thread(self.add_notification, row["id"], chat_id, msg.message_id)
                                sent_any = True
                        except Exception as exc:
                            last_error = str(exc)
                            logging.warning("failed to notify chat %s for SMS #%s: %s", chat_id, row["id"], exc)
                    await asyncio.to_thread(
                        self.mark_inbound_status,
                        row["id"],
                        "notified" if sent_any else "notify_failed",
                        last_error,
                    )
            except asyncio.CancelledError:
                raise
            except Exception:
                logging.exception("inbound worker loop failed")
            await asyncio.sleep(2)

    def run(self):
        app = self.app
        app.add_handler(self.CommandHandler(["start", "help"], self.cmd_start))
        app.add_handler(self.CommandHandler("cancel", self.cmd_cancel))
        app.add_handler(self.CommandHandler("status", self.cmd_status))
        app.add_handler(self.CommandHandler("last", self.cmd_last))
        app.add_handler(self.CommandHandler("send", self.cmd_send))
        app.add_handler(self.CallbackQueryHandler(self.callback_send, pattern=r"^send_(confirm|cancel)$"))
        app.add_handler(self.MessageHandler(self.filters.TEXT & ~self.filters.COMMAND, self.handle_text))
        app.run_polling(drop_pending_updates=True)


def run_bot(_args):
    logging.basicConfig(
        level=getenv("TG_SMS_LOG_LEVEL", "INFO"),
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("telegram.request").setLevel(logging.WARNING)
    SmsBot().run()
    return 0


def run_send_cli(args):
    init_db()
    body = " ".join(args.body)
    result = send_sms_via_asterisk(args.phone, body, args.device)
    sms_id = record_outbound(normalize_device(args.device), normalize_phone(args.phone), body, "sent", result=result)
    print(f"sent #{sms_id}: {result}")
    return 0


def main():
    parser = argparse.ArgumentParser(description="Telegram SMS gateway for chan_quectel")
    sub = parser.add_subparsers(dest="command", required=True)

    daemon_parser = sub.add_parser("daemon")
    daemon_parser.set_defaults(func=run_bot)

    agi_parser = sub.add_parser("ingest-agi")
    agi_parser.set_defaults(func=run_agi_ingest)

    send_parser = sub.add_parser("send")
    send_parser.add_argument("phone")
    send_parser.add_argument("body", nargs="+")
    send_parser.add_argument("--device", default=configured_device())
    send_parser.set_defaults(func=run_send_cli)

    args = parser.parse_args()
    return args.func(args)


if __name__ == "__main__":
    raise SystemExit(main())

主要结构如下:

/opt/tg-sms/bin/tg_sms_gateway.py
├── daemon              # Telegram bot daemon
├── ingest-agi          # Asterisk AGI入口
└── send                # 命令行发短信测试入口

数据库表大致如下:

CREATE TABLE IF NOT EXISTS sms (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    direction TEXT NOT NULL CHECK(direction IN ('inbound', 'outbound')),
    created_at TEXT NOT NULL,
    device TEXT NOT NULL,
    phone TEXT NOT NULL,
    body TEXT NOT NULL,
    status TEXT NOT NULL,
    telegram_chat_id INTEGER,
    telegram_message_id INTEGER,
    reply_to_sms_id INTEGER,
    asterisk_result TEXT,
    error TEXT
);

CREATE TABLE IF NOT EXISTS telegram_notifications (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    sms_id INTEGER NOT NULL REFERENCES sms(id) ON DELETE CASCADE,
    chat_id INTEGER NOT NULL,
    message_id INTEGER NOT NULL,
    created_at TEXT NOT NULL,
    UNIQUE(chat_id, message_id)
);

每个bot实例只看自己的设备:

def configured_device():
    return normalize_device(
        os.environ.get("TG_SMS_DEVICE")
        or os.environ.get("TG_SMS_DEFAULT_DEVICE")
        or "quectel0"
    )

def pending_inbound(self, limit=20):
    return conn.execute(
        """
        SELECT id, created_at, device, phone, body
        FROM sms
        WHERE direction = 'inbound'
          AND device = ?
          AND status IN ('new', 'notify_failed')
        ORDER BY id
        LIMIT ?
        """,
        (self.device, limit),
    ).fetchall()

回复短信时,通过Telegram message id反查原入站短信,然后用原来的device回发:

SELECT sms.id, sms.phone, sms.device
FROM telegram_notifications n
JOIN sms ON sms.id = n.sms_id
WHERE n.chat_id = ?
  AND n.message_id = ?
  AND sms.direction = 'inbound'
  AND sms.device = ?

发短信时没有使用shell=True,而是让Python直接执行asterisk -rx

def send_sms_via_asterisk(phone, body, device):
    command = f'quectel sms {device} {phone} {asterisk_quote(body)}'
    proc = subprocess.run(
        ["/usr/sbin/asterisk", "-rx", command],
        text=True,
        capture_output=True,
        timeout=45,
    )

虽然asterisk -rx本身仍然需要一整条字符串命令,但至少没有再经过一层shell展开。

实际测试

quectel1上线后,Asterisk立刻从SIM卡里读出了几条积压短信:

[quectel1] Got full SMS from #SIMWorld: ...
[quectel1] Got full SMS from +86...: 'hello from xxx at 20260704'

随后dialplan调用AGI:

Executing [sms@incoming-mobile:1] Verbose(... "Incoming SMS from +86... on quectel1")
/opt/tg-sms/bin/tg_sms_gateway.py,ingest-agi: tg-sms queued inbound SMS #5 from +86... on quectel1

SQLite里可以看到:

id  direction  device    phone       status
5   inbound    quectel1  +86...      new
4   inbound    quectel1  #SIMWorld   new
3   inbound    quectel1  #SIMWorld   new

因为当时quectel1的第二个Telegram bot token还没配置,所以这些短信先停留在new状态。等tg-sms@quectel1启动后,对应实例会把device=quectel1的短信推送出去。

总结

  • 如果只需要收发短信,可以不着急配VoLTE(况且模块提供的volte配置也不多,主要只有中国,美国,澳大利亚,西欧,韩国的一些运营商)。AT+QCFG="ims",1,0当然不适合做通话,但不代表短信不可用。
  • 漫游卡调试时一定要盯着ip routeusbnet=1后模块很容易变成一块会自动DHCP的“有线网卡”。
  • 多模块机器上尽量用/dev/serial/by-path,不要直接写死ttyUSB6。同型号模块重启顺序变化后,ttyUSB序号可能会变。
  • chan_quectel的自动discovery能用,但遇到这种非标准manufacturer/product的模块,手工写audio=data=更可控。
  • Telegram bot最好按卡拆分。一个bot管一张卡,命令和通知都更清楚,也能避免误从另一张卡发短信。
  • 入站短信不要在dialplan里直接System(echo '${BASE64_DECODE(...)}。短信正文来自外部,交给AGI脚本处理会干净很多。

参考资料