沉寂的博客

沉寂的博客

行政区划编码查询 For Python

2024-07-08
行政区划编码查询 For Python

起源

本文最初是2022年在52pojie论坛发表的《行政区划双向查询工具》(也不知道当时为什么没想写在博客上)今年年初对输出的格式进行了改进,并增加了城乡区划代码的显示,但仍有如下缺陷存在:

1.该程序可以展示某一行政区划的树形结构,由于获取树形结构的函数是通过递归查询某一行政区划的父级区划,大量的数据库递归查询会进行大量数据库I/O操作导致查询缓慢,尤其在使用通配符搜索行政区划名称时更为明显。

def get_tree(m_cursor, father_value, m_name):
    m_tree = [m_name]
    f_value = father_value
    while f_value != "0":
        f_rows = m_cursor.execute("SELECT * FROM xzqh WHERE value='{0}'".format(f_value)).fetchall()
        m_tree.insert(0, f_rows[0][1])
        f_value = f_rows[0][3]
    return m_tree

2.模块化程度不足,许多逻辑被写在main函数中,导致代码可读性、可维护性的下降。

改进

关于查询的改进

由于直接对数据库进行递归查询的效率实在太低下,当只对某一行政区划的全部父级区划进行递归查询时不会明显注意到,但当使用通配符对某一地名进行匹配时,由于程序需要展示搜索到的所有行政区划的父级区划,此时会进行大量的递归查询,导致每条输出需要等待,无法快速展示查询到的数据。

如下所示,在每一条数据输出后都会有短暂的等待:

输入行政区划名称 或 行政区划代码(输入[0]退出) >> %杭州%

查询到多个结果,请根据以下信息重新查询吧~
浙江省>杭州市 [3301]
天津市>市辖区>滨海新区>杭州道街道 [120116005]
新疆维吾尔自治区>乌鲁木齐市>新市区>杭州路街道 [650104012]
辽宁省>锦州市>凌河区>锦铁街道>杭州街社区 [210703009004]
江苏省>淮安市>清江浦区>水渡口街道>杭州路社区居委会 [320812011006]
浙江省>杭州市>钱塘区>前进街道>杭州经济技术开发区江东区块社区 [330114007400]
山东省>青岛市>胶州市>中云街道>杭州路社区居民委员会 [370281002014]
山东省>青岛市>平度市>东阁街道>杭州路居委会 [370283001007]
山东省>青岛市>莱西市>水集街道>杭州北路社区居委会 [370285001031]
湖北省>黄石市>下陆区>团城山街道>杭州东路社区居委会 [420204004008]
湖北省>黄石市>下陆区>团城山街道>杭州西路社区居委会 [420204004010]
四川省>眉山市>东坡区>通惠街道>杭州路社区居民委员会 [511402001007]
新疆维吾尔自治区>乌鲁木齐市>新市区>杭州路街道>杭州东街社区居委会 [650104012007]

思路壹:利用字典建立索引

我们可以在程序启动时,一次性读取数据库所有数据,并存入以行政区划编码为键的字典中。

def load_all_data(cursor):
    all_divisions = {}
    for row in cursor.execute("SELECT * FROM xzqh"):
        all_divisions[row[2]] = row
    return all_divisions

我们可以看到,上述代码先定义了一个空字典,而后将行数据放入键row[2] (行政区划编码)中,这样,索引就完成了。

而后,我们需要构建一个新的函数用于构建行政区划树。

def build_administrative_tree(all_divisions, value):
    tree = []
    while value != "0":
        row = all_divisions.get(value)
        if row is None:
            break
        tree.insert(0, row[1])
        value = row[3]
    return tree

以上代码从字典中递归查询行政区划的上级区划,直到一级行政区划为止。由于数据存储在内存中,我们可以很快得到结果。

以检索%社区%为例,此前需要长时间查询才能得到结果(没有时间数据是因为跑了十几分钟才跑了一点😓),现在仅需不到0.2秒就能完成数据库搜索和行政区划树的构建,加上输出数据到控制台的总时间在0.3秒左右。

这种方法会带来更高的内存占用,但对于效率的提升还是非常明显的。

思路贰:代替数据库查询

上面用的方法已经从数据库中读取了全部的数据,既然已经将数据都载入内存中,并且也不涉及到复杂的数据库命令,那我们不妨试试直接在字典中进行查询。同时,由于我们不再使用数据库查询语句,此时可以将原通配符%_替换为*. ,这样更符合使用习惯。

我们构建一个新的函数query_database ,用于从上面load_all_data函数返回的字典中根据输入的条件进行检索。

def query_database(all_divisions, in_val):
    pattern = '^' + in_val.replace('*', '.*') + '$'
    regex = re.compile(pattern)

    if '*' not in in_val and '.' not in in_val:
        row_by_code = all_divisions.get(in_val)
        if row_by_code:
            return [row_by_code]

        rows_by_name = [row for row in all_divisions.values() if in_val == row['name']]
        if rows_by_name:
            return rows_by_name

    rows = [row for row in all_divisions.values() if regex.match(row['name'])]
    return rows

通过以上的改进,查询的效率已经得到了很大的提升。

关于可读性的改进

主要就是抽取原本在主函数里的代码,将可以复用的代码封装成函数,过程不再赘述,完整代码如下:

import re
import sqlite3

# 定义数据库文件名常量
DATABASE_FILE_NAME = "xzqh.db"

