Mercurial > repos > sanbi-uwc > assess_poliovirus_alignment
comparison assess_alignment.py @ 13:bf0bebcb6bb1 draft default tip
planemo upload for repository https://github.com/pvanheus/polio_report commit 753aad311378b064f2152c8e99e7c8097c7f4321-dirty
| author | sanbi-uwc |
|---|---|
| date | Fri, 11 Nov 2022 06:18:15 +0000 |
| parents | ee98dcd9aad4 |
| children |
comparison
equal
deleted
inserted
replaced
| 12:ee98dcd9aad4 | 13:bf0bebcb6bb1 |
|---|---|
| 112 # for mismatch store [pos_in_genome, pos_in_vp1, reference_base, sequenced_base] | 112 # for mismatch store [pos_in_genome, pos_in_vp1, reference_base, sequenced_base] |
| 113 mismatch_list.append( | 113 mismatch_list.append( |
| 114 [i + 1, i - offset + 1, reference["align"][i], mismatch_bases[i]] | 114 [i + 1, i - offset + 1, reference["align"][i], mismatch_bases[i]] |
| 115 ) | 115 ) |
| 116 if vp1only: | 116 if vp1only: |
| 117 # we have trim consensus bases before vp1 region | 117 # a note on gap trimming: if the consensus is smaller than VP1, we might |
| 118 if base_start <= offset: | 118 # get gaps on either side. these will of course be trimmed. |
| 119 cons_start = offset - base_start | 119 # we don't expect gaps inside the consensus but in the rare event that |
| 120 else: | 120 # these occur they are also discarded. |
| 121 cons_start = 0 | 121 consensus = data['gappedConsensus'][offset:offset+length].replace('-','') |
| 122 consensus = data['gapFreeConsensus'][cons_start:cons_start + length] | |
| 123 else: | 122 else: |
| 123 # TODO: sometimes there is "orphan" sequence from false calls far in front | |
| 124 # or behind the consensus region. these should be removed somehow | |
| 124 consensus = data['gapFreeConsensus'] | 125 consensus = data['gapFreeConsensus'] |
| 125 return [conflicts, matches, mismatches, mismatch_list, consensus] | 126 return [conflicts, matches, mismatches, mismatch_list, consensus] |
| 126 | 127 |
| 127 | 128 |
| 128 def analyse_trace_quality(json_file: TextIO) -> float: | 129 def analyse_trace_quality(json_file: TextIO) -> float: |
| 130 # TODO: consider clipping this to VP1 region or at least aligned region | |
| 129 data = load_json(json_file) | 131 data = load_json(json_file) |
| 130 | 132 |
| 131 traces = data["gappedTraces"] | 133 traces = data["gappedTraces"] |
| 132 overall_avg = 0 | 134 overall_avg = 0 |
| 133 for trace in traces: | 135 for trace in traces: |
| 137 avg_ratio = 0 | 139 avg_ratio = 0 |
| 138 for base in ("A", "C", "G", "T"): | 140 for base in ("A", "C", "G", "T"): |
| 139 calls = trace["peak" + base][start : end + 1] | 141 calls = trace["peak" + base][start : end + 1] |
| 140 min_call = min(calls) | 142 min_call = min(calls) |
| 141 max_call = max(calls) | 143 max_call = max(calls) |
| 144 if len(calls) == 0: | |
| 145 # no calls for this base to deal with, skip it | |
| 146 continue | |
| 142 avg_call = sum(calls) / len(calls) | 147 avg_call = sum(calls) / len(calls) |
| 143 ratio = max_call / avg_call | 148 if avg_call == 0: |
| 149 # zero average base quality! | |
| 150 ratio = 0 | |
| 151 else: | |
| 152 ratio = max_call / avg_call | |
| 144 call_quality["avg" + base] = avg_call | 153 call_quality["avg" + base] = avg_call |
| 145 call_quality["min" + base] = min_call | 154 call_quality["min" + base] = min_call |
| 146 call_quality["max" + base] = max_call | 155 call_quality["max" + base] = max_call |
| 147 call_quality["ratio" + base] = ratio | 156 call_quality["ratio" + base] = ratio |
| 148 avg_ratio += ratio | 157 avg_ratio += ratio |
| 149 avg_ratio = avg_ratio / 4 | 158 avg_ratio = avg_ratio / 4 |
| 150 overall_avg += avg_ratio | 159 overall_avg += avg_ratio |
| 151 overall_avg = overall_avg / len(traces) | 160 if len(traces) > 0: |
| 161 overall_avg = overall_avg / len(traces) | |
| 162 else: | |
| 163 overall_avg = 0 | |
| 152 return overall_avg | 164 return overall_avg |
| 153 | 165 |
| 154 | 166 |
| 155 def comma_split(args: str) -> list[str]: | 167 def comma_split(args: str) -> list[str]: |
| 156 return args.split(",") | 168 return args.split(",") |
| 196 min_mismatches = mismatches | 208 min_mismatches = mismatches |
| 197 best_match_mismatch_list = mismatch_list | 209 best_match_mismatch_list = mismatch_list |
| 198 best_match_quality = quality | 210 best_match_quality = quality |
| 199 best_match_reference = dataset_name | 211 best_match_reference = dataset_name |
| 200 best_consensus = consensus | 212 best_consensus = consensus |
| 201 percent_mismatches = round(min_mismatches / lengths[best_match_reference] * 100, 2) | 213 percent_mismatches = round(min_mismatches / lengths[best_match_reference] * 100, 2), |
| 214 vp1_coverage_perc = round(len(consensus) / lengths[best_match_reference] * 100) | |
| 202 | 215 |
| 203 info = { | 216 info = { |
| 204 "sample_name": args.sample_name, | 217 "sample_name": args.sample_name, |
| 205 "best_reference": best_match_reference, | 218 "best_reference": best_match_reference, |
| 206 "mismatches": min_mismatches, | 219 "mismatches": min_mismatches, |
| 207 "mismatch_list": best_match_mismatch_list, | 220 "mismatch_list": best_match_mismatch_list, |
| 208 "quality": best_match_quality, | 221 "quality": best_match_quality, |
| 209 "perc_mismatches": percent_mismatches, | 222 "perc_mismatches": percent_mismatches, |
| 210 "consensus": best_consensus | 223 "consensus": best_consensus, |
| 211 } | 224 "vp1_coverage_perc": vp1_coverage_perc |
| 225 } | |
| 212 json.dump(info, open(args.output_filename, "w")) | 226 json.dump(info, open(args.output_filename, "w")) |
| 213 | 227 |
| 214 open(args.consensus_output_filename, "w").write(f'>{args.sample_name}\n' + fill(best_consensus) + '\n') | 228 if len(best_consensus): |
| 229 consensus_record = f'>{args.sample_name}\n' + fill(best_consensus) + '\n' | |
| 230 else: | |
| 231 # better to write an empty FASTA file than one with just a header | |
| 232 consensus_record = '' | |
| 233 open(args.consensus_output_filename, "w").write(consensus_record) |
