Issue
We have a requirement where we need to run all the JSON payloads in different files in a directory in our project (Airflow).
Each JSON payload exists in a separate JSON file in a directory structure JSON/payloads.
I am trying to write a python function to read all JSON payloads and send REST calls with payload from these files.
def read_payloads(category):
payload_dict = {}
directory = './json' + "/" + 'payload' + "/" + category
for filename in os.listdir(directory):
if os.path.isfile(filename):
f = open(filename)
payload_dict.update({filename, json.load(f)})
f.close()
payload_dict
I am trying to create a dictionary with key as file names and value as JSON payload.
I need to pass this dictionary to a python function which will use the JSON payload from these values and make REST calls one by one.
While trying to execute above function, getting error:
No such file or directory:
It seems it is due to absolute / relative path issue.
Also how can I extract the JSON payload from this dictionary using for loop and make REST call with each payload.
Have a small sample code for REST API.
def server_call(url, token, category):
payload_dict = read_payloads(category)
/*
code to make REST call for one payload
*/
How can I fix the above function and how to iterate the payload dictionary to make REST calls by using JSON payloads from the values of this dictionary?
Solution
Ok I was able to figure this one out.
import json
import os
from pathlib import Path
def read_payload(category):
category_dict = {}
directory = Path(__file__).parent.parent / "payloads/" / category
for filename in os.listdir(directory):
filepath = Path(directory) / filename
if os.path.isfile(filepath):
f = open(filepath)
category_dict[filename.replace(".json","")] = json.load(f)
f.close()
else:
print(filepath + " is not a valid file.")
return category_dict
def server_call(BACKEND_URL, token, category):
url = os.path.join(BACKEND_URL, 'api/v1/category-creation')
payload_dict = read_payloads(category)
/*
code to make REST call for one payload
*/
headers = {
"token": f'{token}',
"Content-Type": "application/json",
"accept": "application/json"
}
for category_name, category_payload in payload_dict.items():
json_payload = json.dumps(payload_payload)
response = requests.request("POST", url, headers=headers, data=json_payload)
##########################
## Load as string and parsing
response_data = json.loads(response.text)
print(response_data)
category_id = response_data['id']
message = 'The category with id: ' + str(category_id) + ' is created successfully. '
logging.info(message)
return "Categories created successfully."
Answered By - azaveri7
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.