PowerBI REST API

PowerBI
API
PowerBI REST API
作者

不止BI

发布于

2024年3月28日

重新刷新过去一天失败数据

import datetime

# 获取当前时间
current_time = datetime.datetime.now().time()

# 设置允许执行的时间范围
start_time = datetime.time(7, 0)  # 早上7点
end_time = datetime.time(22, 0)  # 晚上10点

# 如果当前时间在允许的时间范围内,则执行脚本
if start_time <= current_time <= end_time:
    print(datetime.datetime.now())
    print('检查PowerBI刷新情况')
    import requests
    import json
    import pandas as pd
    # 获取PowerBI API访问令牌
    client_id = "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
    client_secret = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
    tenant_id = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
    resource_url = "https://analysis.chinacloudapi.cn/powerbi/api"
    auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
    response = requests.post(auth_url, data={
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials",
        "resource": resource_url
    })
    access_token = response.json()["access_token"]
    
    # 获取所有数据集合
    groups = {"PowerBI_代驾":'e2849431-0ce5-4bf4-93bb-3c4b3ca18d3f',
    "PowerBI_企业客户事业部":'7d73438b-2eae-407c-8c6d-1adb80842c69',
    "PowerBI_内控":'58bb1549-58ac-4103-ae49-f273e7c084bb',
    "PowerBI_技术部":'cf785038-e1ae-4bc8-acd2-259423844a86',
    "PowerBI_自驾":'cd7cb0f7-2370-485a-9c13-ff24a45b09f5',
    "PowerBI_市场":'51e5a521-2d2d-4c24-8efe-ebbc2d861e14',
    "PowerBI_YDF":'c898fdec-b5e6-4857-9caa-b911286b390a'
    # "PowerBI_高级测试":"a3cc44f2-ca78-4303-98e3-27442f1aae74"
    }
    df_dataset = pd.DataFrame()
    for group_name,group_id in groups.items():
    #   print(group_name)
      api_url = f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets"
      headers = {
          "Authorization": "Bearer " + access_token,
          "Content-Type": "application/json"
      }
      response = requests.get(api_url, headers=headers)
      df_response_text = pd.DataFrame(json.loads(response.text)['value'])
      df_response_text['group_name'] = group_name
      df_response_text['group_id'] = group_id
      df_dataset = pd.concat([df_dataset,df_response_text], ignore_index=True)
      
     
    def dataset_lastrefresh(group_id,dataset_id):
      """获取数据集合最后一次刷新情况"""
      try:
        api_url = f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes?$top=1"
        headers = {
          "Authorization": "Bearer " + access_token,
          "Content-Type": "application/json"
        }
        response = requests.get(api_url, headers=headers)
        return json.loads(response.text)['value'][0]['status'],json.loads(response.text)['value'][0]['startTime'],json.loads(response.text)['value'][0]['endTime']
      except:
        return 'response_error','',''
    
    # 获取所有数据集最后一次刷新情况
    result_list = []
    for index, row in df_dataset.iterrows():
        result = dataset_lastrefresh(row['group_id'], row['id'])
        result_list.append(result)
    
    # 将结果存储到新的列中
    df_dataset[['result','starTime','endTime']] = pd.DataFrame(result_list)
    
    
    # 转北京时间
    from datetime import datetime
    import pytz
    
    def convert_timezone(utc_time_str, target_timezone = 'Asia/Shanghai'):
        try:
            # 解析字符串为datetime对象
            utc_time = datetime.strptime(utc_time_str, '%Y-%m-%dT%H:%M:%S.%fZ')
    
            # 设置时区为UTC
            utc_timezone = pytz.timezone('UTC')
    
            # 转换为目标时区
            target_timezone = pytz.timezone(target_timezone)
            target_time = utc_timezone.localize(utc_time).astimezone(target_timezone)
    
            return target_time
        except:
            ''
    df_dataset['starTime_Beijin'] = df_dataset['starTime'].apply(convert_timezone)
    df_dataset['endTime_Beijin'] = df_dataset['endTime'].apply(convert_timezone)
    from datetime import datetime, timedelta
    current_date = datetime.now(pytz.timezone('Asia/Shanghai'))
    previous_date = current_date - timedelta(days=1)
    # previous_date = current_date - timedelta(hours=4)
    df_dataset = df_dataset[df_dataset['endTime_Beijin'] > previous_date]
    df_dataset = df_dataset[df_dataset['result']== 'Failed']
    
    
    def refresh(group_id,dataset_id,group_name,name):
        """刷新数据集"""
        try:
            api_url = f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes"
            headers = {
            "Authorization": "Bearer " + access_token,
            "Content-Type": "application/json"
            }
            response = requests.post(api_url, headers=headers)
            if response.ok:
                print(f"{group_name}-{name}执行刷新")
            else:
                print(f"{group_name}-{name}执行刷新失败,失败原因:", response.content)
        except:
            ''
    # 刷新数据集合
    for index, row in df_dataset.iterrows():
        refresh(row['group_id'], row['id'],row['group_name'],row['name'])

