Updated bundled version of PyGeoIP

This commit is contained in:
Luke Rogers 2013-07-14 22:12:31 +12:00
parent 487ee03e6c
commit b0e43c0aea
6 changed files with 1718 additions and 1422 deletions

View file

@ -1,17 +1,13 @@
# -*- coding: utf-8 -*-
"""
Pure Python GeoIP API. The API is based off of U{MaxMind's C-based Python API<http://www.maxmind.com/app/python>},
but the code itself is based on the U{pure PHP5 API<http://pear.php.net/package/Net_GeoIP/>}
by Jim Winstead and Hans Lellelid.
Pure Python GeoIP API
It is mostly a drop-in replacement, except the
C{new} and C{open} methods are gone. You should instantiate the L{GeoIP} class yourself:
The API is based on MaxMind's C-based Python API, but the code itself is
ported from the Pure PHP GeoIP API by Jim Winstead and Hans Lellelid.
C{gi = GeoIP('/path/to/GeoIP.dat', pygeoip.MEMORY_CACHE)}
@author: Jennifer Ennis <zaylea@gmail.com>
@author: Jennifer Ennis <zaylea at gmail dot com>
@license:
Copyright(C) 2004 MaxMind LLC
@license: Copyright(C) 2004 MaxMind LLC
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
@ -27,39 +23,43 @@ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/lgpl.txt>.
"""
from __future__ import with_statement, absolute_import, division
import os
import math
import socket
import mmap
import gzip
import codecs
from StringIO import StringIO
from threading import Lock
from . import const
from .util import ip2long
from .timezone import time_zone_by_country_and_region
try:
from StringIO import StringIO
except ImportError:
from io import StringIO, BytesIO
import six
from pygeoip import util, const
from pygeoip.const import PY2, PY3
from pygeoip.timezone import time_zone_by_country_and_region
STANDARD = const.STANDARD
MMAP_CACHE = const.MMAP_CACHE
MEMORY_CACHE = const.MEMORY_CACHE
STANDARD = const.STANDARD
ENCODING = const.ENCODING
class GeoIPError(Exception):
pass
class GeoIPMetaclass(type):
class GeoIPMetaclass(type):
def __new__(cls, *args, **kwargs):
"""
Singleton method to gets an instance without reparsing the db. Unique
instances are instantiated based on the filename of the db. Flags are
ignored for this, i.e. if you initialize one with STANDARD flag (default)
and then try later to initialize with MEMORY_CACHE, it will still
return the STANDARD one.
ignored for this, i.e. if you initialize one with STANDARD
flag (default) and then try later to initialize with MEMORY_CACHE, it
will still return the STANDARD one.
"""
if not hasattr(cls, '_instances'):
cls._instances = {}
@ -68,25 +68,25 @@ class GeoIPMetaclass(type):
elif 'filename' in kwargs:
filename = kwargs['filename']
if not filename in cls._instances:
if filename not in cls._instances:
cls._instances[filename] = type.__new__(cls, *args, **kwargs)
return cls._instances[filename]
GeoIPBase = GeoIPMetaclass('GeoIPBase', (object,), {})
class GeoIP(GeoIPBase):
class GeoIP(GeoIPBase):
def __init__(self, filename, flags=0):
"""
Initialize the class.
@param filename: path to a geoip database. If MEMORY_CACHE is used,
the file can be gzipped.
@param filename: Path to a geoip database.
@type filename: str
@param flags: flags that affect how the database is processed.
Currently the only supported flags are STANDARD (the default),
MEMORY_CACHE (preload the whole file into memory), and
@param flags: Flags that affect how the database is processed.
Currently supported flags are STANDARD (the default),
MEMORY_CACHE (preload the whole file into memory) and
MMAP_CACHE (access the file via mmap).
@type flags: int
"""
@ -94,42 +94,71 @@ class GeoIP(GeoIPBase):
self._flags = flags
if self._flags & const.MMAP_CACHE:
with open(filename, 'rb') as f:
self._filehandle = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
f = open(filename, 'rb')
access = mmap.ACCESS_READ
self._filehandle = mmap.mmap(f.fileno(), 0, access=access)
f.close()
elif self._flags & const.MEMORY_CACHE:
if filename.endswith('.gz'):
opener = gzip.open
else:
opener = open
f = open(filename, 'rb')
self._memoryBuffer = f.read()
iohandle = BytesIO if PY3 else StringIO
self._filehandle = iohandle(self._memoryBuffer)
f.close()
with opener(filename, 'rb') as f:
self._memoryBuffer = f.read()
self._filehandle = StringIO(self._memoryBuffer)
else:
self._filehandle = codecs.open(filename, 'rb','latin_1')
self._filehandle = codecs.open(filename, 'rb', ENCODING)
self._lock = Lock()
self._setup_segments()
def _setup_segments(self):
"""
Parses the database file to determine what kind of database is being used and setup
segment sizes and start points that will be used by the seek*() methods later.
Parses the database file to determine what kind of database is
being used and setup segment sizes and start points that will
be used by the seek*() methods later.
Supported databases:
* COUNTRY_EDITION
* COUNTRY_EDITION_V6
* REGION_EDITION_REV0
* REGION_EDITION_REV1
* CITY_EDITION_REV0
* CITY_EDITION_REV1
* CITY_EDITION_REV1_V6
* ORG_EDITION
* ISP_EDITION
* ASNUM_EDITION
* ASNUM_EDITION_V6
"""
self._databaseType = const.COUNTRY_EDITION
self._recordLength = const.STANDARD_RECORD_LENGTH
self._databaseSegments = const.COUNTRY_BEGIN
self._lock.acquire()
filepos = self._filehandle.tell()
self._filehandle.seek(-3, os.SEEK_END)
for i in range(const.STRUCTURE_INFO_MAX_SIZE):
chars = chr(255) * 3
delim = self._filehandle.read(3)
if delim == six.u(chr(255) * 3):
self._databaseType = ord(self._filehandle.read(1))
if PY3 and type(delim) is bytes:
delim = delim.decode(ENCODING)
if PY2:
chars = chars.decode(ENCODING)
if type(delim) is str:
delim = delim.decode(ENCODING)
if delim == chars:
byte = self._filehandle.read(1)
self._databaseType = ord(byte)
# Compatibility with databases from April 2003 and earlier
if (self._databaseType >= 106):
# backwards compatibility with databases from April 2003 and earlier
self._databaseType -= 105
if self._databaseType == const.REGION_EDITION_REV0:
@ -140,51 +169,29 @@ class GeoIP(GeoIPBase):
elif self._databaseType in (const.CITY_EDITION_REV0,
const.CITY_EDITION_REV1,
const.CITY_EDITION_REV1_V6,
const.ORG_EDITION,
const.ISP_EDITION,
const.ASNUM_EDITION):
const.ASNUM_EDITION,
const.ASNUM_EDITION_V6):
self._databaseSegments = 0
buf = self._filehandle.read(const.SEGMENT_RECORD_LENGTH)
if PY3 and type(buf) is bytes:
buf = buf.decode(ENCODING)
for j in range(const.SEGMENT_RECORD_LENGTH):
self._databaseSegments += (ord(buf[j]) << (j * 8))
if self._databaseType in (const.ORG_EDITION, const.ISP_EDITION):
LONG_RECORDS = (const.ORG_EDITION, const.ISP_EDITION)
if self._databaseType in LONG_RECORDS:
self._recordLength = const.ORG_RECORD_LENGTH
break
else:
self._filehandle.seek(-4, os.SEEK_CUR)
if self._databaseType == const.COUNTRY_EDITION:
self._databaseSegments = const.COUNTRY_BEGIN
self._filehandle.seek(filepos, os.SEEK_SET)
def _lookup_country_id(self, addr):
"""
Get the country index.
This method is called by the _lookupCountryCode and _lookupCountryName
methods. It looks up the index ('id') for the country which is the key
for the code and name.
@param addr: The IP address
@type addr: str
@return: network byte order 32-bit integer
@rtype: int
"""
ipnum = ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
if self._databaseType != const.COUNTRY_EDITION:
raise GeoIPError('Invalid database type; country_* methods expect '\
'Country database')
return self._seek_country(ipnum) - const.COUNTRY_BEGIN
self._lock.release()
def _seek_country(self, ipnum):
"""
@ -196,117 +203,119 @@ class GeoIP(GeoIPBase):
@return: offset of start of record
@rtype: int
"""
offset = 0
try:
offset = 0
seek_depth = 127 if len(str(ipnum)) > 10 else 31
for depth in range(31, -1, -1):
for depth in range(seek_depth, -1, -1):
if self._flags & const.MEMORY_CACHE:
startIndex = 2 * self._recordLength * offset
endIndex = startIndex + (2 * self._recordLength)
buf = self._memoryBuffer[startIndex:endIndex]
else:
startIndex = 2 * self._recordLength * offset
readLength = 2 * self._recordLength
self._lock.acquire()
self._filehandle.seek(startIndex, os.SEEK_SET)
buf = self._filehandle.read(readLength)
self._lock.release()
if self._flags & const.MEMORY_CACHE:
startIndex = 2 * self._recordLength * offset
length = 2 * self._recordLength
endIndex = startIndex + length
buf = self._memoryBuffer[startIndex:endIndex]
else:
self._filehandle.seek(2 * self._recordLength * offset, os.SEEK_SET)
buf = self._filehandle.read(2 * self._recordLength)
if PY3 and type(buf) is bytes:
buf = buf.decode(ENCODING)
x = [0,0]
x = [0, 0]
for i in range(2):
for j in range(self._recordLength):
byte = buf[self._recordLength * i + j]
x[i] += ord(byte) << (j * 8)
if ipnum & (1 << depth):
if x[1] >= self._databaseSegments:
return x[1]
offset = x[1]
else:
if x[0] >= self._databaseSegments:
return x[0]
offset = x[0]
except:
pass
for i in range(2):
for j in range(self._recordLength):
x[i] += ord(buf[self._recordLength * i + j]) << (j * 8)
if ipnum & (1 << depth):
if x[1] >= self._databaseSegments:
return x[1]
offset = x[1]
else:
if x[0] >= self._databaseSegments:
return x[0]
offset = x[0]
raise Exception('Error traversing database - perhaps it is corrupt?')
raise GeoIPError('Corrupt database')
def _get_org(self, ipnum):
"""
Seek and return organization (or ISP) name for converted IP addr.
Seek and return organization or ISP name for ipnum.
@param ipnum: Converted IP address
@type ipnum: int
@return: org/isp name
@rtype: str
"""
seek_org = self._seek_country(ipnum)
if seek_org == self._databaseSegments:
return None
record_pointer = seek_org + (2 * self._recordLength - 1) * self._databaseSegments
read_length = (2 * self._recordLength - 1) * self._databaseSegments
self._lock.acquire()
self._filehandle.seek(seek_org + read_length, os.SEEK_SET)
buf = self._filehandle.read(const.MAX_ORG_RECORD_LENGTH)
self._lock.release()
self._filehandle.seek(record_pointer, os.SEEK_SET)
if PY3 and type(buf) is bytes:
buf = buf.decode(ENCODING)
org_buf = self._filehandle.read(const.MAX_ORG_RECORD_LENGTH)
return org_buf[:org_buf.index(chr(0))]
return buf[:buf.index(chr(0))]
def _get_region(self, ipnum):
"""
Seek and return the region info (dict containing country_code and region_name).
Seek and return the region info (dict containing country_code
and region_name).
@param ipnum: converted IP address
@param ipnum: Converted IP address
@type ipnum: int
@return: dict containing country_code and region_name
@rtype: dict
"""
country_code = ''
region = ''
country_code = ''
seek_country = self._seek_country(ipnum)
def get_region_name(offset):
region1 = chr(offset // 26 + 65)
region2 = chr(offset % 26 + 65)
return ''.join([region1, region2])
if self._databaseType == const.REGION_EDITION_REV0:
seek_country = self._seek_country(ipnum)
seek_region = seek_country - const.STATE_BEGIN_REV0
if seek_region >= 1000:
country_code = 'US'
region = ''.join([chr((seek_region // 1000) // 26 + 65), chr((seek_region // 1000) % 26 + 65)])
region = get_region_name(seek_region - 1000)
else:
country_code = const.COUNTRY_CODES[seek_region]
region = ''
elif self._databaseType == const.REGION_EDITION_REV1:
seek_country = self._seek_country(ipnum)
seek_region = seek_country - const.STATE_BEGIN_REV1
if seek_region < const.US_OFFSET:
country_code = '';
region = ''
pass
elif seek_region < const.CANADA_OFFSET:
country_code = 'US'
region = ''.join([chr((seek_region - const.US_OFFSET) // 26 + 65), chr((seek_region - const.US_OFFSET) % 26 + 65)])
elif seek_region < const.WORLD_OFFSET:
region = get_region_name(seek_region - const.US_OFFSET)
elif seek_region < const.WORLD_OFFSET:
country_code = 'CA'
region = ''.join([chr((seek_region - const.CANADA_OFFSET) // 26 + 65), chr((seek_region - const.CANADA_OFFSET) % 26 + 65)])
region = get_region_name(seek_region - const.CANADA_OFFSET)
else:
i = (seek_region - const.WORLD_OFFSET) // const.FIPS_RANGE
if i < len(const.COUNTRY_CODES):
#country_code = const.COUNTRY_CODES[(seek_region - const.WORLD_OFFSET) // const.FIPS_RANGE]
country_code = const.COUNTRY_CODES[i]
else:
country_code = ''
region = ''
elif self._databaseType in (const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
index = (seek_region - const.WORLD_OFFSET) // const.FIPS_RANGE
if index in const.COUNTRY_CODES:
country_code = const.COUNTRY_CODES[index]
elif self._databaseType in const.CITY_EDITIONS:
rec = self._get_record(ipnum)
country_code = rec['country_code'] if 'country_code' in rec else ''
region = rec['region_name'] if 'region_name' in rec else ''
region = rec.get('region_name', '')
country_code = rec.get('country_code', '')
return {'country_code' : country_code, 'region_name' : region }
return {'country_code': country_code, 'region_name': region}
def _get_record(self, ipnum):
"""
Populate location dict for converted IP.
@param ipnum: converted IP address
@param ipnum: Converted IP address
@type ipnum: int
@return: dict with country_code, country_code3, country_name,
region, city, postal_code, latitude, longitude,
@ -315,107 +324,115 @@ class GeoIP(GeoIPBase):
"""
seek_country = self._seek_country(ipnum)
if seek_country == self._databaseSegments:
return None
return {}
record_pointer = seek_country + (2 * self._recordLength - 1) * self._databaseSegments
read_length = (2 * self._recordLength - 1) * self._databaseSegments
self._lock.acquire()
self._filehandle.seek(seek_country + read_length, os.SEEK_SET)
buf = self._filehandle.read(const.FULL_RECORD_LENGTH)
self._lock.release()
self._filehandle.seek(record_pointer, os.SEEK_SET)
record_buf = self._filehandle.read(const.FULL_RECORD_LENGTH)
if PY3 and type(buf) is bytes:
buf = buf.decode(ENCODING)
record = {}
record_buf_pos = 0
char = ord(record_buf[record_buf_pos])
#char = record_buf[record_buf_pos] if six.PY3 else ord(record_buf[record_buf_pos])
record['country_code'] = const.COUNTRY_CODES[char]
record['country_code3'] = const.COUNTRY_CODES3[char]
record['country_name'] = const.COUNTRY_NAMES[char]
record_buf_pos += 1
str_length = 0
# get region
char = ord(record_buf[record_buf_pos+str_length])
while (char != 0):
str_length += 1
char = ord(record_buf[record_buf_pos+str_length])
if str_length > 0:
record['region_name'] = record_buf[record_buf_pos:record_buf_pos+str_length]
record_buf_pos += str_length + 1
str_length = 0
# get city
char = ord(record_buf[record_buf_pos+str_length])
while (char != 0):
str_length += 1
char = ord(record_buf[record_buf_pos+str_length])
if str_length > 0:
record['city'] = record_buf[record_buf_pos:record_buf_pos+str_length]
else:
record['city'] = ''
record_buf_pos += str_length + 1
str_length = 0
# get the postal code
char = ord(record_buf[record_buf_pos+str_length])
while (char != 0):
str_length += 1
char = ord(record_buf[record_buf_pos+str_length])
if str_length > 0:
record['postal_code'] = record_buf[record_buf_pos:record_buf_pos+str_length]
else:
record['postal_code'] = None
record_buf_pos += str_length + 1
str_length = 0
record = {
'dma_code': 0,
'area_code': 0,
'metro_code': '',
'postal_code': ''
}
latitude = 0
longitude = 0
buf_pos = 0
# Get country
char = ord(buf[buf_pos])
record['country_code'] = const.COUNTRY_CODES[char]
record['country_code3'] = const.COUNTRY_CODES3[char]
record['country_name'] = const.COUNTRY_NAMES[char]
record['continent'] = const.CONTINENT_NAMES[char]
buf_pos += 1
def get_data(buf, buf_pos):
offset = buf_pos
char = ord(buf[offset])
while (char != 0):
offset += 1
char = ord(buf[offset])
if offset > buf_pos:
return (offset, buf[buf_pos:offset])
return (offset, '')
offset, record['region_name'] = get_data(buf, buf_pos)
offset, record['city'] = get_data(buf, offset + 1)
offset, record['postal_code'] = get_data(buf, offset + 1)
buf_pos = offset + 1
for j in range(3):
char = ord(record_buf[record_buf_pos])
record_buf_pos += 1
char = ord(buf[buf_pos])
buf_pos += 1
latitude += (char << (j * 8))
record['latitude'] = (latitude/10000.0) - 180.0
for j in range(3):
char = ord(record_buf[record_buf_pos])
record_buf_pos += 1
char = ord(buf[buf_pos])
buf_pos += 1
longitude += (char << (j * 8))
record['longitude'] = (longitude/10000.0) - 180.0
record['latitude'] = (latitude / 10000.0) - 180.0
record['longitude'] = (longitude / 10000.0) - 180.0
if self._databaseType == const.CITY_EDITION_REV1:
if self._databaseType in (const.CITY_EDITION_REV1, const.CITY_EDITION_REV1_V6):
dmaarea_combo = 0
if record['country_code'] == 'US':
for j in range(3):
char = ord(record_buf[record_buf_pos])
record_buf_pos += 1
dmaarea_combo += (char << (j*8))
char = ord(buf[buf_pos])
dmaarea_combo += (char << (j * 8))
buf_pos += 1
record['dma_code'] = int(math.floor(dmaarea_combo/1000))
record['area_code'] = dmaarea_combo%1000
else:
record['dma_code'] = 0
record['area_code'] = 0
record['dma_code'] = int(math.floor(dmaarea_combo / 1000))
record['area_code'] = dmaarea_combo % 1000
if 'dma_code' in record and record['dma_code'] in const.DMA_MAP:
record['metro_code'] = const.DMA_MAP[record['dma_code']]
else:
record['metro_code'] = ''
if 'country_code' in record:
record['time_zone'] = time_zone_by_country_and_region(
record['country_code'], record.get('region_name')) or ''
else:
record['time_zone'] = ''
record['metro_code'] = const.DMA_MAP.get(record['dma_code'])
params = (record['country_code'], record['region_name'])
record['time_zone'] = time_zone_by_country_and_region(*params)
return record
def _gethostbyname(self, hostname):
if self._databaseType in const.IPV6_EDITIONS:
try:
response = socket.getaddrinfo(hostname, 0, socket.AF_INET6)
family, socktype, proto, canonname, sockaddr = response[0]
address, port, flow, scope = sockaddr
return address
except socket.gaierror:
return ''
else:
return socket.gethostbyname(hostname)
def id_by_addr(self, addr):
"""
Get the country index.
Looks up the index for the country which is the key for
the code and name.
@param addr: The IP address
@type addr: str
@return: network byte order 32-bit integer
@rtype: int
"""
ipnum = util.ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
COUNTY_EDITIONS = (const.COUNTRY_EDITION, const.COUNTRY_EDITION_V6)
if self._databaseType not in COUNTY_EDITIONS:
message = 'Invalid database type, expected Country'
raise GeoIPError(message)
return self._seek_country(ipnum) - const.COUNTRY_BEGIN
def country_code_by_addr(self, addr):
"""
Returns 2-letter country code (e.g. 'US') for specified IP address.
@ -427,31 +444,38 @@ class GeoIP(GeoIPBase):
@rtype: str
"""
try:
if self._databaseType == const.COUNTRY_EDITION:
country_id = self._lookup_country_id(addr)
return const.COUNTRY_CODES[country_id]
elif self._databaseType in (const.REGION_EDITION_REV0, const.REGION_EDITION_REV1,
const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
return self.region_by_addr(addr)['country_code']
else:
raise GeoIPError('Invalid database type; country_* methods expect '\
'Country, City, or Region database')
VALID_EDITIONS = (const.COUNTRY_EDITION, const.COUNTRY_EDITION_V6)
if self._databaseType in VALID_EDITIONS:
ipv = 6 if addr.find(':') >= 0 else 4
if ipv == 4 and self._databaseType != const.COUNTRY_EDITION:
message = 'Invalid database type; expected IPv6 address'
raise ValueError(message)
if ipv == 6 and self._databaseType != const.COUNTRY_EDITION_V6:
message = 'Invalid database type; expected IPv4 address'
raise ValueError(message)
country_id = self.id_by_addr(addr)
return const.COUNTRY_CODES[country_id]
elif self._databaseType in const.REGION_CITY_EDITIONS:
return self.region_by_addr(addr).get('country_code')
message = 'Invalid database type, expected Country, City or Region'
raise GeoIPError(message)
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def country_code_by_name(self, hostname):
"""
Returns 2-letter country code (e.g. 'US') for specified hostname.
Use this method if you have a Country, Region, or City database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: 2-letter country code
@rtype: str
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.country_code_by_addr(addr)
def country_name_by_addr(self, addr):
@ -465,34 +489,35 @@ class GeoIP(GeoIPBase):
@rtype: str
"""
try:
if self._databaseType == const.COUNTRY_EDITION:
country_id = self._lookup_country_id(addr)
VALID_EDITIONS = (const.COUNTRY_EDITION, const.COUNTRY_EDITION_V6)
if self._databaseType in VALID_EDITIONS:
country_id = self.id_by_addr(addr)
return const.COUNTRY_NAMES[country_id]
elif self._databaseType in (const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
return self.record_by_addr(addr)['country_name']
elif self._databaseType in const.CITY_EDITIONS:
return self.record_by_addr(addr).get('country_name')
else:
raise GeoIPError('Invalid database type; country_* methods expect '\
'Country or City database')
message = 'Invalid database type, expected Country or City'
raise GeoIPError(message)
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def country_name_by_name(self, hostname):
"""
Returns full country name for specified hostname.
Use this method if you have a Country database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: country name
@rtype: str
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.country_name_by_addr(addr)
def org_by_addr(self, addr):
"""
Lookup the organization (or ISP) for given IP address.
Use this method if you have an Organization/ISP database.
Lookup Organization, ISP or ASNum for given IP address.
Use this method if you have an Organization, ISP or ASNum database.
@param addr: IP address
@type addr: str
@ -500,31 +525,30 @@ class GeoIP(GeoIPBase):
@rtype: str
"""
try:
ipnum = ip2long(addr)
ipnum = util.ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
raise ValueError('Invalid IP address')
if self._databaseType not in (const.ORG_EDITION, const.ISP_EDITION, const.ASNUM_EDITION):
raise GeoIPError('Invalid database type; org_* methods expect '\
'Org/ISP database')
valid = (const.ORG_EDITION, const.ISP_EDITION, const.ASNUM_EDITION, const.ASNUM_EDITION_V6)
if self._databaseType not in valid:
message = 'Invalid database type, expected Org, ISP or ASNum'
raise GeoIPError(message)
return self._get_org(ipnum)
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def org_by_name(self, hostname):
"""
Lookup the organization (or ISP) for hostname.
Use this method if you have an Organization/ISP database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: organization or ISP name
@return: Organization or ISP name
@rtype: str
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.org_by_addr(addr)
def record_by_addr(self, addr):
@ -534,38 +558,41 @@ class GeoIP(GeoIPBase):
@param addr: IP address
@type addr: str
@return: dict with country_code, country_code3, country_name,
region, city, postal_code, latitude, longitude,
dma_code, metro_code, area_code, region_name, time_zone
@return: Dictionary with country_code, country_code3, country_name,
region, city, postal_code, latitude, longitude, dma_code,
metro_code, area_code, region_name, time_zone
@rtype: dict
"""
try:
ipnum = ip2long(addr)
ipnum = util.ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
raise ValueError('Invalid IP address')
if not self._databaseType in (const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
raise GeoIPError('Invalid database type; record_* methods expect City database')
if self._databaseType not in const.CITY_EDITIONS:
message = 'Invalid database type, expected City'
raise GeoIPError(message)
return self._get_record(ipnum)
rec = self._get_record(ipnum)
if not rec:
return None
return rec
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def record_by_name(self, hostname):
"""
Look up the record for a given hostname.
Use this method if you have a City database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: dict with country_code, country_code3, country_name,
region, city, postal_code, latitude, longitude,
dma_code, metro_code, area_code, region_name, time_zone
@return: Dictionary with country_code, country_code3, country_name,
region, city, postal_code, latitude, longitude, dma_code,
metro_code, area_code, region_name, time_zone
@rtype: dict
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.record_by_addr(addr)
def region_by_addr(self, addr):
@ -575,37 +602,33 @@ class GeoIP(GeoIPBase):
@param addr: IP address
@type addr: str
@return: dict containing country_code, region,
and region_name
@return: Dictionary containing country_code, region and region_name
@rtype: dict
"""
try:
ipnum = ip2long(addr)
ipnum = util.ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
raise ValueError('Invalid IP address')
if not self._databaseType in (const.REGION_EDITION_REV0, const.REGION_EDITION_REV1,
const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
raise GeoIPError('Invalid database type; region_* methods expect '\
'Region or City database')
if self._databaseType not in const.REGION_CITY_EDITIONS:
message = 'Invalid database type, expected Region or City'
raise GeoIPError(message)
return self._get_region(ipnum)
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def region_by_name(self, hostname):
"""
Lookup the region for given hostname.
Use this method if you have a Region database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: dict containing country_code, region,
and region_name
@return: Dictionary containing country_code, region, and region_name
@rtype: dict
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.region_by_addr(addr)
def time_zone_by_addr(self, addr):
@ -613,35 +636,33 @@ class GeoIP(GeoIPBase):
Look up the time zone for a given IP address.
Use this method if you have a Region or City database.
@param hostname: IP address
@type hostname: str
@param addr: IP address
@type addr: str
@return: Time zone
@rtype: str
"""
try:
ipnum = ip2long(addr)
ipnum = util.ip2long(addr)
if not ipnum:
raise ValueError("Invalid IP address: %s" % addr)
raise ValueError('Invalid IP address')
if not self._databaseType in (const.REGION_EDITION_REV0, const.REGION_EDITION_REV1,
const.CITY_EDITION_REV0, const.CITY_EDITION_REV1):
raise GeoIPError('Invalid database type; region_* methods expect '\
'Region or City database')
if self._databaseType not in const.CITY_EDITIONS:
message = 'Invalid database type, expected City'
raise GeoIPError(message)
return self._get_record(ipnum)['time_zone']
return self._get_record(ipnum).get('time_zone')
except ValueError:
raise GeoIPError('*_by_addr methods only accept IP addresses. Use *_by_name for hostnames. (Address: %s)' % addr)
raise GeoIPError('Failed to lookup address %s' % addr)
def time_zone_by_name(self, hostname):
"""
Look up the time zone for a given hostname.
Use this method if you have a Region or City database.
@param hostname: host name
@param hostname: Hostname
@type hostname: str
@return: Time zone
@rtype: str
"""
addr = socket.gethostbyname(hostname)
addr = self._gethostbyname(hostname)
return self.time_zone_by_addr(addr)