Py小工具和功能性方法

author:Ian

python

邮件 #

发送邮件时携带附件 中文名称附件

def send_email_week_report(htm, recipients, copys, file_name, file_path):
    port = 465
    # str_today = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    str_today = datetime.now().strftime("%Y-%m-%d")
    mail_host = "mail.{demo}.com"
    mail_user = "{user name}"
    mail_pass = "{passwd}"
    sender = '{name}@{demo}.com'
    mail_from = "{name} <{name}@{demo}.com>"
    msg_Bcc = '{name}@{demo}.com'
    subject = f"{title}-{str_today}"  # 标题
    msg = MIMEMultipart()
    msg.attach(MIMEText(htm, 'html', 'utf-8'))  # 这里可以传html内容 也可以传普通文字

    # 创建附件
    with open(file_path, 'rb') as attachment:
        part = MIMEBase('application', 'octet-stream')
        part.set_payload(attachment.read())
        encoders.encode_base64(part)
        part.add_header(
            'Content-Disposition',
            'attachment',
            filename=("utf-8", "", file_name),
        )
        msg.attach(part)

    for root, _, files in os.walk(os.path.join(BASE_DIR, '{path}/images')):
        for f in files:
            if f in htm:
                with open(os.path.join(root, f), 'rb') as fp:
                    image = MIMEImage(fp.read())
                    image.add_header('Content-ID', f'<{f}>')
                    msg.attach(image)

    msg['From'] = mail_from  # 发件人
    msg['To'] = ";".join(recipients)  # 收件人多个逗号分隔
    msg["Cc"] = ";".join(copys)
    msg["Bcc"] = msg_Bcc
    subject = subject
    msg['Subject'] = Header(subject)
    try:
        smtpObj = smtplib.SMTP_SSL(mail_host)
        if is_partner_node == 1:
            smtpObj = smtplib.SMTP(mail_host)
        smtpObj.connect(mail_host, port)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, recipients + copys + [sender], msg.as_string())
        return True
    except smtplib.SMTPException as e:
        return False

这种内容插入附件会有文件名称长度限制(19好像或者16)

# 创建附件
with open(file_path, 'rb') as attachment:
    part = MIMEBase('application', 'octet-stream')
    part.set_payload(attachment.read())
    encoders.encode_base64(part)
    if len(file_name) > 18:
        file_name = file_name[:9] + file_name[-9:]
    part.add_header(
        'Content-Disposition',
        f'attachment; filename={Header(file_name, "utf-8").encode()}',
    )
    msg.attach(part)

docx 插入附件 插入文件 #

取巧的办法 通过替换文件实现插入

"""
__name__: main.py
替换掉word模板中的excel文件内容

所需文件:
    - demo.docx(其中已经插入了一个xlsx文件)
    - demo_aim.xlsx
"""

import os
import shutil
import zipfile


class DocxEmbeddingFile:
    def __init__(self, src_docx_fn, temp_path, extract_folder='extrated'):
        self.src_docx_fn = src_docx_fn
        self.temp_path = temp_path
        self.extract_folder = extract_folder

    def unzip_docx(self):
        shutil.rmtree(self.extract_folder, ignore_errors=True)
        os.mkdir(self.extract_folder)
        os.chdir(self.extract_folder)
        with zipfile.ZipFile(self.src_docx_fn) as azip:
            azip.extractall()

    def change_ip_details_xlsx(self, path):
        """# 第一个插入的文件命名规则为按插入顺序递增:
            - Microsoft_Excel_Worksheet.xlsx
            - Microsoft_Excel_Worksheet1.xlsx
            - Microsoft_Word_Document.docx
        如果要测试其他类型文件,可将docx解压后去以下路径去查看
        """
        shutil.copy(path, os.path.join(self.temp_path, self.extract_folder, "word", "embeddings", "Microsoft_Excel_Worksheet.xlsx"))
        return True

    def zip_docx(self):
        os.chdir(os.path.join(self.temp_path, self.extract_folder))
        with zipfile.ZipFile(self.src_docx_fn, 'w') as azip:
            for i in os.walk('.'):
                for j in i[2]:
                    azip.write(os.path.join(i[0], j), compress_type=zipfile.ZIP_DEFLATED)

