测试接口的网站:
Webhook.site - Test, process and transform emails and HTTP requests
接口测试的软件:
postman
部分代码,其余代码见资源处
- from datetime import datetime
- from http.client import OK
- import ipaddress, requests, datetime
- from xmlrpc.client import FastParser
- import pytz
- from django.http import HttpResponse
- from django.shortcuts import get_object_or_404, render, redirect
- from .models import MessageHistory, PSBServer
- from .forms import PSBServerForm
- from http import HTTPStatus
- from django.views.decorators.csrf import csrf_exempt
-
-
- from hook.models import PSBServer
-
- def save_history(device_subnet_ip, device_subnet_mask, full_path, response_code, error_text, redirect_rul, is_success):
- new_history=MessageHistory()
- new_history.device_subnet='{0}/{1}'.format(device_subnet_ip, device_subnet_mask)
- new_history.full_path=full_path
- new_history.response_code=response_code
- new_history.error_text=error_text
- new_history.history_time=datetime.datetime.now(tz=pytz.utc)
- new_history.redirect_url=redirect_rul
- new_history.is_success=is_success
- new_history.save()
-
- def get_ip_address(request):
- user_ip_address = request.META.get('HTTP_X_FORWARDED_FOR')
- if user_ip_address:
- ip = user_ip_address.split(',')[0]
- else:
- ip = request.META.get('REMOTE_ADDR')
- return ip
-
- @csrf_exempt
- def index(request):
- ip_addr=get_ip_address(request)
- print(request)
- full_path=request.get_full_path()[1:]
- # 查找该 IP 的子网 PSB 服务器
- psb_servers = PSBServer.objects.all()
- is_done = False
- # 获取请求中的location参数
- location = request.GET.get('location', '')
-
- for psb_server in psb_servers:
- # if ipaddress.ip_address(ip_addr) in ipaddress.ip_network('{0}/{1}'.format(psb_server.device_subnet_ip, psb_server.device_subnet_mask)):
- # 将信息重定向到相应的 PSB 服务器
- if location == psb_server.name:
- # 如果location等于服务器的name,将数据转发到服务器的ip_address和tcp_port
- target_url = f'http://{psb_server.ip_address}:{psb_server.tcp_port}/{full_path}'
- try:
- response = requests.get(target_url)
- # 这里你可以根据响应做进一步处理,例如记录历史等
- is_done = True
- return HttpResponse('Data forwarded successfully.')
- except Exception as e:
- is_done = False
- if not is_done:
- save_history('0.0.0.0', '0', full_path, 'NO_MATCH', 'there is no matched psb server', 'NO_MATCH',
- False)
- # 如果发送失败,可以记录错误或返回错误响应
- return HttpResponse('Data forwarding failed: ' + str(e), status=500)
-
- directory = ''
-
- return HttpResponse('OK,no location in your web ')
-
- def management(request):
- psb_servers=PSBServer.objects.all()
- content={
- 'psbs':psb_servers,
- }
-
- return render(request, 'hook/management.html', content)
-
- def history(request):
- history=MessageHistory.objects.all().order_by('-pk')[:100]
- content={
- 'histories':history,
- }
-
- return render(request, 'hook/history.html', content)
-
- def psb(request):
- if request.method=='POST':
- form=PSBServerForm(request.POST)
- if form.is_valid():
- new_psb=form.save(commit=False)
- new_psb.save()
- return redirect('hook:management')
-
- psbServerForm = PSBServerForm()
- content={
- 'form':psbServerForm,
- }
-
- return render(request, 'hook/psb.html', content)
-
- def delete_psb(request, psb_id):
-
- psb=get_object_or_404(PSBServer, pk=psb_id)
- psb.delete()
-
- return redirect('hook:management')