137 lines
5.0 KiB
Python
137 lines
5.0 KiB
Python
#!/usr/bin/env python
|
|
|
|
import json
|
|
import os
|
|
|
|
from flask import Flask, Response, request
|
|
from json import JSONEncoder
|
|
|
|
from aman.AMAN import AMAN
|
|
from aman.config.AirportSequencing import AirportSequencing
|
|
from aman.config.RunwaySequencing import RunwaySequencing
|
|
|
|
class InboundEncoder(JSONEncoder):
|
|
def default(self, o):
|
|
pta = str(o.PlannedArrivalTime)
|
|
delimiter = pta.find('.')
|
|
if -1 == delimiter:
|
|
delimiter = pta.find('+')
|
|
return { 'callsign' : o.Callsign, 'fixed' : o.FixedSequence, 'runway' : o.PlannedRunway.Name, 'pta' : pta[0:delimiter] }
|
|
|
|
class RunwaySequencingEncoder(JSONEncoder):
|
|
def default(self, o):
|
|
return { 'runway' : o.Runway.Name, 'spacing' : o.Spacing }
|
|
|
|
os.environ['AMAN_CONFIG_PATH'] = 'C:\\Repositories\\VATSIM\\AMAN\\config'
|
|
aman = AMAN()
|
|
app = Flask('AMAN')
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|
|
|
|
@app.route('/aman/configuration/<icao>')
|
|
def configuration(icao):
|
|
airport = aman.findAirport(icao.upper())
|
|
if None == airport:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
config = airport.SequencingConfiguration
|
|
dependencies = []
|
|
for dependency in config.RunwayDependencies:
|
|
rwy0 = config.runway(dependency[0])
|
|
rwy1 = config.runway(dependency[1])
|
|
cand1 = [ rwy0.Name, rwy1.Name ]
|
|
cand2 = [ rwy1.Name, rwy0.Name ]
|
|
found = False
|
|
|
|
for dep in dependencies:
|
|
if cand1 == dep or cand2 == dep:
|
|
found = True
|
|
break
|
|
|
|
if False == found:
|
|
dependencies.append(cand1)
|
|
|
|
dictionary = {
|
|
'airport' : airport.Icao,
|
|
'useShallShouldMay' : config.UseShallShouldMay,
|
|
'activeRunways' : config.ActiveArrivalRunways,
|
|
'dependentRunways' : dependencies
|
|
}
|
|
data = json.dumps(dictionary, ensure_ascii=True, cls=RunwaySequencingEncoder)
|
|
return Response(data, status=200, mimetype='application/json')
|
|
|
|
@app.route('/aman/sequence/<icao>')
|
|
def sequence(icao):
|
|
airport = aman.findAirport(icao.upper())
|
|
if None == airport:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
# convert the timestamp
|
|
stamp = str(airport.SequencingConfiguration.LastUpdateTimestamp)
|
|
delimiter = stamp.find('.')
|
|
if -1 == delimiter:
|
|
delimiter = stamp.find('+')
|
|
|
|
dictionary = {
|
|
'airport': airport.Icao,
|
|
'lastConfigurationUpdate': stamp[0:delimiter],
|
|
'sequence': airport.inboundSequence()
|
|
}
|
|
data = json.dumps(dictionary, ensure_ascii=True, cls=InboundEncoder)
|
|
return Response(data, status=200, mimetype='application/json')
|
|
|
|
@app.route('/aman/configure', methods=['POST'])
|
|
def configure():
|
|
data = request.get_json()
|
|
|
|
# validate that the airport exists
|
|
if 'airport' not in data:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
airport = aman.findAirport(data['airport'].upper())
|
|
if None == airport:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
# check that all top-level information are available
|
|
if 'useShallShouldMay' not in data or 'activeRunways' not in data or 'dependentRunways' not in data:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
if False == isinstance(data['useShallShouldMay'], bool) or 0 == len(data['activeRunways']):
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
# create the toplevel information
|
|
config = AirportSequencing(airport.Icao)
|
|
config.Airport = data['airport'].upper()
|
|
config.UseShallShouldMay = data['useShallShouldMay']
|
|
|
|
# parse the active runways
|
|
for activeRunway in data['activeRunways']:
|
|
if 'runway' not in activeRunway or 'spacing' not in activeRunway:
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
if False == isinstance(activeRunway['runway'], str) or False == isinstance(activeRunway['spacing'], int):
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
gngRunway = None
|
|
for runway in airport.Configuration.GngData.Runways[airport.Icao]:
|
|
if runway.Name == activeRunway['runway']:
|
|
gngRunway = runway
|
|
break
|
|
|
|
# could not find the runway
|
|
if None == gngRunway:
|
|
return None
|
|
|
|
runway = RunwaySequencing(gngRunway)
|
|
runway.Spacing = activeRunway['spacing']
|
|
config.activateRunway(runway)
|
|
|
|
# parse the dependent runways
|
|
for dependency in data['dependentRunways']:
|
|
if 2 != len(dependency) or False == isinstance(dependency[0], str) or False == isinstance(dependency[1], str):
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
if False == config.addDependency(dependency[0], dependency[1]):
|
|
return Response('{}', status=404, mimetype='application/json')
|
|
|
|
airport.configure(config)
|
|
|
|
return Response('{}', status=200, mimetype='application/json')
|