if __name__ == '__main__':
    temp_path = os.path.dirname(os.path.abspath(__file__))
    report_path = temp_path
    src_docx_fn = os.path.join(report_path, "demo.docx")
    docxef = DocxEmbeddingFile(src_docx_fn=src_docx_fn, temp_path=temp_path)
    docxef.unzip_docx()
    docxef.change_ip_details_xlsx(os.path.join(temp_path, "demo_aim.xlsx"))
    docxef.zip_docx()

word 插入文件附件 #

class DocxEmbeddingFile:
    def __init__(self, src_docx_fn, temp_path, extract_folder='extrated'):
        self.src_docx_fn = src_docx_fn
        self.temp_path = temp_path
        self.extract_folder = extract_folder

    def unzip_docx(self):
        original_dir = os.getcwd()
        shutil.rmtree(os.path.join(self.temp_path, self.extract_folder), ignore_errors=True)
        os.mkdir(os.path.join(self.temp_path, self.extract_folder))
        dir_path, file_name = os.path.split(self.src_docx_fn)
        docx_file_path = self.src_docx_fn
        if file_name.endswith(".zip"):
            os.chdir(self.temp_path)
            with zipfile.ZipFile(self.src_docx_fn) as azip:
                azip.extractall()
            file_name = file_name.replace("zip", "docx")
            docx_file_path = os.path.join(dir_path, file_name)
        elif file_name.endswith("docx"):
            pass
        else:
            os.chdir(original_dir)
            return False
        os.chdir(os.path.join(self.temp_path, self.extract_folder))
        with zipfile.ZipFile(docx_file_path) as azip:
            azip.extractall()
        os.chdir(original_dir)
        return True

    def change_ip_details_xlsx(self, path):
	    # 通过替换文件实现文件插入
        target_path = os.path.join(self.temp_path, self.extract_folder, "word", "embeddings", "Microsoft_Excel_Worksheet.xlsx")
        if not path or not os.path.exists(target_path):
            return False
        shutil.copy(path, target_path)
        return True

    def zip_docx(self):
        original_dir = os.getcwd()
        os.chdir(os.path.join(self.temp_path, self.extract_folder))
        docx_file_path = self.src_docx_fn
        dir_path, file_name = os.path.split(self.src_docx_fn)
        if file_name.endswith(".zip"):
            file_name = file_name.replace("zip", "docx")
            docx_file_path = os.path.join(dir_path, file_name)
        with zipfile.ZipFile(docx_file_path, 'w') as azip:
            for i in os.walk('.'):
                for j in i[2]:
                    azip.write(os.path.join(i[0], j), compress_type=zipfile.ZIP_DEFLATED)
        os.chdir(original_dir)
        return docx_file_path

生成Excel文件 #

