jiegemena Blog

哪些事情,我相信!

SSH文件夹上传工具

这个Python脚本用于通过SSH将本地文件夹上传到远程服务器,并可以选择替换远程文件夹中的内容。它使用paramiko库实现SSH连接和SFTP文件传输,支持密码认证和密钥认证两种方式。

功能特点

  • 递归上传整个文件夹结构
  • 智能文件比较,只上传已更改的文件
  • 支持密码认证和SSH密钥认证
  • 可选择删除远程目标中本地没有的文件
  • 详细的日志输出和上传统计信息
  • 跨平台支持(Windows, Linux, macOS)
  • 命令行参数支持,便于自动化

依赖项

  • Python 3.6+
  • paramiko库

安装

  1. 确保已安装Python 3.6或更高版本
  2. 安装paramiko库:
pip install paramiko

或者使用虚拟环境:

# 激活虚拟环境
.\venv\Scripts\activate  # Windows
source venv/bin/activate  # Linux/macOS

# 安装依赖
pip install paramiko

使用方法

Windows用户

直接运行upload_ssh.bat批处理文件:

upload_ssh.bat [参数]

所有平台

使用Python直接运行脚本:

python upload_ssh.py [参数]

命令行参数

usage: upload_ssh.py [-h] [--local LOCAL] [--remote REMOTE] [--host HOST]
                     [--port PORT] [--user USER] [--key KEY] [--password]
                     [--delete] [--verbose] [--check-hash] [--force]

选项:
  -h, --help            显示帮助信息并退出
  --local LOCAL, -l LOCAL
                        本地文件夹路径 (默认: public)
  --remote REMOTE, -r REMOTE
                        远程文件夹路径 (默认: /var/www/html)
  --host HOST           服务器主机名或IP地址 (默认: example.com)
  --port PORT           SSH端口 (默认: 22)
  --user USER, -u USER  SSH用户名 (默认: root)
  --key KEY, -k KEY     SSH私钥文件路径
  --password, -p        使用密码认证 (将会提示输入密码)
  --delete, -d          删除远程目标中本地没有的文件
  --verbose, -v         显示详细输出
  --check-hash, -c      使用文件哈希值进行比较(更准确但更慢)
  --force, -f           强制上传所有文件,忽略文件比较

文件比较机制

脚本使用以下标准来判断文件是否需要上传:

  1. 文件大小:如果本地文件和远程文件大小不同,则上传
  2. 修改时间:如果本地文件和远程文件的修改时间差异超过1小时(考虑时区差异),则上传
  3. 文件哈希值(可选):如果启用--check-hash选项,将计算并比较文件的SHA-256哈希值

这种智能比较机制可以显著减少网络传输量和上传时间,特别是对于大型网站或频繁部署的情况。

使用示例

基本用法(使用默认参数)

python upload_ssh.py --host your-server.com --user your-username --password

这将上传public文件夹到远程服务器的/var/www/html目录,只上传已更改的文件。

指定本地和远程路径

python upload_ssh.py --local ./build --remote /var/www/mysite --host your-server.com --user your-username --password

使用SSH密钥认证

python upload_ssh.py --host your-server.com --user your-username --key ~/.ssh/id_rsa

删除远程目标中本地没有的文件

python upload_ssh.py --host your-server.com --user your-username --password --delete

使用文件哈希值进行更准确的比较

python upload_ssh.py --host your-server.com --user your-username --password --check-hash

强制上传所有文件(忽略文件比较)

python upload_ssh.py --host your-server.com --user your-username --password --force

显示详细输出

python upload_ssh.py --host your-server.com --user your-username --password --verbose

注意事项

  1. 如果使用--delete选项,请确保指定了正确的远程路径,以避免意外删除重要文件。
  2. 对于大型文件夹,上传可能需要较长时间,请耐心等待。
  3. 如果遇到权限问题,请确保SSH用户对远程目录有写入权限。
  4. 使用--check-hash选项会增加上传时间,但提供更准确的文件比较。
  5. 如果需要强制重新上传所有文件(例如,在远程服务器时间设置不正确的情况下),请使用--force选项。

