Source code for geonames_postcode.fetch

# -*- encoding:utf-8 -*-
#
# Copyright (C) 2018 André Wobst <project.geonames_postcode@wobsta.de>
#
# This file is part of geonames_postcode (https://github.com/wobsta/geonames_postcode).
#
# geonames_postcode is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# PyX is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with geonames_postcode; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 

import sys

if sys.version_info[0] == 2:
    import ConfigParser as configparser
    from codecs import open
    from urllib2 import urlopen
else:
    import configparser
    from urllib.request import urlopen

import collections, datetime, importlib, io, itertools, json, os, pprint, unicodedata, zipfile

from . import dataDir, postcode_item, name_item, distance

[docs]def fetch(*countries): config = configparser.ConfigParser() config.read(os.path.join(os.path.dirname(__file__), 'fetch.ini')) for country in countries: if country not in config.sections(): config.add_section(country) assert len(country) == 2 assert country.isupper() url = 'http://download.geonames.org/export/zip/%s.zip' % country zip = os.path.join(os.path.dirname(__file__), 'data', '%s.zip' % country) if not os.path.exists(zip): print('Downloading geonames postcode data from %s ...' % url) print('PLEASE RESPECT THE LICENCE OF THIS DATA (CREATIVE COMMONS ATTRIBUTION 4.0 LICENSE), SEE GEONAMES.ORG FOR DETAILS.') geonames_file = urlopen(url) with open(zip, 'wb') as cache: data = geonames_file.read() while data: cache.write(data) data = geonames_file.read() print('(Download is cached at %s.)' % zip) geonames_item = collections.namedtuple('geonames_item', ['postcode', 'name', 'region', 'subregion', 'subsubregion', 'latitude', 'longitude']) geonames_data = [] postcode_remove_from = config.get(country, 'postcode_remove_from') with zipfile.ZipFile(zip) as z: with z.open('%s.txt' % country) as f: for line in f: (verify_country, postcode, name, region, region_code, subregion, subregion_code, subsubregion, subsubregion_code, latitude, longitude, accuracy) = line.decode('utf-8').split('\t') assert verify_country == country assert (len(accuracy) == 1 or (len(accuracy) == 2 and accuracy[0] in '123456')) and accuracy[-1] == '\n' assert postcode assert name if postcode_remove_from: postcode = postcode[:postcode.index(postcode_remove_from)] latitude = float(latitude) longitude = float(longitude) geonames_data.append(geonames_item(postcode, name, region or None, subregion or None, subsubregion or None, latitude, longitude)) regions = set(item.region for item in geonames_data if item.region is not None) def mean(items): sum = count = 0 for item in items: sum += item count += 1 return sum/count def alphabetical(s): r = s.lower() if country == 'DE': r = r.replace(u'ä', 'ae').replace(u'ö', 'oe').replace(u'ü', 'ue').replace(u'ß', 'ss') r = unicodedata.normalize('NFKD', r).encode('ASCII', 'ignore') return r def match(item, terms): for term in terms: for key, value in term.items(): if ((key == 'name' and item.name != value) or (key == 'name_start' and not item.name.startswith(value)) or (key == 'postcode' and item.postcode != value) or (key == 'latitude' and item.latitude != value) or (key == 'longitude' and item.longitude != value)): break else: return True return False items_per_postcode = collections.defaultdict(list) for item in geonames_data: items_per_postcode[item.postcode].append(item) postcodes = {} for postcode, items in items_per_postcode.items(): postcodes[postcode] = postcode_item(sorted(set(item.name for item in items), key=alphabetical), sorted(set(item.region for item in items if item.region is not None), key=alphabetical), mean(item.latitude for item in items), mean(item.longitude for item in items)) names = {} remaining_geonames_data = [item for item in geonames_data if not match(item, json.loads(config.get(country, 'skip_for_names')))] max_distance = config.getfloat(country, 'max_distance') add_distance_per_item = config.getfloat(country, 'add_distance_per_item') for add_details_to_name in range(4+config.getint(country, 'name_postcode_chars')): # remaining_geonames_data = list(remaining_geonames_data) # print(country, add_details_to_name, len(remaining_geonames_data)) items_per_name = collections.defaultdict(list) for item in remaining_geonames_data: if add_details_to_name: add_to_name = [] if item.region: add_to_name.append(item.region) if add_details_to_name > 1 and item.subregion: add_to_name.append(item.subregion) if add_details_to_name > 2 and item.subsubregion: add_to_name.append(item.subsubregion) if add_details_to_name > 3: add_to_name.append(item.postcode[:add_details_to_name-3]) if add_to_name: name = '%s (%s)' % (item.name, ', '.join(add_to_name)) else: name = item.name else: name = item.name items_per_name[name].append(item) failed_names = {} for name, items in items_per_name.items(): mean_latitude = mean(item.latitude for item in items) mean_longitude = mean(item.longitude for item in items) if max(distance(item.latitude, item.longitude, mean_latitude, mean_longitude) for item in items) > max_distance + add_distance_per_item*len(items): failed_names[name] = items else: names[name.lower()] = name_item(sorted(set(item.postcode for item in items)), mean_latitude, mean_longitude) remaining_geonames_data = itertools.chain(*failed_names.values()) if failed_names: print('Combining of the following name and points is out of bound:') for name, items in failed_names.items(): print(name) for item in items: print('%f,%f,%s' % (item.latitude, item.longitude, item.postcode)) print('-> abort') sys.exit(1) with open(os.path.join(dataDir, 'template'), 'r') as f: template = f.read() data = {'url': url, 'now': datetime.datetime.utcnow().isoformat()} with open(os.path.join(dataDir, '%s.py' % country.lower()), 'w') as f: f.write(template.format(url=url, now=datetime.datetime.utcnow().isoformat(), regions=pprint.pformat(sorted(list(regions), key=alphabetical)), postcodes=pprint.pformat(postcodes), names=pprint.pformat(names))) importlib.import_module('geonames_postcode.data.%s' % country.lower())
def main(): fetch(*sys.argv[1:]) if __name__ == '__main__': main()