class SheetGenerater:
    t_risky_info = "1_2"
    t_failed_info = u"key1信息"
    t_simple = "simple"
    def __init__(self, key, title, _type, sheeter):
        self.key = key
        self.title = title
        self.type = _type
        self.sheeter = sheeter
        self.dict_title = {
            u"key1信息": [u"本周期内第三列明细如下:", u"本周期内第三列告警详情如下:"],
            u"key2信息": [u"本周期内风险主机明细如下:", u"详情"]
        }
    
    def __get_list_dict_value(self, n, key, data):
        if len(data) <= n:
            return ''
        value = data[n][key]
        if isinstance(value, str) or isinstance(value, unicode):
            pass
        elif value and (isinstance(value, list) or isinstance(value, tuple)) and (isinstance(value[0], str) or isinstance(value[0], unicode)):
            value = u','.join(value)
        else:
            value = str(value)
        return value

    def full_sheet_data_failed(self, sheet_data):
        """
        key1信息 sheet信息填充
        实现方式与数据长度强相关,如果需要向表格中添加数据,需要手动调整各个数值
        """
        # set column names for sheet1 and fill in data
        col_names = [u"序号", u"第三列", u"第四列", u"备注"]
        col_names_2 = [u"序号", u"告警时间", u"事件名称", u"源IP", "目的IP"]
        if self.dict_title.get(self.title):
            self.sheeter.merge_cells('A1:D1')
            self.sheeter.cell(1, 1).value = self.dict_title[self.title][0]
            self.sheeter.cell(1, 1).fill = PatternFill(start_color='00b050', end_color='00b050', fill_type='solid')
            
            self.sheeter.merge_cells('F1:J1')
            self.sheeter.cell(1, 2+len(col_names)).value = self.dict_title[self.title][-1]
            self.sheeter.cell(1, 2+len(col_names)).fill = PatternFill(start_color='00b050', end_color='00b050', fill_type='solid')
        self.sheeter.append(col_names + [""] + col_names_2)
        # self.sheeter.append([u"序号", u"状态", u"第三列", u"第四列", u"备注"])
        longer = sheet_data["key1"] if len(sheet_data["key1"]) > len(sheet_data["failed_host_warning_info"]) else sheet_data["failed_host_warning_info"]
        for n, _ in enumerate(longer):
            # item["loop_state"] # 状态列,暂时不做自动填充,此列直接删除
            self.sheeter.append([
                str(n+1) if self.__get_list_dict_value(n, "host", sheet_data["key1"]) else '',
                self.__get_list_dict_value(n, "host", sheet_data["key1"]),
                self.__get_list_dict_value(n, "event", sheet_data["key1"]),
                self.__get_list_dict_value(n, "remark", sheet_data["key1"]),
                "",
                str(n+1) if self.__get_list_dict_value(n, "datetime", sheet_data["failed_host_warning_info"]) else '',
                self.__get_list_dict_value(n, "datetime", sheet_data["failed_host_warning_info"]),
                self.__get_list_dict_value(n, "event", sheet_data["failed_host_warning_info"]),
                self.__get_list_dict_value(n, "source_ip", sheet_data["failed_host_warning_info"]),
                self.__get_list_dict_value(n, "destined_ip", sheet_data["failed_host_warning_info"]),
            ])
        for n, cell in enumerate(self.sheeter[2]):
            if n == len(col_names):
                continue
            cell.fill = openpyxl.styles.PatternFill(start_color="92D050", end_color="92D050", fill_type="solid")
        self.autosize_table()
        return True
    
    def full_sheet_data_risky(self, sheet_data):
        """
        key2信息 sheet信息填充
        """
        # set column names for sheet1 and fill in data
        col_names = [u"序号", u"风险主机", u"第四列", u"备注"]
        if self.dict_title.get(self.title):
            self.sheeter.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(col_names))
            self.sheeter.cell(row=1, column=1, value=self.dict_title[self.title][0])
            fill = PatternFill(fill_type='solid', fgColor="00b050")
            self.sheeter.cell(row=1, column=1).fill = fill
            self.sheeter.cell(row=1, column=1).alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)

        if self.dict_title.get(self.title):
            self.sheeter.merge_cells('A1:D1')
            self.sheeter.cell(1, 1).value = self.dict_title[self.title][0]
            self.sheeter.cell(1, 1).fill = PatternFill(start_color='00b050', end_color='00b050', fill_type='solid')
            
            # self.sheeter.merge_cells('F1:J1')
            self.sheeter.cell(1, 2+len(col_names)).value = self.dict_title[self.title][-1]
            self.sheeter.cell(1, 2+len(col_names)).fill = PatternFill(start_color='00b050', end_color='00b050', fill_type='solid')
        
        self.sheeter.append(col_names)
        # self.sheeter.append([u"序号", u"状态", u"第三列", u"第四列", u"备注"])
        sheet_data = sheet_data["key2"]
        for n, _ in enumerate(sheet_data):
            # item["loop_state"] # 状态列,暂时不做自动填充,此列直接删除
            # self.sheeter.append([str(n+1), item["host"], item["event"], item["remark"]])
            self.sheeter.append([str(n+1),
                self.__get_list_dict_value(n, "host", sheet_data),
                self.__get_list_dict_value(n, "event", sheet_data),
                self.__get_list_dict_value(n, "remark", sheet_data)
            ])
            # set background color for column headers
        for n, cell in enumerate(self.sheeter[2]):
            if n == len(col_names):
                break
            cell.fill = openpyxl.styles.PatternFill(start_color="92D050", end_color="92D050", fill_type="solid")
        self.autosize_table()
        return True
    
    def full_sheet_data_simple(self, sheet_data):
        # set column names and fill in data for sheet3
        self.sheeter.append(sheet_data["column_name"])
        for row in sheet_data["table"]:
            self.sheeter.append(row)
            # set background color for column headers
        for cell in self.sheeter[1]:
            cell.fill = openpyxl.styles.PatternFill(start_color="92D050", end_color="92D050", fill_type="solid")
        self.autosize_table()
        return True
    
    def autosize_table(self):
        # 生效了
        self.sheeter.column_dimensions['B'].width = 20
        if self.title == u"key3信息":
            self.sheeter.column_dimensions['C'].width = 40
        else:
            self.sheeter.column_dimensions['C'].width = 60
        return
        # for column in self.sheeter.columns:
        #     column_letter = get_column_letter(column[0].column)
        #     self.sheeter.column_dimensions[column_letter].auto_size = True
            # self.sheeter.column_dimensions[column_letter].max_width = 500

    def full_sheet(self, data):
        if self.type == SheetGenerater.t_risky_info:
            # print('excel_utils -------135 {}'.format(data))
            return self.full_sheet_data_risky(data)
        if self.type == SheetGenerater.t_failed_info:
            # print('excel_utils -------138 {}'.format(data))
            return self.full_sheet_data_failed(data)
        elif self.type == SheetGenerater.t_simple:
            # print('excel_utils -------141 {}'.format(data))
            return self.full_sheet_data_simple(data)
        return False

