""" Szurubooru API ~~~~~~~~~~~~~~ Szurubooru API is used for communicating with a Szurubooru instance through python. :copyright (c) 2023 Colin Wilk. :license: MIT, see LICENSE for more details. """ import functools import json import math from typing import List from urllib.parse import quote import requests as r from szuruboorupy.dataclasses import Tag from szuruboorupy.exceptions import SzurubooruException, TagNotFoundError class Szurubooru: """Client class for communicating with the Szurubooru instance. Raises: SzurubooruException: When the Szurubooru REST API returns any kinds of errors. See: SzurubooruException for mor information. """ url: str token: str verify: bool header: dict[str, str] # Session Stats requests_get: int = 0 requests_post: int = 0 requests_put: int = 0 requests_delete: int = 0 def __init__(self, url: str, token: str, verify: bool = True): self.url = url self.token = token self.verify = verify self.header = { "Accept": "application/json", "Authorization": f"Token {self.token}", } @staticmethod def __raise_szurubooru_exception(func): @functools.wraps(func) def wrapper(self, **kwargs): res = func(self, **kwargs) if SzurubooruException.response_is_exception(res): raise SzurubooruException.map_exception(res) return res return wrapper @staticmethod def __count_stats(get: int = 0, post: int = 0, put: int = 0, delete: int = 0): def function_wrapper(func): @functools.wraps(func) def wrapper(self, **kwargs): self.requests_get += get self.requests_post += post self.requests_put += put self.requests_delete += delete return func(self, **kwargs) return wrapper return function_wrapper @__count_stats(get=1) @__raise_szurubooru_exception def __get(self, path: str) -> r.Response: return r.get( f"{self.url}{path}", timeout=15, verify=self.verify, headers=self.header ) @__count_stats(post=1) @__raise_szurubooru_exception def __post(self, path: str, params: dict) -> r.Response: return r.post( f"{self.url}{path}", timeout=15, verify=self.verify, headers=self.header, data=json.dumps(params), ) @__count_stats(put=1) @__raise_szurubooru_exception def __put(self, path: str, params: dict) -> r.Response: return r.put( f"{self.url}{path}", timeout=15, verify=self.verify, headers=self.header, data=json.dumps(params), ) @__count_stats(delete=1) @__raise_szurubooru_exception def __delete(self, path: str, params: dict) -> r.Response: return r.delete( f"{self.url}{path}", timeout=15, verify=self.verify, headers=self.header, data=json.dumps(params), ) def __map_tag_resource_response(self, res: r.Response, name: str = "") -> Tag: """Map the requests response from the Szurubooru REST API to the Tag dataclass. This function is similar to __map_tag_resource and calls it internally. Instead of receiving a dictionary if the Szurubooru Tag it operates on the requests Response of the same form. To Illustrate, both code yield the same result: >>> __map_tag_resource_response(res, name) >>> __map_tag_resource(json.loads(res.content), name) Args: res (r.Response): The requests Response from an Szurubooru REST API endpoint that returns a Szurubooru tag resource in a form like: .. code-block:: JSON { "version": , "names": , "category": , "implications": , "suggestions": , "creationTime": , "lastEditTime": , "usages": , "description": } name (str, optional): If you provide a name here, the mapper will use that name for the Tag dataclass instead of fetching it from the alias names. This is useful if you have multiple aliases or names in Szurubooru. Returns: Tag: A Tag dataclass. The version parameter is included if the Szurubooru REST API response contained a version field, otherwise it defaults to -1. The name will be the first name returned by the Szurubooru REST API or the name that was passed to the function as an optional parameter. """ return self.__map_tag_resource(json.loads(res.content), name) def __map_tag_resource(self, c: dict, name: str = "") -> Tag: """Map the dict from the Szurubooru REST API to the Tag dataclass. This function is similar to __map_tag_resource_response and is called by it. Instead of receiving a requests Response it operates on the dictionary of the same form. To Illustrate, both code yield the same result: >>> __map_tag_resource_response(res, name) >>> __map_tag_resource(json.loads(res.content), name) Args: c (dict): The dict object coming from an Szurubooru REST API Szurubooru tag resource in a form like: .. code-block:: JSON { "version": , "names": , "category": , "implications": , "suggestions": , "creationTime": , "lastEditTime": , "usages": , "description": } name (str, optional): If you provide a name here, the mapper will use that name for the Tag dataclass instead of fetching it from the alias names. This is useful if you have multiple aliases or names in Szurubooru. Returns: Tag: A Tag dataclass. The version parameter is included if the Szurubooru REST API response contained a version field, otherwise it defaults to -1. The name will be the first name returned by the Szurubooru REST API or the name that was passed to the function as an optional parameter. """ return Tag( name=name if name != "" else c["names"][0], version=c.get("version", -1), category=c["category"], description=c["description"], implications=list(map(lambda a: a["names"][0], c["implications"])), suggestions=list(map(lambda a: a["names"][0], c["suggestions"])), ) def tag_exists(self, name: str) -> bool: """Check if the tag exists on Szurubooru. Args: name (str): Name of the tag. Returns: bool: True if tag already exists. False if there was no matching tag on Szurubooru. """ try: res = self.__get(path=f"/api/tag/{quote(name)}") except TagNotFoundError: return False except Exception as e: raise e return res.status_code == 200 def tag_len(self) -> int: """Check how many tags there are on the Szurubooru instance. Returns: int: The number of tags that exist on the Szurubooru instance """ return json.loads(self.__get(path="/api/tags").content)["total"] def tag_list(self, limit: int = 100, offset: int = 0, query: str = "") -> List[Tag]: """List tags on the Szurubooru instance. Args: limit (int, optional): Limits the amount of tags displayed on one page. Setting a smaller limit may help with query speed. Usually the maximum allowed limit is 100. Defaults to 100. offset (int, optional): Offset where to start the listing. Required for Paging the data when searching through many tags. If you have a page size of 100 you need to set an offset of 100 to see results on the 2. page. So your offset should be PAGE_NUM * LIMIT. Defaults to 0. query (str, optional): You can use a custom query to filter through tags better. If you use an empty string (the default) you will search through all tags on the Szurubooru instance. Defaults to ''. Returns: List[Tag]: List of Tag objects in the current page. """ path = f"/api/tags?offset={offset}&limit={limit}&query={quote(query)}" results = json.loads(self.__get(path=path).content)["results"] return list(map(self.__map_tag_resource, results)) def tag_list_all(self, query: str = "") -> List[Tag]: """List every available Tag (matching the query) on the Szurubooru instance. WARNING: This method calls the Szurubooru REST API multiple times depending on how many tags there are. It can take a while and can put significant load on the Szurubooru instance. Use with caution. In essence this method does the paging for you in tag_list and returns the full batch of Tags. Args: query (str, optional): You can use a custom query to filter through tags better. If you use an empty string (the default) you will search through all tags on the Szurubooru instance. Defaults to ''. Returns: List[Tag]: List of all Tag objects matching the query on the Szurubooru instance. """ tags = [] for i in range(0, math.ceil(self.tag_len() / 100)): tags.extend(self.tag_list(offset=100 * i, limit=100, query=query)) return tags def tag_get(self, name: str) -> Tag: """Get a tag form the Szurubooru instance by name. Args: name (str): Name of the tag. Raises: TagNotFoundError: Tag could not be found on the Szurubooru instance. Returns: Tag: Tag object of the requested tag. """ res = self.__get(path=f"/api/tag/{quote(name)}") return self.__map_tag_resource_response(res, name=name) def tag_create(self, tag: Tag) -> Tag: """Create a tag on the Szurubooru instance. Args: tag (Tag): Tag dataclass representing the Tag that should be created. Returns: Tag: Resulting Tag dataclass that was returned by the Szurubooru REST API representing the created tag as it was created on the Szurubooru instance. """ res = self.__post( path="/api/tags", params={ "names": [tag.name], "category": tag.category, "description": tag.description, "implications": tag.implications, "suggestions": tag.suggestions, }, ) return self.__map_tag_resource_response(res, name=tag.name) def tag_update(self, tag: Tag) -> Tag: """Update a tag on the Szurubooru instance. Args: tag (Tag): Tag dataclass representing the Tag that should be updated. The name is the identifying parameter for the Szurubooru instance. Returns: Tag: Resulting Tag dataclass that was returned by the Szurubooru REST API representing the updated version of the tag as it is now on the Szurubooru instance. """ res = self.__put( path=f"/api/tag/{quote(tag.name)}", params={ "version": tag.version, "category": tag.category, "description": tag.description, "implications": tag.implications, "suggestions": tag.suggestions, }, ) return self.__map_tag_resource_response(res, name=tag.name) def tag_delete(self, tag: Tag) -> None: """Delete the given tag on the Szurubooru instance. Args: tag (Tag): Tag dataclass of the tag that should be reledte. Needs to contain the current version number. """ self.__delete( path=f"/api/tag/{quote(tag.name)}", params={"version": tag.version} ) return def tag_merge(self, tag_to_merge: Tag, tag_to_merge_into: Tag) -> Tag: """Remove tag_to_merge and merges all uses into tag_to_merge_into. Removes source tag and merges all of its usages, suggestions and implications to the target tag. Other tag properties such as category and aliases do not get transferred and are discarded. Args: tag_to_merge (Tag): Tag dataclass representing the Tag that will be removed and merged into the second Tag. The version parameter is required. tag_to_merge_into (Tag): Tag dataclass representing the Tag that the first tag will be merged into. The version parameter is required. Returns: Tag: Resulting Tag dataclass that was returned by the Szurubooru REST API representing the new (merged) tag on the Szurubooru instance. """ res = self.__post( path="/api/tag-merge", params={ "removeVersion": tag_to_merge.version, "remove": tag_to_merge.name, "mergeToVersion": tag_to_merge_into.version, "mergeTo": tag_to_merge_into.name, }, ) return self.__map_tag_resource_response(res, name=tag_to_merge_into.name) def tag_list_siblings(self, tag: Tag) -> List[dict[str, int]]: """List siblings of the given Tag Lists siblings of given tag, e.g. tags that were used in the same posts as the given tag. `occurrences` field signifies how many times a given sibling appears with given tag. Results are sorted by occurrences count and the list is truncated to the first 50 elements. Doesn't use paging. Args: tag (Tag): Tag that will queried. Only uses the tag.name parameter. Returns: List[dict[str, int]]: Returns a List of dictionaries with the primary tag name as the key and the number of occurrences as the value. """ res = self.__get(path=f"/api/tag-siblings/{tag.name}") results = json.loads(res.content)["results"] return list(map(lambda a: {a["tag"]["names"][0]: a["occurrences"]}, results))