You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
103 lines
3.4 KiB
103 lines
3.4 KiB
import abc |
|
from urllib.parse import urlencode |
|
|
|
import requests |
|
|
|
from cms.models import Article |
|
from crossposting_backend.settings import promoter_secrets |
|
|
|
|
|
class PromoteError(Exception): |
|
pass |
|
|
|
|
|
class Promoter(abc.ABC): |
|
def __init__(self, article: Article): |
|
self.article = article |
|
|
|
def promote(self): |
|
raise NotImplementedError |
|
|
|
|
|
class TelegramPromoter(Promoter): |
|
def promote(self): |
|
long_text = f'{self.article.body}\n{self.article.link}' |
|
channel_id = promoter_secrets['TELEGRAM_CHAT_ID'] |
|
bot_token = promoter_secrets['TELEGRAM_BOT_TOKEN'] |
|
querystring = (('chat_id', channel_id), ('text', long_text)) |
|
encoded_querystring = urlencode(querystring) |
|
|
|
send_message_url = f'https://api.telegram.org/bot{bot_token}/sendMessage?{encoded_querystring}' |
|
|
|
response = requests.get(send_message_url) |
|
result = response.json() |
|
if not result['ok']: |
|
raise PromoteError('Посылка сообщения в телеграм провалилась. Подробности %s' % str(result)) |
|
|
|
|
|
class VkontaktePromoter(Promoter): |
|
def promote(self): |
|
vk_owner_id = promoter_secrets['VK_OWNER_ID'] |
|
vk_token = promoter_secrets['VK_TOKEN'] |
|
from_group = 1 |
|
|
|
try: |
|
requests.post('https://api.vk.com/method/wall.post', |
|
params={ |
|
'access_token': vk_token, |
|
'owner_id': vk_owner_id, |
|
'from_group': from_group, |
|
'message': self.article.body, |
|
'attachment': self.article.link, |
|
'signed': 0, |
|
'v': '5.131' |
|
}) |
|
except Exception as exc: |
|
raise PromoteError(exc) |
|
|
|
|
|
class OdnoklassnikiPromoter(Promoter): |
|
def promote(self): |
|
from json import JSONEncoder |
|
import ok_api |
|
|
|
ok_access_token = promoter_secrets['OK_ACCESS_TOKEN'] |
|
ok_application_key = promoter_secrets['OK_APPLICATION_KEY'] |
|
ok_application_secret_key = promoter_secrets['OK_APPLICATION_SECRET_KEY'] |
|
ok_group_id = promoter_secrets['OK_GROUP_ID'] |
|
|
|
session = ok_api.OkApi(access_token=ok_access_token, |
|
application_key=ok_application_key, |
|
application_secret_key=ok_application_secret_key) |
|
attachments = { |
|
'media': [ |
|
{ |
|
'type': 'text', |
|
'text': self.article.body, |
|
}, |
|
{ |
|
'type': 'link', |
|
'url': self.article.link, |
|
}, |
|
] |
|
} |
|
encoded_attachments = JSONEncoder().encode(attachments) |
|
try: |
|
session.mediatopic.post(type='GROUP_THEME', |
|
gid=ok_group_id, |
|
attachment=encoded_attachments) |
|
except ok_api.OkApiException as exc: |
|
raise PromoteError(exc) |
|
|
|
|
|
class Marketer: |
|
def __init__(self, article: Article): |
|
self.promoters = [ |
|
TelegramPromoter(article), |
|
VkontaktePromoter(article), |
|
OdnoklassnikiPromoter(article), |
|
] |
|
|
|
def promote(self): |
|
for promoter in self.promoters: |
|
promoter.promote()
|
|
|