Source code for irclog.archive

""":mod:`irclog.archive` --- IRC log archive
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. todo:: File encoding auto-detection.

.. data:: STRPTIME_TO_GLOB

   The table for translating :func:`strptime() <time.strptime>` directives to
   :mod:`glob` patterns.

.. data:: STRPTIME_DIRECTIVE_PATTERN

   The :mod:`re` pattern object that matches to
   :func:`strptime() <time.strptime>` directives.

   .. sourcecode:: pycon

      >>> fmtstr = "%Y-%m-%d"
      >>> STRPTIME_DIRECTIVE_PATTERN.sub(lambda m: STRPTIME_TO_GLOB[m.group(0)],
      ...                                fmtstr)
      '[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9]'

"""
import re
import glob
import os.path
import functools
import datetime
try:
    import cStringIO as StringIO
except ImportError:
    import StringIO
import irclog.parser


STRPTIME_TO_GLOB = {"%a": "???",
                    "%A": "*",
                    "%b": "???",
                    "%B": "*",
                    "%c": "*",
                    "%d": "[0-3][0-9]",
                    "%f": "[0-9]" * 6,
                    "%H": "[0-2][0-9]",
                    "%I": "[01][0-9]",
                    "%j": "[0-3][0-9][0-9]",
                    "%m": "[01][0-9]",
                    "%M": "[0-6][0-9]",
                    "%p": "[APap][Mm]",
                    "%S": "[0-6][0-9]",
                    "%U": "[0-5][0-9]",
                    "%w": "[0-6]",
                    "%W": "[0-5][0-9]",
                    "%x": "*",
                    "%X": "*",
                    "%y": "[0-9]" * 2,
                    "%Y": "[0-9]" * 4,
                    "%z": "*",
                    "%Z": "*",
                    "%%": "%"}
STRPTIME_TO_PATTERN = {"%a": "[Mm]on|[Tt]u[eu]|[Ww]ed|[Ff]ri|[Ss](?:at|un)",
                       "%A": "[Mm]onday|[Tt]uesday|[Ww]ednesday|[Tt]hursday"
                             "|[Ff]riday|[Ss]aturday|[Ss]unday",
                       "%b": "[Jj](?:an|u[nl])|[Ff]eb|[Mm]a[ry]|[Aa](?:pr|ug)"
                             "|[Ss]ep|[Oo]ct|[Nn]ov|[Dd]ec",
                       "%B": "[Jj]anuary|[Ff]ebruary|[Mm]arch|[Aa]pril|[Mm]ay"
                             "|[Jj]une|[Jj]uly|[Aa]ugust|[Ss]eptember"
                             "|[Oo]ctober|[Nn]ovember|[Dd]ecember",
                       "%c": ".+?",
                       "%d": "[012][0-9]|3[01]",
                       "%f": "[0-9]{6}",
                       "%H": "[0-1][0-9]|2[0-4]",
                       "%I": "0[0-9]|1[012]",
                       "%j": "[0-2][0-9][0-9]|3[0-5][0-9]|36[0-6]",
                       "%m": "0[0-9]|1[012]",
                       "%M": "[0-5][0-9]|60",
                       "%p": "[APap][Mm]",
                       "%S": "[0-5][0-9]|6[01]",
                       "%U": "[0-4][0-9]|5[0-3]",
                       "%w": "[0-6]",
                       "%W": "[0-4][0-9]|5[0-3]",
                       "%x": ".+?",
                       "%X": ".+?",
                       "%y": "[0-9]{2}",
                       "%Y": "[0-9]{4}",
                       "%z": "|[+-](?:[0-1][0-9]|2[0-4])(?:[0-5][0-9]|60)",
                       "%Z": ".*?",
                       "%%": "%"}
STRPTIME_DIRECTIVE_PATTERN = re.compile("|".join(STRPTIME_TO_GLOB.iterkeys()))


