Module pywander.file.run_py_file

本模块采用文件hash比对的方法,因为预先假设文件夹里面的都是一些零碎的小文件,总的文件数目也不是很多。

批量运行某个文件夹下的python脚本

Functions

def batch_run_python_script(root='.', out_suffix='.out', err_suffix='.err')
Expand source code
def batch_run_python_script(root='.', out_suffix='.out', err_suffix='.err'):
    need_to_process_file = collect_need_to_process_file(root=root, out_suffix=out_suffix, err_suffix=err_suffix)

    for file_path in need_to_process_file:
        file_out_path = file_path + out_suffix
        file_error_path = file_path + err_suffix

        try:
            # 打开文件以写入标准输出
            with open(file_out_path, 'wt', encoding='utf8') as stdout_file:
                # 打开文件以写入标准错误
                with open(file_error_path, 'wt', encoding='utf8') as stderr_file:
                    # 运行子进程,并将标准输出和标准错误分别重定向到文件
                    result = subprocess.run(f'python {file_path}', stdout=stdout_file, stderr=stderr_file,
                                            text=True, shell=True)

            # 检查返回码
            if result.returncode == 0:
                print(f"脚本 {file_path} 执行成功,标准输出已保存到 {stdout_file.name}")
                sync_file_hash(file_path, root=root)
            else:
                print(f"脚本 {file_path} 执行失败,错误信息已保存到 {stderr_file.name}")

        except FileNotFoundError:
            print("错误: 要运行的脚本文件未找到。")
        except Exception as e:
            print(f"发生未知错误: {e}")

    print(f'all job done.')
def build_file_hash_key(file_path)
Expand source code
def build_file_hash_key(file_path):
    file_hash_cache_key = 'file_hash' + file_path
    return file_hash_cache_key
def collect_need_to_process_file(root='.', out_suffix='.out', err_suffix='.err')
Expand source code
def collect_need_to_process_file(root='.', out_suffix='.out', err_suffix='.err'):
    """
    收集需要处理的文件
    """
    file_group = set()
    cachedb = get_cachedb(root=root)

    # 检查没有输出的脚本
    for file_path in gen_all_file(root, 'py$', exclude_folder_name=['__pycache__']):
        file_out_path = file_path + out_suffix

        if not os.path.exists(file_out_path):
            file_group.add(file_path)

    for file_path in gen_all_file(root, 'py$', exclude_folder_name=['__pycache__']):
        # 计算文件的哈希值
        file_hash = calculate_file_hash(file_path)

        # 获取文件的哈希缓存
        file_hash_cache_key = build_file_hash_key(file_path)
        if cachedb.has_key(file_hash_cache_key):
            file_hash_cache = cachedb.get(file_hash_cache_key)
        else:
            file_hash_cache = file_hash
            cachedb.set(file_hash_cache_key, file_hash)

        # 判断文件是否被修改
        if file_hash != file_hash_cache:
            file_group.add(file_path)

    return list(file_group)

收集需要处理的文件

def sync_file_hash(file_path, root='.')
Expand source code
def sync_file_hash(file_path, root='.'):
    cachedb = get_cachedb(root=root)
    file_hash_cache_key = build_file_hash_key(file_path)
    file_hash = calculate_file_hash(file_path)
    cachedb.set(file_hash_cache_key, file_hash)