view ttest/stats.py @ 2:f1929875b1b3

Bug fix: added support for the condition that the clinical vector (from which the two categorical values are drawn) might be empty for some rows
author melissacline
date Wed, 06 Aug 2014 12:34:04 -0700
parents 12bb38e187b9
children 6ca836b7e0b4
line wrap: on
line source

#!/usr/bin/env python



import argparse
import math
import re
import sys
try:
    import numpy
except:
    from galaxy import eggs
    eggs.require("numpy")
    import numpy
try:
    import scipy
except:
    from galaxy import eggs
    eggs.require( "scipy" )
    import scipy
    import scipy.stats

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("genomicData", type=str)
    parser.add_argument("clinicalVector", type=str)
    parser.add_argument("outfile", type=str)
    parser.add_argument('-a', type=str, default=None)
    parser.add_argument('-b', type=str, default=None)
    args = parser.parse_args()
    clinicalVector = readClinicalFile(args.clinicalVector)
    genomicFp = open(args.genomicData)
    headerTokens = genomicFp.readline().rstrip().split("\t")
    (columnsGroupA, columnsGroupB) = headerLineToGroups(headerTokens, 
                                                        clinicalVector,
                                                        args.a, args.b)
    print columnsGroupA, columnsGroupB
    outfile = open(args.outfile, "w")
    outfile.write("%s\tStatistic\tpValue\tMedian1\tMedian2\tDelta\n" % (headerTokens[0]))
    for row in genomicFp:
        tokens = row.rstrip().split("\t")
        symbol = tokens[0]
        groupATokens = [tokens[ii] for ii in columnsGroupA]
        groupBTokens = [tokens[ii] for ii in columnsGroupB]
        groupAValues = [float(xx) for xx in groupATokens if xx != "NA"]
        groupBValues = [float(xx) for xx in groupBTokens if xx != "NA"]
        (ttest, pvalue) = performTTest(groupAValues, groupBValues)
        if ttest == None or pvalue == None:
            outputLine = "%s\tNA\tNA" % (symbol)
        else:
            outputLine= "%s\t%f\t%f" % (symbol, ttest, pvalue)
        if len(groupAValues) == 0:
            outputLine = outputLine + "\tNA"
        else:
            medianA = numpy.median(groupAValues)
            outputLine = "%s\t%f" % (outputLine, medianA)
        if len(groupBValues) == 0:
            outputLine = outputLine + "\tNA"
        else:
            medianB = numpy.median(groupBValues)
            outputLine = "%s\t%f" % (outputLine, medianB)
        if len(groupAValues) == 0 or len(groupBValues) == 0:
            outputLine = outputLine + "\tNA"
        else:
            outputLine = "%s\t%f" % (outputLine, medianA - medianB)
        outfile.write("%s\n" % (outputLine))
    outfile.close()

def readClinicalFile(clinicalFile):
    clinicalVector = dict()
    fp = open(clinicalFile)
    fp.readline()
    for row in fp:
        tokens = row.rstrip().split("\t")
        if len(tokens) > 1:
            clinicalVector[tokens[0]] = tokens[1]
        else:
            clinicalVector[tokens[0]] = None
    fp.close()
    return(clinicalVector)

def headerLineToGroups(headerTokens, clinicalVector, optionA, optionB):
    columnsGroupA = list()
    columnsGroupB = list()
    for ii in range(len(headerTokens)):
        id = headerTokens[ii]
        if clinicalVector.has_key(id):
            if clinicalVector[id] == optionA:
                columnsGroupA.append(ii)
            elif clinicalVector[id] == optionB:
                columnsGroupB.append(ii)
    return((columnsGroupA, columnsGroupB))

def performTTest(groupA, groupB):
    from scipy import special
    
    epsilon = 1E-06
    countA = len(groupA)
    countB = len(groupB)
    meanA = numpy.mean(groupA)
    meanB = numpy.mean(groupB)
    varA = numpy.var(groupA)
    varB = numpy.var(groupB)
    degreesOfFreedom = countA + countB - 2
    sVar = ((countA - 1) * varA + (countB - 1) * varB) / degreesOfFreedom
    if sVar == 0:
        tStat = None
        prob = None
    else:
        tStat = (meanA - meanB)/math.sqrt(sVar * (1.0/(countA + epsilon)     
                                                  + 1.0/(countB + epsilon)))  
        bottom = degreesOfFreedom + tStat * tStat
        prob = special.betainc(0.5*degreesOfFreedom, 0.5, 
                               degreesOfFreedom/bottom)
    return(tStat, prob)

    


if __name__ == "__main__":
    main()