暂停内核

# https://learn.microsoft.com/zh-cn/rest/api/power-bi-embedded/capacities/resume?view=rest-power-bi-embedded-2021-01-01&tabs=HTTP
import datetime
# 获取当前时间
current_time = datetime.datetime.now()

import requests
import json
import pandas as pd
def suspend_sku():
    print(current_time)
    client_id = "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
    client_secret = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
    tenant_id = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
    resource_url = "https://management.chinacloudapi.cn/"
    auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
    response = requests.post(auth_url, data={
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials",
        "resource": resource_url
    })
    access_token = response.json()["access_token"]
    subscriptionId = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'
    resourceGroupName = 'PowebiBI'
    dedicatedCapacityName = 'powerbiembeddedgen2'
    api_url = f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}/suspend?api-version=2021-01-01'
    headers = {
        "Authorization": "Bearer " + access_token,
        "Content-Type": "application/json"
    }
    response = requests.post(api_url, headers=headers)
    return response

suspendresponse= suspend_sku()

if  suspendresponse.ok :
    print('暂停内核')

恢复内核

# https://learn.microsoft.com/zh-cn/rest/api/power-bi-embedded/capacities/resume?view=rest-power-bi-embedded-2021-01-01&tabs=HTTP
import datetime
# 获取当前时间
current_time = datetime.datetime.now()

import requests
import json
import pandas as pd
def resume_sku():
    print(current_time)
    client_id = "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
    client_secret = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
    tenant_id = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
    resource_url = "https://management.chinacloudapi.cn/"
    auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
    response = requests.post(auth_url, data={
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials",
        "resource": resource_url
    })
    access_token = response.json()["access_token"]
    subscriptionId = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'
    resourceGroupName = 'PowebiBI'
    dedicatedCapacityName = 'powerbiembeddedgen2'
    api_url = f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}/resume?api-version=2021-01-01'
    headers = {
        "Authorization": "Bearer " + access_token,
        "Content-Type": "application/json"
    }
    response = requests.post(api_url, headers=headers)
    return response

resumeresponse = resume_sku()
if resumeresponse.ok :
    print('恢复内核')

自动缩放

# https://learn.microsoft.com/zh-cn/rest/api/monitor/metrics/list?view=rest-monitor-2018-01-01&tabs=HTTP
import datetime

# 获取当前时间
current_time = datetime.datetime.now().time()

# 设置允许执行的时间范围
start_time = datetime.time(7, 0)  # 早上7点
end_time = datetime.time(22, 0)  # 晚上10点

