#!/usr/bin/env python import json import requests import sys from json import JSONEncoder from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from requests.structures import CaseInsensitiveDict from aman.config.Server import Server class InboundEncoder(JSONEncoder): def default(self, o): pta = str(o.PlannedArrivalTime) delimiter = pta.find('.') if -1 == delimiter: delimiter = pta.find('+') return { 'callsign' : o.Callsign, 'runway' : o.PlannedRunway.Name, 'pta' : pta[0:delimiter] } class WebUI: def __init__(self): self.Config = None self.Aman = None def __del__(self): self.release() def acquire(self, config : Server, aman): self.Config = config self.Aman = aman def release(self): return def createSession(): # prepare the session session = requests.Session() retry = Retry(total=1, read=1, connect=1) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session def requestConfiguration(self, airport): return def sendSequence(self, airport, inbounds): # prepare the HTTP header and session headers = CaseInsensitiveDict() headers['Accept'] = 'application/json' headers['Content-Type'] = 'application/json' session = WebUI.createSession() # prepare the relevant information url = self.Config.WebUiUrl + self.Config.WebUiSequenceNotification data = json.dumps({ 'airport': airport, 'sequence': inbounds }, ensure_ascii=True, cls=InboundEncoder) # send to the server try: response = session.patch(url, headers=headers, data=data, timeout=2) except requests.exceptions.ConnectTimeout: return