geolocation.py (Código fuente)

# -*- coding: utf-8 -*-

"""
Calculate a position from an IP or Address and save it into a file.

Usage:
  geolocation.py [(-v | --verbose)] [--no-wait] (-m | --me)
  geolocation.py [(-v | --verbose)] (-a | --address) [(-r | --remove)] [(-p | --previous-city)] <address>
  geolocation.py [(-v | --verbose)] (-s | --symlinks)
  geolocation.py [(-v | --verbose)] --activate
  geolocation.py [(-v | --verbose)] --deactivate
  geolocation.py (-h | --help)
  geolocation.py --version

Options:
  <address>            Address to be calculated.
  -r --remove          Remove this point from the json file.
  -p --previous-city   Save the calculated point into the previous cities file.
  -s --symlinks        Create symlinks in output directory to upload on deploy.
  -h --help            Show this screen.
  -v --verbose         Show the log in the standard output.
  --version            Show version.
"""

import collections
import json
import logging
import os
import time
import webbrowser
from logging.handlers import RotatingFileHandler

import configobj  # fades
import geocoder  # fades
from docopt import docopt  # fades

logger = logging.getLogger('geolocation')

DIRNAME = os.path.dirname(os.path.abspath(__file__))
SYMLINKS_DIR = os.path.join(DIRNAME, 'output/assets/data')
GPX_FILES = [
    os.path.join(DIRNAME, 'geodata/0-etapa.gpx'),
    os.path.join(DIRNAME, 'geodata/primera-etapa.gpx'),
    os.path.join(DIRNAME, 'geodata/segunda-etapa.gpx'),
    os.path.join(DIRNAME, 'geodata/tercera-etapa.gpx'),
]
CITIES_FILENAME = os.path.join(DIRNAME, 'geodata/cities.json')
MY_POSITION_FILENAME = os.path.join(DIRNAME, 'geodata/my-position.json')
SYMLINK_FILES = [
    CITIES_FILENAME,
    MY_POSITION_FILENAME,
] + GPX_FILES
WAIT_BEFORE_QUERY = 5
MAP_ZOOM = 14
CONF_FILE = os.path.join('~', '.geolocation.ini')


config = configobj.ConfigObj(
    infile=CONF_FILE,
    encoding='utf-8',
)


def setup_logging(verbose):
    logfile = os.path.join(DIRNAME, 'geodata/geolocation.log')
    handler = RotatingFileHandler(logfile, maxBytes=1e6, backupCount=10, encoding='utf-8')
    logger.addHandler(handler)
    formatter = logging.Formatter("%(asctime)s  %(name)-10s  "
                                  "%(levelname)-8s %(message)s")
    handler.setFormatter(formatter)
    logger.setLevel(logging.DEBUG)

    if verbose:
        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        logger.addHandler(handler)


def save_json(data, output):
    logger.info('Saving file...')
    with open(output, 'w') as fh:
        data = json.dumps(
            data,
            indent=4,
            sort_keys=True
        )
        fh.write(data)


def load_json(output):
    data = open(output, 'r').read()
    cities = json.loads(
        data,
        object_pairs_hook=collections.OrderedDict
    )
    return cities


def setup_output(output):
    dirname = os.path.dirname(output)
    if not os.path.exists(dirname):
        os.makedirs(dirname)

    if not os.path.exists(output):
        # touch file with an empty dict
        init = {
            'next': [],
            'previous': [],
        }

        # TODO: add 'etapa' info so we can use the same data in
        # "Argentina en Python" page

        #     {
        #         'first': [],
        #         'second': [],
        #         'third': [],
        #     },
        # }

        save_json(init, output)


def osmurl_invalid():
    logger.info('The URL is not correct. Quitting...')


def is_osmurl_valid(response):
    url = 'http://www.openstreetmap.org/#map={zoom}/{lat}/{lng}'.format(
        zoom=MAP_ZOOM,
        lat=response.lat,
        lng=response.lng,
    )
    logger.info('OSMUrl: %s', url)
    answer = None
    webbrowser.open_new_tab(url)
    while answer not in ('y', 'yes', 'n', 'no'):
        answer = input('Is this URL correct?\n    {}\n[y/n]: '.format(url))
    if answer in ('y', 'yes'):
        return True
    return False


