import datetime
# 获取当前时间
= datetime.datetime.now().time()
current_time
# 设置允许执行的时间范围
= datetime.time(7, 0) # 早上7点
start_time = datetime.time(22, 0) # 晚上10点
end_time
# 如果当前时间在允许的时间范围内,则执行脚本
if start_time <= current_time <= end_time:
print(datetime.datetime.now())
print('检查PowerBI刷新情况')
import requests
import json
import pandas as pd
# 获取PowerBI API访问令牌
= "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
client_id = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
client_secret = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
tenant_id = "https://analysis.chinacloudapi.cn/powerbi/api"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token
# 获取所有数据集合
= {"PowerBI_代驾":'e2849431-0ce5-4bf4-93bb-3c4b3ca18d3f',
groups "PowerBI_企业客户事业部":'7d73438b-2eae-407c-8c6d-1adb80842c69',
"PowerBI_内控":'58bb1549-58ac-4103-ae49-f273e7c084bb',
"PowerBI_技术部":'cf785038-e1ae-4bc8-acd2-259423844a86',
"PowerBI_自驾":'cd7cb0f7-2370-485a-9c13-ff24a45b09f5',
"PowerBI_市场":'51e5a521-2d2d-4c24-8efe-ebbc2d861e14',
"PowerBI_YDF":'c898fdec-b5e6-4857-9caa-b911286b390a'
# "PowerBI_高级测试":"a3cc44f2-ca78-4303-98e3-27442f1aae74"
}= pd.DataFrame()
df_dataset for group_name,group_id in groups.items():
# print(group_name)
= f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets"
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.get(api_url, headers=headers)
response = pd.DataFrame(json.loads(response.text)['value'])
df_response_text 'group_name'] = group_name
df_response_text['group_id'] = group_id
df_response_text[= pd.concat([df_dataset,df_response_text], ignore_index=True)
df_dataset
def dataset_lastrefresh(group_id,dataset_id):
"""获取数据集合最后一次刷新情况"""
try:
= f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes?$top=1"
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.get(api_url, headers=headers)
response return json.loads(response.text)['value'][0]['status'],json.loads(response.text)['value'][0]['startTime'],json.loads(response.text)['value'][0]['endTime']
except:
return 'response_error','',''
# 获取所有数据集最后一次刷新情况
= []
result_list for index, row in df_dataset.iterrows():
= dataset_lastrefresh(row['group_id'], row['id'])
result
result_list.append(result)
# 将结果存储到新的列中
'result','starTime','endTime']] = pd.DataFrame(result_list)
df_dataset[[
# 转北京时间
from datetime import datetime
import pytz
def convert_timezone(utc_time_str, target_timezone = 'Asia/Shanghai'):
try:
# 解析字符串为datetime对象
= datetime.strptime(utc_time_str, '%Y-%m-%dT%H:%M:%S.%fZ')
utc_time
# 设置时区为UTC
= pytz.timezone('UTC')
utc_timezone
# 转换为目标时区
= pytz.timezone(target_timezone)
target_timezone = utc_timezone.localize(utc_time).astimezone(target_timezone)
target_time
return target_time
except:
''
'starTime_Beijin'] = df_dataset['starTime'].apply(convert_timezone)
df_dataset['endTime_Beijin'] = df_dataset['endTime'].apply(convert_timezone)
df_dataset[from datetime import datetime, timedelta
= datetime.now(pytz.timezone('Asia/Shanghai'))
current_date = current_date - timedelta(days=1)
previous_date # previous_date = current_date - timedelta(hours=4)
= df_dataset[df_dataset['endTime_Beijin'] > previous_date]
df_dataset = df_dataset[df_dataset['result']== 'Failed']
df_dataset
def refresh(group_id,dataset_id,group_name,name):
"""刷新数据集"""
try:
= f"https://api.powerbi.cn/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes"
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.post(api_url, headers=headers)
response if response.ok:
print(f"{group_name}-{name}执行刷新")
else:
print(f"{group_name}-{name}执行刷新失败,失败原因:", response.content)
except:
''
# 刷新数据集合
for index, row in df_dataset.iterrows():
'group_id'], row['id'],row['group_name'],row['name']) refresh(row[
PowerBI REST API
PowerBI
API
PowerBI REST API
重新刷新过去一天失败数据
暂停内核
# https://learn.microsoft.com/zh-cn/rest/api/power-bi-embedded/capacities/resume?view=rest-power-bi-embedded-2021-01-01&tabs=HTTP
import datetime
# 获取当前时间
= datetime.datetime.now()
current_time
import requests
import json
import pandas as pd
def suspend_sku():
print(current_time)
= "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
client_id = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
client_secret = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
tenant_id = "https://management.chinacloudapi.cn/"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'
subscriptionId = 'PowebiBI'
resourceGroupName = 'powerbiembeddedgen2'
dedicatedCapacityName = f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}/suspend?api-version=2021-01-01'
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.post(api_url, headers=headers)
response return response
= suspend_sku()
suspendresponse
if suspendresponse.ok :
print('暂停内核')
恢复内核
# https://learn.microsoft.com/zh-cn/rest/api/power-bi-embedded/capacities/resume?view=rest-power-bi-embedded-2021-01-01&tabs=HTTP
import datetime
# 获取当前时间
= datetime.datetime.now()
current_time
import requests
import json
import pandas as pd
def resume_sku():
print(current_time)
= "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
client_id = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
client_secret = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
tenant_id = "https://management.chinacloudapi.cn/"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'
subscriptionId = 'PowebiBI'
resourceGroupName = 'powerbiembeddedgen2'
dedicatedCapacityName = f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}/resume?api-version=2021-01-01'
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.post(api_url, headers=headers)
response return response
= resume_sku()
resumeresponse if resumeresponse.ok :
print('恢复内核')
自动缩放
# https://learn.microsoft.com/zh-cn/rest/api/monitor/metrics/list?view=rest-monitor-2018-01-01&tabs=HTTP
import datetime
# 获取当前时间
= datetime.datetime.now().time()
current_time
# 设置允许执行的时间范围
= datetime.time(7, 0) # 早上7点
start_time = datetime.time(22, 0) # 晚上10点
end_time
# 如果当前时间在允许的时间范围内,则执行脚本
if start_time <= current_time < end_time:
import requests
import json
import pandas as pd
= "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
client_id = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
client_secret = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
tenant_id def get_cpu():
## 注意这里的资源url
= "https://management.chinacloudapi.cn/"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token # 获取当前cpu
= '/subscriptions/9a02f6c8-5945-42b0-8b45-bfa35c216afd/resourceGroups/PowebiBI/providers/Microsoft.PowerBIDedicated/capacities/powerbiembeddedgen2'
scope = 'cpu_metric'
metricnames = f'https://management.chinacloudapi.cn/{scope}/providers/Microsoft.Insights/metrics?api-version=2018-01-01&metricnames={metricnames}'
api_url = {
headers "Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}= requests.get(api_url, headers=headers)
response = pd.json_normalize(json.loads(response.text)['value'][0]['timeseries'][0]['data'])
df_monitor = df_monitor.sort_values(ascending=False,by=['timeStamp'])
df_monitor = df_monitor.iloc[0,1]
current_cpu return current_cpu
def get_sku():
# 获取当前资源容量
= "https://analysis.chinacloudapi.cn/powerbi/api"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token
# 配置Power BI REST API的端点和访问令牌
= 'https://api.powerbi.cn/v1.0/myorg/capacities'
capacity_endpoint
# 获取当前容量的信息
= requests.get(capacity_endpoint, headers={'Authorization': 'Bearer ' + access_token})
response = json.loads(response.text)['value'][0]['sku']
sku_now return sku_now
def update_sku(sku_update):
# 修改容量
= "e9e6e972-eb00-4eb3-bb1d-92da464b2a4d"
client_id = "6Nv_OjFQKEtW1.42mGL.RYCw6fU~ICnJc."
client_secret = "b93daeb8-cf25-4ad4-997b-2cedebef6925"
tenant_id ## 注意这里的资源url
= "https://management.chinacloudapi.cn/"
resource_url = "https://login.chinacloudapi.cn/" + tenant_id + "/oauth2/token"
auth_url = requests.post(auth_url, data={
response "client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials",
"resource": resource_url
})= response.json()["access_token"]
access_token
= '53E01D15-4FF7-4C1E-85D2-ABED52C8B36E'
capacity_id = 'PowebiBI'
resourceGroupName = 'powerbiembeddedgen2'
dedicatedCapacityName = '9a02f6c8-5945-42b0-8b45-bfa35c216afd'
subscriptionId
# 更新容量
= f'https://management.chinacloudapi.cn/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.PowerBIDedicated/capacities/{dedicatedCapacityName}?api-version=2021-01-01'
capacity_url = {
capacity_data "sku": {
"name": sku_update,
"tier": "PBIE_Azure"
},"tags": {
"testKey": "testValue"
}
}= {
capacity_headers 'Authorization': 'Bearer {0}'.format(access_token),
'Content-Type': 'application/json'
}= requests.patch(capacity_url, data=json.dumps(capacity_data), headers=capacity_headers)
capacity_response return capacity_response
def monitor_update_sku(current_cpu,sku_now,downgrade_cpu,upgrade_cpu):
= datetime.datetime.now()
current_time if current_cpu <= downgrade_cpu:
if sku_now == 'A1':
print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},无需调整')
pass
else:
= sku_now[0] + str(int(sku_now[1]) - 1)
sku_update print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},调整sku为{sku_update}')
update_sku(sku_update)elif current_cpu >= upgrade_cpu:
if sku_now == 'A3':
print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},已达上限,不做调整')
pass
else:
= sku_now[0] + str(int(sku_now[1]) + 1)
sku_update print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},调整sku为{sku_update}')
update_sku(sku_update)else:
print(f'{current_time},当前cpu为{current_cpu},当前sku为{sku_now},无需调整')
= get_cpu()
current_cpu = get_sku()
sku_now 50,90) monitor_update_sku(current_cpu,sku_now,