需要在数据库内,动态添加某些地址,标记上“禁用”等。但后续 SQL 需要用行政区划代码,而不是人类可读的地址。
不想一级一级下拉列表选/填写完全名,或填写完整 12 位区划代码,想只填写部分地址,就能自动扩展全名。
初始化队列,放入【根节点】地区
取出一个地区,在该地区所有后代范围内,模糊搜索【包含给定地址前两个字】的所有地区,并为每个地区截掉【给定地址出现在该地区的最长前缀】,作为该地区后续匹配的【给定地址】。最后全部添加进队列中。
如,给定地址 “新疆维吾尔阿克苏温宿县……” 出现在 “新疆维吾尔自治区” 的最长前缀为 “新疆维吾尔”。
截掉后,给定地址为 “
新疆维吾尔阿克苏温宿县……”,作为后续在 “新疆维吾尔自治区” 范围内继续搜索的【给定地址】。
队列若不为空,则跳转回第 2 步。
检查所有曾经匹配出的结果,取截掉【给定地址】最多、首次匹配层级最高(省 > 市 > 区 > …) 的地区,作为结果。
目前识别一个 33 字符的长地址,需要 0.02 秒左右。(测试 SQL 如下,耗时 20 秒左右)
WITH
data(id, name) AS (
SELECT
t5.id,
TRIM(FORMAT('%s - %s - %s - %s - %s', t1.name, t2.name, t3.name, t4.name, t5.name), ' -')
FROM area t5
LEFT JOIN area t4 ON t4.id = t5.pid
LEFT JOIN area t3 ON t3.id = t4.pid
LEFT JOIN area t2 ON t2.id = t3.pid
LEFT JOIN area t1 ON t1.id = t2.pid
LIMIT 1000
)
INSERT INTO test
SELECT name, NULL
FROM data;
SQLite
生成代码执行下列 shell
代码,等待约 1 分钟,然后在同目录下生成 40 MB 的 data.db
。
#!/bin/bash
sqlite3 <<'EOF'
-- 在内存完成一切
PRAGMA temp_store = memory;
-- 新建地区表
CREATE TABLE area (
id INTEGER PRIMARY KEY,
name TEXT,
data INT,
level AS (5 - (data & 7)),
plevel AS (level - 1 - (data >> 3)),
cid_begin AS (IIF(level < 5, id + 1, NULL)),
cid_end AS (id + CASE level WHEN 1 THEN 9999999999 WHEN 2 THEN 99999999 WHEN 3 THEN 999999 WHEN 4 THEN 999 END),
pid AS (id - id % CASE plevel WHEN 1 THEN 10000000000 WHEN 2 THEN 100000000 WHEN 3 THEN 1000000 WHEN 4 THEN 1000 END)
);
-- 新建地区表的倒排索引表
CREATE TABLE area_idx (
word TEXT,
begin_id INT,
deltas ANY,
PRIMARY KEY (word, begin_id)
) WITHOUT ROWID;
-- 新建供输入的测试表
CREATE TABLE test (
addr TEXT,
code INT
);
-- 新建导入 CSV 用的临时表
CREATE TEMP TABLE csv_tbl (
id INTEGER PRIMARY KEY,
name TEXT,
level INT,
pid INT
);
-- 导入地区表 CSV(可以直接下载解压,命令)
.import --csv '| curl "https://github.com/adyliu/china_area/raw/master/area_code_2023.csv.gz" | gzip -dc' csv_tbl
-- 如果无法直接访问/没有 curl、gzip,可以手动下载解压后,改成如下命令
-- .import --csv 'area_code_2023.csv' csv_tbl
-- 从 CSV 中抽取需要的信息,填充地区表
INSERT OR IGNORE INTO area
SELECT id, name, ((level - 1 - IIF(pid % 1000000, 4, IIF(pid % 100000000, 3, IIF(pid % 10000000000, 2, pid != 0)))) << 3) | (5 - level)
FROM csv_tbl
UNION ALL
VALUES
-- 原来的数据似乎缺失下列三条信息
(441900000000, '东莞市', 3),
(442000000000, '中山市', 3),
(460400000000, '儋州市', 3);
-- 建立倒排索引
WITH
gen_idx(word, id) AS (
SELECT DISTINCT SUBSTR(name, key + 1, 2), area.id
FROM area
JOIN json_each(REPLACE(FORMAT(REPLACE('[%*.*s]', '*', MAX(LENGTH(name) - 1, 0)), '0'), ' ', '0,'))
),
gen_num(word, id, num) AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY word ORDER BY id) - 1
FROM gen_idx
),
split(word, ids) AS (
SELECT word, JSON_GROUP_ARRAY(id)
FROM gen_num
-- 每 89 个行政区划代码,就分割为单独的一行,整体索引最小(20 MB),粒度不粗不细
GROUP BY word, num / 89
-- 改为每个代码单独存一行,速度应该最快,但索引较大(60 MB)
),
diff(word, begin_id, deltas) AS (
SELECT
word,
ids ->> 0, (
SELECT JSON_GROUP_ARRAY(delta)
FROM (
SELECT value - 1 - LAG(value, 1, -1) OVER win delta
FROM json_each(ids)
WINDOW win AS (ORDER BY value)))
FROM split
)
INSERT INTO area_idx
SELECT
word,
deltas ->> 0,
CASE JSON_ARRAY_LENGTH(deltas)
WHEN 1 THEN NULL
WHEN 2 THEN deltas ->> 1
ELSE JSON_REMOVE(deltas, '$[0]')
END
FROM diff;
-- 为测试表建立触发器
-- SQLite 没法合并 Insert 和 Update 触发器,
-- 所以插入数据时,再更新一下,以触发 Update。这要求不开启递归触发器
CREATE TRIGGER trig_test_ains AFTER INSERT ON test
BEGIN
UPDATE test
SET addr = NEW.addr,
code = NEW.code
WHERE rowid = NEW.rowid;
END;
-- 更改了行政区划代码,则优先使用该代码,自动更新相对应的地区全名。
-- 否则根据给出的新地址,尽量识别出对应的区划代码。
CREATE TRIGGER trig_test_aupd AFTER UPDATE OF addr, code ON test
WHEN IFNULL(NEW.addr, NEW.code) IS NOT NULL
BEGIN
UPDATE test
SET code = t5.id,
addr = NULLIF(TRIM(FORMAT('%s - %s - %s - %s - %s', t1.name, t2.name, t3.name, t4.name, t5.name), ' -'), '')
FROM (SELECT 0)
LEFT JOIN area t5 ON t5.id = IIF(
-- 这里就是上个贴子说的,求优化方式
((NEW.addr IS OLD.addr) << 2) |
((NEW.code IS OLD.code) << 1) |
((NEW.code IS NULL)) IN (0, 4, 5, 6),
NEW.code, (
WITH RECURSIVE
matches(id, first_level, cid_end, cid_begin, is_final, skips, rest_addr) AS (
SELECT NULL, NULL, 0x7FFFFFFFFFFFFFFF, 0x8000000000000000, FALSE, 0, TRIM(NEW.addr)
UNION ALL
SELECT
IFNULL(this.id, prev.id),
IFNULL(first_level, this.plevel),
IIF(this.id, this.cid_end, prev.cid_end),
IIF(this.id, this.cid_begin, prev.cid_begin),
(this.id IS NULL AND LENGTH(rest_addr) < 2),
(this.id IS NULL AND LENGTH(rest_addr) > 0) + skips,
-- 截掉最长前缀
SUBSTR(rest_addr, IIF(this.id, (
SELECT 3 + key
FROM json_each(REPLACE(FORMAT(REPLACE('[%*.*s]', '*', MAX(LENGTH(rest_addr) - 1, 0)), '0'), ' ', '0,'))
WHERE NOT INSTR(this.name, SUBSTR(rest_addr || x'FFFF', 1, 3 + key))
LIMIT 1
), 2))
FROM matches prev
-- 从倒排索引中,找包含前俩字、且在指定范围内的所有后代
LEFT JOIN area_idx
ON LENGTH(rest_addr) >= 2
AND word = SUBSTR(rest_addr, 1, 2)
AND begin_id BETWEEN IFNULL((
SELECT MAX(begin_id) id
FROM area_idx inr
WHERE word = SUBSTR(rest_addr, 1, 2)
AND begin_id <= prev.cid_begin
), 0) AND prev.cid_end
-- SQLite 不支持 OUTER APPLY,以下表值函数可模拟该功能
-- 虽然反复序列化/反序列化,但还是比 IN subquery 快
LEFT JOIN (json_each((
SELECT json_group_array(id)
FROM (
SELECT begin_id id
UNION ALL
SELECT begin_id + SUM(value + 1) OVER win
FROM json_each(deltas)
WINDOW win AS (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
WHERE id BETWEEN prev.cid_begin AND prev.cid_end))) cand
LEFT JOIN area this ON this.id = cand.value
WHERE NOT is_final
)
SELECT id
FROM matches
WHERE is_final
ORDER BY skips, first_level
LIMIT 1))
LEFT JOIN area t4 ON t4.id = t5.pid
LEFT JOIN area t3 ON t3.id = t4.pid
LEFT JOIN area t2 ON t2.id = t3.pid
LEFT JOIN area t1 ON t1.id = t2.pid
WHERE rowid = NEW.rowid;
END;
-- 分析数据库(后续可生成更优的执行计划),并整理碎片,以紧实方式保存数据库
ANALYZE;
VACUUM INTO 'data.db';
EOF
能用纯SQL实现
要我的话肯定得用常规编程语言,甚至上人工智能