Random Forest User

From Control Systems Technology Group
Jump to navigation Jump to search

from random import seed from random import randrange from csv import reader import re import csv from ast import literal_eval

  1. Import the random forest

with open('A forest.csv', 'r') as file:

   Forest = [{k: literal_eval(v) for k, v in row.items()}
   for row in csv.DictReader(file, skipinitialspace=True)]
   

def load_csv(filename): dataset = list() with open(filename, 'r') as file: csv_reader = reader(file) for row in csv_reader: if not row: continue dataset.append(row) return dataset

  1. Makes an array of the data. Each row is a point in time and
  2. each column is a channel, except for the last column, which contains
  3. the desired output.

def make_data(dataname, startname, labelname):

   data = load_csv(dataname)
   starttimes = load_csv(startname)
   labels = load_csv(labelname)
   numtrials = len(labels)
   regex = r"NaN\s+"
   
   #Convert the data, each row is one second and each column is one channel
   for i in range(0, len(data)):
       data[i] = [float(j) for j in data[i][0].split()]
       
   #Convert starttimes and labels. for labels, 0 indicates a test trial
   for i in range(0, numtrials):
       starttimes[i] = int(starttimes[i][0])
       if re.search(regex, labels[i][0]): labels[i] = 0 
       else: labels[i] = int(labels[i][0])
       
   #Add the labels to the data matrix
   for i in range(0,numtrials):
       if i == 0: begin, end = 0, starttimes[0]
       else: begin, end = starttimes[i-1], starttimes[i]
       for j in range(begin, end):
           if i == 0: data[j].append(0)
           else: data[j].append(labels[i])
   for j in range(starttimes[-1], len(data)):
       data[j].append(labels[-1])
   return data
  1. Delete the rows with an unknown desired output

def delete_test_trials(data):

   new_data = list()
   for row in data:
       if row[-1] != 0: new_data.append(row)
   return new_data
  1. Make a smaller set without replacement

def smaller_set(data, n_rows):

   data_copy = data
   new_data = list()
   while len(new_data) < n_rows:
       index = randrange(0, len(data_copy))
       new_data.append(data_copy[index])
       data_copy.remove(data_copy[index])
   return new_data
  1. The methods needed for predicting an outcome:
  1. Make a prediction with a list of bagged trees

def bagging_predict(trees, row): predictions = [predict(tree, row) for tree in trees] return max(set(predictions), key=predictions.count)

  1. Make a prediction with a decision tree

def predict(node, row):

   if row[node['index']] < node['value']:
       if isinstance(node['left'], dict):
           return predict(node['left'], row)
       else:
           return node['left']
   else:
       if isinstance(node['right'], dict):
           return predict(node['right'], row)
       else:
           return node['right']
       
  1. Testing to see if we can get an outcome

seed(2)

dataname = 'k3b_s.txt' startname = 'k3b_HDR_TRIG.txt' labelname = 'k3b_HDR_Classlabel.txt'

data = make_data(dataname, startname, labelname) train = delete_test_trials(data) faults = 0 misses = 0 first = True

for i in range(0,len(train)):

   prediction = bagging_predict(Forest,train[i])
   actual = train[i][-1]
   #print("Prediction: " + str(prediction) + ", Actual: " + str(actual))
   if round(prediction) != actual: faults = faults + 1
   if type(actual) is not int: 
       misses = misses + 1
       if first: print(i)
       first = False
   

print(faults/len(train)) print(misses/len(train))