Eliza ports

fukurou

the supreme coder
ADMIN
Python:
class LimUniqueResponder:
    def __init__(self, lim: int):
        self.responses: List[str] = []
        self.lim = lim
        self.urg = UniqueRandomGenerator(0)

    def get_a_response(self) -> str:
        if not self.responses:
            return ""
        return self.responses[self.urg.get_unique_random()]

    def responses_contains_str(self, item: str) -> bool:
        return item in self.responses

    def str_contains_response(self, item: str) -> bool:
        return any(response and response in item for response in self.responses)

    def add_response(self, s1: str) -> None:
        if len(self.responses) > self.lim - 1:
            self.responses.pop(0)
        if s1 not in self.responses:
            self.responses.append(s1)
            self.urg = UniqueRandomGenerator(len(self.responses))

    def add_responses(self, *replies: str) -> None:
        for value in replies:
            self.add_response(value)

    def get_savable_str(self) -> str:
        return "_".join(self.responses)

    def get_last_item(self) -> str:
        if not self.responses:
            return ""
        return self.responses[-1]
 

fukurou

the supreme coder
ADMIN
Python:
class EventChatV2:
    def __init__(self, lim: int):
        self.dic: dict[str, LimUniqueResponder] = {}
        self.modified_keys: set[str] = set()
        self.lim = lim

    def get_modified_keys(self) -> set[str]:
        return self.modified_keys

    def key_exists(self, key: str) -> bool:
        # if the key was active true is returned
        return key in self.modified_keys

    # Add items
    def add_items(self, ur: LimUniqueResponder, *args: str) -> None:
        for arg in args:
            self.dic[arg] = ur

    def add_from_db(self, key: str, value: str) -> None:
        if not value or value == "null":
            return
        values = value.split("_")  # assuming AXStringSplit splits on "_"
        if key not in self.dic:
            self.dic[key] = LimUniqueResponder(self.lim)
        for item in values:
            self.dic[key].add_response(item)

    # Add key-value pair
    def add_key_value(self, key: str, value: str) -> None:
        self.modified_keys.add(key)
        if key in self.dic:
            self.dic[key].add_response(value)
        else:
            self.dic[key] = LimUniqueResponder(self.lim)
            self.dic[key].add_response(value)

    def add_key_values(self, eliza_results: list) -> None:
        for pair in eliza_results:
            # Access the key and value of each AXKeyValuePair object
            self.add_key_value(pair.get_key(), pair.get_value())

    # Get response
    def response(self, in1: str) -> str:
        return self.dic[in1].get_a_response() if in1 in self.dic else ""

    def response_latest(self, in1: str) -> str:
        return self.dic[in1].get_last_item() if in1 in self.dic else ""

    def get_save_str(self, key: str) -> str:
        return self.dic[key].get_savable_str() if key in self.dic else ""
 

fukurou