[docs]class FilenamePattern(object): """The glob-like, but specialized to IRC log files, filename pattern matcher. .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> pattern.glob_pattern_string() '/logs/*/*.[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9].log' >>> pattern.glob_pattern_string(server="Freenode") '/logs/Freenode/*.[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9].log' .. data:: REPLACER_PATTERN The pattern of replacer e.g. ``<date:%Y-%m-%d>``, ``<channel>``. .. productionlist:: replacer: "<" `name` [ ":" `format` ] ">" name: /[A-Za-z_]+/ format: /[^>]*/ """ REPLACER_PATTERN = re.compile(r"<(?P<name>[A-Za-z_]+)" r"(?::(?P<format>[^>]*))?>") __slots__ = "pattern", "_re_pattern" def __init__(self, pattern): self.pattern = pattern self._re_pattern = None @property def replacers(self): """The list of replacers. .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> list(pattern.replacers) ['server', 'channel', 'date'] """ for m in self.REPLACER_PATTERN.finditer(self.pattern): yield m.group("name") @property def replacer_pairs(self): """The list of replacers' ``(name, format)``. (For replacers have no format, ``(name, None)``.) .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> list(pattern.replacer_pairs) [('server', None), ('channel', None), ('date', '%Y-%m-%d')] """ for m in self.REPLACER_PATTERN.finditer(self.pattern): yield m.group("name"), m.group("format") or None @property def replacer_dict(self): """The :class:`dict` of replacers. .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> pattern.replacer_dict["server"] >>> pattern.replacer_dict["date"] '%Y-%m-%d' >>> pattern.replacer_dict["xyz"] Traceback (most recent call last): ... KeyError: 'xyz' """ return dict(self.replacer_pairs)
[docs] def fill_replacers(self, replacers, escape=None): """Fills replacers with given values. .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> import datetime >>> pattern.fill_replacers({"server": "Freenode", ... "channel": "#hongminhee", ... "date": datetime.date(2010, 8, 4)}) '/logs/Freenode/#hongminhee.2010-08-04.log' When ``escape`` function has given, non-replacers are applied into the function: .. sourcecode:: pycon >>> pattern.fill_replacers({"server": "Freenode", ... "channel": "#hongminhee", ... "date": datetime.date(2010, 8, 4)}, ... escape=lambda x: x.upper()) '/LOGS/Freenode/#hongminhee.2010-08-04.LOG' It may raise :exc:`KeyError` when too few replacers has given: .. sourcecode:: pycon >>> pattern.fill_replacers({"server": "Freenode"}) Traceback (most recent call last): ... KeyError: 'channel' :param replacers: a mapping object of ``(replacer_name, str_to_replace)`` :type replacers: :class:`dict`, mapping object :returns: a filled filename """ if not isinstance(replacers, dict): replacers = dict(replacers) pos = 0 buffer = StringIO.StringIO() matches = self.REPLACER_PATTERN.finditer(self.pattern) for m in matches: if pos < m.start(): part = self.pattern[pos:m.start()] buffer.write(escape(part) if escape else part) pos = m.end() value = replacers[m.group("name")] if m.group("format") and not isinstance(value, basestring): value = format(value, m.group("format")) buffer.write(value) part = self.pattern[pos:] buffer.write(escape(part) if escape else part) return buffer.getvalue()
[docs] def glob_pattern_string(self, **replacers): """Generates a :mod:`glob` pattern string. It takes keyword arguments of replacers to fill also. .. sourcecode:: pycon >>> pattern = FilenamePattern("/logs/<server>" ... "/<channel>.<date:%Y-%m-%d>.log") >>> pattern.glob_pattern_string() '/logs/*/*.[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9].log' >>> pattern.glob_pattern_string(server="Freenode") '/logs/Freenode/*.[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9].log' >>> pattern.glob_pattern_string(channel="#hongminhee") '/logs/*/#hongminhee.[0-9][0-9][0-9][0-9]-[01][0-9]-[0-3][0-9].log' >>> import datetime >>> pattern.glob_pattern_string(date=datetime.date(2010, 8, 4)) '/logs/*/*.2010-08-04.log' :param \*\*replacers: replacers to fill. keywords go replacer names and values fills them :returns: a glob pattern string """ for name, form in self.replacer_pairs: if name not in replacers: if form: replacers[name] = STRPTIME_DIRECTIVE_PATTERN.sub( lambda m: STRPTIME_TO_GLOB[m.group(0)], form ) else: replacers[name] = "*" return self.fill_replacers(replacers)
[docs] def glob(self, **replacers): """Globs with the pattern. :param \*\*replacers: replacers to fill. keywords go replacer names and values fills them :returns: a :class:`list` of all matched paths .. seealso:: - Module :mod:`glob` - Function :func:`glob.glob()` """ return glob.glob(self.glob_pattern_string(**replacers))
[docs] def iglob(self, **replacers): """Case-insensitive version of :meth:`glob()`. :param \*\*replacers: replacers to fill. keywords go replacer names and values fills them :returns: a :class:`list` of all matched paths .. seealso:: - Module :mod:`glob` - Function :func:`glob.iglob()` """ return glob.glob(self.glob_pattern_string(**replacers))
[docs] def re_pattern_string(self, **replacers): r"""Generates a :mod:`re` pattern string. It takes keyword arguments of replacers to fill also. .. sourcecode:: pycon >>> import datetime >>> pattern = FilenamePattern("/<server>/<channel>.<date:%Y%m%d>") >>> pattern.re_pattern_string(date=datetime.date(2010, 8, 4)) '^\\/(?P<server>.+?)\\/(?P<channel>.+?)\\.20100804$' :param \*\*replacers: replacers to fill. keywords go replacer names and values fills them :returns: a :mod:`re` pattern string """ def replace(m): return "(?:" + STRPTIME_TO_PATTERN[m.group(0)] + ")" for name, form in self.replacer_pairs: if name not in replacers: if form: val = STRPTIME_DIRECTIVE_PATTERN.sub(replace, form) val = "(?P<{0}>{1})".format(name, val) else: val = "(?P<{0}>.+?)".format(name) replacers[name] = val return "^" + self.fill_replacers(replacers, escape=re.escape) + "$"
[docs] def re_pattern(self, **replacers): """Generates a :mod:`re` pattern object. It takes keyword arguments of replacers to fill also. :param \*\*replacers: replacers to fill. keywords go replacer names and values fills them :returns: a :mod:`re` pattern .. note:: This method is the same to following code:: re.compile(file_pattern.re_pattern_string(**replacers)) """ if not self._re_pattern: self._re_pattern = re.compile(self.re_pattern_string(**replacers)) return self._re_pattern
def __str__(self): return self.pattern def __repr__(self): t = type(self) mod = "" if t.__module__ == "__main__" else t.__module__ + "." return "{0}{1}({2!r})".format(mod, t.__name__, self.pattern)
[docs]class BaseArchive(object): """Abstract base class for :class:`Archive`, :class:`Server` and :class:`Channel`. .. data:: ELEMENT_CLASS Should be implemented in the subclass. .. data:: ELEMNENT_TAG Should be implemented in the subclass. .. attribute:: pattern_replacers Should be implemented in the subclass. """ pattern_replacers = {} def __iter__(self): regex = self.pattern.re_pattern(**self.pattern_replacers) files = self.pattern.glob(**self.pattern_replacers) files.sort() elements = [] for path in files: match = regex.match(path) if match: try: el = self.decode_element_key(match.group(self.ELEMENT_TAG)) except IndexError: continue if el not in elements: elements.append(el) yield self.ELEMENT_CLASS(self, el)
[docs] def decode_element_key(self, element_key): """Decodes an element key string to element key. :param element_key: an element key string :type element_key: :class:`basestring` :returns: an element key """ return element_key
[docs] def encode_element_key(self, element_key): """Encodes am element key to element key string. :param element_key: an element key :returns: an element key string """ form = self.pattern.replacer_dict[self.ELEMENT_TAG] if form: return format(element_key, form) return str(element_key)
def __contains__(self, element): replacers = dict(self.pattern_replacers) replacers[self.ELEMENT_TAG] = self.encode_element_key(element) paths = self.pattern.glob(**replacers) return bool(paths) def __getitem__(self, element): if element in self: return self.ELEMENT_CLASS(self, element) raise KeyError(element) def __len__(self): cnt = 0 for _ in self: cnt += 1 return cnt
[docs]class Archive(BaseArchive): """Log archive. :param pattern: logs filename pattern e.g. ``"/logs/<server>/<channel>.<date:%Y-%m-%d>.log"`` :type pattern: :class:`FilenamePattern`, :class:`basestring` """ ELEMENT_CLASS = lambda *a, **k: Server(*a, **k) ELEMENT_TAG = "server" __slots__ = "pattern", def __init__(self, pattern): if not isinstance(pattern, FilenamePattern): pattern = FilenamePattern(pattern) self.pattern = pattern def __repr__(self): t = type(self) mod = "" if t.__module__ == "__main__" else t.__module__ + "." return "{0}{1}({2!r})".format(mod, t.__name__, str(self.pattern))
[docs]class Server(BaseArchive): """IRC server. :param archive: an archive :type archive: :class:`Archive` :param server: a server name :type server: :class:`basestring` """ ELEMENT_CLASS = lambda *a, **k: Channel(*a, **k) ELEMENT_TAG = "channel" __slots__ = "archive", "server" def __init__(self, archive, server): self.archive = archive self.server = server @property def pattern(self): return self.archive.pattern @property def pattern_replacers(self): replacers = dict(self.archive.pattern_replacers) replacers["server"] = self.server return replacers def __eq__(self, other): return self.archive == other.archive and self.server == other.server def __ne__(self, other): return not (self == other) def __str__(self): return self.server def __repr__(self): t = type(self) mod = "" if t.__module__ == "__main__" else t.__module__ + "." clsname = mod + t.__name__ return "{0}({1!r}, {2!r})".format(clsname, self.archive, self.server)
[docs]class Channel(BaseArchive): """IRC channel or nick. :param server: a server :type server: :class:`Server` :param channel: a channel name or a nick :type channel: :class:`basestring` """ ELEMENT_CLASS = lambda *a, **k: Log(*a, **k) ELEMENT_TAG = "date" __slots__ = "server", "channel" def __init__(self, server, channel): self.server = server self.channel = channel @property def archive(self): return self.server.archive @property def pattern(self): return self.archive.pattern @property def pattern_replacers(self): replacers = dict(self.server.pattern_replacers) replacers["channel"] = self.channel return replacers
[docs] def decode_element_key(self, element_key): """Decodes an element key string to element key. :param element_key: an element key string :type element_key: :class:`basestring` :returns: an element key """ form = self.pattern.replacer_dict[self.ELEMENT_TAG] or "%Y-%m-%d" time = datetime.datetime.strptime(element_key, form) return time.date()
[docs] def encode_element_key(self, element_key): """Encodes am element key to element key string. :param element_key: an element key :returns: an element key string """ if not isinstance(element_key, datetime.date): raise TypeError("expected a datetime.date instance, " "not " + repr(element_key)) return super(Channel, self).encode_element_key(element_key)
def __contains__(self, date): return True def __eq__(self, other): return self.server == other.server and self.channel == other.channel def __ne__(self, other): return not (self == other) def __str__(self): return self.channel def __unicode__(self): try: return self.channel.decode("utf-8") except UnicodeDecodeError: from os import environ try: _, enc = environ["LANG"].split(".") except (LookupError, ValueError): return self.channel.decode("utf-8", "replace") return self.channel.decode(enc, "replace") def __repr__(self): t = type(self) mod = "" if t.__module__ == "__main__" else t.__module__ + "." clsname = mod + t.__name__ return "{0}({1!r}, {2!r})".format(clsname, self.server, self.channel)
[docs]class Log(object): """IRC log. :param channel: a channel logged :type channel: :class:`Channel` :param date: a date logged :type date: :class:`datetime.date` """ __slots__ = "channel", "date" def __init__(self, channel, date): self.channel = channel self.date = date @property def server(self): """The server.""" return self.channel.server @property def archive(self): """The archive.""" return self.server.archive @property def pattern(self): """The filename pattern.""" return self.archive.pattern @property def yesterday_log(self): """The yesterday log of the same channel.""" return self.channel[self.date - datetime.timedelta(days=1)] @property def tomorrow_log(self): """The tomorrow log of the same channel.""" return self.channel[self.date + datetime.timedelta(days=1)] @property def filename(self): """The filename of the log. If not exists, it could be ``None``.""" replacers = dict(self.channel.pattern_replacers) replacers["date"] = self.channel.encode_element_key(self.date) files = self.pattern.glob(**replacers) if files: return files[0] @property def file_size(self): """The size of the log file in bytes.""" filename = self.filename if filename: return os.path.getsize(filename) return 0
[docs] def is_logged(self): """Returns ``True`` if the channel of the day has logged. :returns: ``True`` or ``False`` """ return bool(self.filename)
def __eq__(self, other): return self.channel == other.channel and self.date == other.date def __ne__(self, other): return not (self == other) def __iter__(self): filename = self.filename if not filename: return with open(filename) as file: for msg in irclog.parser.parse(file, self.date): yield msg def __repr__(self): t = type(self) mod = "" if t.__module__ == "__main__" else t.__module__ + "." clsname = mod + t.__name__ return "{0}({1!r}, {2!r})".format(clsname, self.channel, self.date)
Archive.ELEMENT_CLASS = Server Server.ELEMENT_CLASS = Channel Channel.ELEMENT_CLASS = Log