# 城乡分类代码字典
CX_CODE_DICT = {
    '111': '主城区',
    '112': '城乡结合区',
    '121': '镇中心区',
    '122': '镇乡结合区',
    '123': '特殊区域',
    '210': '乡中心区',
    '220': '村庄'
}


# 预加载所有数据到内存
def load_all_data():
    all_divisions = {}
    with sqlite3.connect(DATABASE_FILE_NAME) as db:
        cursor = db.cursor()
        for row in cursor.execute("SELECT * FROM xzqh"):
            all_divisions[row[2]] = row
    return all_divisions


# 查询数据库函数
def query_database(all_divisions, in_val):
    pattern = '^' + in_val.replace('*', '.*') + '$'
    regex = re.compile(pattern)

    if '*' not in in_val and '.' not in in_val:
        row_by_code = all_divisions.get(in_val)
        if row_by_code:
            return [row_by_code]

        rows_by_name = [row for row in all_divisions.values() if in_val == row[1]]
        if rows_by_name:
            return rows_by_name

    rows = [row for row in all_divisions.values() if regex.match(row[1])]
    return rows


# 显示结果函数
def display_results(rows, all_divisions):
    if len(rows) > 1:
        print("查询到多个结果,请根据以下信息重新查询吧~")
        for row in rows:
            tree = build_administrative_tree(all_divisions, row[2])
            print(f"{' > '.join(tree)} [{row[2]}]")
        print()
    elif rows:
        row = rows[0]
        tree = build_administrative_tree(all_divisions, row[2])
        cx_code = row[5]
        cx_code_text = f", 他的城乡分类代码是[{cx_code}]{CX_CODE_DICT[cx_code]}" if cx_code != '0' else ""
        print(f"{' > '.join(tree)} [{row[2]}], 这是一个{row[4]}级行政区划地区{cx_code_text}")
        show_subdivisions(all_divisions, row[2])
    else:
        print("无法查询到您的输入对应的行政区划信息,请检查后重试\n")


# 构建行政树函数
def build_administrative_tree(all_divisions, value):
    tree = []
    while value != "0":
        row = all_divisions.get(value)
        if row is None:
            break
        tree.insert(0, row[1])
        value = row[3]
    return tree


# 显示下级行政区划函数
def show_subdivisions(all_divisions, father_value):
    ll_rows = [row for row in all_divisions.values() if row[3] == father_value]
    if ll_rows:
        max_name_length = max(len(row[1]) for row in ll_rows)
        for row in ll_rows:
            cx_code = row[5]
            cx_code_text = f"  [{cx_code}]{CX_CODE_DICT[cx_code]}" if cx_code != '0' else ""
            formatted_name = row[1].ljust(max_name_length + 2, ' ')
            print(f"{formatted_name}{row[2]}{cx_code_text}")
        print()
    else:
        print("他没有下级行政区划\n")


def main():
    print("行政区划双向查询工具 By: 繁华中的沉寂\n")
    print("本程序支持通配符查询,[*]代表任意数量的任意字符,[.]代表一个任意字符。")
    print("查询优先级:通配符查询 > 行政区划编码 > 行政区划名称\n")

    # 加载所有数据到内存
    print("正在载入数据,请稍等...")
    all_divisions = load_all_data()
    print("数据载入成功,欢迎使用!\n")
    
    while True:
        try:
            in_val = input("输入行政区划名称 或 行政区划代码(输入[0]退出) >> ")
        except KeyboardInterrupt:
            exit()

        if in_val == "0":
            break
        elif not in_val:
            continue
        
        rows = query_database(all_divisions, in_val)
        
        if len(rows) > 100:
            print(f"\n查询到 {len(rows)} 条数据,数据量较大,可能导致耗时较长。")
            opt = input("输入[Y]确认继续,回车取消 >> ")
            print()
            if opt.upper() != "Y":
                continue

        print()
        display_results(rows, all_divisions)


if __name__ == "__main__":
    main()

总结

本次主要还是通过将数据库全部加载到内存的方式进行,其实也尝试了其他的方法,例如在内存中创建一个SQLite数据库,读取磁盘中的数据库写入内存中的数据库里,虽然查询效率有所提升,但实际体验效果并不理想,因而本文中没有对这个方法进行介绍。同时也尝试了从csv文件中读取全部数据,不过读取csv文件的过程似乎没有从SQLite数据库中读取来的快。如果需要这种方式来加载数据,也仅需对load_all_data 函数进行略微改动即可。

由于数据量不是特别大,数据全部载入消耗约200MB左右的内存,但带来的查询效率的提升是非常大的,所以这里采用这种方法。如果数据量非常大,可能还是需要选择性载入字段,或是利用其他的方法。

本文中所采用的数据为2023年度库,本数据库来源于国家统计局公布的最新数据,作者整理免费提供,不保证数据准确性,也不拥有对该数据的所有权,如用于重要用途,请酌情考虑,或自行对数据库进行校对。

本作品源代码部分(下称本作品)采用CC BY-NC-SA 4.0(署名-非商业性使用-相同方式共享 4.0)许可协议,您可以自由修改,但不得对本作品进行商业性使用,您应在您修改后的程序中注明本文链接。如果您再混合、转换或者基于本作品进行创作,您必须基于与本作品的相同的许可协议进行分发。

本文的许可协议(不含本文所述的源代码)与本博客采用的许可协议一致。

数据库在如下链接中可以下载。

https://wwd.lanzoue.com/b01d56dkqb密码:gbuv