#! /usr/bin/python3 # Copyright (C) 2024 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. import argparse import datetime import email.utils import re import sys LOGRE = re.compile('^(?P.*?) \(\d+\) (?P[^:]*): ' '(?P(non)?)(member )?(?P[^ ]*) ' '(?P[^,;]*)') DATEFMT = '%b %d %H:%M:%S %Y' DATEOUTFMT = '%b %d %Y' ONEDAY = datetime.timedelta(1) MIN = datetime.datetime(datetime.MINYEAR, 1, 1) def usage(code, msg=''): if msg: print(msg) sys.exit(code) def parseargs(): parser = argparse.ArgumentParser( description="""Analyze a Mailman subscribe log. Report by listname and by email address, the most recent join and/or leave event within the time frame defined by the BEGIN and END dates or the entire log. The log is a single file but may be a concatenation of several subscribe log files. """) parser.add_argument('-f', '--file', type=str, default='-', help="""\ The path to the subscribe log. Defaults to stdin.""") parser.add_argument('-b', '--begin', type=str, default='', help="""\ If specified this is the beginning date in the form yyyy-mm-dd. Log entries before this date are ignored.""") parser.add_argument('-e', '--end', type=str, default='', help="""\ If specified this is the ending date in the form yyyy-mm-dd. Log entries after this date are ignored.""") parser.add_argument('-l', '--list', type=str, action='append', help="""\ Specify a fqdn listname to process. May be repeated for multiple lists. The default is to do all lists in the log.""") parser.add_argument('-v', '--verbose', action='store_true', help="""\ Include the date of the (un)subscribe in the output.""") return parser.parse_args() def event_up(event, timestamp, action): if event == []: event = [MIN, MIN, MIN, MIN] if action == 'unsubscribed' and timestamp > event[0]: event[0] = timestamp if action == 'subscribed' and timestamp > event[1]: event[1] = timestamp if action == 'nmsubscribed' and timestamp > event[2]: event[2] = timestamp if action == 'nmunsubscribed' and timestamp > event[3]: event[3] = timestamp return event def report(events, verbose): if len(events) == 0: print('None found') return for listname in sorted(events.keys()): n_nmjoined = n_joined = n_nmleft = n_left = 0 print('List {}:'.format(listname)) for user, val in sorted(events[listname].items()): left, joined, nmjoined, nmleft = val if left > MIN: n_left += 1 if joined > MIN: n_joined += 1 if nmjoined > MIN: n_nmjoined += 1 if nmleft > MIN: n_nmleft += 1 if left > MIN and joined > MIN: if joined > left: if verbose: outline = '{}: member left {}, joined {}'.format( user, datetime.datetime.strftime(left, DATEOUTFMT), datetime.datetime.strftime(joined, DATEOUTFMT)) else: outline = '{}: member left and rejoined'.format(user) else: if verbose: outline = '{}: member joined {}, left {}'.format( user, datetime.datetime.strftime(joined, DATEOUTFMT), datetime.datetime.strftime(left, DATEOUTFMT)) else: outline = '{}: member joined and left'.format(user) elif joined > MIN: if verbose: outline = '{}: member joined {}'.format( user, datetime.datetime.strftime(joined, DATEOUTFMT)) else: outline = '{}: member joined'.format(user) elif left > MIN: if verbose: outline = '{}: member left {}'.format( user, datetime.datetime.strftime(left, DATEOUTFMT)) else: outline = '{}: member left'.format(user) if nmleft > MIN and nmjoined > MIN: if nmjoined > nmleft: if verbose: outline = '{}: nonmember left {}, joined {}'.format( user, datetime.datetime.strftime(nmleft, DATEOUTFMT), datetime.datetime.strftime(nmjoined, DATEOUTFMT)) else: outline = '{}: nonmember left and rejoined'.format(user) else: if verbose: outline = '{}: nommember joined {}, left {}'.format( user, datetime.datetime.strftime(nmjoined, DATEOUTFMT), datetime.datetime.strftime(nmleft, DATEOUTFMT)) else: outline = '{}: nonmember joined and left'.format(user) elif nmjoined > MIN: if verbose: outline = '{}: nonmember joined {}'.format( user, datetime.datetime.strftime(nmjoined, DATEOUTFMT)) else: outline = '{}: nonmember joined'.format(user) elif nmleft > MIN: if verbose: outline = '{}: nonmember left {}'.format( user, datetime.datetime.strftime(nmleft, DATEOUTFMT)) else: outline = '{}: nonmember left'.format(user) print(' ' + outline) print(f"""\ For this list, nonmember joined = {n_nmjoined} member joined = {n_joined} nonmember left = {n_nmleft} member left = {n_left} """) def main(): ns = parseargs() if len(ns.begin) == 0: begin = datetime.datetime(datetime.MINYEAR, 1, 1) else: try: begin = datetime.datetime.strptime(ns.begin, '%Y-%m-%d') except ValueError as e: usage(1, e) if len(ns.end) == 0: end = datetime.datetime(datetime.MAXYEAR, 12, 31) else: try: end = datetime.datetime.strptime(ns.end, '%Y-%m-%d') + ONEDAY except ValueError as e: usage(1, e) if begin >= end: usage(1, "Start date can't be greater than end date.") events = {} if ns.file == '-': in_file = sys.stdin else: in_file = open(ns.file) for line in in_file: mo = LOGRE.search(line) if mo: action = mo.group('action') if mo.group('non'): action = 'nm' + action if action not in ('nmsubscribed', 'subscribed', 'unsubscribed', 'nmunsubscribed',): continue timestamp = datetime.datetime.strptime(mo.group('date'), DATEFMT) if timestamp < begin or timestamp > end: continue listname = mo.group('listname') if ns.list and listname not in [x.lower() for x in ns.list]: continue user = email.utils.parseaddr(mo.group('user')) event = events.setdefault(listname, {}).setdefault(user[1], []) events[listname][user[1]] = event_up(event, timestamp, action) report(events, ns.verbose) if __name__ == '__main__': main()