def generate_excel(data, output_path):
    # create new workbook and worksheets
    workbook = openpyxl.Workbook()
    sheet1 = workbook.active
    sheet_gennerater_list = [
        SheetGenerater(key="key1", title=u"key1信息", _type=SheetGenerater.t_failed_info, sheeter=None),
        SheetGenerater(key="key2", title=u"key2信息", _type=SheetGenerater.t_risky_info, sheeter=None),
        SheetGenerater(key="key3", title=u"key3信息", _type=SheetGenerater.t_simple, sheeter=None),
    ]
    for n, sheet_gennerater in enumerate(sheet_gennerater_list):
        if data.get(sheet_gennerater.key) and sheet1.title == "Sheet":
            sheet1.title = sheet_gennerater.title
            sheet_gennerater_list[n].sheeter = sheet1
        elif data.get(sheet_gennerater.key):
            _sheet = workbook.create_sheet(sheet_gennerater.title)
            sheet_gennerater_list[n].sheeter = _sheet

    for sheet_gennerate in sheet_gennerater_list:
        if sheet_gennerate.sheeter:
            sheet_gennerate.full_sheet(data[sheet_gennerate.key])

    # save workbook to output file
    workbook.save(output_path)

计算md5 #

import hashlib

def calculate_md5(input_string):
    md5_hash = hashlib.md5()
    md5_hash.update(input_string.encode('utf-8'))
    return md5_hash.hexdigest()

input_string = "Hello, World!"
md5_value = calculate_md5(input_string)
print("MD5 值为:", md5_value)

定时任务 #

 pip install apscheduler

可以参看: 我以前写的定时任务