the supreme coder
ADMIN
Python:
class ElizaDeducer:
    """
    This class populates a special chat dictionary
    based on the matches added via its add_phrase_matcher function.
    See subclass ElizaDeducerInitializer for example:
    ed = ElizaDeducerInitializer(2)  # 2 = limit of replies per input
    """
    def __init__(self, lim: int):
        self.babble2: list['PhraseMatcher'] = []
        self.pattern_index: dict[str, list['PhraseMatcher']] = {}
        self.response_cache: dict[str, list['AXKeyValuePair']] = {}
        self.ec2 = EventChatV2(lim)  # Chat dictionary, use getter for access. Hardcoded replies can also be added

    def get_ec2(self) -> 'EventChatV2':
        return self.ec2

    def learn(self, msg: str) -> None:
        # Populate EventChat dictionary
        # Check cache first
        if msg in self.response_cache:
            self.ec2.add_key_values(list(self.response_cache[msg]))

        # Search for matching patterns
        potential_matchers = self.get_potential_matchers(msg)
        for pm in potential_matchers:
            if pm.matches(msg):
                response = pm.respond(msg)
                self.response_cache[msg] = response
                self.ec2.add_key_values(response)

    def learned_bool(self, msg: str) -> bool:
        # Same as learn method but returns true if it learned new replies
        learned = False
        # Populate EventChat dictionary
        # Check cache first
        if msg in self.response_cache:
            self.ec2.add_key_values(list(self.response_cache[msg]))
            learned = True

        # Search for matching patterns
        potential_matchers = self.get_potential_matchers(msg)
        for pm in potential_matchers:
            if pm.matches(msg):
                response = pm.respond(msg)
                self.response_cache[msg] = response
                self.ec2.add_key_values(response)
                learned = True
        return learned

    def respond(self, str1: str) -> str:
        return self.ec2.response(str1)

    def respond_latest(self, str1: str) -> str:
        # Get most recent reply/data
        return self.ec2.response_latest(str1)

    def get_potential_matchers(self, msg: str) -> list['PhraseMatcher']:
        potential_matchers = []
        for key in self.pattern_index:
            if key in msg:
                potential_matchers.extend(self.pattern_index[key])
        return potential_matchers

    def add_phrase_matcher(self, pattern: str, *kv_pairs: str) -> None:
        kvs = [AXKeyValuePair(kv_pairs[i], kv_pairs[i + 1]) for i in range(0, len(kv_pairs), 2)]
        matcher = PhraseMatcher(pattern, kvs)
        self.babble2.append(matcher)
        self.index_pattern(pattern, matcher)

    def index_pattern(self, pattern: str, matcher: 'PhraseMatcher') -> None:
        for word in pattern.split():
            self.pattern_index.setdefault(word, []).append(matcher)

class PhraseMatcher:
    def __init__(self, matcher: str, responses: list['AXKeyValuePair']):
        self.matcher = re.compile(matcher)
        self.responses = responses

    def matches(self, str: str) -> bool:
        m = self.matcher.match(str)
        return m is not None

    def respond(self, str: str) -> list['AXKeyValuePair']:
        m = self.matcher.match(str)
        result = []
        if m:
            tmp = len(m.groups())
            for kv in self.responses:
                temp_kv = AXKeyValuePair(kv.key, kv.value)
                for i in range(tmp):
                    s = m.group(i + 1)
                    temp_kv.key = temp_kv.key.replace("{" + str(i) + "}", s).lower()
                    temp_kv.value = temp_kv.value.replace("{" + str(i) + "}", s).lower()
                result.append(temp_kv)
        return result
 

fukurou

the supreme coder
ADMIN
Python:
class ElizaDeducerInitializer(ElizaDeducer):
    def __init__(self, lim: int):
        # Recommended lim = 5; it's the limit of responses per key in the EventChat dictionary
        # The purpose of the lim is to make saving and loading data easier
        super().__init__(lim)
        self.initialize_babble2()

    def initialize_babble2(self) -> None:
        self.add_phrase_matcher(
            r"(.*) is (.*)",
            "what is {0}", "{0} is {1}",
            "explain {0}", "{0} is {1}"
        )

        self.add_phrase_matcher(
            r"if (.*) or (.*) than (.*)",
            "{0}", "{2}",
            "{1}", "{2}"
        )

        self.add_phrase_matcher(
            r"if (.*) and (.*) than (.*)",
            "{0}", "{1}"
        )

        self.add_phrase_matcher(
            r"(.*) because (.*)",
            "{1}", "i guess {0}"
        )
 

fukurou

the supreme coder
ADMIN
Python:
class PhraseMatcher:
    def __init__(self, matcher: str, responses: list['AXKeyValuePair']):
        self.matcher = re.compile(matcher)
        self.responses = responses

    def matches(self, str: str) -> bool:
        m = self.matcher.match(str)
        return m is not None

    def respond(self, str: str) -> list['AXKeyValuePair']:
        m = self.matcher.match(str)
        result = []
        if m:
            tmp = len(m.groups())
            for kv in self.responses:
                temp_kv = AXKeyValuePair(kv.key, kv.value)
                for i in range(tmp):
                    s = m.group(i + 1)
                    temp_kv.key = temp_kv.key.replace(f"{{{i}}}", s).lower()
                    temp_kv.value = temp_kv.value.replace(f"{{{i}}}", s).lower()
                result.append(temp_kv)
        return result
 
Top