import calendar import os import string from collections import OrderedDict, defaultdict from datetime import datetime, date from decimal import Decimal import numpy as np import pandas as pd from numpy import object from openpyxl import Workbook from openpyxl import load_workbook from openpyxl.chart import Reference, PieChart3D from openpyxl.chart.marker import DataPoint from openpyxl.styles import NamedStyle, Font, Border, Side, PatternFill, Alignment, colors from pytz import timezone fill_null_value = 'Others' other_display = 'OtherNullTagResource' application_dict = { 'sapdr': ['sapdr', '基础架构管理处'], 'dfsbackup': ['文件服务器云异地备份', '基础架构管理处'], 'ka': ['KA现渠自动化管理平台', '资讯应用规划处'], 'tpm': ['TPM系统', '资讯应用规划处'], 'wtx': ['旺天下测试', '资讯技术研发处'], 'newsrm': ['新供应商管理系统', '资讯技术研发处'], 'wantmediaback': ['CC备份上传云端', '基础架构管理处'], 'wantclub': ['微信广告', '资讯技术研发处'], 'watergod': ['水神系统', '/神旺控股/资讯处'], 'srm': ['企企通供应商管理系统', '资讯应用规划处'], 'vpn server': ['VPN服务', '基础架构管理处'], 'ecrm': ['ECRM系统', '资讯应用规划处'], 'ytwms': ['优豚WMS系统', '企业资源应用处'], 'sfa': ['营销通SFA系统', '资讯应用规划处'], 'dljs': ['多啦金税系统', '企业资源应用处'], 'firewall': ['CP测试', '基础架构管理处'], 'awsbackup': ['GIT数据备份', '-'], 'hrrpa': ['人资自动化机器人', '资讯应用规划处'], 'tczs': ['华南统仓追溯系统', '资讯应用规划处'], 'cas': ['统一认证系统', '资讯应用规划处'], 'nyerp': ['农业ERP系统', '农业发展事业部'], 'other': ['其他', '-'], 'iwantwant': ['爱旺旺广告', '资讯技术研发处'], 'wwgw': ['集团官网', '传媒服务处'], 'awslog': ['AWS服务日志', '-'], 'hollywant': ['售卖机', '-'], } venv_name_dict = { 'PRD': '生产环境', 'UAT': '测试环境', 'prd': '生产环境', 'uat': '测试环境', } def new_round(_float, _len=2): """ Parameters ---------- _float: _len: int, 指定四舍五入需要保留的小数点后几位数为_len Returns ------- type ==> float, 返回四舍五入后的值 """ try: if isinstance(_float, np.float64): return new_round(float(_float), _len) elif isinstance(_float, float): if 'e' in str(_float).lower(): return round(_float, _len) elif str(_float)[::-1].find('.') <= _len: return _float elif str(_float)[-1] == '5': return round(float(str(_float)[:-1] + '6'), _len) else: return round(_float, _len) elif isinstance(_float, str): _float = _float.replace(',', '') return new_round(float(_float), _len) else: return round(_float, _len) except Exception as e: pass class Book: def __init__(self, _time: list, title=None, project=None, currency=None, language=True): self.wb = Workbook() self.wb.remove(self.wb.active) self.highlight = None self.title = title self.project = project self.currency = currency _time.sort() if len(_time) != 2: raise Exception('时间数组不对') self.subtotal_str = '小计:' if language else 'subtotal:' self.total_str = '总计:' if language else 'total:' self.time = "%s_%s-%d" % ( _time[0] + '-01', _time[1], calendar.monthrange(int(_time[1].split('-')[0]), int(_time[1].split('-')[1]))[1]) def save(self, file_name='freeze.xlsx'): self.wb.save(file_name) def workbook(self, name=None, index=False): name = name.replace(' ', '_') if name is None or index is True: wb = self.wb.create_sheet('ResourcesUsage', ) wb.title = 'ResourcesUsage' else: wb = self.wb.create_sheet(name, ) # 在最后添加 wb.title = name return wb def format_font(self, background_color="FFFFFF", font_color='2B2B2B', bold=True): if not self.highlight: self.highlight = NamedStyle(name="highlight") self.highlight.fill = PatternFill("solid", fgColor=background_color) # 背景填充 self.highlight.font = Font(name='微软雅黑', size=11, bold=bold, italic=False, strike=False, color=font_color) return self.highlight class Sheet: def __init__(self, _wb: Book, tag: str, table_first_data=None, table_second_data=None, table_data=None, index=False): self.border = None self.raw_wb = _wb self.wb = _wb.workbook(tag.replace(':', "_"), index=index) self.highlight = _wb.format_font self.tag = tag self.title_first = [] self.table_first_data = table_first_data self.table_data = table_data self.title_second = [] self.table_second_data = table_second_data self.margin_left = 1 self.margin_top = 1 self.row_number = None def merge(self, left, right, top, down, color=None, value=None): """ :param left: :param right: :param top: :param down: :param color: :param value: :return: """ self.set_border(left, right, top, down) self.wb.merge_cells('%s%d:%s%d' % (chr(64 + left), top, chr(64 + right), down)) subtotal = self.wb.cell(row=self.row_number, column=left) subtotal.style = self.highlight(color) if value: subtotal.value = value if not isinstance(value, str): subtotal.number_format = f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00' subtotal.alignment = Alignment(horizontal='center', vertical='center') self.wb.row_dimensions[self.row_number].height = 20 def fill_table(self, data: list, index_title: list, link=False, index=False): """ 填充数据 :param data: :param index_title: :return: """ for row in data: for k, value in enumerate(row): cell = self.wb.cell(row=self.row_number, column= k + self.margin_left + 1) cell.style = self.highlight(bold=False) cell.alignment = Alignment( horizontal = index_title[k].get('sub_horizontal', 'center'), vertical = 'center' ) cell.border = self.my_border('thin', 'thin', 'thin', 'thin') self.wb.row_dimensions[self.row_number].height = 20 if index_title[k].get('format'): cell.number_format = index_title[k].get('format') if k == 0 and link is True and not index: cell.value = '=HYPERLINK("%s", "%s")' % (f'#{value.replace(" ", "_")}!B2', value) cell.font = Font(underline = 'single', color = '0563C1') else: cell.value = value if index and k == 1: # cell.value = '=HYPERLINK("%s", "%s")' % (f'#{value.replace(" ", "_")}!B2', value) # cell.font = Font(underline = 'single', color = '0563C1') cell.value = value self.row_number += 1 def fill_footer(self, footer_data: list, color='A9D08E'): """ 填充角标 :return: """ total = self.margin_left + 1 for k, v in footer_data: # 合并每一个区域块 self.merge(total, total + k - 1, self.row_number, self.row_number, value = v, color = color) # 标注网格线 self.set_border(total, total + k - 1, self.row_number, self.row_number) total += k # 完成下移 # 空行 self.row_number += 1 for i in range(2): self.wb.append([]) self.row_number += 1 def format_title(self, index_data: list): for i, title in enumerate(index_data): self.wb.column_dimensions[chr(65 + i + 1)].width = title.get('width', 20) cell = self.wb.cell(row = self.row_number, column = i + self.margin_left + 1) title_info = index_data[i] cell.value = title_info['title'] cell.style = self.highlight( title_info['background_color'], font_color = title_info.get('font_color', '2B2B2B') ) cell.alignment = Alignment(horizontal = 'center', vertical = 'center') self.set_border(self.margin_left + 1, len(index_data) + self.margin_left, self.row_number, self.row_number) self.row_number += 1 def my_border(self, t_border, b_border, l_border, r_border): if self.border: return self.border else: self.border = Border(top = Side(border_style = t_border, color = colors.BLACK), bottom = Side(border_style = b_border, color = colors.BLACK), left = Side(border_style = l_border, color = colors.BLACK), right = Side(border_style = r_border, color = colors.BLACK)) return self.border def redirect_index(self): cell = self.wb.cell(row = 1, column = 1) cell.style = self.highlight(bold = False) cell.border = self.my_border('thin', 'thin', 'thin', 'thin') self.wb.row_dimensions[self.row_number].height = 20 cell.value = '=HYPERLINK("%s", "%s")' % (f'#ResourcesUsage!B2', "跳回首页") cell.font = Font(underline = 'single', color = '0563C1') def set_border(self, left, right, top, down): for row in range(top, down + 1): for line in range(left, right + 1): cell = self.wb.cell(row = row, column = line) cell.border = self.my_border('thin', 'thin', 'thin', 'thin') cell.alignment = Alignment(horizontal = 'center', vertical = 'center') def fill_index(self, location='B2', title_index: list = None, title=None, bkg=None): if title: self.wb[location] = title else: self.wb[location] = self.raw_wb.project + ':' + self.raw_wb.time if bkg: self.wb[location].style = self.highlight(background_color = bkg) else: self.wb[location].style = self.highlight(background_color = 'D9D9D9') self.set_border(self.margin_left + 1, len(title_index) + self.margin_left, self.row_number, self.row_number) self.wb[location].border = self.my_border('thin', 'thin', 'thin', 'thin') self.wb[location].alignment = Alignment(horizontal = 'center', vertical = 'center') self.row_number += 1 class AppSheet(Sheet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.title_first = [ {'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 5}, {'title': 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 30}, {'title': '所属环境', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 30}, {'title': 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 30}, {'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 20}, {'title': 'Other Resource', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30}, {'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'width': 15}, {'title': 'Data Transfer', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 15}, {'title': 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 15}, {'title': 'Application', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20}, {'title': 'Owner', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10}, {'title': 'Remarks', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10}, ] self.title_second = [ {"title": 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'width': 5}, {"title": 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'width': 30}, {"title": 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30}, {"title": 'Resource Cost', 'background_color': "D9D9D9", 'width': 20, 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', }, {"title": 'Total Cost', 'background_color': "D9D9D9", 'width': 30, 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', }, {"title": self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 20}, ] self.wb.sheet_view.showGridLines = False # 隐藏网络线 self.wb.freeze_panes = 'C4' # 冻结窗口 self.wb.merge_cells('B2:M2') self.row_number = self.margin_top + 1 def run(self): # 第二行 self.fill_index(title_index = self.title_first) # title行 self.format_title(self.title_first) # 填充数据行 total_1 = new_round(sum([item[-4] for item in self.table_first_data]), 6) # 格式化 符号 for i in range(len(self.table_first_data)): self.table_first_data[i][3] = self.table_first_data[i][3] self.table_first_data[i][5] = self.table_first_data[i][5] self.table_first_data[i][6] = self.table_first_data[i][6] self.table_first_data[i][7] = self.table_first_data[i][7] self.fill_table(data = self.table_first_data, index_title = self.title_first) self.set_border(2, len(self.title_first) + self.margin_left, self.row_number, self.row_number) # 小计行 self.fill_footer([(len(self.title_first) - 4, self.raw_wb.subtotal_str), (4, total_1)]) # 第二个表格 self.format_title(self.title_second) start = self.row_number total_2 = new_round(sum([item[3] for item in self.table_second_data]), 6) total_3 = new_round(sum([item[4] for item in self.table_second_data]), 6) # 格式化 符号 for i in range(len(self.table_second_data)): self.table_second_data[i][3] = self.table_second_data[i][3] self.table_second_data[i][4] = self.table_second_data[i][4] self.fill_table(self.table_second_data, index_title = self.title_second) end = self.row_number - 1 if end > start: self.wb.merge_cells( '%s%d:%s%d' % (chr(64 + self.margin_left + 2), start, chr(64 + self.margin_left + 2), end)) # 小计行 self.fill_footer([(3, self.raw_wb.subtotal_str), (1, total_2), (1, total_3)]) # 总计行 self.fill_footer([ (len(self.title_first) - 2, self.raw_wb.total_str), (2, new_round(float(total_1) + float(total_3), 2)) ], color = 'FFC000') self.redirect_index() class ServiceSheet(Sheet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.wb.sheet_view.showGridLines = False # 隐藏网络线 self.wb.freeze_panes = 'B2' # 冻结窗口 self.row_number = self.margin_top + 1 def run(self): # 第二行 for _date, date_data in self.table_data.items(): self.write(_date, self.get_tag_index(date_data)) self.redirect_index() def get_tag_index(self, date_data: dict): temp = [] for tag, data in date_data.items(): cost = new_round(float(sum(data.values())) * 1.06, 6) if cost > 0: temp.append((tag, cost)) temp = sorted(temp, key = lambda x: x[1], reverse = True) return temp def get_pie(self, min_row=None, max_row=None, min_col=None, max_col=None): pie = PieChart3D() # 创建饼图 labels = Reference(self.wb, min_col = min_col, max_col = min_col, min_row = min_row, max_row = max_row) # 饼图label取值单元格 data = Reference(self.wb, max_col = max_col, min_col = max_col, min_row = min_row - 1, max_row = max_row) # 饼图数据取值单元格 pie.add_data(data, titles_from_data = True) pie.set_categories(labels) pie.title = "项目费用分布图" slice = DataPoint(idx = 0, explosion = 20) pie.series[0].data_points = [slice] return pie def get_date_tag_value(self, inner_date, tag, service): if service not in display_set: total = sum([v for k, v in self.table_data[inner_date][tag].items() if k not in display_set]) if total == 0: return '-' else: total = self.table_data[inner_date][tag].get(service, 0) if total == 0: return 0 return new_round(float(total) * 1.06, 6) @staticmethod def len_byte(value): # 获取字符串长度,一个中文的长度为2 if value is None or value == "": return 10 if type(value) != int: length = len(value) utf8_length = len(value.encode('utf-8')) length = (utf8_length - length) / 2 + length else: length = len(str(value)) return int(length) def write(self, inner_date, _date_tag_order): services_set = set() for k_1, item_dict in self.table_data[inner_date].items(): for k_2 in item_dict.keys(): if k_2 in display_set: services_set.add(k_2) services_list = [*list(services_set), "Others Services"] # 确定栏位宽度 adapt_width = [self.get_width(item) for item in services_list] serve_dict = { 'Amazon Elastic Compute Cloud': 'EC2服务器', 'Amazon Simple Storage Service': 'S3存储桶', 'Amazon Relational Database Service': 'RDS数据库', 'Amazon ElastiCache': 'Redis数据库', 'Others Services': '其他云服务', } temp = [] for k, service in enumerate(services_list): name = serve_dict.get(service, '') temp.append({ 'title': name, 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': adapt_width[k], 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', }) self.title_first = [ { 'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 37 }, { 'title': '所属部门', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 37 }, *temp, { 'title': 'Total Cost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': self.get_width('Total Cost'), 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', }, { 'title': 'Percentage', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': self.get_width('Total Cost') }, ] # 第二行 self.wb.merge_cells(f'B{self.row_number}:{self.digit2alphabet(len(self.title_first))}{self.row_number}') self.fill_index(title_index = self.title_first, title = f"{inner_date[:7]} Resource Distribution Details", bkg = 'E26B0A') # title行 self.format_title(self.title_first) table_data = [] total_cost = new_round(sum([item[1] for item in _date_tag_order]), 6) for tag, total in _date_tag_order: application = application_dict.get(tag, [tag, '-']) temp = [application[0], application[1]] for service in services_list: temp.append(self.get_date_tag_value(inner_date, tag, service)) temp.append(total) if total_cost: temp.append("%.2f%%" % new_round(total * 100 / total_cost, 4)) else: temp.append("%.2f%%" % new_round(0, 6)) table_data.append(temp) table_data = self.reset_order(table_data) # others_total_list = self.get_other_total(table_data, services_list) resource_li = ['资源费用', '-', *[0] * len(services_list), 0, '100%'] for item_data in table_data: for k, v in enumerate(item_data): if k == 0 or k > len(item_data) - 2 or k == 1: continue else: if v == '-' or v == '': pass else: resource_li[k] += v resource_num = resource_li[-2] table_data.append(resource_li) table_data.insert(0, ['服务费用', '-', *'-' * len(services_list), resource_num * 0.106, '-']) total_list = self.get_total(table_data, resource_num, services_list) # table_data.append(others_total_list) # table_data.append(total_list) min_row = self.row_number self.fill_table(table_data, self.title_first, link=True) max_row = self.row_number - 1 min_col = 2 max_col = 3 + len(services_list) + 1 if max_row > min_row: pie = self.get_pie(max_row = max_row, min_row = min_row, max_col = max_col, min_col = min_col) self.wb.add_chart(pie, f"{self.digit2alphabet(7 + len(services_list))}3") self.fill_total([total_list], self.title_first) def get_width(self, service): total = 0 for item in list(service): if item in string.digits: total += 1 elif item in string.ascii_lowercase: total += 1 elif item in string.ascii_uppercase: total += 2 elif item in string.punctuation: total += 1 else: total += 2 return total def fill_total(self, total_list, index_title: list): for row in total_list: for k, value in enumerate(row): cell = self.wb.cell(row = self.row_number, column = k + self.margin_left + 1) cell.value = value cell.style = self.highlight(background_color = 'A6A6A6', bold = False) # cell.style = self.highlight() cell.alignment = Alignment(horizontal = index_title[k].get('sub_horizontal', 'center'), vertical = 'center') cell.border = self.my_border('thin', 'thin', 'thin', 'thin') self.wb.row_dimensions[self.row_number].height = 20 if index_title[k].get('format'): cell.number_format = index_title[k].get('format') self.row_number += 1 def get_other_total(self, table_data, services_list): other_percent = 0 others_total_list = [f'Total(exclude Other resource cost)', *[0] * len(services_list), 0, 0] for item_data in table_data: if item_data[0] == other_display: other_percent = eval(item_data[-1][:-1]) continue for k, v in enumerate(item_data): if k == 0 or k > len(item_data) - 2: continue else: if v == '-' or v == '': pass else: others_total_list[k] += v others_total_list[-1] += eval(item_data[-1][:-1]) others_total_list[-1] = f"{new_round(100 - other_percent, 2)}%" return others_total_list def get_total(self, table_data, resource_num, services_list): total_list = [f'Total', '-', *'-' * len(services_list), resource_num * 1.106, ''] return total_list def reset_order(self, data): res = [] temp = [] for item in data: if item[0] == other_display: temp = item else: res.append(item) if temp: res.append(temp) return res @staticmethod def digit2alphabet(digit): """10进制 -> 26进制""" ALPHABET = list(map(lambda x: chr(x), range(ord('A'), ord('Z') + 1))) mod, remainder = divmod(digit, 26) alphabet = ALPHABET[remainder] while mod: mod, remainder = divmod(mod, 26) alphabet = ALPHABET[remainder] + alphabet return alphabet class SharedSheet(Sheet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.title_first = [ {'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 5}, {'title': 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 30}, {'title': 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 30}, {'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 20}, {'title': 'Other Resource', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30}, {'title': 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'width': 15}, {'title': 'Data Transfer', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 15}, {'title': 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'sub_horizontal': 'right', 'width': 15}, {'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 20}, {'title': 'Owner', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10}, {'title': 'Remarks', 'background_color': "808080", "font_color": "FFFFFF", 'width': 10}, ] self.title_second = [ {"title": 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'width': 5}, {"title": 'Name', 'background_color': "808080", "font_color": "FFFFFF", 'width': 30}, {"title": 'Product', 'background_color': "1F4E78", "font_color": "FFFFFF", 'width': 30}, {"title": 'Resource Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'width': 20}, {"title": 'Total Cost', 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', 'background_color': "D9D9D9", 'width': 30}, {"title": 'Application', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20}, ] self.wb.sheet_view.showGridLines = False # 隐藏网络线 self.wb.freeze_panes = 'C4' # 冻结窗口 self.wb.merge_cells('B2:L2') self.row_number = self.margin_top + 1 def run(self): # 第二行 self.fill_index(title_index = self.title_first) # title行 self.format_title(self.title_first) # 填充数据行 total_1 = new_round(sum([item[-4] for item in self.table_first_data]), 6) # 格式化 符号 for i in range(len(self.table_first_data)): self.table_first_data[i][3] = self.table_first_data[i][3] self.table_first_data[i][5] = self.table_first_data[i][5] self.table_first_data[i][6] = self.table_first_data[i][6] self.table_first_data[i][7] = self.table_first_data[i][7] self.fill_table(data = self.table_first_data, index_title = self.title_first) self.set_border(2, len(self.title_first) + self.margin_left, self.row_number, self.row_number) # 小计行 self.fill_footer([(len(self.title_first) - 4, self.raw_wb.subtotal_str), (4, total_1)]) # 第二个表格 self.format_title(self.title_second) start = self.row_number total_2 = new_round(sum([item[3] for item in self.table_second_data]), 6) total_3 = new_round(sum([item[4] for item in self.table_second_data]), 6) # 格式化 符号 for i in range(len(self.table_second_data)): self.table_second_data[i][3] = self.table_second_data[i][3] self.table_second_data[i][4] = self.table_second_data[i][4] self.fill_table(self.table_second_data, index_title = self.title_second) end = self.row_number - 1 if end > start: self.wb.merge_cells( '%s%d:%s%d' % (chr(64 + self.margin_left + 2), start, chr(64 + self.margin_left + 2), end)) # 小计行 self.fill_footer([(3, self.raw_wb.subtotal_str), (1, total_2), (1, total_3)]) # 总计行 self.fill_footer([ (len(self.title_first) - 2, self.raw_wb.total_str), (2, new_round(total_1 + total_3, 2)) ], color = 'FFC000') class IndexSheet(Sheet): def __init__(self, *args, **kwargs): kwargs['index'] = True super().__init__(*args, **kwargs) self.title_index = [ {'title': 'NO.', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 20}, {'title': self.tag, 'background_color': "808080", "font_color": "FFFFFF", 'width': 30}, {'title': 'Total Cost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'format': f'{self.raw_wb.currency}#,##0;{self.raw_wb.currency}-#,##0', 'width': 20}, {'title': 'Percentage', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'center', 'width': 20}, ] self.margin_top = 4 self.wb.sheet_view.showGridLines = False # 隐藏网络线 self.wb.merge_cells('B5:E5') self.row_number = self.margin_top + 1 a2 = self.wb.cell(row = 2, column = 1) a2.value = 'Billing duration:' a2.style = self.highlight(background_color = 'FFFF00', bold = False) b2 = self.wb.cell(row = 2, column = 2) b2.style = self.highlight(background_color = 'FFFF00', bold = False) a3 = self.wb.cell(row = 3, column = 1) a3.value = 'tag=%s' % self.tag c2 = self.wb.cell(row = 2, column = 3) c2.value = self.raw_wb.time self.wb.sheet_properties.tabColor = "FF0000" def run(self): # 第二行 self.fill_index(location = "B5", title_index = self.title_index) # title行 self.format_title(self.title_index) for k in range(len(self.table_data)): # 变成整数 self.table_data[k][2] = float(self.table_data[k][2]) * 1.06 total_cost = new_round(sum([item[2] for item in self.table_data]), 6) for k in range(len(self.table_data)): temp = self.table_data[k] if total_cost: temp.append("%.2f%%" % new_round(self.table_data[k][2] * 100 / total_cost, 4)) else: temp.append("%.2f%%" % new_round(0, 6)) self.table_data[k] = temp for i in range(len(self.table_data)): self.table_data[i][2] = self.table_data[i][2] # 填充数据行 self.fill_table(data=self.table_data, index_title=self.title_index, index=True) self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number) # 总计行 self.fill_footer([(2, self.raw_wb.total_str), (1, total_cost)]) def insert(self): # ws.insert_rows(7) pass class OtherSheet(Sheet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.title_index = [ {'title': 'LinkedAccountId', 'background_color': "808080", "font_color": "FFFFFF", 'width': 20, 'sub_horizontal': 'left'}, {'title': 'Product', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'left', 'width': 40, }, {'title': 'ItemDescription', 'background_color': "808080", "font_color": "FFFFFF", 'width': 60, 'sub_horizontal': 'left'}, {'title': 'UnBlendedCost', 'background_color': "808080", "font_color": "FFFFFF", 'sub_horizontal': 'right', 'width': 20, 'format': f'{self.raw_wb.currency}#,##0.00;{self.raw_wb.currency}-#,##0.00', }, ] self.margin_top = 1 self.wb.sheet_view.showGridLines = False # 隐藏网络线 self.row_number = self.margin_top + 1 self.wb.freeze_panes = 'B3' # 冻结窗口 def run(self, ): # title行 self.format_title(self.title_index) # 填充数据行 self.fill_table(data=self.table_data, index_title=self.title_index) self.set_border(2, len(self.title_index) + self.margin_left, self.row_number, self.row_number) # 总计行 self.fill_footer([(3, self.raw_wb.total_str), (1, sum([item[3] for item in self.table_data]))]) class Excel: """ 生成EXCEL """ ROUND_NUM = 6 def __init__(self, **kwargs): self.raw_kwargs = kwargs self.regions = kwargs['regions'] # type: list self.project = kwargs['project'] self.project_id = kwargs['project_id'] self.title = kwargs['title'] self.tag = kwargs['tag'] self.linked_accounts = kwargs['linked_accounts'] self.shared = set(kwargs.get('shared', [])) self.start_date = [] self.currency = kwargs['currency'] self.create_time = kwargs['create_time'] self.file_paths = kwargs['file_path'] def get_file(self): self.start_date = [self.file_paths[0][-11:-4], self.file_paths[0][-11:-4]] v_tag = dict() wb = load_workbook('旺旺EBStag信息.xlsx', read_only = False, data_only = True) wb1 = wb['Sheet1'] # 因为按行,所以返回A1, B1, C1这样的顺序 for row in list(wb1.rows)[1:]: tags = list(row) v_tag[tags[0].value] = { 'user:Name': tags[1].value, 'user:Application': tags[2].value, # 'user:appenv': tags[3].value, 'user:Vendor': tags[4].value, } res_data = [] for file in self.file_paths: a = self.handle(file, v_tag) res_data += a return res_data def handle(self, file_path, res): if 'Ningxia' in file_path: region = 'cn-northwest-1' else: region = 'cn-north-1' _type = { 'UnBlendedCost': object, 'ProductName': object, 'UsageStartDate': object, 'ResourceId': object, 'RateId': object, 'InvoiceID': object, } reader = pd.read_csv( file_path, iterator=True, sep=',', dtype=_type, chunksize=10000, low_memory = False) total = 0 index = reader.get_chunk(1) if len(index.columns) == 1: reader = pd.read_csv( file_path, iterator=True, sep=',', skiprows=[0, ], dtype=_type, chunksize=10000, low_memory = False ) iter_num = 1 result = [] for data in reader: print(iter_num) iter_num += 1 data.loc[:, 'UsageStartDate'] = pd.to_datetime(data.loc[:, 'UsageStartDate'].copy()).dt.strftime('%Y-%m-%d') data.insert(1, 'usage_start_date', pd.to_datetime(data['UsageStartDate']).dt.strftime('%Y-%m-%d')) data.insert(1, 'usage_end_date', pd.to_datetime(data['UsageEndDate']).dt.strftime('%Y-%m-%d')) data = data[(data['RecordType'] == 'LineItem') & (data['ProductName'].notna())] data = data[data['LinkedAccountId'].isin(self.linked_accounts)] data = data.reset_index(drop = True) index = list(data.columns) for i in range(data.shape[0]): pack = dict() for k, v in enumerate(data.loc[i]): if str(v).strip() == '*': print(i) v = 'Others' pack[index[k]] = v pack["region"] = region _temp = self.pop_empty(pack) for k, v in _temp.items(): if k == 'UnBlendedCost': _temp[k] = Decimal(v) else: _temp[k] = str(v) _temp['usage_start_date'] = _temp['usage_start_date'][:7] + "-01" total += Decimal(_temp['UnBlendedCost']) # print(res.get(_temp.get('ResourceId', ''), {})) _temp.update(res.get(_temp.get('ResourceId', ''), {})) result.append(_temp) return result def run(self): currency = '$' if self.currency == 'USD' else '¥' response = self.get_file() start_date = self.get_range_date(self.start_date) temp = [] for item in response: if item.get('region', '') in self.regions: temp.append(item) response = temp service_data = self.get_service_data(start_date, response) shared_data = [] response2 = [] for item in response: if item.get(self.tag) == 'shared': shared_data.append(item) elif item.get('ProductName', '') in self.shared: shared_data.append(item) else: response2.append(item) response = response2 temp_dict = defaultdict(list) temp_dict['sharing resources'] = shared_data other = [] for item in response: if item.get(self.tag) and item.get(self.tag) != fill_null_value: temp_dict[item.get(self.tag).strip()].append(item) else: other.append(item) temp_dict[other_display] = other sheet = Book( title = self.title, _time = self.start_date, project = self.project, currency = currency, language = self.raw_kwargs.get("language") == 'zh' ) table_data = [] for k, items in temp_dict.items(): table_data.append( [0, k, new_round((sum([item.get('UnBlendedCost', Decimal("0.0")) for item in items])), self.ROUND_NUM)] ) # 对 table_data进行排序 table_data = sorted(table_data, key=lambda _item: _item[2], reverse=True) for i in range(len(table_data)): table_data[i][0] = i + 1 # 第一页 IndexSheet(sheet, self.tag, table_data=table_data).run() # 应用服务占比图 ServiceSheet(sheet, self.tag, table_data=service_data).run() order_dict = OrderedDict() for item in [item[1] for item in table_data]: application = application_dict.get(item, [item, '-'])[0] order_dict[application] = temp_dict[item] # 其余页 for key, items in order_dict.items(): if key == fill_null_value: table_data = [] for item in items: table_data.append([ item.get('LinkedAccountId'), item.get('ProductName'), item.get('ItemDescription'), new_round(float(item.get('UnBlendedCost', '0.0')), self.ROUND_NUM), ]) merge_table_data = [] table_data = sorted(table_data, key = lambda xx: "%s%s%s" % (xx[0], xx[1], xx[2]), reverse = True) # 合并数据 if len(table_data) == 1: merge_table_data = table_data elif table_data: merge_table_data.append(table_data[0]) for item in table_data[1:]: if merge_table_data[-1][0] == item[0] and \ merge_table_data[-1][1] == item[1] and \ merge_table_data[-1][2] == item[2]: merge_table_data[-1][3] += item[3] else: merge_table_data.append(item) merge_table_data = sorted(merge_table_data, key=lambda xx: xx[3], reverse=True) OtherSheet(sheet, other_display, table_data=merge_table_data).run() else: is_key = key == 'sharing resources' app = AppSheet(sheet, key) table_first_data = [] table_second_data = [] blow_data = [] venv_dict = defaultdict(dict) # 上边的表 front_table = defaultdict(dict) if not is_key: for item in items: if item.get('user:Name') and item.get('user:Name') != fill_null_value: front_table[item.get('user:Name')][item.get('ProductName', '')] = front_table[item.get('user:Name')].get(item.get('ProductName', Decimal("0")),Decimal("0")) + item.get('UnBlendedCost', Decimal("0")) venv_dict[item.get('user:Name')] = item.get('user:appenv', '') else: blow_data.append(item) else: for item in items: if item.get('user:Name') and item.get(self.tag) == 'shared' and \ item.get('ProductName') not in self.shared: front_table[item.get('user:Name')][item.get('ProductName', '')] = \ front_table[item.get('user:Name')].get(item.get('ProductName', Decimal("0")), Decimal("0")) + \ item.get('UnBlendedCost', Decimal("0")) venv_dict[item.get('user:Name')] = item.get('user:appenv', '') else: blow_data.append(item) for name, values in front_table.items(): venv = venv_dict.get(name, '') venv_name = venv_name_dict.get(venv.strip(), '') temp = [Decimal("0"), name, venv_name, '', Decimal("0"), '', Decimal("0"), Decimal("0"), Decimal("0"), self.tag, '', '', ] i = 1 for k, v in values.items(): if 'transfer' in k.lower(): temp[7] = v elif i == 1: temp[3] = k temp[4] += v i += 1 else: temp[5] = k temp[6] += v temp[8] = temp[4] + temp[6] + temp[7] temp[4] = new_round(temp[4], self.ROUND_NUM) temp[6] = new_round(temp[6], self.ROUND_NUM) temp[7] = new_round(temp[7], self.ROUND_NUM) temp[8] = new_round(temp[8], self.ROUND_NUM) table_first_data.append(temp) # 下边的表 x = defaultdict(lambda: defaultdict(list)) for item in blow_data: x[item.get('ProductName', '')]['CostBeforeTax'].append(item.get('CostBeforeTax', Decimal("0"))) x[item.get('ProductName', '')]['UnBlendedCost'].append(item.get('UnBlendedCost', Decimal("0"))) for k, item in x.items(): temp = [ 0, fill_null_value, k, new_round(sum(item['CostBeforeTax']), self.ROUND_NUM), new_round(sum(item['UnBlendedCost']), self.ROUND_NUM), 'shared' if is_key else self.tag ] table_second_data.append(temp) table_first_data = sorted(table_first_data, key = lambda _item: _item[1] + _item[2]) for i in range(len(table_first_data)): table_first_data[i][0] = i + 1 table_first_data[i][4] = float(table_first_data[i][4]) * 1.06 table_first_data[i][8] = float(table_first_data[i][8]) * 1.06 app.table_first_data = table_first_data table_second_data = sorted(table_second_data, key = lambda _item: _item[2]) for i in range(len(table_second_data)): table_second_data[i][0] = i + 1 table_second_data[i][4] = float(table_second_data[i][4]) * 1.06 app.table_second_data = table_second_data app.run() sheet.save(os.path.join('./', filename)) @staticmethod def pop_empty(data: dict): raw_data = {} for k, v in data.items(): if v is np.nan or str(v) == 'nan' or str(v) == 'None': continue if isinstance(v, np.float64): raw_data[k] = str(int(v)) continue if v != "": raw_data[k] = v return raw_data def get_service_data(self, date_list, response): date_dict = OrderedDict() def handler(inner_date, inner_res): temp_dict = defaultdict(list) other = [] for item in inner_res: if item['usage_start_date'] != inner_date: continue if item.get(self.tag) and item.get(self.tag) != fill_null_value: if item['ProductName'] in display_set: temp_dict[item.get(self.tag).strip()].append(item) continue other.append(item) temp_dict[other_display] = other res = defaultdict(lambda: defaultdict(lambda: Decimal('0'))) for app, data_list in temp_dict.items(): for item in data_list: res[app][item.get('ProductName', 'others')] += item['UnBlendedCost'] date_dict[inner_date] = res for _date in date_list: handler(_date, response) return date_dict @staticmethod def get_range_date(date_list: list): if len(date_list) != 2: return date_list start_date = date(year=int(date_list[0][:4]), month=int(date_list[0][5:7]), day=1) end_date = date(year=int(date_list[1][:4]), month=int(date_list[1][5:7]), day=1) if start_date.year > end_date.year: return [] elif start_date.year == end_date.year: if start_date.month > end_date.month: return [] else: return [ "%02d-%02d-01" % (start_date.year, i) for i in range(start_date.month, end_date.month + 1) ] else: res = [] # 1、 for i in range(start_date.month, 13): # res.append(date(year=start_date.year, month=i, day=start_date.day).strftime("%Y年%m月")) res.append("%02d-%02d-01" % (start_date.year, i)) # 2、 for year in range(start_date.year + 1, end_date.year): for i in range(1, 13): res.append("%02d-%02d-01" % (year, i)) # 3、 for i in range(1, end_date.month + 1): res.append("%02d-%02d-01" % (end_date.year, i)) return res if __name__ == '__main__': filename = '旺旺账单定制开发2022-4月2.xlsx' # display_set = {'AmazonEC2', 'AmazonElastiCache', 'AmazonRDS', 'AmazonS3', 'AWSDataTransfer'} display_set = { 'Amazon Elastic Compute Cloud', 'Amazon ElastiCache', 'Amazon Relational Database Service', 'Amazon Simple Storage Service', 'AWSDataTransfer' } file_path = [ '190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-2022-04.csv', '190508708273-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-Ningxia-2022-04.csv', # '190841870523-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-2022-04.csv', # '190841870523-aws-billing-detailed-line-items-with-resources-and-tags-ACTS-Ningxia-2022-04.csv', ] Excel(**{ "project": '旺旺', "project_id": '3419daba-0ff1-4463-8069-34b8df839c78', "title": '------', "tag": 'user:Application', "regions": ['cn-north-1', 'cn-northwest-1'], "linked_accounts": { # 295524321018, # 538779945615, # 559133870160, # 559312183200, # 123192574065, # 284149824167, # 557490265248, 694341593692, # 0.0001272639 单独出 # 123502220024 # 0523账号 单独出 }, # "start_date": ['2021-03', '2021-03'], 'create_time': str(datetime.now(tz=timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S")), 'language': 'zh', 'currency': '¥', 'file_path': file_path }).run()