Input is a CSV/spreadsheet file with the number of winners to be selected in the first cell. On subsequent lines, the first column contains the names of the candidates (no blanks), with each additional column representing a ballot. Details are given here. Little error checking is done, so be sure to review the trace output. The source code is available here.
$Id: vote.py 75 2009-01-08 23:29:13Z jtkorb $
# Compute Hare Ballot
#
# Tim Korb (jtk@cs.purdue.edu), May 2007
#
# A few definitions and notes...
#
# tally: dictionary in which the keys are candidate names and the
# values are lists of ballots currently assigned to that candidate
#
# ballot: list of candidates in the order determined by the voter
#
# winners: list of candidates that have reached the quota of ballots
# or have remained in the running long enough to be declared a winner
#
# losers: list of candidates that have been eliminated from the running
#
# Note that plurals are generally used to indicate lists of other
# items, e.g., ballots is a list of ballot items.
#
# To download the complete source distribution, use Subversion:
#
# % svn co http://www.bikmort.com/hare
#
# $Id: tally.py 75 2009-01-08 23:29:13Z jtkorb $
import sys
import math
import copy
import random
fdOut = None
traceLevel = 0 # generate trace output if trace >= trace parameter
fParallel = 0 # track parallel universes instead of randomization if possible
class Universe:
id = 0 # id of last created Universe
pendingUniverses = []
results = {}
dropped = 0
def __reset(self):
Universe.id = 0
Universe.pendingUniverses = []
Universe.results = {}
Universe.dropped = 0
def __init__(self, p, nWinners, nBallots, quota):
if p == 1.0:
self.__reset()
Universe.id += 1
self.id = Universe.id
self.p = p
self.nWinners = nWinners
self.nBallots = nBallots
self.quota = quota
self.winners = []
self.losers = []
self.tally = {}
self.pendingItem = None
self.pendingType = None
self.pendingData = None
def forkPendingWinners(self, lWin):
return self.__forkPendingList(lWin, "winner")
def forkPendingLosers(self, lMin):
return self.__forkPendingList(lMin, "loser")
def forkPendingRedist(self, lRedist, winner):
return self.__forkPendingList(lRedist, "redist", winner)
def __forkPendingList(self, lItems, type, data=None):
global fParallel
if len(lItems) == 1:
return lItems[0]
if not fParallel:
trace(1, "\tmaking random choice among %d alternatives" %
len(lItems))
return random.choice(lItems)
# Split into parallel universes unless too many with small probability
#
if Universe.id > 10000 and self.p/len(lItems) < 1.0/10000:
trace(1, "\tpassing on %d universes (p = %f)" %
(len(lItems)-1, self.p/len(lItems)))
trace(1, "\tmaking random choice among %d alternatives" %
len(lItems))
Universe.dropped += len(lItems)-1
return random.choice(lItems)
self.p /= len(lItems)
trace(1, "\tcreating %d universes (p = %f)" % (len(lItems)-1, self.p))
for item in lItems[1:]:
u = Universe(self.p, self.nWinners, self.nBallots, self.quota)
u.winners = self.winners[:]
u.losers = self.losers[:]
u.tally = copy.deepcopy(self.tally)
u.pendingItem = item
u.pendingType = type
u.pendingData = data
Universe.pendingUniverses.append(u)
return lItems[0]
def combinations(l, n):
if n == 0:
return [[]]
else:
lResult = []
for i in range(len(l)):
for lsub in combinations(l[i+1:], n-1):
lResult.append([l[i]] + lsub)
return lResult
def trace(t, s):
global traceLevel, fdOut
if traceLevel >= t: print >>fdOut, s
return
def findNextWinner(u):
cWin = u.quota # number of votes for highest winner
lWin = [] # list of candidates with highest votes
for c in u.tally.keys():
if c not in u.losers and c not in u.winners and len(u.tally[c]) >= cWin:
if len(u.tally[c]) == cWin:
lWin.append(c)
else:
lWin = [c]
cWin = len(u.tally[c])
if len(lWin) > 0:
return u.forkPendingWinners(lWin)
# Check to see if only enough candidates remain
# TODO: sort by len(tally[c]) to choose larger winners first
# create multiple universes if some have equal votes
n = 0
last = ""
for c in u.tally.keys():
if c not in u.winners and c not in u.losers:
last = c
n = n + 1
if u.nWinners - len(u.winners) >= n:
trace(1, "\tremaining winner(s) have fewer than quota (%d) ballots" %
u.quota)
return last
return
def redistributeWinner(winner, u):
excess = len(u.tally[winner]) - u.quota
if excess <= 0:
trace(1, "\tno excess ballots to redistribute")
else:
lEffective = gatherEffectiveBallots(winner, u)
nRedist = min(excess, len(lEffective))
trace(1, "\tredistributing %d effective ballots, %d at a time" %
(len(lEffective), nRedist))
for ballot in u.forkPendingRedist(combinations(lEffective, nRedist), winner):
u.tally[winner].remove(ballot)
redistributeBallot(ballot, u)
traceTally(1, u)
def gatherEffectiveBallots(winner, u):
lEffective = []
for ballot in u.tally[winner]:
for candidateTo in ballot:
if candidateTo not in u.winners and candidateTo not in u.losers:
lEffective.append(ballot)
break
return lEffective
def redistributeBallot(ballot, u):
for candidateTo in ballot:
if candidateTo not in u.winners and candidateTo not in u.losers:
trace(1, "\tto %s: %s" % (candidateTo, ballot))
if not u.tally.has_key(candidateTo): u.tally[candidateTo] = []
u.tally[candidateTo].append(ballot)
ballot = ""
break
if ballot:
trace(1, "\tineffective ballot dropped: %s" % ballot)
def findNextLoser(u):
cMin = sys.maxint # least number of votes for candidate loser
lMin = [] # list of candidates with least votes
for c in u.tally.keys():
if c not in u.losers and c not in u.winners and len(u.tally[c]) <= cMin:
if len(u.tally[c]) == cMin:
lMin.append(c)
else:
lMin = [c]
cMin = len(u.tally[c])
if len(lMin) == 0:
return None
else:
return u.forkPendingLosers(lMin)
def redistributeLoser(loser, u):
excess = len(u.tally[loser])
if excess <= 0:
trace(1, "\tno ballots to redistribute")
else:
trace(1, "\tredistributing %d ballot(s) from %s" % (excess, loser))
while len(u.tally[loser]) > 0:
ballot = u.tally[loser][0]
u.tally[loser] = u.tally[loser][1:]
redistributeBallot(ballot, u)
traceTally(1, u)
return
def traceTally(t, u):
global traceLevel
if traceLevel < t: return
trace(t, "\nCURRENT ASSIGNMENT OF BALLOTS (%d needed to win)" % u.quota)
for candidate in u.tally.keys():
trace(t, "\t%s:" % candidate)
for ballot in u.tally[candidate]:
trace(t, "\t\t%s" % ballot)
return
# The basic Single Transferable Vote algorithm with Hare quota
#
# while winners < nWinners:
# if a candidate has more than quota votes:
# redistribute excess votes to next priority candidate
# else:
# eliminate lowest ranking candidate
# redistribute wasted votes to next priority candidate
#
def dotally(nWinners, ballots):
global fParallel
nBallots=len(ballots)
quota=int(math.ceil((nBallots + 1.0)/(nWinners + 1)))
u = Universe(1.0, nWinners, nBallots, quota)
trace(0, "INPUT SUMMARY")
trace(0, "\t%d ballots" % u.nBallots)
trace(0, "\tChoosing %s winners" % u.nWinners)
trace(0, "\tNeed ceil((%d + 1)/(%d + 1)) = %d ballots to win" %
(u.nBallots, u.nWinners, u.quota))
# Create initial tally and statistics
#
for ballot in ballots:
candidate = ballot[0]
if not u.tally.has_key(candidate):
u.tally[candidate] = []
u.tally[candidate].append(ballot)
total1st = {}
total2nd = {}
totalMrk = {}
for ballot in ballots:
for c in ballot:
total1st[c] = 0
total2nd[c] = 0
totalMrk[c] = 0
for ballot in ballots:
total1st[ballot[0]] += 1
if len(ballot) > 1:
total2nd[ballot[1]] += 1
for c in ballot:
totalMrk[c] += 1
trace(0, "\n\tBALLOT MARKS\t1ST\t2ND\tNONE")
candidates = totalMrk.keys()
candidates.sort()
for c in candidates:
trace(0, "\t%-16s%3d\t%3d\t%4d" %
(c, total1st[c], total2nd[c], len(ballots)-totalMrk[c]))
traceTally(0, u)
Universe.pendingUniverses.append(u)
while len(Universe.pendingUniverses) > 0:
u = Universe.pendingUniverses.pop()
trace(1, "POPPED UNIVERSE %d (p = %f, %d pending):" %
(u.id, u.p, len(Universe.pendingUniverses)))
if u.pendingType == "redist":
trace(1, "\tredistributing %d ballots" % len(u.pendingItem))
winner = u.pendingData
assert(winner in u.winners)
for ballot in u.pendingItem:
u.tally[winner].remove(ballot)
redistributeBallot(ballot, u)
elif u.pendingType == "winner":
trace(1, "\tfinishing pending winner %s" % u.pendingItem)
winner = u.pendingItem
assert(winner not in u.winners)
u.winners.append(winner)
trace(1, "\nSELECTION #%d: %s" % (len(u.winners), winner))
redistributeWinner(winner, u)
elif u.pendingType == "loser":
trace(1, "\tfinishing pending loser %s" % u.pendingItem)
loser = u.pendingItem
u.losers.append(loser)
trace(1, "\nELIMINATED: %s" % loser)
redistributeLoser(loser, u)
u.pendingType = None
while len(u.winners) < u.nWinners:
winner = findNextWinner(u)
if winner:
u.winners.append(winner)
trace(1, "\nSELECTION #%d: %s" % (len(u.winners), winner))
redistributeWinner(winner, u)
else:
loser = findNextLoser(u)
if loser:
u.losers.append(loser)
trace(1, "\nELIMINATED: %s" % loser)
redistributeLoser(loser, u)
else:
trace(1, "Not enough chosen candidates to fill all positions")
break
winners = str(u.winners)
if fParallel:
trace(1, "RESULTS FOR UNIVERSE %d (p = %f): %s" % (u.id, u.p, winners))
if winners not in Universe.results: Universe.results[winners] = 0.0
Universe.results[winners] += u.p
if fParallel:
trace(0, "\nTracked %d alternative decisions (skipped %d with low probability)" %
(Universe.id, Universe.dropped))
trace(0, "\nFINAL RESULTS")
lWinners = []
for winner in Universe.results.keys():
if fParallel:
lWinners.append("%f: %s" % (Universe.results[winner], winner))
else:
lWinners.append("%s" % winner)
lWinners.sort()
lWinners.reverse()
for winner in lWinners:
trace(0, winner)
return lWinners