Source code for cf_rules.cf

import os

import requests

from .error import Error
from .utils import Utils

class DomainObject(dict):
    def __init__(self, *args, **kwargs):
        super(DomainObject, self).__init__(*args, **kwargs)
        self.__dict__ = self

class RuleObject(dict):
    def __init__(self, *args, **kwargs):
        super(RuleObject, self).__init__(*args, **kwargs)
        self.__dict__ = self

class RulesetObject(dict):
    def __init__(self, *args, **kwargs):
        super(RulesetObject, self).__init__(*args, **kwargs)
        self.__dict__ = self



[docs] class Cloudflare:
[docs] def __init__(self, folder: str | None = None): """Initialize Cloudflare class Specify a folder argument where expressions will be saved >>> cf = Cloudflare("my_expressions") """ self.utils = Utils(folder) self.error = Error() self.plan = "free" self.max_rules = 5 self.active_rules = 0
[docs] def auth_key(self, email: str, key: str) -> dict: """Get your global API Key through cloudflare profile (API Keys section) :exception Error: Email or key is not provided .. warning:: This will grant all domains access to this API library, prefer using :func:`auth_token` https://dash.cloudflare.com/profile/api-tokens * email -> Email account * key -> Global API Key >>> cf.auth_key("cloudflare@example.com", "your-global-api-key") >>> {"success": True, "result": {"id": "a1b2c3", "email": "cloudflare@example.com", ...}} # OR >>> {"success": False, "errors": [{"code": 6003, "message": "Invalid request headers", ...}]} """ if not email: raise Error("You must provide an email") if not key: raise Error("You must provide an API key") self._headers = { "X-Auth-Email": email, "X-Auth-Key": key, "Content-Type": "application/json" } r = requests.get("https://api.cloudflare.com/client/v4/user", headers=self._headers, timeout=5) return r.json()
[docs] def auth_token(self, bearer_token: str) -> dict: """Generate a specific token through cloudflare profile (API Tokens section) :exception Error: Bearer token is not provided .. note:: This will grant only specific domains/permissions related access to this API library https://dash.cloudflare.com/profile/api-tokens * bearer_token -> API Token >>> cf.auth_token("your-specific-bearer-token") >>> {"success": True, "result": {"id": "a1b2c3", "status": "active"}, ...} # OR >>> {"success": False, "errors": [{"code": 1000, "message": "Invalid API token"}], ...} """ if not bearer_token: raise Error("You must provide a bearer token") self._headers = { "Authorization": "Bearer " + bearer_token, "Content-Type": "application/json" } r = requests.get("https://api.cloudflare.com/client/v4/user/tokens/verify", headers=self._headers, timeout=5) return r.json()
[docs] def get_domains(self: str) -> dict: """Get all domains :exception Error: If not authenticated (use :func:`auth_key(email, key) <auth_key>` or :func:`auth_token(bearer_token) <auth_token>`) >>> cf.get_domains() >>> {"count": 2, "domains": ["example.com", "example.fr"], "result": [{"id": "a1b2c3", "name": "example.com", ...}, ...]} """ if not hasattr(self, "_headers"): raise Error("You must authenticate first, use cf.auth_key(email, key) or cf.auth_token(bearer_token)") r = requests.get("https://api.cloudflare.com/client/v4/zones", headers=self._headers, timeout=5) zones = self.error.handle(r.json(), ["result"]) if not zones: raise Error("No domain found") return { "count": len(zones), "domains": [x["name"] for x in zones], "result": zones, }
@property def domains(self) -> list[DomainObject]: """Get all domains as a list of :class:`DomainObject` Access any value of the object with the dot operator Better handling compared to :func:`get_domains`, return directly the result key of the function >>> cf.domains >>> [{"id": "a1b2c3", "name": "example.com", ...}, {"id": "d4e5f6", "name": "example.fr", ...}] """ return [DomainObject(x) for x in self.get_domains()["result"]]
[docs] def get_domain(self, domain_name: str) -> DomainObject: """Get a specific domain as :class:`DomainObject` :exception Error: If not authenticated (use :func:`auth_key(email, key) <auth_key>` or :func:`auth_token(bearer_token) <auth_token>`) :exception Error: If domain is not found (list all domains using cf.domains) .. important:: This function is the "core" for all other functions, it is needed for every other function to work >>> cf.get_domain("example.com") >>> {"id": "a1b2c3", "name": "example.com", ...} >>> domain = cf.get_domain("example.fr") >>> domain.name # OR domain["name"] >>> "example.fr" """ if not hasattr(self, "_headers"): raise Error("You must authenticate first, use cf.auth_key(email, key) or cf.auth_token(bearer_token)") r = requests.get(f"https://api.cloudflare.com/client/v4/zones?name={domain_name}", headers=self._headers, timeout=5) domain = self.error.handle(r.json(), ["result"]) if not domain: raise Error(f"Domain '{domain_name}' not found") domain = domain[0] if "error" in domain: raise Error(domain["error"]) return DomainObject(domain)
[docs] def set_plan(self, domain_name: str): """Save current website plan .. note:: Will define the current plan of the website in the instance of the class >>> cf.set_plan("example.com") # Now the maximum available rules for this domain depends on the current plan """ self.plan = self.get_domain(domain_name)["plan"]["legacy_id"] match self.plan: case "free": self.max_rules = 5 case "pro": self.max_rules = 20 case "business": self.max_rules = 100 case "enterprise": self.max_rules = 1000
[docs] def get_rulesets(self, domain_name: str) -> dict: """Get all rulesets from a specific domain >>> cf.get_rulesets("example.com") >>> {"count": 4, "rulesets": ["default", "Cloudflare Normalization Ruleset", ...], "result": [{"id": "a1b2c3", "name": "default", ...}]} """ zone = self.get_domain(domain_name) zone_id = zone["id"] r = requests.get(f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets", headers=self._headers, timeout=5) rulesets = self.error.handle(r.json(), ["result"]) return { "zone_id": zone_id, "count": len(rulesets), "rulesets": [x["name"] for x in rulesets], "result": rulesets, }
[docs] def rulesets(self, domain_name: str) -> list[RulesetObject]: """Get all rulesets as a list of :class:`RulesetObject` Access any value of the object with the dot operator Better handling compared to :func:`get_rulesets()`, return directly the result key of the function >>> cf.rulesets >>> [{"id": "a1b2c3", "name": "default", ...}, {"id": "d4e5f6", "name": "Cloudflare Normalization Ruleset", ...}] """ return [RulesetObject(x) for x in self.get_rulesets(domain_name)["result"]]
[docs] def get_custom_ruleset(self, domain_name: str) -> RulesetObject: """Get the custom ruleset from a specific domain as :class:`RulesetObject` It should be the only ruleset with the source "firewall_custom" as per Cloudflare's documentation. This is the ruleset where all custom rules are stored. :exception Error: If no custom ruleset is found >>> cf.get_custom_ruleset("example.com") >>> {"id": "a1b2c3", "name": "default", "source": "firewall_custom", ...} """ rulesets = self.get_rulesets(domain_name) custom_ruleset = [x for x in rulesets["result"] if x.get("source") == "firewall_custom"] if not custom_ruleset: raise Error("No custom ruleset found") custom_ruleset = custom_ruleset[0] if "error" in custom_ruleset: raise Error(custom_ruleset["error"]) custom_ruleset["zone_id"] = rulesets["zone_id"] return RulesetObject(custom_ruleset)
[docs] def get_rules(self, domain_name: str) -> dict: """Get all rules from a specific domain :exception Error: If no rules are found >>> cf.get_rules("example.com") >>> {"count": 3, "rules": ["Bad Bots", "Bad IP", "Bad AS"], "result": [{"id": "a1b2c3", "description": "Bad Bots", ...}, ...]} """ ruleset = self.get_custom_ruleset(domain_name) zone_id = ruleset["zone_id"] custom_ruleset_id = ruleset["id"] r = requests.get(f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{custom_ruleset_id}", headers=self._headers, timeout=5) rules = self.error.handle(r.json(), ["result", "rules"]) if not rules: raise Error("No rules found") # No rules found (handle silently fails), return empty list if isinstance(rules, dict): rules = [] self.active_rules = len(rules) return { "zone_id": zone_id, "custom_ruleset_id": custom_ruleset_id, "count": len(rules), "rules": [x["description"] for x in rules], "result": rules, }
[docs] def rules(self, domain_name: str) -> list[RuleObject]: """Get all rules as a list of :class:`RuleObject` Access any value of the object with the dot operator Better handling compared to :func:`get_rules()`, return directly the result key of the function >>> cf.rules >>> [{"id": "a1b2c3", "description": "Bad Bots", ...}, {"id": "d4e5f6", "description": "Bad IP", ...}, ...] """ return [RuleObject(x) for x in self.get_rules(domain_name)["result"]]
[docs] def get_rule(self, domain_name: str, *, rule_name: str | None = None, rule_id: str | None = None) -> RuleObject: """Get a specific rule by name or ID from a specific domain as :class:`RuleObject` :exception Error: Rule name or rule ID is not provided :exception Error: If no rule is found with the specified name >>> cf.get_rule("example.com", rule_name="Bad Bots") >>> {"id": "a1b2c3", "enabled": True, "action": "block", "description": "Bad Bots", "expression": "(http.user_agent contains "DotBot")", ...} """ if rule_id: rules = self.get_rules(domain_name) rule = [x for x in rules["result"] if x["id"] == rule_id] elif rule_name: rules = self.get_rules(domain_name) rule = [x for x in rules["result"] if x["description"] == rule_name] else: raise Error("You must provide a rule_name or rule_id") if not rule: raise Error(f"Rule '{rule_name}' not found") rule = rule[0] if "error" in rule: raise Error(rule["error"]) rule["zone_id"] = rules["zone_id"] rule["custom_ruleset_id"] = rules["custom_ruleset_id"] return RuleObject(rule)
[docs] def export_rules(self, domain_name: str) -> True: """Export all expressions from a specific domain .. note:: Will save all expressions into multiple files in the folder specified in Cloudflare's constructor >>> cf.export_rules("example.com") # "Bad Bots.txt", "Bad IP.txt", "Bad AS.txt" files created in "my_expressions" folder """ rules = self.get_rules(domain_name) for rule in rules["result"]: print(f"Exporting {rule['description']}...") header = { "id": rule["id"], "action": rule["action"], "enabled": rule["enabled"], } rule_expression = self.utils.beautify(rule["expression"]) self.utils.write_expression(rule["description"], rule_expression, header=header) return True
[docs] def export_rule(self, domain_name: str, *, rule_name: str | None = None, rule_id: str | None = None) -> True: """Export the expression of a rule in a txt file :exception Error: Rule name or rule ID is not provided .. note:: Will save the expression into a file in the folder specified in Cloudflare's constructor >>> cf.export_rule("example.com", "Bad Bots") # "Bad Bots.txt" file created in "my_expressions" folder """ if rule_id: rule = self.get_rule(domain_name, rule_id=rule_id) elif rule_name: rule = self.get_rule(domain_name, rule_name=rule_name) else: raise Error("You must provide a rule_name or rule_id") header = { "id": rule["id"], "action": rule["action"], "enabled": rule["enabled"], } rule_expression = self.utils.beautify(rule["expression"]) self.utils.write_expression(rule_name, rule_expression, header=header) return True
[docs] def create_rule(self, domain_name: str, rule_file: str, rule_name: str | None = None, action: str | None = None) -> bool: """Create a rule with a specific expression * action -> Please refer to https://developers.cloudflare.com/ruleset-engine/rules-language/actions/ Available actions as string: `managed_challenge, js_challenge, challenge, block, skip, log` Action is read from the header of the file by default, but you can specify it manually. Else it will be "managed_challenge" :exception Error: Rule file is not found :exception Error: Rule already exists in remote WAF :exception Error: Cannot create more rules (5 used / 5 available depending on the current plan) >>> cf.create_rule("example.com", "Bad URL.txt", action="managed_challenge") # Create a rule with the expression in "Bad URL.txt" and override the action to "managed_challenge" >>> cf.create_rule("example.com", "Bad IP.txt", "Not allowed IP") # Create a rule named "Not allowed IP" with the expression in "Bad IP.txt" with the action in the header of the file """ if not rule_name: rule_name = rule_file.strip(".txt") rules = self.get_rules(domain_name) zone_id = rules["zone_id"] custom_ruleset_id = rules["custom_ruleset_id"] header, expression = self.utils.read_expression(rule_file) if not expression: raise Error(f"No such file in folder '{self.utils.directory}'") if rule_file.strip(".txt") in rules["rules"]: raise Error(f"Rule '{rule_file.strip('.txt')}' already exists") new_rule = { "description": rule_name, "expression": expression, } if header: if action or "action" in header: new_rule["action"] = action or header["action"] else: new_rule["action"] = "managed_challenge" if "enabled" in header: new_rule["enabled"] = header["enabled"] else: new_rule["action"] = action or "managed_challenge" if self.active_rules < self.max_rules: r = requests.post(f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{custom_ruleset_id}/rules", headers=self._headers, json=new_rule, timeout=5) else: raise Error(f"Cannot create more rules ({self.active_rules} used / {self.max_rules} available)\n" "\t\t\tIf you have a better plan, please register the domain plan using cf.set_plan(\"<your-domain>\")") return self.error.handle(r.json(), ["success"])
[docs] def update_rule(self, domain_name: str, rule_file: str, rule_name: str | None = None, action: str | None = None) -> bool: """Update a rule with a specific expression :exception Error: Rule file is not found .. todo:: First modify "Bad Bots.txt" by changing the expression or adding a new rule >>> cf.update_rule("example.com", "Bad Bots.txt") # Will update the remote rule "Bad Bots" with the expression in "Bad Bots.txt" >>> cf.update_rule("example.com", "Bad IP.txt", "Not allowed IP", "block") # Will update the remote rule "Not allowed IP" with the expression in "Bad IP.txt" and override the action to "block" """ if not rule_name: rule_name = rule_file.strip(".txt") rule = self.get_rule(domain_name, rule_name=rule_name) zone_id = rule["zone_id"] custom_ruleset_id = rule["custom_ruleset_id"] rule_id = rule["id"] updated_rule = rule.copy() del updated_rule["zone_id"] del updated_rule["custom_ruleset_id"] del updated_rule["id"] header, expression = self.utils.read_expression(rule_file) if not expression: raise Error(f"No such file in folder '{self.utils.directory}'") if header: if action or "action" in header: updated_rule["action"] = action or header["action"] if "enabled" in header: updated_rule["enabled"] = header["enabled"] updated_rule["expression"] = expression r = requests.patch(f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{custom_ruleset_id}/rules/{rule_id}", headers=self._headers, json=updated_rule, timeout=5) return self.error.handle(r.json(), ["success"])
[docs] def delete_rule(self, domain_name: str, rule_name: str) -> bool: """Delete a rule from a specific domain >>> cf.delete_rule("example.com", "Bad AS") # Will delete the rule "Bad AS" remotely from the domain "example.com" """ rule = self.get_rule(domain_name, rule_name=rule_name) zone_id = rule["zone_id"] custom_ruleset_id = rule["custom_ruleset_id"] rule_id = rule["id"] r = requests.delete(f"https://api.cloudflare.com/client/v4/zones/{zone_id}/rulesets/{custom_ruleset_id}/rules/{rule_id}", headers=self._headers, timeout=5) return self.error.handle(r.json(), ["success"])
[docs] def purge_rules(self, domain_name: str) -> bool: """Purge all rules from a specific domain >>> cf.purge_rules("example.com") # Will delete all rules remotely from the domain "example.com" """ rules = self.get_rules(domain_name)["rules"] for rule in rules: self.delete_rule(domain_name, rule) self.active_rules = 0 return True
[docs] def import_rules(self, domain_name: str, actions_all: str | None = None) -> bool: """Import all expressions from all txt file * actions_all -> Set the same action for all imported rules, \ please refer to https://developers.cloudflare.com/ruleset-engine/rules-language/actions/ Available actions as string: `managed_challenge, js_challenge, challenge, block, skip, log` :exception Error: Cannot create more rules (5 used / 5 available depending on the current plan) .. note:: If you have a better plan, please register your plan using the method :func:`set_plan(domain_name) <set_plan>` >>> cf.import_rules("example.com") # Will use the action in the header specific for every file >>> cf.import_rules("example.com", "block") # Will import all rules and use the "block" action """ files = os.listdir(self.utils.directory) rules = self.get_rules(domain_name)["rules"] self.active_rules = len(rules) for file in files: if file.endswith(".txt"): print(f"Importing {file}...") if file.strip(".txt") not in rules: if self.active_rules < self.max_rules: if actions_all: self.import_rule(domain_name, file, action=actions_all) else: self.import_rule(domain_name, file) self.active_rules += 1 else: raise Error(f"Cannot create more rules ({self.active_rules} used / {self.max_rules} available)\n" "\t\t\tIf you have a better plan, please register the domain plan using cf.set_plan(\"<your-domain>\")") return True
import_rule = create_rule """Import a rule with a specific expression Alias for :func:`create_rule` >>> cf.import_rule("example.com", "Bad URL.txt", action="managed_challenge") # Import a rule with the expression in "Bad URL.txt", will use the action in the header if specified or force it using the action argument """