12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- #!/usr/bin/env python
- import json
- import requests
- import sys
- from json import JSONEncoder
- from requests.adapters import HTTPAdapter
- from requests.packages.urllib3.util.retry import Retry
- from requests.structures import CaseInsensitiveDict
- from aman.config.Server import Server
- class InboundEncoder(JSONEncoder):
- def default(self, o):
- pta = str(o.PlannedArrivalTime)
- delimiter = pta.find('.')
- if -1 == delimiter:
- delimiter = pta.find('+')
- return { 'callsign' : o.Callsign, 'runway' : o.PlannedRunway.Name, 'pta' : pta[0:delimiter] }
- class WebUI:
- def __init__(self):
- self.Config = None
- self.Aman = None
- def __del__(self):
- self.release()
- def acquire(self, config : Server, aman):
- self.Config = config
- self.Aman = aman
- def release(self):
- return
- def createSession():
- # prepare the session
- session = requests.Session()
- retry = Retry(total=1, read=1, connect=1)
- adapter = HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
- return session
- def requestConfiguration(self, airport):
- return
- def sendSequence(self, airport, inbounds):
- # prepare the HTTP header and session
- headers = CaseInsensitiveDict()
- headers['Accept'] = 'application/json'
- headers['Content-Type'] = 'application/json'
- session = WebUI.createSession()
- # prepare the relevant information
- url = self.Config.WebUiUrl + self.Config.WebUiSequenceNotification
- data = json.dumps({ 'airport': airport, 'sequence': inbounds }, ensure_ascii=True, cls=InboundEncoder)
- # send to the server
- try:
- response = session.patch(url, headers=headers, data=data, timeout=2)
- except requests.exceptions.ConnectTimeout:
- return
|