def create_symlinks(dirname=SYMLINKS_DIR):
    if not os.path.exists(dirname):
        logger.info('Creating directory: %s', dirname)
        os.makedirs(dirname)

    def get_output_path(filename):
        return os.path.join(
            dirname,
            os.path.basename(filename)
        )

    def get_abs_path(filename):
        return os.path.abspath(filename)

    for filename in SYMLINK_FILES:
        destination = get_output_path(filename)
        if not os.path.exists(destination):
            source = get_abs_path(filename)
            logger.info('Creating symlink: %s', destination)
            os.symlink(source, destination)


def calc_my_position_ip(output=MY_POSITION_FILENAME):
    setup_output(output)
    logger.info('Waiting %s seconds...', WAIT_BEFORE_QUERY)
    time.sleep(WAIT_BEFORE_QUERY)
    logger.info('Querying the server about my ip...')
    response = geocoder.ip('me')
    logger.info('LatLng: %s', response.latlng)
    logger.info('Place: %s', response.address)

    return calc_my_position_address(response.address, output)


def calc_my_position_address(address, output, upload=True):
    logger.info('Querying the server about "%s"...', address)
    response = geocoder.osm(address)
    logger.info('LatLng: %s', response.latlng)
    logger.info('Place: %s', response.address)

    if upload:
        save_json(response.latlng, output)
        upload_my_position()

    return response


def upload_my_position():
    command = ' '.join([
        'runuser',
        '-l',
        'humitos',
        '-c',
        '"scp',
        MY_POSITION_FILENAME,
        'elblogdehumitos.com:~/apps/argentinaenpython.com.ar/assets/data/"',
    ])

    logger.debug(command)
    logger.info('Uploading new "my-position.json" file...')
    os.system(command)
    logger.info('Upload Finished!')


def calc_address(address, when, output=CITIES_FILENAME):
    setup_output(output)

    logger.info('Querying the server for: "%s" ...', address)
    response = geocoder.osm(address)
    logger.info('Got an answer!')

    if not is_osmurl_valid(response):
        osmurl_invalid()
        return

    logger.info('Loading old cities from: %s', output)
    cities = load_json(output)

    osm_id = response.json['osm_id']
    last_osm_id = None
    if cities[when]:
        last_osm_id = cities[when][-1]['osm_id']

    if not cities[when] or osm_id != last_osm_id:
        logger.info('Adding new city: "%s"', response.address)
        cities[when].append(response.json)
        save_json(cities, output)
    else:
        logger.info('This city is already saved. Excluding...')

    return response


def remove_address(address, output=CITIES_FILENAME):
    logger.info('Querying the server for: "%s" ...', address)
    response = geocoder.osm(address)
    logger.info('Got an answer!')
    logger.info('Place: %s', response.address)

    logger.info('Loading old cities from: %s', output)
    cities = load_json(output)

    removed = False
    next_cities = cities['next']
    for city in next_cities:
        if city['osm_id'] == response.json['osm_id']:
            logger.info('City: %s removed!', city['address'])
            next_cities.remove(city)
            removed = True
            break

    if removed:
        save_json(cities, output)
    else:
        logger.info('City not found!')

    return response


def activate(activate=True):
    config['activated'] = activate
    config.write()


def deactivate():
    activate(False)


if __name__ == '__main__':
    arguments = docopt(__doc__, version='Geolocation 0.1')

    setup_logging(arguments['--verbose'])
    if arguments['--no-wait']:
        WAIT_BEFORE_QUERY = 0

    if arguments['--activate']:
        activate()
    elif arguments['--deactivate']:
        deactivate()

    if arguments['-a'] or arguments['--address']:
        q = arguments['<address>']  # .decode('utf8')

        if arguments['--remove']:
            response = remove_address(q)
        else:
            if arguments['--previous-city']:
                when = 'previous'
            else:
                when = 'next'
            calc_address(q, when)

    if arguments['-m'] or arguments['--me']:
        calc_my_position_ip()

    if arguments['--symlinks']:
        create_symlinks()

    create_symlinks()

    logger.info('Finished!')