# 如果当前时间在允许的时间范围内,则执行脚本
if start_time <= current_time < end_time:
    import requests
    import json
    import pandas as pd
    client_id = "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
    client_secret = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
    tenant_id = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
    def get_cpu():     
        ## 注意这里的资源url
        resource_url = "https://management.chinacloudapi.cn/"
        auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
        response = requests.post(auth_url, data={
            "client_id": client_id,
            "client_secret": client_secret,
            "grant_type": "client_credentials",
            "resource": resource_url
        })
        access_token = response.json()["access_token"]
        # 获取当前cpu
        scope = '/subscriptions/9a02f6c8-5945-42b0-8b45-bfa35c216afd/resourceGroups/PowebiBI/providers/Microsoft.PowerBIDedicated/capacities/powerbiembeddedgen2'
        metricnames = 'cpu_metric'
        api_url = f'https://management.chinacloudapi.cn/{scope}/providers/Microsoft.Insights/metrics?api-version=2018-01-01&metricnames={metricnames}'
        headers = {
            "Authorization": "Bearer " + access_token,
            "Content-Type": "application/json"
        }
        response = requests.get(api_url, headers=headers)
        df_monitor = pd.json_normalize(json.loads(response.text)['value'][0]['timeseries'][0]['data'])
        df_monitor = df_monitor.sort_values(ascending=False,by=['timeStamp'])
        current_cpu = df_monitor.iloc[0,1]
        return current_cpu
    def get_sku():
        # 获取当前资源容量
        resource_url = "https://analysis.chinacloudapi.cn/powerbi/api"
        auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
        response = requests.post(auth_url, data={
        "client_id": client_id,
        "client_secret": client_secret,
        "grant_type": "client_credentials",
        "resource": resource_url
        })
        access_token = response.json()["access_token"]

        # 配置Power BI REST API的端点和访问令牌
        capacity_endpoint = 'https://api.powerbi.cn/v1.0/myorg/capacities'


        # 获取当前容量的信息
        response = requests.get(capacity_endpoint, headers={'Authorization': 'Bearer ' + access_token})
        sku_now = json.loads(response.text)['value'][0]['sku']
        return sku_now
    def update_sku(sku_update): 
        # 修改容量
            client_id = "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
            client_secret = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
            tenant_id = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
            ## 注意这里的资源url
            resource_url = "https://management.chinacloudapi.cn/"
            auth_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
            response = requests.post(auth_url, data={
                "client_id": client_id,
                "client_secret": client_secret,
                "grant_type": "client_credentials",
                "resource": resource_url
            })
            access_token = response.json()["access_token"]


            capacity_id = '53E01D15-4FF7-4C1E-85D2-ABED52C8B36E'
            resourceGroupName = 'PowebiBI'
            dedicatedCapacityName = 'powerbiembeddedgen2'
            subscriptionId = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'

            # 更新容量
            capacity_url = f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}?api-version=2021-01-01'
            capacity_data = {
                "sku": {
                "name": sku_update,
                "tier": "PBIE_Azure"
                },
                "tags": {
                "testKey": "testValue"
                }
            }
            capacity_headers = {
                'Authorization': 'Bearer {0}'.format(access_token),
                'Content-Type': 'application/json'
            }
            capacity_response = requests.patch(capacity_url, data=json.dumps(capacity_data), headers=capacity_headers)
            return capacity_response
    def monitor_update_sku(current_cpu,sku_now,downgrade_cpu,upgrade_cpu):
        current_time = datetime.datetime.now()
        if current_cpu <= downgrade_cpu:
            if sku_now == 'A1':
                print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},无需调整')
                pass
            else:
                sku_update = sku_now[0] + str(int(sku_now[1]) - 1)
                print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},调整sku为{sku_update}')
                update_sku(sku_update)
        elif  current_cpu >= upgrade_cpu:
            if sku_now == 'A3':
                print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},已达上限,不做调整')
                pass
            else:
                sku_update = sku_now[0] + str(int(sku_now[1]) + 1)
                print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},调整sku为{sku_update}')
                update_sku(sku_update)
        else:
            print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},无需调整')
    current_cpu = get_cpu()
    sku_now = get_sku()
    monitor_update_sku(current_cpu,sku_now,50,90)
回到顶部