upload_ssh.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
通过SSH上传文件夹到服务器并替换现有文件夹
使用paramiko库实现SSH连接和SFTP文件传输
支持密码认证和密钥认证两种方式
只上传已更改的文件,提高上传效率
"""

import os
import sys
import time
import hashlib
import argparse
import getpass
import paramiko
import logging
from stat import S_ISDIR
from datetime import datetime, timezone

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class UploadStats:
    """上传统计信息"""
    def __init__(self):
        self.total_files = 0
        self.uploaded_files = 0
        self.skipped_files = 0
        self.deleted_files = 0
        self.deleted_dirs = 0
        self.errors = 0

def parse_arguments():
    """解析命令行参数"""
    parser = argparse.ArgumentParser(description='通过SSH上传文件夹到服务器')
    parser.add_argument('--local', '-l', required=False, 
                        default='public',
                        help='本地文件夹路径 (默认: public)')
    parser.add_argument('--remote', '-r', required=False,
                        default='/var/www/html',
                        help='远程文件夹路径 (默认: /var/www/html)')
    parser.add_argument('--host', required=False,
                        default='example.com',
                        help='服务器主机名或IP地址')
    parser.add_argument('--port', type=int, required=False,
                        default=22,
                        help='SSH端口 (默认: 22)')
    parser.add_argument('--user', '-u', required=False,
                        default='root',
                        help='SSH用户名 (默认: root)')
    parser.add_argument('--key', '-k', required=False,
                        help='SSH私钥文件路径')
    parser.add_argument('--password', '-p', action='store_true',
                        help='使用密码认证 (将会提示输入密码)')
    parser.add_argument('--delete', '-d', action='store_true',
                        help='删除远程目标中本地没有的文件')
    parser.add_argument('--verbose', '-v', action='store_true',
                        help='显示详细输出')
    parser.add_argument('--check-hash', '-c', action='store_true',
                        help='使用文件哈希值进行比较(更准确但更慢)')
    parser.add_argument('--force', '-f', action='store_true',
                        help='强制上传所有文件,忽略文件比较')
    
    return parser.parse_args()

def calculate_file_hash(file_path, block_size=65536):
    """计算文件的SHA-256哈希值"""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for block in iter(lambda: f.read(block_size), b""):
            sha256_hash.update(block)
    return sha256_hash.hexdigest()

def get_remote_file_hash(sftp, remote_path):
    """计算远程文件的SHA-256哈希值"""
    try:
        with sftp.open(remote_path, 'rb') as f:
            sha256_hash = hashlib.sha256()
            while True:
                block = f.read(65536)
                if not block:
                    break
                sha256_hash.update(block)
            return sha256_hash.hexdigest()
    except Exception as e:
        logger.debug(f"计算远程文件哈希值失败 {remote_path}: {e}")
        return None

def should_upload_file(sftp, local_path, remote_path, check_hash=False, verbose=False):
    """
    判断是否需要上传文件
    
    比较标准:
    1. 文件大小
    2. 修改时间(考虑1小时的时区差异)
    3. 可选的文件哈希值
    """
    try:
        local_stat = os.stat(local_path)
        try:
            remote_stat = sftp.stat(remote_path)
        except FileNotFoundError:
            if verbose:
                logger.debug(f"远程文件不存在: {remote_path}")
            return True
        
        # 比较文件大小
        if local_stat.st_size != remote_stat.st_size:
            if verbose:
                logger.debug(f"文件大小不同: {local_path}")
            return True
        
        # 比较修改时间(允许1小时的时区差异)
        local_mtime = datetime.fromtimestamp(local_stat.st_mtime, timezone.utc)
        remote_mtime = datetime.fromtimestamp(remote_stat.st_mtime, timezone.utc)
        time_diff = abs((local_mtime - remote_mtime).total_seconds())
        if time_diff > 3600:  # 允许1小时的时区差异
            if verbose:
                logger.debug(f"文件修改时间差异过大: {local_path}")
            return True
        
        # 如果启用了哈希值检查
        if check_hash:
            local_hash = calculate_file_hash(local_path)
            remote_hash = get_remote_file_hash(sftp, remote_path)
            if local_hash != remote_hash:
                if verbose:
                    logger.debug(f"文件哈希值不同: {local_path}")
                return True
        
        return False
    except Exception as e:
        logger.error(f"比较文件失败 {local_path}: {e}")
        return True

def upload_folder_via_sftp(sftp, local_path, remote_path, stats, delete=False, check_hash=False, force=False, verbose=False):
    """
    递归上传文件夹到远程服务器
    
    Args:
        sftp: SFTP客户端对象
        local_path: 本地文件夹路径
        remote_path: 远程文件夹路径
        stats: 上传统计信息对象
        delete: 是否删除远程目标中本地没有的文件
        check_hash: 是否使用文件哈希值进行比较
        force: 是否强制上传所有文件
        verbose: 是否显示详细输出
    """
    # 确保本地路径存在
    if not os.path.exists(local_path):
        raise FileNotFoundError(f"本地路径不存在: {local_path}")
    
    # 确保远程路径存在,如果不存在则创建
    try:
        sftp.stat(remote_path)
    except FileNotFoundError:
        logger.info(f"创建远程目录: {remote_path}")
        sftp.mkdir(remote_path)
    
    # 如果启用了删除选项,先获取远程文件列表
    remote_items = set()
    if delete:
        try:
            remote_items = set(sftp.listdir(remote_path))
        except Exception as e:
            logger.error(f"获取远程文件列表失败: {e}")
    
    # 获取本地文件列表
    local_items = os.listdir(local_path)
    local_items_set = set(local_items)
    
    # 如果启用了删除选项,删除远程目标中本地没有的文件
    if delete:
        for item in remote_items - local_items_set:
            remote_item_path = os.path.join(remote_path, item).replace('\\', '/')
            try:
                try:
                    # 检查是否是目录
                    sftp.stat(remote_item_path)
                    is_dir = S_ISDIR(sftp.stat(remote_item_path).st_mode)
                    if is_dir:
                        # 递归删除目录
                        _rmdir_recursive(sftp, remote_item_path, stats)
                        stats.deleted_dirs += 1
                    else:
                        # 删除文件
                        sftp.remove(remote_item_path)
                        stats.deleted_files += 1
                        if verbose:
                            logger.info(f"已删除远程文件: {remote_item_path}")
                except FileNotFoundError:
                    pass
            except Exception as e:
                logger.error(f"删除远程项目失败 {remote_item_path}: {e}")
                stats.errors += 1
    
    # 上传本地文件到远程
    for item in local_items:
        local_item_path = os.path.join(local_path, item)
        remote_item_path = os.path.join(remote_path, item).replace('\\', '/')
        
        if os.path.isdir(local_item_path):
            # 递归上传子目录
            upload_folder_via_sftp(sftp, local_item_path, remote_item_path, stats, delete, check_hash, force, verbose)
        else:
            stats.total_files += 1
            # 检查是否需要上传文件
            try:
                if force or should_upload_file(sftp, local_item_path, remote_item_path, check_hash, verbose):
                    if verbose:
                        logger.info(f"上传: {local_item_path} -> {remote_item_path}")
                    sftp.put(local_item_path, remote_item_path)
                    stats.uploaded_files += 1
                else:
                    if verbose:
                        logger.info(f"跳过: {local_item_path} (未更改)")
                    stats.skipped_files += 1
            except Exception as e:
                logger.error(f"上传文件失败 {local_item_path}: {e}")
                stats.errors += 1

def _rmdir_recursive(sftp, path, stats):
    """递归删除远程目录"""
    for filename in sftp.listdir(path):
        filepath = os.path.join(path, filename).replace('\\', '/')
        try:
            is_dir = S_ISDIR(sftp.stat(filepath).st_mode)
            if is_dir:
                _rmdir_recursive(sftp, filepath, stats)
            else:
                sftp.remove(filepath)
                stats.deleted_files += 1
        except Exception as e:
            logger.error(f"删除远程文件失败 {filepath}: {e}")
            stats.errors += 1
    
    sftp.rmdir(path)
    logger.info(f"已删除远程目录: {path}")

def main():
    """主函数"""
    args = parse_arguments()
    
    # 设置日志级别
    if args.verbose:
        logger.setLevel(logging.DEBUG)
    
    # 本地和远程路径
    local_path = os.path.abspath(args.local)
    remote_path = args.remote
    
    # 服务器信息
    host = args.host
    port = args.port
    username = args.user
    
    # 认证信息
    password = None
    key_filename = args.key
    
    # 如果使用密码认证,提示输入密码
    if args.password:
        password = getpass.getpass(f"请输入 {username}@{host} 的密码: ")
    
    # 如果既没有指定密钥文件也没有使用密码认证,提示用户选择认证方式
    if not key_filename and not args.password:
        auth_method = input("请选择认证方式 [1: 密码, 2: 密钥文件] (默认: 1): ").strip() or "1"
        if auth_method == "1":
            password = getpass.getpass(f"请输入 {username}@{host} 的密码: ")
        else:
            key_filename = input("请输入SSH私钥文件路径: ").strip()
    
    logger.info(f"准备上传 {local_path} 到 {host}:{remote_path}")
    if args.force:
        logger.info("强制上传模式:将上传所有文件")
    elif args.check_hash:
        logger.info("启用文件哈希值比较:上传过程可能较慢")
    
    # 创建SSH客户端
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 连接到服务器
        logger.info(f"正在连接到 {host}:{port} 作为 {username}...")
        if key_filename:
            client.connect(host, port, username, key_filename=key_filename)
        else:
            client.connect(host, port, username, password=password)
        
        # 创建SFTP客户端
        sftp = client.open_sftp()
        
        # 创建统计信息对象
        stats = UploadStats()
        
        # 记录开始时间
        start_time = time.time()
        
        # 上传文件夹
        logger.info(f"开始上传文件夹...")
        upload_folder_via_sftp(sftp, local_path, remote_path, stats,
                             args.delete, args.check_hash, args.force, args.verbose)
        
        # 计算耗时
        elapsed_time = time.time() - start_time
        
        # 显示统计信息
        logger.info("\n上传完成!")
        logger.info(f"总文件数: {stats.total_files}")
        logger.info(f"已上传: {stats.uploaded_files}")
        logger.info(f"已跳过 (未更改): {stats.skipped_files}")
        if args.delete:
            logger.info(f"已删除文件: {stats.deleted_files}")
            logger.info(f"已删除目录: {stats.deleted_dirs}")
        if stats.errors > 0:
            logger.info(f"错误数: {stats.errors}")
        logger.info(f"耗时: {elapsed_time:.2f} 秒")
        
    except Exception as e:
        logger.error(f"上传失败: {e}")
        return 1
    finally:
        # 关闭连接
        if 'sftp' in locals():
            sftp.close()
        client.close()
    
    return 0

if __name__ == "__main__":
    sys.exit(main())